Pulsar消息路由深入剖析

news2025/1/15 6:49:37

一、概述

大数据背景下,分区应该是所有组件必备的基本条件,否则面对海量数据时无论是计算还是存储都容易遇到瓶颈。跟其他消息系统一样,Pulsar通过Topic将消息数据进行业务层面划分管理,同时也支持Topic分区,通过将多个分区分布在多台Broker/机器上从而带来性能上的巨大提升以及无限的横向拓展能力。而一旦有了分区之后就会面临一个问题,但一条数据请求时应该将其发往哪个分区?目前Pulsar跟其他消息系统一样支持以下三种路由模式。

  1. 轮询路由
    生产者会按将消息按批为单位轮询发送到不同的分区,这是一种常见的路由策略,具有简单的优势,由于它不需要过多的配置以及考虑但却可以表现不错的性能。如果消息带有key的话会根据key进行哈希运算后再对分区进行取模来决定消息投放的目标分区。
  2. 单分区路由
    单分区路由提供一种更简单的机制,它会将所有消息路由到同一个分区。这种模式类似非分区Topic,如果消息提供key的话将恢复到轮询哈希路由方式
  3. 自定义分区路由
    自定义分区路由支持你通过实现MessageRouter接口来自定义路由逻辑,例如将特定key的消息发到指定的分区等

二、实战

消息路由发生在生产者端,在创建生产者是通过 .messageRoutingMode() 进行指定,下面就分别实战对比下这三种的路由效果

1. 轮询路由

先试试轮询路由的策略,这是最常见也是默认的路由策略,通过 .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) 进行指定,然后往里面通过同步方式往分区Topic里面写入数据

        String serverUrl = "http://localhost:8080";
        PulsarClient pulsarClient =
                PulsarClient.builder().serviceUrl(serverUrl).build();
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic("sherlock-api-tenant-1/sherlock-namespace-1/partition_partition_topic_2")
                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
                //.messageRoutingMode(MessageRoutingMode.SinglePartition)
                .create();

        for (int i = 0; i < 20000; i++) {
            producer.send("hello java API pulsar:"+i+", 当前时间为:"+new Date());
        }

通过管理页面可以看到数据基本均匀的落在各个分区,从这个结果是能够反向验证数据是符合轮询发送后的效果
在这里插入图片描述

2. 单分区路由

现在试试单分区路由的策略,通过 .messageRoutingMode(MessageRoutingMode.SinglePartition) 进行指定,并往分区Topic里面写入一批数据

        String serverUrl = "http://localhost:8080";
        PulsarClient pulsarClient =
                PulsarClient.builder().serviceUrl(serverUrl).build();
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic("sherlock-api-tenant-1/sherlock-namespace-1/partition_partition_topic_2")
                .messageRoutingMode(MessageRoutingMode.SinglePartition)
                .create();

        for (int i = 0; i < 20000; i++) {
            producer.send("hello java API pulsar:"+i+", 当前时间为:"+new Date());
        }

通过管理页面可以看到数据都落在第一个分区,说明这也符合官网中对单分区路由的描述。同时经过反复试验多次发现,生产者会随机选择一个分区并将所有数据发送到这个分区。
在这里插入图片描述

3. 自定义路由

在有些业务场景,我们需要将自己的业务逻辑“融入”路由策略,因此像Pulsar、Kafka等消息中间件都是支持用户进行路由规则的自定义的。这里为了好玩,咱们尝试将数据按照 1:2:3:4 等比例分别落在四个分区如何?说干就干,自定义路由也是比较简单的,只需要实现Pulsar MessageRouter接口的choosePartition方法即可,实现逻辑如下

public class SherlockRouter implements MessageRouter {

    Long count = 0L;

    public int choosePartition(Message<?> msg, TopicMetadata metadata) {
        count++;
        count = count % 10;
        if (count == 0) return 0;
        if (count < 3) return 1;
        if (count < 6) return 2;
        return 3;
    }
}

通过上面代码可以看到,参数msg就是生产者中国呢发送的消息对象,metadata是这条消息的元数据如租户、命名空间等等,而返回值其实就是这个Topic分区的下标,这里需要注意的是不要超过Topic的分区数,同时一些比较复杂的数据处理逻辑代码尽量不要写在这里影响消息发送性能以及不规范。

写完后通过 .messageRouter() 方法进行指定即可使用

    public static void customRoundSchemaProducer() throws Exception {
        String serverUrl = "http://localhost:8080";
        PulsarClient pulsarClient =
                PulsarClient.builder().serviceUrl(serverUrl).build();
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic("sherlock-api-tenant-1/sherlock-namespace-1/partition_partition_topic_3")
                .messageRouter(new SherlockRouter())
                .create();

        for (int i = 0; i < 20000; i++) {
            producer.send("hello java API pulsar:"+i+", 当前时间为:"+new Date());
        }

        producer.close();
        pulsarClient.close();
    }

