【RocketMQ】RocketMQ实例--顺序消息

news2025/1/11 7:04:07

1、应用场景

 一、以证券股票交易撮合场景为例,对于出价相同的交易单,坚持按照先出价先交易的原则,下游处理订单的系统需要严格按照出价顺序来处理订单。

二、以数据库变更增量同步场景为例,上游源端数据库按需执行增删改操作,将二进制操作日志作为消息,通过 Apache RocketMQ 传输到下游搜索系统,下游系统按顺序还原消息数据,实现状态数据按序刷新。如果是普通消息则可能会导致状态混乱,和预期操作结果不符,基于顺序消息可以实现下游状态和上游操作结果一致。

2、功能原理

 2.1 什么是顺序消息

顺序消息是 Apache RocketMQ 提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。 相比其他类型消息,顺序消息在发送、存储和投递的处理过程中,更多强调多条消息间的先后顺序关系。

Apache RocketMQ 顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。

基于消息组的顺序判定逻辑,支持按照业务逻辑做细粒度拆分,可以在满足业务局部顺序的前提下提高系统的并行度和吞吐能力。

2.2 如何保证消息的顺序性

1、生产顺序性

如需保证消息生产的顺序性,则必须满足以下条件:

  • 单一生产者:消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。

  • 串行发送:Apache RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

满足以上条件的生产者,将顺序消息发送至 Apache RocketMQ 后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:

  • 相同消息组的消息按照先后顺序被存储在同一个队列。

  • 不同消息组的消息可以混合在同一个队列中,且不保证连续。

 2、消费顺序性

如需保证消息消费的顺序性,则必须满足以下条件:

  • 投递顺序

    Apache RocketMQ 通过客户端SDK和服务端通信协议保障消息按照服务端存储顺序投递,但业务方消费消息时需要严格按照接收---处理---应答的语义处理消息,避免因异步处理导致消息乱序。

  • 有限重试

    Apache RocketMQ 顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。

    对于需要严格保证消费顺序的场景,请务设置合理的重试次数,避免参数不合理导致消息乱序

2.3 生命周期

  • 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。

  • 待消费:消息被发送到服务端,对消费者可见,等待消费者消费的状态。

  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。

  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

  • 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除

3、使用限制

 顺序消息只能发送至类型为顺序消息的主题中,发送的消息的类型必须和主题的类型一致。

4、代码实例

4.1 生产消息

 

/**
 * 顺序消息生产
 */
public class Producer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        String[] tags = new String[]{"TagA", "TagB", "TagC"};

        //订单列表
        List<OrderStep> orderStepList = new Producer().buildOrders();

        Date date = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(date);

        for (int i = 0; i < 10; i++) {
            //加个时间前缀
            String body = dateStr + " Hello RocketMQ " + orderStepList.get(i);
            Message message = new Message("TopicTest",
                    tags[i % tags.length],
                    "KEYS" + i,
                    body.getBytes());

            SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message message, Object arg) {
                    Long id = (Long) arg; //根据id选择发送订单
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
                //订单id
            }, orderStepList.get(i).getOrderId());

            System.out.printf("SendResult status:%s, queueId:%d, body:%s%n",
                    sendResult.getSendStatus(),
                    sendResult.getMessageQueue().getQueueId(),
                    body);
        }
        producer.shutdown();
    }

    /**
     * 订单的步骤
     */
    private static class OrderStep {
        private long orderId;
        private String desc;

        public long getOrderId() {
            return orderId;
        }

        public void setOrderId(long orderId) {
            this.orderId = orderId;
        }

        public String getDesc() {
            return desc;
        }

        public void setDesc(String desc) {
            this.desc = desc;
        }

        @Override
        public String toString() {
            return "OrderStep{" +
                    "orderId=" + orderId +
                    ", desc='" + desc + '\'' +
                    '}';
        }
    }

    private List<OrderStep> buildOrders() {
        List<OrderStep> orderList = new ArrayList<OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("提交");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("提交");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("提交");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        return orderList;
    }

}

4.2 消费消息

 

/**
 * 顺序消息消费,带事务方式(应用控制在 offset 什么时候提交)
 */
public class ConsumerInOrder {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_name");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerOrderly() {

