RocketMQ如何保证消息被有序消费

news2025/1/23 4:56:25

RocketMQ如何保证消息被有序消费

消费者端如何接收有序消息

队列消费的两种模式

并发消费模式

当同一类消息被送入不同队列,且这些消息在处理上并不需要按时序消费时,可以考虑使用并发消费模式。

并发消费模式生产者会将消息轮询发送到不同的队列当中,这些队列会和消费者实例建立多个连接(线程),将消息并发送入到不同的消费者,因为消费者处理速度有快慢,所以并不能保证物流数据会按1~9的顺序依次消费。

并发消费模式处理效率很高,但无法保证有序性。

在这里插入图片描述

有序消费模式

有序消息是指生产者在产生数据的时候,根据Hash规则指定让消息放入哪个队列,在消费者消费时会保证不同消费者针对每一个队列只有唯一的连接(线程)用于消费指定队列。

有序消费模式可以保证消息按队列FIFO顺序依次被消费,但因此失去并发性能,有序消费模式只有在业务要求必须按顺序消费的场景下才允许使用。

在这里插入图片描述

RocketMQ如何实现有序消息

要实现RocketMQ有序消息需要两点调整:

  • 生产者端要求按id等唯一标识分配消息队列
  • 消费者端采用专用的监听器保证对队列的单线程应用

下面咱们来看一下代码:

生产者端

SequenceMessageProvider核心代码是在向Broker发送消息时附加MessageQueueSelector对象,在实现select方法时指定存放到哪个队列中。

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.nio.charset.StandardCharsets;
import java.util.List;

@Slf4j
public class SequenceMessageProvider {
    public static void main(String[] args) {
        // 前置准备代码
        DefaultMQProducer producer = new DefaultMQProducer("producer-group");
        producer.setNamesrvAddr("127.0.0.1:9876");
        try {
            producer.start();
            // 模拟10笔订单
            for (Integer orderId = 1; orderId <= 10; orderId++) {
                // 每笔订单要发3条消息:(1)创建订单 (2)订单库存扣减 (3)增加积分
                for (int i = 0; i < 3; i++) {
                    String data = "";
                    switch (i % 3) {
                        case 0:
                            data = orderId + "号创建订单";
                            break;
                        case 1:
                            data = orderId + "号订单减少库存";
                            break;
                        case 2:
                            data = orderId + "号订单增加积分";
                            break;
                    }
                    // 创建消息对象 topic="order",tags="order",key=orderId
                    Message message = new Message("order", "order", orderId.toString(), data.getBytes(StandardCharsets.UTF_8));
                    // 发送消息,实现MessageQueueSelector接口
                    SendResult result = producer.send(message, new MessageQueueSelector() {
                        // select方法决定向broker哪一个队列发送消息
                        @Override
                        public MessageQueue select(List<MessageQueue> list, Message msg, Object arg) {
                            int orderId = Integer.parseInt(msg.getKeys());
                            int index = orderId % list.size();
                            MessageQueue messageQueue = list.get(index);
                            log.info("id:{},data:{},queue:{}", orderId, new String(msg.getBody()), messageQueue);
                            return messageQueue;
                        }
                    }, null);
                }
            }
        } catch (Exception exception) {
            exception.printStackTrace();
        } finally {
            try {
                producer.shutdown();
                log.warn("连接已关闭");
            } catch (Exception exception) {
                exception.printStackTrace();
            }
        }
    }
}

消费者端

消费者端最大的变化是registerMessageListener监听器要实例化MessageListenerOrdery对象,用于为每一个队列分配唯一的连接(线程)进行消费。

每一批消息从Broker投递给消费者都会触发consumeMessage()方法实现对消息的消费。

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

@Slf4j
public class SequenceMessageConsumer {
    public static void main(String[] args) throws Exception {
        // 声明并初始化一个consumer
        // 需要一个consumer group名字作为构造方法的参数
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
        // 同样也要设置NamesrvAddr地址,须要与提供者的地址列表保持一致
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 设置consumer所订阅的Top 和 Tag,*代表全部的Tag
        consumer.subscribe("order", "*");
        // 注册消息监听者,消费者端要增加MessageListenerOrderly监听器,用于实现有序队列
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
                // 遍历输出
                list.forEach(msg -> {
                    log.info("{},{},{}", msg.getKeys(), new String(msg.getBody()), context.getMessageQueue());
                });
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
    }
}