在管理页面可以看到,数据是按照咱们预期的逻辑 1:2:3:4等比落在分区里面,嘿嘿~
在这里插入图片描述

三、源码分析

1. 接口以及父类

Pulsar中所有路由规则都是基于MessageRouter接口进行实现的,这个接口主要提供了choosePartition方法,只要重写这个方法即可自定义任意自己预期的逻辑

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface MessageRouter extends Serializable {

    /**
     *
     * @param msg
     *            Message object
     * @return The index of the partition to use for the message
     * @deprecated since 1.22.0. Please use {@link #choosePartition(Message, TopicMetadata)} instead.
     */
    @Deprecated
    default int choosePartition(Message<?> msg) {
        throw new UnsupportedOperationException("Use #choosePartition(Message, TopicMetadata) instead");
    }

    /**
     * Choose a partition based on msg and the topic metadata.
     *
     * @param msg message to route
     * @param metadata topic metadata
     * @return the partition to route the message.
     * @since 1.22.0
     */
    default int choosePartition(Message<?> msg, TopicMetadata metadata) {
        return choosePartition(msg);
    }

}

MessageRouterBase是路由策略的抽象类,主要定义了消息有key时的哈希算法,像上面提的轮询路由和单分区路由继承了这个抽象类。JavaStringHash和Murmur3Hash32两个都是Pulsar提供的哈希算法的实现类,两者的差异后面再单独进行分析

public abstract class MessageRouterBase implements MessageRouter {
    private static final long serialVersionUID = 1L;

    protected final Hash hash;

    MessageRouterBase(HashingScheme hashingScheme) {
        switch (hashingScheme) {
        case JavaStringHash:
            this.hash = JavaStringHash.getInstance();
            break;
        case Murmur3_32Hash:
        default:
            this.hash = Murmur3Hash32.getInstance();
        }
    }
}

2. 轮询路由的实现

主要看choosePartition 方法的逻辑,首先如果消息带有key则针对key进行哈希然后取模,这样可以保证相同key的消息落在同一个分区。然后就是判断消息是否按批次进行发送的,如果是单条消息发送的话则通过一个累加计数器进行轮询分区,即可达到消息按照分区顺序逐个发送的效果;如果是按批次发送的话,则是根据时间戳进行取模,这样达到的效果就是每批数据都会随机发送到某一个分区

public class RoundRobinPartitionMessageRouterImpl extends MessageRouterBase {

    @SuppressWarnings("unused")
    private volatile int partitionIndex = 0;

    private final int startPtnIdx;
    private final boolean isBatchingEnabled;
    private final long partitionSwitchMs;

		....

    @Override
    public int choosePartition(Message<?> msg, TopicMetadata topicMetadata) {
        // If the message has a key, it supersedes the round robin routing policy
        if (msg.hasKey()) {
            return signSafeMod(hash.makeHash(msg.getKey()), topicMetadata.numPartitions());
        }

        if (isBatchingEnabled) { // if batching is enabled, choose partition on `partitionSwitchMs` boundary.
            long currentMs = clock.millis();
            return signSafeMod(currentMs / partitionSwitchMs + startPtnIdx, topicMetadata.numPartitions());
        } else {
            return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), topicMetadata.numPartitions());
        }
    }

}

3. 单分区路由

可以看到单分区的逻辑是比较简单且清晰的,如果有key就进行哈希取模,否则就发送到partitionIndex这个成员变量指定的分区去,那么这个partitionIndex指定的是哪个分区呢?通过代码能看到是从构造函数里面传进来的,因此跟踪下代码看看

public class SinglePartitionMessageRouterImpl extends MessageRouterBase {
		private final int partitionIndex;
  
		public SinglePartitionMessageRouterImpl(int partitionIndex, HashingScheme hashingScheme) {
        super(hashingScheme);
        this.partitionIndex = partitionIndex;
    }
  
    @Override
    public int choosePartition(Message<?> msg, TopicMetadata metadata) {
        // If the message has a key, it supersedes the single partition routing policy
        if (msg.hasKey()) {
            return signSafeMod(hash.makeHash(msg.getKey()), metadata.numPartitions());
        }
        return partitionIndex;
    }

}

通过跟踪可以看到是在PartitionedProducerImpl类的getMessageRouter方法中进行SinglePartitionMessageRouterImpl类的初始化,同时是通过ThreadLocalRandom.current().nextInt(topicMetadata.numPartitions()) 来生成一个小于分区数的随机数,因此单分区路由的分区是随机指定的一个,这个结果跟咱们实战中测试的效果是吻合的。除此之外,咱们还看到 getMessageRouter方法中会根据咱们在创建生产者时 .messageRoutingMode 方法指定的路由模式来创建对应的路由实现类,在这里可以明确的看到没有指定的话默认就是采用的轮询路由规则

