五、RocketMQ发送顺序消息

news2025/1/24 4:56:59

顺序消息的应用场景

在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。

例如需要保证一个订单的生成、付款和发货,这三件事情是被顺序执行的。

如何消息的顺序性

RocketMQ消息的顺序性分为两部分:生产顺序性和消费顺序性,只有同时满足了生产顺序性和消费顺序性才能达到消息整体的有序性

生产的顺序性

要保证发送消息的顺序性,就必须保证消息以下条件

  • 单一生产者:顺序消息必须由单一生产者产生,不同生产者分布在不同的系统,即使设置相同的分区键,不同生产者之间产生的消息也无法判定其先后顺序。
  • 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

总结:单一生产者需要制定消息的顺序性,并且需要将顺序消息根据分区键发送到一个队列上,在发送时,需要使用串行发送

@Test
public void sendOrder() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");
    producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER_ADDR);
    producer.start();

    // 分区key
    int orderId = 1;
    for (int i = 0; i < 1000; i++) {
        Message message = new Message(RocketMQConfig.TEST_TOPIC, ("顺序" + i).getBytes(Charset.defaultCharset()));
        // 发送顺序消息,需要传递分区键
        SendResult sendResult1 = producer.send(message, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                // 分区键
                int orderId = (int) arg;
                // 根据分区键取模
                int index = orderId % mqs.size();
                // 返回要发送到哪个队列中去
                return mqs.get(index);
            }
        }, orderId);
        System.out.println(sendResult1.getSendStatus());
    }
    producer.shutdown();
}

消息的顺序性

消费者在push模式下,有两种消息方式

  • MessageListenerOrderly会启动多个线程处理消息,但是会加锁,实际上会转变为串行,进行实现消息的顺序性

    顺序消费的结果

在这里插入图片描述

  • MessageListenerConcurrently会启动多个线程处理消费者,并且不保证加锁,不保证消息的顺序性

    非顺序消费的结果

在这里插入图片描述

因此为了保证消息的顺序性,需要使用MessageListenerOrderly来处理消息

@Test
public void consumerOrder() throws Exception{
    // 1.创建消费端,指明消费者属于哪个组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_group_order");

    // 2.注册NameServer地址
    consumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER_ADDR);

    // 3.订阅topic,并且可以根据标签进行定向消费
    consumer.subscribe(RocketMQConfig.TEST_TOPIC, "*");

    // 4.注册监听器,broker推送消息后,处理顺序消息
    consumer.registerMessageListener(new MessageListenerOrderly() {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            System.out.println(Thread.currentThread().getName() + "收到的msg大小:" + msgs.size());
            for (MessageExt msg : msgs) {
                String content = new String(msg.getBody());
                System.out.println(Thread.currentThread().getName() + "收到的消息体:" + content);
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });

    // 5.启动消费端
    consumer.start();

    // 防止主线程退出
    Thread.sleep(Integer.MAX_VALUE);
}

注:在验证顺序消息时,当这两种消费模式在启动情况下,是都可以顺序消费的。

只有先发送消息,在启动消费,并发消费才会出现乱序的情况

猜测:

  1. 单生产者进行发送消息,并发量不高
  2. 发送的数据量太低

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1094631.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C# OpenVINO Det 物体检测

效果 耗时 elephant:89% Preprocess: 0.00ms Infer: 47.21ms Postprocess: 11.63ms Total: 58.84ms 项目 代码 using OpenCvSharp; using Sdcb.OpenVINO; using Sdcb.OpenVINO.Natives; using System; using System.Diagnostics; using System.Drawing; using System.Text; …

淘宝店铺订单详情接口,淘宝店铺订单列表数据,淘宝店铺订单物流接口,淘宝店铺订单线下发货接口-

淘宝店铺订单详情接口可以理解为淘宝开放平台&#xff08;Taobao Open Platform&#xff09;提供的API接口1。 该接口可以获取淘宝店铺的订单详情&#xff0c;包括订单号、买家昵称、收货人姓名、收货地址、邮编、电话、商品描述、价格等信息。开发者可以在淘宝开放平台注册一…

使用winUSB进行USB开发

什么是winUSB WinUSB是Windows操作系统提供的一种通用USB驱动程序&#xff0c;用于简化USB设备的开发和使用。它是一个用户模式驱动程序&#xff0c;可以在Windows XP及更高版本的操作系统上使用。WinUSB提供了一组API和工具&#xff0c;使开发人员能够与USB设备进行通信&…

C++11可变参数模板

文章目录 1.可变参数模板的介绍1.1C语言中的可变参数1.2C98/C11的类模板和函数模板1.3可变参数的函数模板1.4展开参数包递归展开初始化列表展开 2.可变参数模板的应用 1.可变参数模板的介绍 1.1C语言中的可变参数 1.2C98/C11的类模板和函数模板 C98/03&#xff0c;类模版和函…

Numpy(一)简介与基本使用

Numpy&#xff08;一&#xff09;简介与基本使用 一、Numpy简介 1.Numpy是一个运行速度非常快的数学库&#xff0c;主要用于数组计算 2.包含&#xff1a;N维数组对象ndarray、广播功能函数、整合C/C/Fortran代码工具、包含线性代数/傅里叶变换/随机数生成等功能 3.优点&…

ChatGPT快速入门

ChatGPT快速入门 一、什么是ChatGPT二、ChatGPT底层逻辑2.1 实现原理2.2 IO流程 三、ChatGPT应用场景3.1 知心好友3.2 文案助理3.3 创意助理3.4 角色扮演 一、什么是ChatGPT ChatGPT指的是基于GPT&#xff08;Generative Pre-trained Transformer&#xff09;模型的对话生成系…