如何实现消息全局顺序消费?

只需要再生产者固定将所有消息发往到0号队列即可保证全局有序,这也意味着全局采用单线程消费,执行效率极差。

    @Override
    public MessageQueue select(List<MessageQueue> list, Message msg, Object arg) {
        MessageQueue messageQueue = list.get(0);
        return messageQueue;
    }

有序消费有什么使用限制吗?

有序消费模式只支持集群模式(CLUSTERING),不支持广播模式(BROADCASTING),采用广播模式会无法接收到数据。

        // 设置为集群模式
        consumer.setMessageModel(MessageModel.CLUSTERING); //支持有序消息,默认模式
        consumer.setMessageModel(MessageModel.BROADCASTING); //不支持有序消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1130136.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OpenCV官方教程中文版 —— 直方图的计算,绘制与分析

OpenCV官方教程中文版 —— 直方图的计算&#xff0c;绘制与分析 前言一、原理1.统计直方图2. 绘制直方图3. 使用掩模 前言 • 使用 OpenCV 或 Numpy 函数计算直方图 • 使用 Opencv 或者 Matplotlib 函数绘制直方图 • 将要学习的函数有&#xff1a;cv2.calcHist()&#xf…

[100天算法】-不同路径(day 37)

题目描述 一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为“Start” &#xff09;。机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#xff08;在下图中标记为“Finish”&#xff09;。问总共有多少条不同的路径&#xff1f;例如…

MATLAB | 两种上色方式的旭日图绘制

嘿&#xff0c;这次真的是好久不见了&#xff0c;好不容易才有点空写点文章&#xff0c;这段时间忙到后台回复都有点来不及看&#xff0c;很抱歉有一部分后台留言刚看到就已经超过时限没法回复了&#xff0c;不过根据大家的留言&#xff0c;需求主要集中在希望出一期旭日图的教…

12、Python -- if 分支 的讲解和使用

目录 程序结构顺序结构分支结构分支结构注意点不要忘记冒号 if条件的类型if条件的逻辑错误if表达式pass语句 程序流程 分支结构 分支结构的注意点 if条件的类型 if语句的逻辑错误 if表达式 程序结构 Python同样提供了现代编程语言都支持的三种流程 顺序结构 分支结构 循环结构…

CPU核检测

import psutil num_cpus psutil.cpu_count(logicalFalse) print(num_cpus) num_cpus psutil.cpu_count(logicalTrue) print(num_cpus)结果如下 可以看到这个结果是不同的

fiddler导出录制脚本并导出jmter脚本文件

1、fiddler导出录制脚本 可以通过save保存录制的脚本&#xff1a; 也可以选中这些链接&#xff0c;点击右键-->save-->select sessions -->选择要导入的文件 2、导入录制的脚本 再fiddler-->file--->load ARCHIVERS--->选择刚导出的.saz文件&#xff0c;正…

01-初识VUE3

01.初识VUE3 1.创建VUE3项目 1).使用 vue-cli 创建 ## 查看vue/cli版本&#xff0c;确保vue/cli版本在4.5.0以上 vue --version ## 安装或者升级你的vue/cli npm install -g vue/cli ## 创建 vue create vue_test ## 启动 cd vue_test npm run serve2).使用 vite 创建 ## 创…

吃豆人C语言开发—Day2 需求分析 流程图 原型图

目录 需求分析 流程图 原型图 主菜单&#xff1a; 设置界面&#xff1a; 地图选择&#xff1a; 游戏界面&#xff1a; 收集完成提示&#xff1a; 游戏胜利界面&#xff1a; 游戏失败界面 死亡提示&#xff1a; 这个项目是我和朋友们一起开发的&#xff0c;在此声明一下…

【C#】委托与事件

目录 一、委托 1.什么是委托 2.委托类型的声明与初始化 3.委托类型引用方法的调用 4.使用委托类型作为方法的参数 5.Action委托 6.Func委托 7.通用类型冒泡排序 8.多播委托 二、Lambda表达式 1.匿名方法 2.Lambda表达式表示匿名方法 三、事件 1.什么是事件 2.事件…

Django viewsets 视图集与 router 路由实现评论接口开发