private MessageRouter getMessageRouter() {
        MessageRouter messageRouter;

        MessageRoutingMode messageRouteMode = conf.getMessageRoutingMode();

        switch (messageRouteMode) {
            case CustomPartition:
                messageRouter = Objects.requireNonNull(conf.getCustomMessageRouter());
                break;
            case SinglePartition:
                messageRouter = new SinglePartitionMessageRouterImpl(
                        ThreadLocalRandom.current().nextInt(topicMetadata.numPartitions()), conf.getHashingScheme());
                break;
            case RoundRobinPartition:
            default:
                messageRouter = new RoundRobinPartitionMessageRouterImpl(
                        conf.getHashingScheme(),
                        ThreadLocalRandom.current().nextInt(topicMetadata.numPartitions()),
                        conf.isBatchingEnabled(),
                        TimeUnit.MICROSECONDS.toMillis(conf.batchingPartitionSwitchFrequencyIntervalMicros()));
        }

        return messageRouter;
    }

四、总结

通过以上内容相信你对Pulsar的路由规则有一定的了解了,如果想进一步了解可以尝试按照自己喜好实现下路由规则并观测是否按照预期运行,同时也可以跟踪Pulsar的源码看看实现是否符合预期。如果想彻底掌握Pulsar,最好自己跟踪下Pulsar的一些核心逻辑,这样不仅了解其底层是如何运作的,也能加深你对一些设计/特性的印象。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1513863.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

挑战杯 多目标跟踪算法 实时检测 - opencv 深度学习 机器视觉

文章目录 0 前言2 先上成果3 多目标跟踪的两种方法3.1 方法13.2 方法2 4 Tracking By Detecting的跟踪过程4.1 存在的问题4.2 基于轨迹预测的跟踪方式 5 训练代码6 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 深度学习多目标跟踪 …

WRF模型安装教程(ububtu系统)-- III.WRF和WPS模型的安装

六、WRF模型的安装 # 进入Build_WRF文件夹 cd Build_WRF # 下载WRFV3.9.1 wget https://www2.mmm.ucar.edu/wrf/src/WRFV4.0.TAR.gz # 解压WRF安装包并进入 tar -zxvf WRFV4.0.TAR.gz cd WRF # 安装WRF ./configure 出现如下选项&#xff1a; 选择34&#xff0c; 这里是让你选…

React进阶(Redux,RTK,dispatch,devtools)

1、初识Redux 是React最常用的集中状态管理工具&#xff0c;类似于Vue中的Pinia(Vuex)&#xff0c;可以独立于框架运行 作用&#xff1a;通过集中管理的方式管理应用的状态 案例-实现一个计数器 实现步骤&#xff1a; Redux管理数据的流程&#xff1a; state:一个对象&…

nRF52832——唯一 ID 与加密解密

nRF52832——唯一 ID 与加密解密 唯一 ID 概念唯一 ID 作用读取唯一 ID 唯一 ID 用于加密TEA 加密算法唯一 ID 的加密和解密 唯一 ID 概念 唯一 ID 作用 nRF52xx 微控制器提供一组 64 位的唯一 ID 号&#xff0c;这个唯一身份标识所提供的 ID 值对任意一个 nRF52xx 微控制器&…

Java项目:51 springboot基于springboot的社区团购系统设计012

作者主页&#xff1a;源码空间codegym 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 本基于Spring Boot的社区团购系统主要满足两种用户的需求&#xff0c;这两种用户分别为管理员和用户&#xff0c;下面将对这两种用户分别实现的…

电磁兼容EMC:一文读懂电气放电管选型设计

目录 1 GDT外观结构 2 GDT 常见品牌 3 GDT命名规则 4 GDT工作原理 5 GDT基本特点 6 GDT典型应用 7 GDT电气参数说明 7.1 DC Spark-over Voltage 直流火花放电电压&#xff08;直流击穿电压&#xff09; 7.2 Maximum Impulse Spark-over Voltage 最大冲击火花放电电压&…

Ribbon简单使用

Ribbon是Netflix发布的云中间层服务开源项目&#xff0c;其主要功能是提供客户端实现负载均衡算法。Ribbon客户端组件提供一系列完善的配置项如连接超时&#xff0c;重试等。简单的说&#xff0c;Ribbon是一个客户端负载均衡器&#xff0c;我们可以在配置文件中Load Balancer后…

【DataWhale学习】用免费GPU线上跑StableDiffusion项目实践

