RocketMQ 顺序消息

news2025/1/11 20:05:53

顺序消息

顺序消息为云消息队列 RocketMQ 版中的高级特性消息,本文为您介绍顺序消息的应用场景、功能原理、使用限制、使用方法和使用建议。

应用场景

在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。在这类场景下使用云消息队列 RocketMQ 版的顺序消息可以有效保证数据传输的顺序性。

典型场景一:撮合交易

交易撮合

以证券、股票交易撮合场景为例,对于出价相同的交易单,坚持按照先出价先交易的原则,下游处理订单的系统需要严格按照出价顺序来处理订单。

典型场景二:数据实时增量同步

图 1. 普通消息

普通消息

图 2. 顺序消息

顺序消息

以数据库变更增量同步场景为例,上游源端数据库按需执行增删改操作,将二进制操作日志作为消息,通过云消息队列 RocketMQ 版传输到下游搜索系统,下游系统按顺序还原消息数据,实现状态数据按序刷新。如果是普通消息则可能会导致状态混乱,和预期操作结果不符,基于顺序消息可以实现下游状态和上游操作结果一致。

功能原理

什么是顺序消息

顺序消息是云消息队列 RocketMQ 版提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。

相比其他类型消息,顺序消息在发送、存储和投递的处理过程中,更多强调多条消息间的先后顺序关系。

云消息队列 RocketMQ 版顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组。

重要

只有同一消息组的消息才能保证顺序,不同消息组或未设置消息组的消息之间不涉及顺序性。

基于消息组的顺序判定逻辑,支持按照业务逻辑做细粒度拆分,可以在满足业务局部顺序的前提下提高系统的并行度和吞吐能力。

如何保证消息的顺序性

云消息队列 RocketMQ 版的消息的顺序性分为两部分,生产顺序性和消费顺序性。

  • 生产顺序性:云消息队列 RocketMQ 版通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。

    如需保证消息生产的顺序性,则必须满足以下条件:

    • 同一消息组(MessageGroup)

      消息生产顺序性的范围为消息组,生产者发送消息时可以为每条消息设置消息组,只有同一消息组内的消息可以保证顺序性,不同消息组或未设置消息组的消息之间不保证顺序。

    • 单一生产者

      消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。

    • 串行发送

      云消息队列 RocketMQ 版生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

    满足以上条件的生产者,将顺序消息发送至云消息队列 RocketMQ 版后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:

    顺序存储逻辑

    • 相同消息组的消息按照先后顺序被存储在同一个队列。

    • 不同消息组的消息可以混合在同一个队列中,且不保证连续。

    如上图所示,消息组1和消息组4的消息混合存储在队列1中,云消息队列 RocketMQ 版保证消息组1中的消息G1-M1、G1-M2、G1-M3是按发送顺序存储,且消息组4的消息G4-M1、G4-M2也是按顺序存储,但消息组1和消息组4中的消息不涉及顺序关系。

  • 消费顺序性:云消息队列 RocketMQ 版通过消费者和服务端的协议保障消息消费严格按照存储的先后顺序来处理。

    如需保证消息消费的顺序性,则必须满足以下条件:

    • 投递顺序

      云消息队列 RocketMQ 版通过客户端SDK和服务端通信协议保障消息按照服务端存储顺序投递,但业务方消费消息时需要严格按照接收—处理—应答的语义处理消息,避免因异步处理导致消息乱序。

      重要

      消费者类型为PushConsumer时,云消息队列 RocketMQ 版保证消息按照存储顺序一条一条投递给消费者,若消费者类型为SimpleConsumer,则消费者有可能一次拉取多条消息。此时,消息消费的顺序性需要由业务方自行保证。消费者类型的具体信息,请参见消费者分类。

    • 有限重试

      云消息队列 RocketMQ 版顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。

      对于需要严格保证消费顺序的场景,请务必设置合理的重试次数,避免参数不合理导致消息乱序。

生产顺序性和消费顺序性组合

如果消息需要严格按照先进先出(FIFO)的原则处理,即先发送的先消费、后发送的后消费,则必须要同时满足生产顺序性和消费顺序性。但一般业务场景下,同一个生产者可能对接多个下游消费者,不一定所有的消费者业务都需要顺序消费,您可以将生产顺序性和消费顺序性进行差异化组合,应用于不同的业务场景。例如发送顺序消息,但使用非顺序的并发消费方式来提高吞吐能力。更多组合方式如下表所示:

