【RocketMQ】(六)顺序消息实现原理

news2024/11/24 12:43:45

全局有序
在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一般不使用。

局部有序
假设一个Topic分配了两个消息队列,生产者在发送消息的时候,可以对消息设置一个路由ID,比如想保证一个订单的相关消息有序,那么就使用订单ID当做路由ID,在发送消息的时候,通过订单ID对消息队列的个数取余,根据取余结果选择消息队列,这样同一个订单的数据就可以保证发送到一个消息队列中,消费者端使用MessageListenerOrderly处理有序消息,这就是RocketMQ的局部有序,保证消息在某个消息队列中有序。

接下来看RoceketMQ源码中提供的顺序消息例子(稍微做了一些修改):

生产者

public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            // 创建生产者
            DefaultMQProducer producer = new DefaultMQProducer("生产者组");
            // 启动
            producer.start();
            // 创建TAG
            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                // 生成订单ID
                int orderId = i % 10;
                // 创建消息
                Message msg =
                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // 获取订单ID
                        Integer id = (Integer) arg;
                        // 对消息队列个数取余
                        int index = id % mqs.size();
                        // 根据取余结果选择消息要发送给哪个消息队列
                        return mqs.get(index);
                    }
                }, orderId); // 这里传入了订单ID
                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

消费者

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        // 创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("消费者组");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 订阅主题
        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
        // 注册消息监听器,使用的是MessageListenerOrderly
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                // 打印消息
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

从例子中可以看出生产者在发送消息的时候,通过订单ID作为路由信息,将同一个订单ID的消息发送到了同一个消息队列中,保证同一个订单ID相关消息有序发送,接下来就看消费者是如何保证消息的顺序消费的。

定时任务对消息队列加锁

消费者在启动的时候,会对是否是顺序消费进行判断(监听器是否是MessageListenerOrderly类型来判断),如果是顺序消费,会使用ConsumeMessageOrderlyService,并调用它的start方法进行启动,在集群模式模式下,start方法中会启动一个定时加锁的任务,周期性的对该消费者负责的消息队列进行加锁。

为什么集群模式下需要加锁?
因为广播模式下,消息队列会分配给消费者下的每一个消费者,而在集群模式下,一个消息队列同一时刻只能被同一个消费组下的某一个消费者进行,所以在广播模式下不存在竞争关系,也就不需要对消息队列进行加锁,而在集群模式下,有可能因为负载均衡等原因将某一个消息队列分配到了另外一个消费者中,因此在顺序消费情况下,集群模式下需要对消息队列加锁,当某个消息队列被锁定时,其他的消费者不能进行消费。

加锁的具体逻辑如下,首先获取当前消费者负责的所有消息队列MessageQueue,返回数据是一个MAP,key为broker名称,value为broker下的消息队列,接着对MAP进行遍历,处理每一个broker下的消息队列:
(1)根据broker名称查找broker的详细信息;
(2)创建加锁请求,在请求中设置要加锁的消息队列,将请求发送给broker,表示要对这些消息队列进行加锁;
(3)Broker返回请求处理结果,响应结果中包含了加锁成功的消息队列,对于加锁成功的消息队列将消息队列MessageQueue,将其对应的ProcessQueue中的locked属性置为true表示该消息队列已加锁成功,如果响应中未包含某个消息队列的信息,表示此消息队列加锁失败,需要将其对应的ProcessQueue对象中的locked属性置为false表示加锁失败;

顺序消息拉取

上面可知,在使用顺序消息时,定时任务会周期性的对当前消费者负责的消息队列进行加锁,不过由于负载均衡等原因,有可能给当前消费者分配了新的消息队列,此时还未来得及通过定时任务加锁,所以消费者在构建消息拉取请求前会再次进行判断,如果是新分配到当前消费者的消息队列,同样会向Broker发送请求,对MessageQueue进行加锁,加锁成功将其对应的ProcessQueue中的locked属性置为true才可以拉取消息。

顺序消息消费