用免费GPU线上跑SD项目实践 ​ DataWhale组织了一个线上白嫖GPU跑chatGLM与SD的项目活动&#xff0c;我很感兴趣就参加啦。之前就对chatGLM有所耳闻&#xff0c;是去年清华联合发布的开源大语言模型&#xff0c;可以用来打造个人知识库什么的&#xff0c;一直没有尝试。而SD我…

【数据结构】顺序表的实现

文章目录 **线性表(linear)&#xff1a;****顺序表****下列是需要实现的接口(Seqlist.h)****顺序表的初始化****顺序表的插入数据****顺序表的扩容(为插入数据提供保障)****顺序表的尾插****顺序表的头插****顺序表的删除数据****顺序表的尾删****顺序表的头删****查找指定位置…

下载API文档

在线看&#xff1a;Overview (Java SE 17 & JDK 17) 离线下载&#xff1a;Java Development Kit 17 Documentation

【Sql】MVCC有关问题,以及锁,日志和主从复制原理

目录 MVCC 解决什么问题? 实现原理 隐式字段 undo log Read View(读视图) InnoDB 对 MVCC 的实现 锁 分类 锁升级&#xff1f; InnoDB 的行锁&#xff1f; 死锁避免&#xff1f; 乐观锁和悲观锁 日志 主从复制原理 主从复制的作用 MySQL主从复制解决的问题 涉…

模型量化(二)—— 训练后量化PTQ(全代码)

训练后量化&#xff08;Post-training Quantization&#xff0c;PTQ&#xff09;是一种常见的模型量化技术&#xff0c;它在模型训练完成之后应用&#xff0c;旨在减少模型的大小和提高推理速度&#xff0c;同时尽量保持模型的性能。训练后量化对于部署到资源受限的设备上&…

【阿里云系列】-利用yaml文件部署NacosXxl-job到ACK

背景介绍 随着容器化的技术成熟落地&#xff0c;拥抱各种成熟的容器化集群平台是加速我们落地的必然之路&#xff0c;目前国内以阿里云、华为云、腾讯云为平台的供应商为主&#xff0c;国外则以AWS&#xff0c;Azure为主&#xff0c;让我们借助平台已有的优势进行快速落地提高…

指针【理论知识速成】(3)

一.指针的使用和传值调用&#xff1a; 在了解指针的传址调用前&#xff0c;先来额外了解一下 “传值调用” 1.传值调用&#xff1a; 对于来看这个帖子的你相信代码展示胜过千言万语 #include <stdio.h> #include<assert.h> int convert(int a, int b) {int c 0…

log4j2.xml介绍和使用

log4j2.xml是什么 log4j2.xml 是用于配置 Apache Log4j 2 的 XML 格式配置文件。Log4j 2 是一个用于 Java 应用的流行日志框架&#xff0c;提供灵活的日志管理和配置。在 log4j2.xml 文件中&#xff0c;可以配置日志记录的格式、级别、目的地等。 下面是一些主要节点和属性的…

内容管理平台原来对企业这么重要,看完收藏!

“内容为王”&#xff0c;这是当今数字化时代的一个重要真理。不论是创业新贵、还是行业巨头&#xff0c;纷纷开始深入理解和应用内容管理平台&#xff08;Content Management System&#xff0c;简称CMS&#xff09;&#xff0c;以便更好的管理其大量的内容和信息。 那么&…

网络安全从业人员何去何从

从2024年1月1日开始到今天&#xff0c;基本没有真正放下自己休息过一天。可能很多人会说是卷&#xff0c;其实真正的原因是压力。不仅仅是生活压力还有行业压力。 今年这个行业让很多人开始感到了迷茫&#xff0c;不仅是股市的低迷&#xff0c;更多的来自于各大公司不断的因为…

什么是架构?架构设计原则是哪些?什么是设计模式?设计模式有哪些?

什么是架构?架构设计原则是哪些?什么是设计模式?设计模式有哪些? 架构的本质 架构本身是一种抽象的、来自建筑学的体系结构,其在企业及IT系统中被广泛应用。 架构的本质是对事物复杂性的管理,是对一个企业、一个公司、一个系统复杂的内部关系进行结构化、体系化的抽象,…

Stable-Diffusion的WebUI部署实战

1、环境准备及安装 1.1、linux环境 # 首先&#xff0c;已经预先安装好了anaconda&#xff0c;在这里新建一个环境 conda create -n sdwebui python3.10 # 安装完毕后&#xff0c;激活该环境 conda activate sdwebui# 安装 # 下载stable-diffusion-webui代码 apt install wget…

String 底层是如何实现的?

1、典型回答 String 底层是基于数组实现的&#xff0c;并且数组使用了 final 修饰&#xff0c;不同版本中的数组类型也是不同的&#xff1a; JDK9 之前&#xff08;不含JDK9&#xff09; String 类是使用 char[ ]&#xff08;字符数组&#xff09;实现的但 JDK9 之后&#xf…