生产顺序

消费顺序

顺序性效果

设置消息组,保证消息顺序发送。

顺序消费

按照消息组粒度,严格保证消息顺序。

同一消息组内的消息的消费顺序和发送顺序完全一致。

设置消息组,保证消息顺序发送。

并发消费

并发消费,尽可能按时间顺序处理。

未设置消息组,消息乱序发送。

顺序消费

按队列存储粒度,严格顺序。

基于云消息队列 RocketMQ 版本身队列的属性,消费顺序和队列存储的顺序一致,但不保证和发送顺序一致。

未设置消息组,消息乱序发送。

并发消费

并发消费,尽可能按照时间顺序处理。

顺序消息生命周期

生命周期

  • 初始化

    消息被生产者构建并完成初始化,待发送到服务端的状态。

  • 待消费

    消息被发送到服务端,对消费者可见,等待消费者消费的状态。

  • 消费中

    消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。

    此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,云消息队列 RocketMQ 版会对消息进行重试处理。具体信息,请参见消费重试。

  • 消费提交

    消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。

    云消息队列 RocketMQ 版默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

  • 消息删除

    云消息队列 RocketMQ 版按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制。

重要

  • 消息消费失败或消费超时,会触发服务端重试逻辑,重试消息属于新的消息,原消息的生命周期已结束。

  • 顺序消息消费失败进行消费重试时,为保障消息的顺序性,后续消息不可被消费,必须等待前面的消息消费完成后才能被处理。

使用限制

顺序消息仅支持使用MessageType为FIFO的主题,即顺序消息只能发送至类型为顺序消息的主题中,发送的消息的类型必须和主题的类型一致。

使用示例

和普通消息发送相比,顺序消息发送必须要设置消息组。消息组的粒度建议按照业务场景,尽可能细粒度设计,以便实现业务拆分和并发扩展。

以Java语言为例,收发顺序消息的示例代码如下:

完整的消息收发示例代码请参见RocketMQ 5.x系列SDK(推荐)。

        //顺序消息发送。
        MessageBuilder messageBuilder = null;
        Message message = messageBuilder.setTopic("topic")
                //设置消息索引键,可根据关键字精确查找某条消息。
                .setKeys("messageKey")
                //设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("messageTag")
                //设置顺序消息的排序分组,该分组尽量保持离散,避免热点排序分组。
                .setMessageGroup("fifoGroup001")
                //消息体。
                .setBody("messageBody".getBytes())
                .build();
        try {
            //发送消息,需要关注发送结果,并捕获失败等异常
            SendReceipt sendReceipt = producer.send(message);
            System.out.println(sendReceipt.getMessageId());
        } catch (ClientException e) {
            e.printStackTrace();
        }
        //消费顺序消息时,需要确保当前消费者分组是顺序投递模式,否则仍然按并发乱序投递。
        //消费示例一:使用PushConsumer消费顺序消息,只需要在消费监听器处理即可。
        MessageListener messageListener = new MessageListener() {
            @Override
            public ConsumeResult consume(MessageView messageView) {
                System.out.println(messageView);
                //根据消费结果返回状态。
                return ConsumeResult.SUCCESS;
            }
        };
        //消费示例二:使用SimpleConsumer消费顺序消息,主动获取消息进行消费处理并提交消费结果。
        //需要注意的是,同一个MessageGroup的消息,如果前序消息没有消费完成,再次调用Receive是获取不到后续消息的。
        List<MessageView> messageViewList = null;
        try {
            messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
            messageViewList.forEach(messageView -> {
                System.out.println(messageView);
                //消费处理完成后,需要主动调用ACK提交消费结果。
                try {
                    simpleConsumer.ack(messageView);
                } catch (ClientException e) {
                    e.printStackTrace();
                }
            });
        } catch (ClientException e) {
            //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
            e.printStackTrace();
        }

获取顺序消息消费重试日志

PushConsumer顺序消费的重试是在消费者客户端进行,服务端无法获取消费重试的详细日志,若消息轨迹中顺序消息的投递结果为失败时,您需要在消费者客户端日志中查看消息重试的最大次数、消费者客户端等信息。

消费者客户端日志查看路径,请参见日志配置。

您可以通过搜索以下关键字在客户端日志中快速定位消费失败的相关内容:

Message listener raised an exception while consuming messages
Failed to consume fifo message finally, run out of attempt times

使用建议

串行消费,避免批量消费导致乱序