消息拉取成功之后,会将消息提交到线程池中进行处理,对于顺序消费处理逻辑如下:

  1. 获取消息队列MessageQueue的对象锁,每个MessageQueue对应了一把Object对象锁,然后使用synchronized进行加锁,这里加锁的原因是因为顺序消费使用的是线程池,由多个线程同时进行消费,所以某个线程在处理某个消息队列的消息时需要对该消息队列MessageQueue加锁,防止其他线程并发消费该消息队列的锁,破坏消息的顺序性

    public class MessageQueueLock {
        private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>();
    
        public Object fetchLockObject(final MessageQueue mq) {
            // 获取消息队列对应的对象锁,也就是一个Object类型的对象
            Object objLock = this.mqLockTable.get(mq);
            // 如果获取为空
            if (null == objLock) {
                // 创建对象
                objLock = new Object();
                // 加入到Map中
                Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
                if (prevLock != null) {
                    objLock = prevLock;
                }
            }
            return objLock;
        }
    }
    
  2. 上一步获取锁成功之后,会再次校验该MessageQueue对应的ProcessQueue中的锁(locked状态),看是否过期或者已经失效,过期或者失效稍后会重新进行加锁;

  3. 获取ProcessQueue的中的consumeLock消费锁,获取成功之后调用消息监听器的consumeMessage方法开始消费消费;

    public class ProcessQueue {
       // 消息消费锁
       private final Lock consumeLock = new ReentrantLock();
    
       public Lock getConsumeLock() { // 获取消息消费锁
             return consumeLock;
       }
    }
    
  4. 消息消费完毕,释放ProcessQueueconsumeLock消费锁;

  5. 方法执行完毕,释放MessageQueue对应的Object对象锁;

在第1步中就已经获取了MessageQueue对应的Object对象锁对消息队列进行加锁了,那么为什么在第3步消费消息之前还要再加一个消费锁呢?

猜测有可能是在消费者进行负载均衡时,当前消费者负责的消息队列发生变化,可能移除某个消息队列,那么消费者在进行消费的时候就要获取ProcessQueueconsumeLock消费锁进行加锁,相当于锁住ProcessQueue,防止正在消费的过程中,ProcessQueue被负载均衡移除。

既然如此,负载均衡的时候为什么不使用MessageQueue对应的Object对象锁进行加锁而要使用ProcessQueue中的consumeLock消费锁?

这里应该是为了减小锁的粒度,因为消费者在MessageQueue对应的Object加锁后,还进行了一系列的判断,校验都成功之后获取ProcessQueue中的consumeLock加锁,之后开始消费消息,消费完毕释放所有的锁,如果负载均衡使用MessageQueueObject对象锁需要等待整个过程结束,锁的粒度较粗,这样显然会降低性能,而如果使用消息消费锁,只需要等待第3步和第4步结束就可以获取锁,减少等待的时间,而且消费者在进行消息消费前也会判断ProcessQueue是否被移除,所以只要保证consumeMessage方法在执行的过程中(消息被消费的过程)ProcessQueue不被移除即可。

总结

消费者端,是通过加锁来保证消息的顺序消费,一共有三把锁:

  1. 向Broker申请的消息队列锁
    集群模式下一个消息队列同一时刻只能被同一个消费组下的某一个消费者进行,为了避免负载均衡等原因引起的变动,消费者会向Broker发送请求对消息队列进行加锁,如果加锁成功,记录到消息队列对应的ProcessQueue中的locked变量中。

  2. 消息队列锁
    对应MessageQueue对应的Object对象锁,消费者在处理拉取到的消息时,由于可以开启多线程进行处理,所以处理消息前需要对MessageQueue加锁,锁住要处理的消息队列,主要是处理多线程之间的竞争,保证消息的顺序性。

  3. 消息消费锁
    对应ProcessQueue中的consumeLock,消费者在调用consumeMessage方法之前会加消费锁,主要是为了避免在消费消息时,由于负载均衡等原因,ProcessQueue被删除

对应的相关源码可参考:

【RocketMQ】【源码】顺序消息实现原理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1034671.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

955. 删列造序 II;1838. 最高频元素的频数;1392. 最长快乐前缀