python字典、列表排序,从简单到复杂

因工作原因&#xff0c;需要频繁对python字典(dict)、列表(list)等进行各种各样的排序&#xff0c;发现网上这块的资料又多又杂&#xff0c;尤其涉及到lambda的&#xff0c;让人觉得难以理解看不下去&#xff0c;因此写了这篇文章&#xff0c;从简单到复杂&#xff0c;配合例子…

【Eclipse】安装教程

首先打开官网&#xff1a;Eclipse Downloads | The Eclipse Foundation 这里我选择了下载最新版本 下载好后&#xff0c;双击下载的文件 &#xff0c;即可进入到安装页面 点击第一个进行安装&#xff1a; 最后安装成功&#xff01; 补充&#xff1a; 【Eclipse】安装JAVA EE插…

Jmter接口网站压力测试工具使用记录

1.首先下载Jmeter 官方地址&#xff1a;Apache JMeter - Apache JMeter™ 回到顶部 2.安装Jmeter 把下载的文件进行解压&#xff0c;产生如下目录&#xff1a; 打开bin文件夹下的jmeter.bat文件及进入程序的主界面窗体jmeter.log是日志文件。 主意&#xff1a;需要配置java环…

河北专升本(C语言)

目录 一&#xff1a;C语言的构成特点 二: 数据类型 三: 常量、变量、运算符及表达式 &#xff08;一&#xff09;标识符 &#xff08;二&#xff09;常量 &#xff08;三&#xff09;变量&#xff1a;其值可以改变的量 &#xff08;四&#xff09;各种类型数据混合运算 &…

打印编译程序固件的环境信息

在编译程序固件的时候&#xff0c;我们常常会有一个软件版本号&#xff0c;用来区分不同的版本&#xff0c;有时候又没变化软件版本号&#xff0c;重新发两个debug版本给测试验证&#xff0c;那我们就需要在程序中埋入一些环境信息&#xff0c;下面有两种方法来加入一下简单的信…

APP 备案公钥、签名 MD5、SHA-1、SHA-256获取方法。

公钥和 MD5 值可以通过安卓开发工具、Keytool、Jadx-GUI 等多种工具获取&#xff0c;最简单的就是以 jadx-gui 为例。 1.下载 jadx-gui 工具 &#xff0c;点击此处 下载 jadx-gui 工具。 2.下载完成后&#xff0c;解压压缩包&#xff0c;双击 jadx-gui-1.4.7.exe 运行。 3.…

Neo4j入门基础:CREATE,DELETE,SET,REMOVE(看不懂我直接吃...)

1. 创建节点 1.1 创建一个节点 create (s:student1)创建一个标签为student1的节点 1.2 创建多个节点 create (s2:student2),(s3:student3)同时创建两个节点&#xff0c;标签分别为&#xff1a;student2&#xff0c;student3 1.3 创建节点并附带&#xff08;多个&#xff0…

MAC上设置IDEA如何一个窗口打开多个项目,多个tab

1、IDEA一个窗口打开多个项目 如果你打开了多个项目、每次切换都要半天&#xff0c;想让项目都汇聚到top栏 点击 Window - Merge All Project Windows 即可 但是这样比较挫&#xff0c;每次打开新的项目都还是会重新打开一个IDEA窗口 so&#xff0c;如何设置项目在同一个窗口…

树模型(一)孤立森林

孤立森林&#xff08;Isolation Forest&#xff09;算法是西瓜书作者周志华老师的团队研究开发的算法&#xff0c;一般用于结构化数据的异常检测。 异常的定义 针对于不同类型的异常&#xff0c;要用不同的算法来进行检测&#xff0c;而孤立森林算法主要针对的是连续型结构化…

Junit单元测试之Maven项目集成Jacoco,查看覆盖率报告

关于单元测试以及本文Calculate类等内容&#xff0c;请见前述文章Junit单元测试_Joy T的博客-CSDN博客 要学Jacoco&#xff0c;首先要知道测试覆盖率是什么&#xff01; 测试覆盖率 测试覆盖率表示的是测试用例所能触及&#xff08;或“覆盖”&#xff09;的代码百分比。换句…

2023年起重信号司索工(建筑特殊工种)证考试题库及起重信号司索工(建筑特殊工种)试题解析

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2023年起重信号司索工(建筑特殊工种)证考试题库及起重信号司索工(建筑特殊工种)试题解析是安全生产模拟考试一点通结合&#xff08;安监局&#xff09;特种作业人员操作证考试大纲和&#xff08;质检局&#xff09;特…

kafka安装和使用的入门教程

这篇文章简单介绍如何在ubuntu上安装kafka&#xff0c;并使用kafka完成消息的发送和接收。 一、安装kafka 访问kafka官网Apache Kafka&#xff0c;然后点击快速开始 紧接着&#xff0c;点击Download 最后点击下载链接下载安装包 二、启动kafka 经过上一步下载完成后&#xff…

新网站如何快速接入百度推荐

百度站长工具 在百度站长工具中点击“自动推送”&#xff0c;如下截图: 推送代码放网站的底部 将百度平台提交的主动推送代码放在自己的网站的底部模板中&#xff1b; <script> (function(){ var bp document.createElement(script); var curProtocol window.locati…

基于算术优化优化的BP神经网络(分类应用) - 附代码

基于算术优化优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码 文章目录 基于算术优化优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码1.鸢尾花iris数据介绍2.数据集整理3.算术优化优化BP神经网络3.1 BP神经网络参数设置3.2 算术优化算法应用 4.测试结果…