RocketMQ 顺序消息收发实践

news2024/10/6 10:27:36

目录

  • 概述
  • 局部有序
    • 创建 Topic
    • 配置
    • 代码
    • 测试
  • 结束

概述

  • 顺序消息
    • 全局有序:适用于性能不是特别高的场景,但是又要求消息又严格一致的概念。
    • 局部有序:适用于性能要求高的场景,想办法通过在设计层面处理有序的消息尽量发送至同一个 Topic 中的同一个队列。
  • 两种有序创建方法
    • 全局有序:
      • perm:2:只写,4:只读;6:读写
      • 创建一个 Topic 只有一个队列。
    • 局部有序:
      • Partly-Orderly-Topic

注意:本文只会针对 局部有序进行实践。

官方文档

局部有序

常见做法就是将 order id 进行处理,将 order id 相同的消息发送到 topicB 的同一个 queue,假设我们 topicB 有 2 个 queue,那么我们可以简单的对 id 取余,奇数的发往 queue0,偶数的发往 queue1,消费者按照 queue 去消费时,就能保证 queue0 里面的消息有序消费,queue1 里面的消息有序消费。

程序局部有序如下图设计进行测试。
在这里插入图片描述

创建 Topic

sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t orderly

[root@hadoop02 rocketmq-all-5.1.4-bin-release]# sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t orderly
create topic to 10.32.36.143:10911 success.
TopicConfig [topicName=orderly, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}]
[root@hadoop02 rocketmq-all-5.1.4-bin-release]# 

在这里插入图片描述

配置

注意:spring.cloud.stream.rocketmq.bindings.input.consumer.orderly=true

spring:
  cloud:
    stream:
      function:
        definition: consumer
      rocketmq:
        binder:
          name-server: 10.32.36.143:9876
        bindings:
          producer-out-0:
            producer:
              group: output_1
              # 定义messageSelector
              messageQueueSelector: orderlyMessageQueueSelector
          consumer-in-0:
            consumer:
              # tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .
#              subscription: 'TagA || TagC || TagD'
              orderly: true
      bindings:
        producer-out-0:
          destination: orderly
        consumer-in-0:
          destination: orderly
          group: orderly-consumer

logging:
  level:
    org.springframework.context.support: debug

上面配置进行补充如下:

  • spring.cloud.stream.bindings.通道名字.group 是针对具体通道的配置,用于设置该通道的消费组名。如果在这里设置了消费组名,那么就会覆盖全局配置。
  • spring.cloud.stream.rocketmq.binder.group 是全局配置,用于设置默认的消费组名。如果没有在具体的通道中设置消费组名,那么就会使用这个全局配置。

spring.cloud.stream.bindings和spring.cloud.stream.rocketmq.bindings 区别

  • 1.spring.cloud.stream.bindings是Spring Cloud Stream的核心配置属性,用于定义消息通道的绑定和配置。
  • spring.cloud.stream.rocketmq.bindings是Spring Cloud Stream与RocketMQ集成时的配置属性,用于定义RocketMQ消息通道的绑定和配置。

代码

public class SimpleMsg {
    private String msg;

    public SimpleMsg(String msg) {
        this.msg = msg;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}
@Component
public class OrderlyMessageQueueSelector implements MessageQueueSelector {
    private static final Logger log = LoggerFactory
            .getLogger(OrderlyMessageQueueSelector.class);
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) ((MessageHeaders) arg).get(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID);
        String tag = (String) ((MessageHeaders) arg).get(MessageConst.PROPERTY_TAGS);
        int index = id % MqApplication.tags.length % mqs.size();
        return mqs.get(index);
    }
}
@EnableDiscoveryClient
@SpringBootApplication
public class MqApplication {
    private static final Logger log = LoggerFactory
            .getLogger(MqApplication.class);

    public static final String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};

    public static void main(String[] args) {
        SpringApplication.run(MqApplication.class);
    }

    @Autowired
    private StreamBridge streamBridge;


    @Bean
    public ApplicationRunner producer() throws InterruptedException {
        Thread.sleep(6000);
        log.info("开始...");
        return args -> {
            for (int i = 0; i < 50; i++) {
                String key = "KEY" + i;
                Map<String, Object> headers = new HashMap<>();
                headers.put(MessageConst.PROPERTY_KEYS, key);
                headers.put(MessageConst.PROPERTY_TAGS, tags[i % tags.length]);
                headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i%2);
                Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Hello RocketMQ " + i%2), headers);
                streamBridge.send("producer-out-0", msg);
            }
        };
    }

    @Bean
    public Consumer<Message<SimpleMsg>> consumer() {
        return msg -> {
            String tagHeaderKey = RocketMQMessageConverterSupport.toRocketHeaderKey(
                    MessageConst.PROPERTY_TAGS).toString();
            log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload().getMsg() + " TAG:" +
                    msg.getHeaders().get(tagHeaderKey).toString());
            try {
                Thread.sleep(100);
            } catch (InterruptedException ignored) {
            }
        };
    }
}