            Random random = new Random();

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
                    System.out.println("consumeThread = " + Thread.currentThread().getName() +
                            "queueId = " + msg.getQueueId() + ", content:" + new String(msg.getBody()));
                }

                try {
                    //模拟业务逻辑处理中
                    TimeUnit.SECONDS.sleep(random.nextInt(10));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();

        System.out.println("Consumer Started.");
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/110882.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RK3568烧录系统

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录前言一、烧录工具二、烧录步骤单独烧录烧录整个固件总结前言 前面我们已经学会了编译系统&#xff0c;包括一键编译和单独编译&#xff0c;其中单独编译适合我们做驱…

ENSP 路由器到本地(现实)PC的FTP实验

前言&#xff1a; 在一个风和日丽的下午&#xff0c;我同事突然问我ENSP和本地PC怎么上传和下载文件&#xff1f;我本以为这个很简单&#xff0c;然后我开始了面向百度编程&#xff0c;但是网上的大多数都是ENSP里面的路由器、服务器和PC间的FTP实验&#xff0c;而不是到本地&…

嵌入式经典通信总线协议:SPI协议

目录 一、spi简介 二、SPI特性 三、spi四种工作方式 四、实现代码 1.选择开发板 2.选择SPI 3.设置硬件片选使能和通讯方式&#xff0c;其他根据需要选择 4. 生成代码 一、spi简介 SPI 是英语Serial Peripheral interface的缩写&#xff0c;顾名思义就是串行外围设备接口…

GIT:如何删除仓库中的.idea .DS_Store target文件/文件夹并设置下次不上传

0. 引言 我们常常会有在git仓库初始化时&#xff0c;忘记设置.gitignore文件导致一些非工程文件上传到仓库中了&#xff0c;导致整个仓库的不美观&#xff0c;甚至影响其他开发同事配置代码。这时候我们就需要删除这些指定文件&#xff0c;那么如何操作呢&#xff0c;这一章我…

php学生成绩管理系统,在线录入、统计学生成绩,多种图表展示对比学生成绩

教学质量是学校教学的生命线&#xff0c;只有能够客观分析自己教学成败得失的教师才是一个合格的老师。这是一款注重优化成绩采集方法、丰富成绩分析维度的小学成绩统计系统&#xff0c;力争做到符合教师工作习惯、使用方法简单、数据分析多样、分析结果科学&#xff0c;为教师…

垃圾回收机制之v8引擎

v8的内存分配 &#xff08;栈&#xff08;执行环境&#xff09;跟堆&#xff09; 堆内存负责垃圾回收机制&#xff0c;只有新生代和老生代两部分 新生代&#xff1a;对等分的&#xff08;严格&#xff09; 老生代&#xff1a; 都是由新生代转变的&#xff08;连续的空间&…

Vue 实现 html 表格 (grid) 单元格编辑功能 2

第一版表格编辑实现是刚学VUE时硬凑出来 点击详见 经过网上的不断学习&#xff0c;代码精简功能增强&#xff0c;克服了上一个版本的两个bug。 欢迎没有下载积分的朋友欢迎复制转载。 主要功能&#xff1a; 由于取消了 vue 循环的 key 更新&#xff0c;故单元格不需要点击两…

Redis常见面试题(六)

目录 1、Redis支持的Java客户端有哪些? 2、Redisson是什么框架? 3、Redis和Redisson有什么关系? 4、Jedis和Redisson对比有什么优缺点? 5、Redis为什么不提供Windows版本? 6、Redis怎么在Windows下使用? 7、Redis如何设置密码访问? 8、Redis如何分析慢查询操作?…

前端线上问题如何调试

记录Vue开发过程中遇到的问题&#xff0c;测试环境以及本地显示都没有问题&#xff0c;但是一上线就出现问题&#xff0c;于是对于这个问题进行排查&#xff0c;在此记录排查问题的步骤以及方法&#xff0c;希望对大家有帮助。 错误信息&#xff1a;Uncaught TypeError: Canno…

Redis常见面试题(七)

目录 1、什么是缓存预热? 2、什么是缓存热备? 3、什么是缓存雪崩? 4、如何解决缓存雪崩? 5、什么是缓存穿透? 6、如何解决缓存穿透? 7、什么是缓存击穿? 8、如何解决缓存击穿? 9、什么是缓存抖动? 10、如何解决缓存抖动? 11、什么是缓存无底洞? 12、如何…

如何让一个 C 语言项目调用另一个 C++ 项目中某些类所提供的接口?

目前問題是這樣的&#xff1a;有兩個項目 一個項目是用 C 寫的 裏面提供了一個輸入輸出接口 後來從外面弄來了另外一個項目 用 C 寫的 現在需要將 C 項目中所使用的原有接口替換為使用我們的 C 項目中提供的接口 求問能夠實現否&#xff1f; 在项目开发过程中&#xff0c;我们底…

XGBoost总结

1.算法原理 XGBoost是boosting算法的其中一种。Boosting算法的思想是将许多弱分类器集成在一起形成一个强分类器。因为XGBoost是一种提升树模型&#xff0c;该算法思想就是不断地添加树&#xff0c;不断地进行特征分裂来生长一棵树&#xff0c;每次添加一个树&#xff0c;其实…

CSS3之3D转换

文章目录一、3D移动translate3d二、perspective&#xff08;透视&#xff09;三、translateZ四、rotateX-rotateY-rotateZ五、rotate3d(x,y,z,deg)六、3D呈现transfrom-style七、旋转木马案例一、3D移动translate3d 3D移动在2D移动的基础上多加了一个可以移动的方向&#xff0…

Python 函数用法和底层分析

【无限嚣张&#xff08;菜菜&#xff09;】&#xff1a;hello您好&#xff0c;我是菜菜&#xff0c;很高兴您能来访我的博客&#xff0c;我是一名爱好编程学习研究的菜菜&#xff0c;每天分享自己的学习&#xff0c;想法&#xff0c;博客来源与自己的学习项目以及编程中遇到问题…

Android编译ZLMediaKit之实现NVR功能问题点记录

NVR功能 NVR&#xff0c;全称Network Video Recorder&#xff0c;即网络视频录像机&#xff0c;是网络视频监控系统的存储转发部分&#xff0c;NVR与视频编码器或网络摄像机协同工作&#xff0c;完成视频的录像、存储及转发功能。 一、git clone项目 git clone --recursive …

ES 的存储原理

目录 一、ES是什么 二、ES基本结构 2.1、结构图 2.2、基本概念 2.3、与关系数据库概念的类比 2.4、数据如何读写 2.5 容灾能力 三、ES的文件存储结构 每个分片的事务日志&#xff08;Transaction Log&#xff09; Index文件夹内文件含义(lucene文件夹) 四、存储步骤…

计算机必备小知识【数据库字段、估算内存】

计算机必备小知识【数据库、内存】 1 mysql数存储类型&#xff08;database&#xff09; 1.1 char与varcha区别 char的存储空间是固定长度&#xff1b;varchar是可变长varchar会比char多1至2个字节来存放数据的长度 1.2 varchar存储 ①varchar能存多少汉字、数字呢&#x…

直播弹幕系统(六)- SpringBoot + STOMP + RabbitMQ(使用MQ替代Spring代理)

直播弹幕系统&#xff08;六&#xff09;- SpringBoot STOMP RabbitMQ&#xff08;使用MQ替代Spring代理&#xff09;前言一. SpringBoot整合RabbitMQ代理Broker1.1 RabbitMQ安装STOMP插件&#xff08;Docker&#xff09;1.2 RabbitMQ相关准备1.3 其他代码二. 前端整合Rabbit…

Prometheus_原理架构-安装部署

文章目录1、prometheus简介常见监控软件优势2、组成图讲解3、安装和配置3.1 容器安装3.2 二进制安装3.3 配置热加载1、prometheus简介 是一个监控软件–》监控容器非常好&#xff0c;也可以监控其他的非容器的机器的业务&#xff0c;例如&#xff1a;MySQL&#xff0c;nginx&am…

locksupport的park和unpark

locksupport是什么 LockSupport是一个线程阻塞工具类&#xff0c;所有的方法都是静态方法&#xff0c;可以让线程在任意位置阻塞&#xff0c;当然阻塞之后肯定得有唤醒的方法。 有什么用 接下面我来看看LockSupport有哪些常用的方法。主要有两类方法&#xff1a;park和unpar…