消息消费建议串行处理,避免一次消费多条消息,否则可能出现乱序情况。

例如:发送顺序为1->2->3->4,消费时批量消费,消费顺序为1->23(批量处理,失败)->23(重试处理)->4,此时可能由于消息3的失败导致消息2被重复处理,最后导致消息消费乱序。

消息组尽可能打散,避免集中导致热点

云消息队列 RocketMQ 版保证相同消息组的消息存储在同一个队列中,如果不同业务场景的消息都集中在少量或一个消息组中,则这些消息存储压力都会集中到服务端的少量队列或一个队列中。容易导致性能热点,且不利于扩展。一般建议的消息组设计会采用订单ID、用户ID作为顺序参考,即同一个终端用户的消息保证顺序,不同用户的消息无需保证顺序。

因此建议将业务以消息组粒度进行拆分,例如,将订单ID、用户ID作为消息组关键字,可实现同一终端用户的消息按照顺序处理,不同用户的消息无需保证顺序。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1892756.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LeetCode热题100刷题5:189. 轮转数组、238. 除自身以外数组的乘积、41. 缺失的第一个正数

189. 轮转数组 两次翻转&#xff0c;借助swap实现reverse class Solution { public:void reverse(vector<int>& nums, int left, int right) {int ileft, j right-1;while(i<j) {swap(nums[i],nums[j]);i;j--;}}void rotate(vector<int>& nums, int…

1105 链表合并

solution P1075的简单变形 #include<iostream> #include<vector> #include<algorithm> using namespace std; struct node{int data, next; }list[100000]; int main(){int first1, first2, n, addr;vector<int> l1, l2, ans;scanf("%d%d%d&quo…

[软件安装]linux下安装steam

1、下载安装包到linux系统 SteamTools 发行版 - Gitee.com 2、选择对应的版本 3、解压安装包steam &#xff08;1&#xff09;在opt路径下新建一个文件夹 sudo mkdir steam &#xff08;2&#xff09;进入压缩包路径下&#xff0c;打开终端&#xff0c;执行以下代码进行解压…

使用Fiddler的ImageView轻松获取抓包中的图片详情信息以及一些图片优化建议

使用过Fiddler中的同学是否遇到过下面的问题&#xff1a; 抓包的请求为获取图片信息时&#xff0c;该如何判断图片显示的是什么内容呢&#xff1f;图片是否需要优化来提升前端展示性能呢&#xff1f;常用的图片优化方案有哪些&#xff1f; 本文就带大家搞定上面的这些问题&am…

【VUE3】uniapp + vite中 uni.scss 使用 /deep/ 不生效(踩坑记录三)

vite 中使用 /deep/ 进行样式穿透报错 原因&#xff1a;vite 中不支持&#xff0c;换成 ::v-deep 或:deep即可

echarts阶段仪表图

echarts阶段仪表图 – 效率图 1、先上效果展示 2、完整源码奉上 Vue2 echarts 5 <template><div ref"gaugeChart" style"width: 100%; height: 100%"></div> </template><script> import * as echarts from "echar…

美工画师必看!AI绘画Stable Diffusion 一键生成 B 端图标教程,轻松制作商业可用的设计图标,从此告别加班!(附安装包)

大家好&#xff0c;我是画画的小强 在日常工作中&#xff0c;设计师在应对运营和UI设计的B端图标时&#xff0c;常常面临大量的构思、制作和渲染等工作&#xff0c;耗时耗力。我们可以利用Stable Diffusion(以下简称SD)结合AI的方式&#xff0c;帮助设计师优化图标的设计流程&…

LLM4Decompile——专门用于反编译的大规模语言模型

概述 论文地址&#xff1a;https://arxiv.org/abs/2403.05286 反编译是一种将已编译的机器语言或字节码转换回原始高级编程语言的技术。该技术用于分析软件的内部工作原理&#xff0c;尤其是在没有源代码的情况下&#xff1b;Ghidra 和 IDA Pro 等专用工具已经开发出来&#…

PyCharm社区版Cython支持

自己在文件类型中加一个&#xff0c;名称叫【pythonC】 &#xff0c;文件名模式这一栏要加*.pyx的后缀&#xff0c;之后双击【pythonC】编辑这个文件类型 这里1、2、3、4配置如下 # 1 " # &*, - / : ; <>[ ] { }# 2 False None True and as assert break cdef …

vue 启动项目报错Syntax Error: Error: PostCSS received undefined instead of CSS string