测试

在这里插入图片描述

在这里插入图片描述

结束

至此,RocketMQ 顺序消息收发实践 就结束了,如有疑问,欢迎评论区留言。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1320113.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

qt QFile文件操作处理,QFileInfo文件信息读取的简单介绍

QFile类用于文件操作,对文件进行读写操作,可读写文件,二进制文件和qt资源文件.处理文本文件和二进制文件可使用QTextStream类和QDataStream类,处理临时文件可以使用QTemporary,获取文件信息可以使用QFileInfo,处理目录可以使用QDir,监视文件和目录变化可以使用QFileSystemWatch…

整合SSH(Spring+Struts+Hibernate)

0.前言, 由于工作需要故来学习SSH的整合,structs其实相当于(把view和controller结合起来,没有像现在的前后端分离,请求会发送给Action处理,在structs.xml映射地址和类) Hibernate(就是处理数据库的,几乎自动化,也可以写sql语句) Struts&#xff1a;Struts 是一个基于 MVC&#…

【Python】计算一年内的总天数(还有跨年日期)

花了一段时间才找到Python中求一年中总日数&#xff08;total day of the Year&#xff09;的格式代码&#xff0c;所以也把计算方法记录下来。 基本 首先&#xff0c;简单地找出一年中的总天数&#xff0c; strftime() 和 strptime() 的格式代码是 %j ↓看这里 使用 strft…

算法基础之Prim算法求最小生成树

Prim算法求最小生成树 核心思想&#xff1a;Prim 算法 类似于dijkstra算法 更新距离时改为到**集合(生成树)**的距离 最小生成树长这样 每次迭代放一个最近(有边)点&#xff0c;一条最短边进生成树 #include <cstring>#include <iostream>#include <algori…

08-Event Sources和Sink架构

1 PingSource -> Kubernetes Service Sink 架构模型 示例1 部署一个kubernetes类型的sink&#xff0c;这里面还是以event-display为例&#xff0c;下面是资源清单 --- apiVersion: apps/v1 kind: Deployment metadata:name: event-display spec:replicas: 1selector:matc…

Axie Infinity 之后,Ronin 的潜力何在?

作者&#xff1a;stellafootprint.network 数据来源&#xff1a;Ronin Dashboard 备受欢迎的 Web3 游戏 Pixels 在 2023 年 10 月下旬从 Polygon 迁移到了专为游戏设计的区块链 Ronin。Pixels 此前作为 Polygon 上活跃用户&#xff08;钱包数量&#xff09;最多的 Web3 游戏&…

【重点】【DFS】【子集】78.子集

题目 法1&#xff1a;DFS 必须掌握的方法&#xff01;&#xff01;&#xff01; 我们也可以用递归来实现子集枚举。 假设我们需要找到一个长度为 nnn 的序列 aaa 的所有子序列&#xff0c;代码框架是这样的&#xff1a; void dfs(int cur, int n, LinkedList<Integer>…

HTML5+CSS3小实例:3D发光切换按钮效果

目录 一、运行效果 图片效果 二、项目概述 三、开发环境 四、实现步骤及代码 1.创建空文件夹 2.完成页面内容 3.完成css样式 五、项目总结 六、源码获取 一、运行效果 图片效果 二、项目概述 该项目是一个基于HTML和CSS的动态小猫动画。通过使用CSS样式和动画效果…

开源支付项目,还有强大的聚合支付项目,值得借鉴学习,收藏备用

开源支付项目,还有强大的聚合支付项目,值得借鉴学习,收藏备用 自学编程之道2023-03-18 14:26 移动支付随处可见,以前都是微信、支付宝分别各一个收款码,二维码多了之后就不好管理了,随着支付方式的增多,现在基本上都用聚合支付,多种支付方式,一码搞定。针对支付及聚…

Linux常用网络指令

