【RocketMQ】顺序消费消息实现原理分析

news2024/9/21 16:48:18

一、顺序消息概述

1.1、什么是顺序消息

顺序消息是指对于一个指定的 Topic ,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。

1.2、顺序消息的类型

全局顺序消息

对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费(单生产者单线程,单消费者单线程)

  • 适用场景

  • 适用于性能要求不高,所有的消息严格按照FIFO原则来发布和消费的场景。

分区顺序消息

对于指定的一个Topic,所有消息根据Sharding Key进行划分到不同队列中,同一个队列内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一队列内同一Sharding Key的消息保证顺序,不同队列之间的消息顺序不做要求。

  • 适用场景

  • 适用于性能要求高,以Sharding Key作为划分字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。

1.3、顺序消息的使用场景

顺序消息通常用于业务上有先后顺序要求的场景,比如订单创建、支付、发货等操作,必须按照一定的顺序执行,否则会导致错误的结果。RocketMQ提供了顺序消息的特性,可以满足这种业务场景的需求。

举个例子,某电商网站有一个订单创建系统,用户下单后需要按照如下顺序依次处理:

  1. 创建订单;

  2. 扣减库存;

  3. 发送短信通知用户。

如果这些操作不按照正确的顺序执行,就会出现一些问题。比如库存已经被扣减了,但是订单还没创建成功,这样可能会导致订单创建失败。RocketMQ的顺序消息可以保证这些操作按照正确的顺序执行。

二、顺序消息的实现原理

2.1、顺序消息实现案例

接下来看RoceketMQ源码中提供的顺序消息例子(稍微做了一些修改):

生产者
public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            // 创建生产者
            DefaultMQProducer producer = new DefaultMQProducer("生产者组");
            // 启动
            producer.start();
            // 创建TAG
            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                // 生成订单ID
                int orderId = i % 10;
                // 创建消息
                Message msg =
                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // 获取订单ID
                        Integer id = (Integer) arg;
                        // 对消息队列个数取余
                        int index = id % mqs.size();
                        // 根据取余结果选择消息要发送给哪个消息队列
                        return mqs.get(index);
                    }
                }, orderId); // 这里传入了订单ID
                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}
消费者
public class Consumer {