启动vue项目然后报错如下图 这个是跟node版本有关系 因为要开发不同的项目使用不同的node版本&#xff0c;所以就用nvm切换&#xff0c;所以导致了node-sass编译问题 执行这个命令就可以 npm install node-sass or npm rebuild node-sass node-sass对于node高版本和低版本切…

GIT - 一条命令把项目更新到远程仓库

前言 阅读本文大概需要1分钟 说明 更新项目到远程仓库只需要执行一条命令&#xff0c;相当的简便 步骤 第一步 编辑配置文件 vim ~/.bash_profile第二步 写入配置文件 gsh() {local msg"${1:-ADD COMMIT PUSH}"git add . && git commit -m "$m…

垂直领域大模型的机遇与挑战:从构建到应用

在人工智能技术的浪潮中,大模型以其强大的数据处理和学习能力,成为推动科技进步的重要力量。然而,这种跨领域应用的过程并非一帆风顺,既面临挑战也蕴含机遇。本文从复旦大学的研究工作出发,详细分析大模型的机遇与挑战。 背景 GPT4技术报告指出,GPT4仍处于通用人工智…

软考高级真的已经烂大街了吗?

软考高级是否“烂大街”是一个相对的观点&#xff0c;这取决于不同的视角和使用场景。虽然有人认为软考高级证书的普及度提高&#xff0c;可能给人一种“烂大街”的感觉&#xff0c;但实际上它仍然具有一定的价值和特定用途。 证书的普遍性与价值 普遍性增加&#xff1a;随着更…

Springboot3本地编译exe文件(实现快速启动仅需200ms)

1. 准备好grallvm版本的JDK jdk17以上 &#xff08;springboot3最低支持jdk17&#xff09; grallvm-jdk17 Download GraalVM 下载界面 2. 配置maven 3.9.x 及以上 maven 3.9.8 Maven – Download Apache Maven 3.创建SpringBoot项目 3.1 项目所需依赖 记得选择这俩个进…

上新:NFTScan 正式上线 Bitcoin-Runes 浏览器!

近日&#xff0c;NFTScan 团队正式对外发布了 Bitcoin-Runes 浏览器&#xff0c;将为 Runes 生态的 NFT 开发者和用户提供简洁高效的 NFT 数据搜索查询服务。Runes 协议的主要目的是定义一种在比特币网络上进行符号化资产交换的方式。它使用 Rune 作为符号化资产的单位&#xf…

量化实例分析初探

一、量化介绍 大型语言模型通常具有数十亿乃至上百亿参数&#xff0c;导致存储和计算成本极高&#xff0c;大多数下游用户难以进行微调。为了便于进一步部署&#xff0c;大模型的模型压缩成为关键的解决方案。 模型压缩目标&#xff1a;减少模型大小&#xff0c;加快训练速度…

dell服务器RAID5磁盘阵列出现故障的解决过程二——热备盘制作与坏盘替换过程

目录 背景方案概念全局热备&#xff08;Global Hot Spare&#xff09;&#xff1a;独立热备&#xff08;Dedicated Hot Spare&#xff09;&#xff1a; 过程8号制作成热备清除配置制作独立热备热备顶替坏盘直接rebuild 更换2号盘2号热备 注意注意事项foreign状态要先清除配置 背…

Fiddler关于Repaly的细节您了解吗?如何重复执行请求?

最近深入的使用了一下Fiddler的Repaly功能&#xff0c;没想到有这么多的细节&#xff0c;不仅可以设置Repaly的次数&#xff0c;还可以无条件地重发选中的请求&#xff0c;而不考虑之前的请求条件或缓存状态。在这里与各位小伙伴分享一下&#xff0c;希望能够帮到大家&#xff…

Unity射击游戏开发教程:(29)躲避敌人的子弹射击

在这篇文章中,我将介绍如何创建一个可以使玩家火力无效的敌人。创建的行为如下...... 当玩家向敌人开火时,敌人会向左或向右移动。向左或向右的移动是随机选择的,并在一段时间后停止敌人的移动。如果敌人移出屏幕,它就会绕到另一边。将一个精灵拖到画布上,将其缩小以匹配游…

Vue实现金钱输入框组件自动带千位逗号

新建PriceInput.vue <template><div id"bord"><el-inputv-model"inputValue"v-bind"$attrs":maxlength"maxlength"input"handleInput"focus"handleFocus"blur"handleBlur"change"h…