一、概述
大数据背景下,分区应该是所有组件必备的基本条件,否则面对海量数据时无论是计算还是存储都容易遇到瓶颈。跟其他消息系统一样,Pulsar通过Topic将消息数据进行业务层面划分管理,同时也支持Topic分区,通过将多个分区分布在多台Broker/机器上从而带来性能上的巨大提升以及无限的横向拓展能力。而一旦有了分区之后就会面临一个问题,但一条数据请求时应该将其发往哪个分区?目前Pulsar跟其他消息系统一样支持以下三种路由模式。
- 轮询路由
生产者会按将消息按批为单位轮询发送到不同的分区,这是一种常见的路由策略,具有简单的优势,由于它不需要过多的配置以及考虑但却可以表现不错的性能。如果消息带有key的话会根据key进行哈希运算后再对分区进行取模来决定消息投放的目标分区。 - 单分区路由
单分区路由提供一种更简单的机制,它会将所有消息路由到同一个分区。这种模式类似非分区Topic,如果消息提供key的话将恢复到轮询哈希路由方式 - 自定义分区路由
自定义分区路由支持你通过实现MessageRouter接口来自定义路由逻辑,例如将特定key的消息发到指定的分区等
二、实战
消息路由发生在生产者端,在创建生产者是通过 .messageRoutingMode()
进行指定,下面就分别实战对比下这三种的路由效果
1. 轮询路由
先试试轮询路由的策略,这是最常见也是默认的路由策略,通过 .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
进行指定,然后往里面通过同步方式往分区Topic里面写入数据
String serverUrl = "http://localhost:8080";
PulsarClient pulsarClient =
PulsarClient.builder().serviceUrl(serverUrl).build();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("sherlock-api-tenant-1/sherlock-namespace-1/partition_partition_topic_2")
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
//.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = 0; i < 20000; i++) {
producer.send("hello java API pulsar:"+i+", 当前时间为:"+new Date());
}
通过管理页面可以看到数据基本均匀的落在各个分区,从这个结果是能够反向验证数据是符合轮询发送后的效果
2. 单分区路由
现在试试单分区路由的策略,通过 .messageRoutingMode(MessageRoutingMode.SinglePartition)
进行指定,并往分区Topic里面写入一批数据
String serverUrl = "http://localhost:8080";
PulsarClient pulsarClient =
PulsarClient.builder().serviceUrl(serverUrl).build();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("sherlock-api-tenant-1/sherlock-namespace-1/partition_partition_topic_2")
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = 0; i < 20000; i++) {
producer.send("hello java API pulsar:"+i+", 当前时间为:"+new Date());
}
通过管理页面可以看到数据都落在第一个分区,说明这也符合官网中对单分区路由的描述。同时经过反复试验多次发现,生产者会随机选择一个分区并将所有数据发送到这个分区。
3. 自定义路由
在有些业务场景,我们需要将自己的业务逻辑“融入”路由策略,因此像Pulsar、Kafka等消息中间件都是支持用户进行路由规则的自定义的。这里为了好玩,咱们尝试将数据按照 1:2:3:4 等比例分别落在四个分区如何?说干就干,自定义路由也是比较简单的,只需要实现Pulsar MessageRouter接口的choosePartition方法即可,实现逻辑如下
public class SherlockRouter implements MessageRouter {
Long count = 0L;
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
count++;
count = count % 10;
if (count == 0) return 0;
if (count < 3) return 1;
if (count < 6) return 2;
return 3;
}
}
通过上面代码可以看到,参数msg就是生产者中国呢发送的消息对象,metadata是这条消息的元数据如租户、命名空间等等,而返回值其实就是这个Topic分区的下标,这里需要注意的是不要超过Topic的分区数,同时一些比较复杂的数据处理逻辑代码尽量不要写在这里影响消息发送性能以及不规范。
写完后通过 .messageRouter()
方法进行指定即可使用
public static void customRoundSchemaProducer() throws Exception {
String serverUrl = "http://localhost:8080";
PulsarClient pulsarClient =
PulsarClient.builder().serviceUrl(serverUrl).build();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("sherlock-api-tenant-1/sherlock-namespace-1/partition_partition_topic_3")
.messageRouter(new SherlockRouter())
.create();
for (int i = 0; i < 20000; i++) {
producer.send("hello java API pulsar:"+i+", 当前时间为:"+new Date());
}
producer.close();
pulsarClient.close();
}
在管理页面可以看到,数据是按照咱们预期的逻辑 1:2:3:4等比落在分区里面,嘿嘿~
三、源码分析
1. 接口以及父类
Pulsar中所有路由规则都是基于MessageRouter接口进行实现的,这个接口主要提供了choosePartition方法,只要重写这个方法即可自定义任意自己预期的逻辑
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface MessageRouter extends Serializable {
/**
*
* @param msg
* Message object
* @return The index of the partition to use for the message
* @deprecated since 1.22.0. Please use {@link #choosePartition(Message, TopicMetadata)} instead.
*/
@Deprecated
default int choosePartition(Message<?> msg) {
throw new UnsupportedOperationException("Use #choosePartition(Message, TopicMetadata) instead");
}
/**
* Choose a partition based on msg and the topic metadata.
*
* @param msg message to route
* @param metadata topic metadata
* @return the partition to route the message.
* @since 1.22.0
*/
default int choosePartition(Message<?> msg, TopicMetadata metadata) {
return choosePartition(msg);
}
}
MessageRouterBase是路由策略的抽象类,主要定义了消息有key时的哈希算法,像上面提的轮询路由和单分区路由继承了这个抽象类。JavaStringHash和Murmur3Hash32两个都是Pulsar提供的哈希算法的实现类,两者的差异后面再单独进行分析
public abstract class MessageRouterBase implements MessageRouter {
private static final long serialVersionUID = 1L;
protected final Hash hash;
MessageRouterBase(HashingScheme hashingScheme) {
switch (hashingScheme) {
case JavaStringHash:
this.hash = JavaStringHash.getInstance();
break;
case Murmur3_32Hash:
default:
this.hash = Murmur3Hash32.getInstance();
}
}
}
2. 轮询路由的实现
主要看choosePartition 方法的逻辑,首先如果消息带有key则针对key进行哈希然后取模,这样可以保证相同key的消息落在同一个分区。然后就是判断消息是否按批次进行发送的,如果是单条消息发送的话则通过一个累加计数器进行轮询分区,即可达到消息按照分区顺序逐个发送的效果;如果是按批次发送的话,则是根据时间戳进行取模,这样达到的效果就是每批数据都会随机发送到某一个分区
public class RoundRobinPartitionMessageRouterImpl extends MessageRouterBase {
@SuppressWarnings("unused")
private volatile int partitionIndex = 0;
private final int startPtnIdx;
private final boolean isBatchingEnabled;
private final long partitionSwitchMs;
....
@Override
public int choosePartition(Message<?> msg, TopicMetadata topicMetadata) {
// If the message has a key, it supersedes the round robin routing policy
if (msg.hasKey()) {
return signSafeMod(hash.makeHash(msg.getKey()), topicMetadata.numPartitions());
}
if (isBatchingEnabled) { // if batching is enabled, choose partition on `partitionSwitchMs` boundary.
long currentMs = clock.millis();
return signSafeMod(currentMs / partitionSwitchMs + startPtnIdx, topicMetadata.numPartitions());
} else {
return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), topicMetadata.numPartitions());
}
}
}
3. 单分区路由
可以看到单分区的逻辑是比较简单且清晰的,如果有key就进行哈希取模,否则就发送到partitionIndex这个成员变量指定的分区去,那么这个partitionIndex指定的是哪个分区呢?通过代码能看到是从构造函数里面传进来的,因此跟踪下代码看看
public class SinglePartitionMessageRouterImpl extends MessageRouterBase {
private final int partitionIndex;
public SinglePartitionMessageRouterImpl(int partitionIndex, HashingScheme hashingScheme) {
super(hashingScheme);
this.partitionIndex = partitionIndex;
}
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
// If the message has a key, it supersedes the single partition routing policy
if (msg.hasKey()) {
return signSafeMod(hash.makeHash(msg.getKey()), metadata.numPartitions());
}
return partitionIndex;
}
}
通过跟踪可以看到是在PartitionedProducerImpl类的getMessageRouter方法中进行SinglePartitionMessageRouterImpl类的初始化,同时是通过ThreadLocalRandom.current().nextInt(topicMetadata.numPartitions())
来生成一个小于分区数的随机数,因此单分区路由的分区是随机指定的一个,这个结果跟咱们实战中测试的效果是吻合的。除此之外,咱们还看到 getMessageRouter方法中会根据咱们在创建生产者时 .messageRoutingMode
方法指定的路由模式来创建对应的路由实现类,在这里可以明确的看到没有指定的话默认就是采用的轮询路由规则
private MessageRouter getMessageRouter() {
MessageRouter messageRouter;
MessageRoutingMode messageRouteMode = conf.getMessageRoutingMode();
switch (messageRouteMode) {
case CustomPartition:
messageRouter = Objects.requireNonNull(conf.getCustomMessageRouter());
break;
case SinglePartition:
messageRouter = new SinglePartitionMessageRouterImpl(
ThreadLocalRandom.current().nextInt(topicMetadata.numPartitions()), conf.getHashingScheme());
break;
case RoundRobinPartition:
default:
messageRouter = new RoundRobinPartitionMessageRouterImpl(
conf.getHashingScheme(),
ThreadLocalRandom.current().nextInt(topicMetadata.numPartitions()),
conf.isBatchingEnabled(),
TimeUnit.MICROSECONDS.toMillis(conf.batchingPartitionSwitchFrequencyIntervalMicros()));
}
return messageRouter;
}
四、总结
通过以上内容相信你对Pulsar的路由规则有一定的了解了,如果想进一步了解可以尝试按照自己喜好实现下路由规则并观测是否按照预期运行,同时也可以跟踪Pulsar的源码看看实现是否符合预期。如果想彻底掌握Pulsar,最好自己跟踪下Pulsar的一些核心逻辑,这样不仅了解其底层是如何运作的,也能加深你对一些设计/特性的印象。