955. 删列造序 II 核心思想&#xff1a;我们可以按照一行一行排列strs&#xff0c;删除索引序列就代表删除某一列&#xff0c;那么我们如何判断一列是否应该删除呢&#xff0c;我们可以从反方向思考&#xff0c;应该保留那些列呢&#xff1f;从第一列开始&#xff0c;如果它不…

WebGIS面试题(浙江中海达)

1、Cesium中有几种拾取坐标的方式&#xff0c;分别介绍 Cesium是一个用于创建3D地球和地理空间应用的JavaScript库。在Cesium中&#xff0c;你可以使用不同的方式来拾取坐标&#xff0c;以便与地球或地图上的对象进行交互。以下是Cesium中几种常见的拾取坐标的方式&#xff1a…

jdk21(最新版) download 配置(linux window mac)

download 直达链接 jdk21,17 # wget https://download.oracle.com/java/20/latest/jdk-21_linux-x64_bin.deb # 选择你需要的包类似格式替换包的名称就可以实现终端下载jdk下载登录/oracle账号 下载jdk有可能存在要求登录帐号的情况 # 好心人的帐号 账号&#xff1a; 599…

C语言内存函数的使用、剖析及模拟实现

目录 一、内存拷贝函数——memcpy 1.函数声明&#xff1a; 注意&#xff1a; 2.函数使用用例&#xff1a; 3.memcpy函数的模拟实现&#xff1a; 二、内存拷贝函数2——memmove 1.函数声明&#xff1a; 2.memmove函数的模拟实现 三、内存比较函数——memcmp 1.函数声明…

01_docker镜像管理:80分钟一口气学完docker+k8s!带你掌握docker+k8s所有核心知识点,全程干货,无废话!

docker镜像的实际使用学习 开发过程中&#xff0c;需要安装很多三方工具&#xff0c;比如etcd、kafka、mysql、nginx等等 1、下载安装Docker工具。 2、获取该软件的Docker镜像&#xff08;基本上&#xff0c;都能搜索到核实的镜像&#xff09;&#xff0c;下载镜像nginx镜像…

浏览器基本原理

1、浏览器内部组成 我们看到浏览器主要包括&#xff1a; 1个浏览器主进程&#xff1a; 主要负责界面显示&#xff0c;用户交互&#xff0c;子进程管理多个渲染进程&#xff1a;一般浏览器会为每个Tab标签窗口创建一个渲染进程&#xff0c;主要负责将html&#xff0c;css&#…

经典网络(一) AlexNet逐层解析 | 代码、可视化、参数查看!

文章目录 1 回顾2 AlexNet的重要性3 AlexNet解析3.1 结构3.1.1 CONV13.1.2 Max Pool13.1.3 NORM13.1.4 CONV23.1.5 Max Pool23.1.6 CONV3 CONV43.1.7 CONV53.1.8 Max Pool33.1.9 FC1 FC2 FC3 3.2 AlexNet使用到的技巧3.3 可视化3.4 代码实现模拟3.4.1 查看每一层输入输出3.4.2 …

UE 虚幻引擎 利用LOD,Nanite技术优化场景性能

目录 0 引言1 LOD1.1 LOD定义1.2 UE5中的LOD技术1.3 HLOD&#xff08;Hierarchical Level of Detail&#xff09; 2 Nanite2.1 UE5的Nanite技术2.2 Nanite介绍2.2.1 Nanite的优势2.2.2 Nanite网格体与传统静态网格体的不同2.2.3 Nanite支持的类型2.2.4 在地形中使用Nanite 0 引…

KT142C语音芯片flash型用户如何更新固件的说明_V2

目录 一、简介 2.1 让芯片进入PC模式 2.2 双击提供的exe程序即可 一、简介 正常的情况下&#xff0c;用户肯定是不需要更新固件的&#xff0c;因为芯片出厂默认就烧录了对应的程序固件&#xff0c;但是有客户可能需要小修小改&#xff0c;或者订制一下某些功能&#xff0c…

寻找环形链表的入环点