正常来说遵循restful风格编写接口&#xff0c;定义一个类包含了 get post delete put 四种请求方式&#xff0c;这四种请求方式是不能重复的 例如:获取单条记录和多条记录使用的方式都是get&#xff0c;如果两个都要实现的话那么得定义两个类&#xff0c;因为在同一个类中不能有…

【vue3 】 创建项目vscode 提示无法找到模块

使用命令创建 vue3 创建新应用 npm create vuelatest会看到一些可选功能的询问&#xff1f; √ 请输入项目名称&#xff1a; … vue-project √ 是否使用 TypeScript 语法&#xff1f; … 否 / 是 √ 是否启用 JSX 支持&#xff1f; … 否 / 是 √ 是否引入 Vue Router 进行单…

useReducer的使用以及与useState、useImmerReducer的对比使用

前言 对于拥有许多状态更新逻辑的组件来说&#xff0c;过于分散的事件处理程序可能会影响代码的可读性。这种情况&#xff0c;可以将组件的所有状态更新逻辑整合到一个外部函数中&#xff0c;这个函数就是reducer。 使用 useReducer(reducer, initialArg, init?) 参数 redu…

电力通信与泛在电力物联网技术的应用与发展-安科瑞黄安南

摘要&#xff1a;随着我国社会经济的快速发展&#xff0c;我国科技实力得到了非常大的提升&#xff0c;当前互联网通信技术在社会中得到了广泛的应用。随着电力通信技术的快速发展与更新&#xff0c;泛在电力物联网建设成为电力通讯发展的重要方向。本文已泛在电力物联网系统为…

化工园区数字孪生可视化管控平台,赋予园区安全环保智慧发展

化工行业作为国民经济的支柱和工业发展的引擎&#xff0c;对安全生产、环保节能、应急管控有着很高的要求。目前国内外化工园区面临安全和环保两大压力。为有效解决这两大难题&#xff0c;巨蟹数科综合运用物联网、数字孪生等新一代信息技术&#xff0c;建设了数字孪生园区智慧…

Docker下安装MSSQL并使用Navicat远程连接(备忘录)

Docker下安装MSSQL并使用Navicat远程连接 一. Docker下安装MSSQL备忘录一、安装SQL Server1、从 Microsoft 容器注册表中请求 SQL Server 2022 (16.x) Linux 容器映像:注意:2、运行这个cu5的版本下表对前一个 docker run 示例中的参数进行了说明:3、看这个MSSQL运行没有?用…

JAVA基础——编译器报告的错误信息总结

1. Invalid character&#xff08;无效字符&#xff09; 中英文符号错误 2. 数组的常见异常 数组越界异常ArrayIndexOutOfBoundsException: 在使用索引访问数组的元素时超出了数组的索引范围0~length-1。 空指针异常NullPointerException&#xff1a; 在使用变量引用一个数…

CSS - 常用属性和布局方式

目录 前言 一、常用属性 1.1、字体相关 1.2、文本相关 1.3、背景相关 1.3.1、背景颜色 1.3.2、背景图片 1.4、圆角边框 二、常用布局相关 2.1、display 2.2、盒子模型 2.2.1、基本概念 2.2.2、border 边框 2.2.3、padding 内边距 2.2.4、margin 外边距 2.3、弹…

【Android】MQTT

目录 MQTT 协议简介应用场景优点缺点 部署服务端下载安装包启动服务器 搭建客户端下载SDK添加依赖配置MQTT服务和权限建立连接订阅主题发布消息取消订阅断开连接 MQTT客户端工具最终效果实现传感器数据采集与监测功能思路 MQTT 协议 简介 MQTT&#xff08;Message Queuing Te…

【目标跟踪】多目标跟踪测距

文章目录 前言python代码&#xff08;带注释&#xff09;main.pysort.pykalman.pydistance.py 结语 前言 先放效果图。目标框内左上角&#xff0c;显示的是目标距离相机的纵向距离。目标横向距离、速度已求出&#xff0c;没在图片展示。这里不仅仅实现对目标检测框的跟踪&#…

【Django restframework】django跨域问题,解决PUT/PATCH/DELETE用ajax请求无法提交数据的问题

【Django restframework】django跨域问题&#xff0c;解决PUT/PATCH/DELETE用ajax请求无法提交数据的问题 1 问题描述&#xff1a; 我用restframework(ModelSerializerGenericApiView)开发了一组符合RestFul接口标准的接口&#xff0c;这意味着它将支持客户端发来的GET、POST、…