    public static void main(String[] args) throws MQClientException {
        // 创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("消费者组");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 订阅主题
        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
        // 注册消息监听器,使用的是MessageListenerOrderly
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                // 打印消息
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

从例子中可以看出生产者在发送消息的时候,通过订单ID作为路由信息,将同一个订单ID的消息发送到了同一个消息队列中,保证同一个订单ID相关消息有序发送,接下来就看消费者是如何保证消息的顺序消费的。

2.2、定时任务对消息队列加锁

消费者在启动的时候,会对是否是顺序消费进行判断(监听器是否是MessageListenerOrderly类型来判断),如果是顺序消费,会使用ConsumeMessageOrderlyService,并调用它的start方法进行启动,在集群模式模式下,start方法中会启动一个定时加锁的任务,周期性的对该消费者负责的消息队列进行加锁。

为什么集群模式下需要加锁?

  • 因为广播模式下,消息队列会分配给消费者下的每一个消费者,而在集群模式下,一个消息队列同一时刻只能被同一个消费组下的某一个消费者进行,所以在广播模式下不存在竞争关系,也就不需要对消息队列进行加锁,而在集群模式下,有可能因为负载均衡等原因将某一个消息队列分配到了另外一个消费者中,因此在顺序消费情况下,集群模式下需要对消息队列加锁,当某个消息队列被锁定时,其他的消费者不能进行消费。

加锁的具体逻辑如下,首先获取当前消费者负责的所有消息队列MessageQueue,返回数据是一个MAP,key为broker名称,value为broker下的消息队列,接着对MAP进行遍历,处理每一个broker下的消息队列:

  1. 根据broker名称查找broker的详细信息;

  2. 创建加锁请求,在请求中设置要加锁的消息队列,将请求发送给broker,表示要对这些消息队列进行加锁;

  3. Broker返回请求处理结果,响应结果中包含了加锁成功的消息队列,对于加锁成功的消息队列将消息队列MessageQueue,将其对应的ProcessQueue中的locked属性置为true表示该消息队列已加锁成功,如果响应中未包含某个消息队列的信息,表示此消息队列加锁失败,需要将其对应的ProcessQueue对象中的locked属性置为false表示加锁失败;

2.3、顺序消息拉取

上面可知,在使用顺序消息时,定时任务会周期性的对当前消费者负责的消息队列进行加锁,不过由于负载均衡等原因,有可能给当前消费者分配了新的消息队列,此时还未来得及通过定时任务加锁,所以消费者在构建消息拉取请求前会再次进行判断,如果是新分配到当前消费者的消息队列,同样会向Broker发送请求,对MessageQueue进行加锁,加锁成功将其对应的ProcessQueue中的locked属性置为true才可以拉取消息。

2.4、顺序消息消费

消息拉取成功之后,会将消息提交到线程池中进行处理,对于顺序消费处理逻辑如下:

  • 获取消息队列MessageQueue的对象锁,每个MessageQueue对应了一把Object对象锁,然后使用synchronized进行加锁,这里加锁的原因是因为顺序消费使用的是线程池,由多个线程同时进行消费,所以某个线程在处理某个消息队列的消息时需要对该消息队列MessageQueue加锁,防止其他线程并发消费该消息队列的锁,破坏消息的顺序性;
public class MessageQueueLock {
    private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>();

    public Object fetchLockObject(final MessageQueue mq) {
        // 获取消息队列对应的对象锁,也就是一个Object类型的对象
        Object objLock = this.mqLockTable.get(mq);
        // 如果获取为空
        if (null == objLock) {
            // 创建对象
            objLock = new Object();
            // 加入到Map中
            Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
            if (prevLock != null) {
                objLock = prevLock;
            }
        }
        return objLock;
    }
}
  • 上一步获取锁成功之后,会再次校验该MessageQueue对应的ProcessQueue中的锁(locked状态),看是否过期或者已经失效,过期或者失效稍后会重新进行加锁;
  • 获取ProcessQueue的中的consumeLock消费锁,获取成功之后调用消息监听器的consumeMessage方法开始消费消费;
public class ProcessQueue {
   // 消息消费锁
   private final Lock consumeLock = new ReentrantLock();

   public Lock getConsumeLock() { // 获取消息消费锁
         return consumeLock;
   }
}
  • 消息消费完毕,释放ProcessQueueconsumeLock消费锁;

  • 方法执行完毕,释放MessageQueue对应的Object对象锁;

在第1步中就已经获取了MessageQueue对应的Object对象锁对消息队列进行加锁了,那么为什么在第3步消费消息之前还要再加一个消费锁呢?

猜测有可能是在消费者进行负载均衡时,当前消费者负责的消息队列发生变化,可能移除某个消息队列,那么消费者在进行消费的时候就要获取ProcessQueueconsumeLock消费锁进行加锁,相当于锁住ProcessQueue,防止正在消费的过程中,ProcessQueue被负载均衡移除。

既然如此,负载均衡的时候为什么不使用MessageQueue对应的Object对象锁进行加锁而要使用ProcessQueue中的consumeLock消费锁?

这里应该是为了减小锁的粒度,因为消费者在MessageQueue对应的Object加锁后,还进行了一系列的判断,校验都成功之后获取ProcessQueue中的consumeLock加锁,之后开始消费消息,消费完毕释放所有的锁,如果负载均衡使用MessageQueueObject对象锁需要等待整个过程结束,锁的粒度较粗,这样显然会降低性能,而如果使用消息消费锁,只需要等待第3步和第4步结束就可以获取锁,减少等待的时间,而且消费者在进行消息消费前也会判断ProcessQueue是否被移除,所以只要保证consumeMessage方法在执行的过程中(消息被消费的过程)ProcessQueue不被移除即可。

2.5、总结

消费者端,是通过加锁来保证消息的顺序消费,一共有三把锁:

  • 向Broker申请的消息队列 集群模式下一个消息队列同一时刻只能被同一个消费组下的某一个消费者进行,为了避免负载均衡等原因引起的变动,消费者会向Broker发送请求对消息队列进行加锁,如果加锁成功,记录到消息队列对应的ProcessQueue中的locked变量中。

  • 消息队列 对应MessageQueue对应的Object对象锁,消费者在处理拉取到的消息时,由于可以开启多线程进行处理,所以处理消息前需要对MessageQueue加锁,锁住要处理的消息队列,主要是处理多线程之间的竞争,保证消息的顺序性。

  • 消息消费锁 对应ProcessQueue中的consumeLock,消费者在调用consumeMessage方法之前会加消费锁,主要是为了避免在消费消息时,由于负载均衡等原因,ProcessQueue被删除

三、参考

  • 【RocketMQ】【源码】顺序消息实现原理

  • RocketMQ顺序消息机制源码分析~

  • RocketMQ 顺序消费机制

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1311958.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【网络安全】网络防护之旅 - 对称密码加密算法的实现

&#x1f308;个人主页&#xff1a;Sarapines Programmer&#x1f525; 系列专栏&#xff1a;《网络安全之道 | 数字征程》⏰墨香寄清辞&#xff1a;千里传信如电光&#xff0c;密码奥妙似仙方。 挑战黑暗剑拔弩张&#xff0c;网络战场誓守长。 目录 &#x1f608;1. 初识网络安…

性能提升100%、存储节约50%!猕猴桃游戏搭载OceanBase开启云端手游新篇章

近日&#xff0c;武汉灵动在线科技有限公司&#xff08;以下简称“灵动在线”&#xff09;与 OceanBase 达成合作&#xff0c;旗下品牌猕猴桃游戏的“游戏用户中心&#xff08;微信小程序&#xff09;”和“BI 分析报表业务系统“两大关键业务系统全面接入 OB Cloud 云数据库&a…

中国企业领袖年会在京举行,天雨设计左明龙应邀出席参加

12月9-11日&#xff0c;第二十一届中国企业领袖年会在中国大饭店隆重举行。本届领袖年会以“致敬长期主义”为主题&#xff0c;逾百名世界500强、中国500强和行业领军企业家受邀汇聚一堂&#xff0c;共话中国经济未来。北京天雨盛世文化传媒有限公司&#xff08;以下简称“天雨…

Ubuntu如何安装KVM

环境&#xff1a; 联想E14笔记本 Ubuntu20.04 问题描述&#xff1a; Ubuntu如何安装KVM 解决方案&#xff1a; 1.验证CPU是否支持硬件虚拟化 rootst-ThinkPad-E14:~# grep -Eoc (vmx|svm) /proc/cpuinfo 162.检查 VT 是否在 BIOS 中启用 安装 apt install cpu-checker …

软件设计师——软件工程(二)

&#x1f4d1;前言 本文主要是【软件工程】——软件设计师——软件工程的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是听风与他&#x1f947; ☁️博客首页&#xff1a;CSDN主页听风与他 &#x1f304…

利用svm进行模型训练

一、步骤 1、将文本数据转换为特征向量 &#xff1a; tf-idf 2、使用这些特征向量训练SVM模型 二、代码 from sklearn.model_selection import train_test_split from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.svm import SVC from sklearn.m…

如何有效利用餐厅预约小程序推广餐厅品牌

随着餐饮行业竞争的加剧&#xff0c;餐厅订座预约成为了吸引顾客的一种重要方式。而微信小程序作为移动互联网的重要入口之一&#xff0c;为餐厅提供了一个方便快捷的预约平台。本文将介绍如何使用乔拓云平台等第三方小程序制作平台来开发餐厅订座预约微信小程序。 首先&#x…

jmeter,跨线程调用cookie

结构目录 一、线程组1 1、创建登录的【HTTP请求】&#xff0c;并配置接口所需参数&#xff1b; 2、创建【正则表达式提取器】&#xff0c;用正则表达式提取cookie字段&#xff1b; 3、创建【调试取样器】&#xff0c;便于观察第2步提取出的数据&#xff1b; 4、创建【BeanSh…

nodejs微信小程序+python+PHP的驾照理论模拟考试系统-计算机毕业设计推荐

从角色上分为用户和管理员两部分&#xff0c;用户功能主要是在前台&#xff0c;主要内容首页&#xff0c;注册登录&#xff0c; 模拟考试&#xff0c;论坛&#xff0c;公告信息 &#xff0c;个人中心&#xff0c;考试记录&#xff0c;错图记录等功能&#xff0c;后台部分主要给…

【Spark精讲】Spark Shuffle详解

目录 Shuffle概述 Shuffle执行流程 总体流程 中间文件 ShuffledRDD生成 Stage划分 Task划分 Map端写入(Shuffle Write) Reduce端读取(Shuffle Read) Spark Shuffle演变 SortShuffleManager运行机制 普通运行机制 bypass 运行机制 Tungsten Sort Shuffle 运行机制…

群晖(Synology)云备份的方案是什么

群晖云备份方案就是在本地的 NAS 如果出现问题&#xff0c;或者必须需要重做整列的时候&#xff0c;保证数据不丢失。 当然&#xff0c;这些是针对有价值的数据&#xff0c;如果只是电影或者不是自己的拍摄素材文件&#xff0c;其实可以不使用云备份方案&#xff0c;因为毕竟云…

Unity Mono加密解决方案

Unity Mono 是 Unity 引擎默认的脚本运行时环境&#xff0c;在游戏开发中扮演着重要的角色。Mono 由跨平台的开源 .NET 框架实现&#xff0c;它允许开发者使用 C# 等编程语言编写游戏逻辑。凭借简单易用的开发环境和高效的脚本编译速度&#xff0c;得到了众多游戏的青睐。 在 …

打工人副业变现秘籍,某多/某手变现底层引擎-Stable Diffusion 模特假人换服装、换背景、换真人

给固定人物换背景或者换服装,需要用到一个Stable Diffusion扩展插件,就是sd-webui-segment-anything。 sd-webui-segment-anything 不仅可以做到抠图的效果,也能实现之多蒙版的效果。 什么是蒙版 图片蒙版是一种用于调节图像修改程度以及进行局部调整的工具。它通常分为四种…

Elasitcsearch--解决CPU使用率飙升

原文网址&#xff1a;Elasitcsearch--解决CPU使用率飙升_IT利刃出鞘的博客-CSDN博客 简介 本文介绍如何解决ES导致的CPU使用率飙升的问题。 问题描述 线上环境 Elasticsearch CPU 使用率飙升常见问题如下&#xff1a; Elasticsearch 使用线程池来管理并发操作的 CPU 资源。…

分布式块存储 ZBS 的自主研发之旅|元数据管理

重点内容 元数据管理十分重要&#xff0c;犹如整个存储系统的“大黄页”&#xff0c;如果元数据操作出现性能瓶颈&#xff0c;将严重影响存储系统的整体性能。如何提升元数据处理速度与高可用是元数据管理的挑战之一。SmartX 分布式存储 ZBS 采用 Log Replication 的机制&…

Processon的使用以及流程图的绘制

目录 一、ProcessOn 1.2 官方网站 门诊流程图 会议OA流程图 药库采购入库流程图 ​住院流程图 二、Axure自定义元件库 2.1 新建元件库 2.2 自定义元件 2.3 添加元件库 一、ProcessOn ProcessOn是一款在线的流程图、思维导图、组织结构图、网络拓扑图等多种图表类型…

【linux】SSH终端Putty配置:上传/下载、显示中文字体、自动登录

文章目录 写在前面putty上传/下载1. 下载2. 解压和配置3. 使用sz/rz3.1 下载文件:sz3.2 上传文件:rz 显示中文字体1. 下载合适的字体2. 解压和安装3. putty配置 putty自动登录1. putty配置2. putty快捷方式配置3. 使用putty 写在后面 写在前面 一篇博客介绍了12种SSH终端工具的…

简说vue-router原理

vue-router原理 hash模式 实现原理 改变描点监听描点变化 history模式 实现原理 改变url监听url变化 abstracthash 和 history 模式有什么区别&#xff1f; url 不一样原理不同 其他总结扩展 history 出现404错误 vue-router原理 vue-router是vue项目的重要组成部分&#x…

对局域网络中应用了网络变压器 POE供电功能的供电端设备间的连接方法

Hqst华轩盛(石门盈盛)电子导读&#xff1a;一起来了解局域网络中应用了网络变压器 POE供电功能的设备间的来连接方法 POE标准为使用以太网的传输电缆输送直流电到POE兼容的设备定义了两种连接方法: 第一,中间跨接法 一种称作"中间跨接法"( Mid -Span ),使用独立的PoE…

WordPress主题Lolimeow v8.0.1二次元风格支持erphpdown付费下载

WordPress国人原创动漫主题lolimeow免费下载 lolimeow是一款WordPress国人原创主题&#xff0c;风格属于二次元、动漫、可爱萝莉风&#xff0c;带有后台设置&#xff0c;支持会员中心。该主题为免费主题。 1.侧栏/无侧栏切换&#xff01; 2.会员中心&#xff08;配套Erphpdown…