之前我们在判断一个链表是否为环&#xff0c; 是运用快慢指针的方法&#xff0c;且只能是慢指针走一步&#xff0c;快指针两步&#xff1b; 那么如何求带环链表的入环点的 思路一&#xff1a;数学方法&#xff08;找出带环链表各个特点量的关系&#xff09; 代码&#xff1a;…

Linux设备驱动之Camera驱动

Linux设备驱动之Camera驱动 Camera&#xff0c;相机&#xff0c;平常手机使用较多&#xff0c;但是手机的相机怎么进行拍照的&#xff0c;硬件和软件&#xff0c;都是如何配合拍摄到图像的&#xff0c;下面大家一起来了解一下。 基础知识 在介绍具体Camera框架前&#xff0c…

图像复原与重建,解决噪声的几种空间域复原方法(数字图像处理概念 P4)

文章目录 图像复原模型噪声模型只存在噪声的空间域复原 图像复原模型 噪声模型 只存在噪声的空间域复原

字节一面:你能手撕节流防抖吗?

前言 最近博主在字节面试中遇到这样一个面试题&#xff0c;这个问题也是前端面试的高频问题&#xff0c;节流防抖是前端性能优化一个很重要的手段&#xff0c;所以作为一个前端工程师必须要深入掌握这个知识点&#xff0c;博主在这给大家细细道来。 &#x1f680; 作者简介&…

01 TextRNN FastText TextCNN-04-训练要点,实验过程

TextRNN & FastText & TextCNN-03-模型总览&#xff0c;后 训练要点 RNN训练 得出来的y&#xff08;m&#xff09;&#xff08;预测标签&#xff09;是每一个分类的概率&#xff0c;比如是一个五分类&#xff0c;化成5个格子&#xff0c;每一个格子是概率&#xff0c…

java生成PDF的Util

java使用itext生成pdf-CSDN博客 接上文 支持表格绘制表格 支持表格中的文本 字体加粗、字体上色、单元格背景上色&#xff0c; 支持拼接文本 支持单行文本 多种背景颜色、字体上色 支持自定义水印 废话不说先上效果图 工具类代码 package com.zxw.文件.PDF.util;import …

建立一张表: 表里面有多个字段,每一个字段对应一种数据类

首先mysql -uroot -p 进入MySQL 选择一个数据库并使用 在该数据库内创建表格 create table homework_tb( id int(11) comment 编号, company_name char(6) comment 公司名称, introduce varchar(100) comment 介绍, content1 tinytext comment 内容1, co…

ad18学习笔记十一:显示和隐藏网络、铺铜

如何显示和隐藏网络&#xff1f; Altium Designer--如何快速查看PCB网络布线_ad原理图查看某一网络的走线_辉_0527的博客-CSDN博客 AD19(Altium Designer)如何显示和隐藏网络 如何显示和隐藏铺铜&#xff1f; Altium Designer 20在PCB中显示或隐藏每层铺铜-百度经验 AD打开与…

React【Context_作用、函数组件订阅Context、Fragments 、错误边界_概念 、错误边界_应用、Refs DOM】(四)

目录 Context_作用 函数组件订阅Context Fragments 错误边界_概念 错误边界_应用 Refs & DOM Context_作用 React组件中数据是通过 props 属性自上而下&#xff08;由父及子&#xff09;进行传递的&#xff0c;但是有的时候中间的一些组件可能并不需要props的值。 //A…

深度学习自学笔记一:神经网络和深度学习

神经网络是一种模拟人脑神经元之间相互连接的计算模型&#xff0c;它由多个节点&#xff08;或称为神经元&#xff09;组成&#xff0c;并通过调整节点之间的连接权重来学习和处理数据。深度学习则是指利用深层次的神经网络进行学习和建模的机器学习方法。 假设有一个数据集&a…

电阻的读数

常见电阻的阻值一般有色环电阻和贴片电阻 &#xff0c;下面介绍两种电阻的阻值读法。 1、色标法&#xff1a; 技巧&#xff1a;四环电阻的的精度一般为银色和金色&#xff0c;如果一眼能可看到这两种颜色可以判断为第4环的精度读数 可见棕色为第1环&#xff0c;黑色第2环&…