网络参数设定使用的指令 手动/自动设定与启动/关闭 IP 参数&#xff1a;ifconfig, ifup, ifdown ifconfig ifconfig常用于修改网络配置以及查看网络参数的指令 [rootwww ~]# ifconfig {interface} {up|down} < 观察与启动接口 [rootwww ~]# ifconfig interface {options…

项目管理:产品经理如何保障项目按时完成

一个项目的成功要考虑多方因素&#xff0c;即使经过了精细的策划&#xff0c;但是在项目推进过程中也会遇到各种问题。 产品经理的任务就不仅仅在于完成产品策划&#xff0c;还需要承担项目管理工作&#xff0c;跟进项目进度&#xff0c;必要时还得出面协调、解决冲突&#x…

猜数字小游戏(猜错了会关机推荐让室友帮你玩)

前言 今天来带大家写一个简易的猜数字小游戏&#xff0c;如果连着猜错n次&#xff08;自己设定&#xff09;就会导致电脑关机&#xff0c;还在等什么呢&#xff1f;赶紧学会咯&#xff0c;发给你的室友让他帮你玩吧&#xff01; 正文 随机数的生成 首先我们还要学会如何创建随…

Windows10 如何开机自动启动redis

前言 当我们在Windows 10上使用Redis时&#xff0c;通常希望能够使Redis服务在系统启动时自动启动&#xff0c;以便我们无需手动介入就能够方便地访问和管理数据。在这个过程中&#xff0c;我们将通过下载、安装和配置Redis为Windows服务的方式&#xff0c;使其成为系统的一部分…

python与机器学习4,激活函数

目录 1 激活函数1: 单位阶跃函数 1.1 函数形式 1.2 函数图形 1.3 函数特点 1.4 代码实现这个 单位阶跃函数 2 激活函数2 sigmoid函数 2.1 函数形式 2.2 函数图形 2.3 函数特点 2.3.1 是一个连续函数&#xff0c;且是一个渐变的曲线 2.3.2 是连续区间的[0,1] , 可以…

JVM基础扫盲

什么是JVM JVM是Java设计者用于屏蔽多平台差异&#xff0c;基于操作系统之上的一个"小型虚拟机"&#xff0c;正是因为JVM的存在&#xff0c;使得Java应用程序运行时不需要关注底层操作系统的差异。使得Java程序编译只需编译一次&#xff0c;在任何操作系统都可以以相…

定制 Electron 窗口标题栏

Electron 是一款流行的桌面应用开发框架&#xff0c;基于 Web 技术构建&#xff0c;提供了强大的跨平台能力。在开发过程中&#xff0c;经常需要定制窗口标题栏以创造独特的用户体验。 1. 完全隐藏默认标题栏 有时候&#xff0c;我们希望创建一个自定义的标题栏&#xff0c;完…

光伏收益计算工具:实现可持续能源投资的决策支持

随着全球能源结构的转型&#xff0c;光伏发电作为主要的可再生能源之一&#xff0c;其投资前景日益光明。然而&#xff0c;光伏发电项目的投资决策需要基于准确的收益预测。因此&#xff0c;光伏收益计算工具应运而生&#xff0c;为投资者提供决策支持。 光伏收益计算工具是一种…

护肤品品牌怎么创建百度百科词条?

护肤品品牌创建百度百科词条&#xff0c;需要遵循一定的步骤和原则&#xff0c;同时保证内容的真实性和权威性。以下是伯乐网络传媒给大家分享的详细创建流程和注意事项&#xff1a; 一、准备工作 收集资料&#xff1a;首先&#xff0c;需要收集品牌的详细资料&#xff0c;包括…

【Matlab】如何可视化多项式函数(附完整MATLAB代码)

可视化多项式函数 前言多项式函数MATLAB 对理解多项式函数的帮助 正文思考步骤 代码实现及图像显示对于一元多项式函数对于多元多项式函数 前言 多项式函数 多项式函数是数学中常见的一类函数&#xff0c;它的一般形式可以表示为&#xff1a; [ f ( x ) a n x n a n − 1 x…

基于Python Django的内容管理系统Wagtail CMS部署与公网访问

文章目录 前言1. 安装并运行Wagtail1.1 创建并激活虚拟环境 2. 安装cpolar内网穿透工具3. 实现Wagtail公网访问4. 固定的Wagtail公网地址 前言 Wagtail是一个用Python编写的开源CMS&#xff0c;建立在Django Web框架上。Wagtail 是一个基于 Django 的开源内容管理系统&#xf…