【RocketMQ系列五】消息示例-顺序消息延迟消息广播消息的实现

news2024/10/5 13:15:54

1. 前言

上一篇文章我们介绍了简单消息的实现,本文将主要来介绍顺序消息的实现,顺序消息分为局部顺序消息和全局顺序消息。

顺序消息指的是消费者在消费消息时,按照生产者发送消息的顺序进行消费。即先发送的先消费【FIFO】。

顺序消息分为 全局顺序消息和局部顺序消息。

全局顺序消息就是全局使用一个queue。

局部顺序消息就是 有顺序依赖的消息放在同一个queue中,多个queue并行消费。

2. 局部顺序消息

默认情况下RocketMQ会根据轮询的方式将消息发送到某个broker中的某个队列中,这样的话就不能保证消息是有序的。

比如在购物网站下单场景下:有 1. 创建订单---->2. 订单支付---->3. 订单发货---->4. 订单完成 四条消息。这四条消息逻辑上肯定是有序的。但是如果采用RocketMQ默认的消息投递方式,那么同一个订单,有可能创建订单被投递到了 MessageQueue1,订单支付的话被投递到了MessageQueue2。 由于消息在不同的MessageQueue中,消费者在消费的时候就可能会出现订单支付的消息先于创建订单的消息。

局部顺序消息就是要保证同一笔订单4条消息都放在同一个queue中,这样的话就不会出现订单支付的消息先于创建订单的消息被消费。就像下图所示:

局部顺序消息

局部顺序消息消费者在消费某个topic的某个队列中的消息的时候是顺序的。消费者使用MessageListenerOrderly类来进行消息监听。

2.1. 定义生产者

  1. 这里定义了名为part_order_topic_test的topic。运行程序之后该topic可以路由到broker-a 以及broker-b 两个broker。

    image-20231003154231683

public class OrderProducer {
	// 局部顺序消费,核心就是自己选择Queue,保证需要顺序保障的消息落到同一个队列中
	public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
		DefaultMQProducer defaultMQProducer = new DefaultMQProducer("order_producer_group");
		defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");
		defaultMQProducer.start();

		for (int i = 0; i < 10; i++) {
			int orderId = i;
			for (int j = 0; j < 5; j++) {
				// 构建消息体,tags和key 只是做一个简单区分
				Message partOrderMsg = new Message("part_order_topic_test", "order_" + orderId, "KEY_" + orderId, ("局部顺序消息处理_" + orderId + ";step_" + j).getBytes());
				SendResult send = defaultMQProducer.send(partOrderMsg, new MessageQueueSelector() {
					@Override
					//这里的arg参数就是外面的orderId
					public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
						Integer orderId = (Integer) arg;
						int index = orderId % mqs.size();
						return mqs.get(index);
					}
				}, orderId);

				System.out.printf("%s%n", send);
			}
		}
		defaultMQProducer.shutdown();
	}
}
  1. 在发送消息的时候实现MessageQueueSelector接口用于在发送消息的时候指定队列。其中, public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) 方法有三个参数:其中,mqs表示当前topic所路由的全部队列数,这里就是8个队列,broker-a有4个队列,broker-b有4个队列。msg就是传入的消息体,arg 就是传入的orderId。

  2. 这里根据orderId与队列数求模取余来获取消息应该发送到哪个队列中,这样就保证了相同的orderId的消息会落到同一个队列中

    Integer orderId = (Integer) arg;
    int index = orderId % mqs.size();
    return mqs.get(index);
    
生产者运行结果(部分截图)

image-20231003160039915

从运行结果可以看出相同orderId的消息被投递到了同一个MessageQueue中,而相同MessageQueue队列天然是有顺序的。

2.2.定义消费者

说完了生产者,接着来说说消费者。消费者的逻辑主要是在消费的时候需要实现 MessageListenerOrderly 类来进行消息监听。核心代码是:

	// 2.订阅消费消息
		defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {
			@Override
			public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
				for (MessageExt msg : msgs) {
					System.out.println("消费得到的消息是={}" + msg);
					System.out.println("消息体内容是={}" + new String(msg.getBody()));
				}
				return ConsumeOrderlyStatus.SUCCESS;
			}
		});

这里启动了三个消费者,不管消费者消费的顺序如何,相同的orderId下的5条消息都是被顺序消费的。

image-20231010125014684

image-20231010125043838

image-20231010125110337

3. 碰到的问题

在首次调试的时候出现了一个 broker is full 的错误。这是由于磁盘空间不足导致的,可以通过 df -h 命令查看当前磁盘空间的占用情况,当磁盘空间使用率超过90%的话则会报此错。

image-20231003131237358

4. 全局顺序消息

全局顺序消息是指消费者消费全部消息都是顺序的,只能让所有的消息都发送到同一个MessageQueue中来实现,在高并发场景下会非常影响效率。

5. 广播消息

广播消息是向主题(topic)的所有订阅者发送消息,订阅同一个topic的多个消费者,都能全量收到生产者发送的所有消息。

广播消息的生产者与普通同步消息的生产者实现是一致的,不同的是消费者的消息模式不同。这里给出消费者实现的不同之处。

	DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("broadCastGroup");
		defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");
		// 设置消费者的模式是广播模式
		defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);

		//从第一位开始消费
		defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

6. 延迟消息

延迟消息与普通消息的不同之处在于,它们要在指定的时间之后才会被传递。生产者并不会延迟发送消息,而是发送到topic里面,消费者延迟指定的时间进行消费。

6.1. 延迟消息生产者

DefaultMQProducer defaultMQProducer = new DefaultMQProducer("scheduled_group");
		defaultMQProducer.setNamesrvAddr("172.31.186.180:9876");
		defaultMQProducer.start();
		for (int i = 0; i < 100; i++) {
			Message message = new Message("Schedule_topic", ("延迟消息测试" + i).getBytes());
			//设置延迟级别,默认有18个延迟级别,这个消息将延迟10秒消费
			message.setDelayTimeLevel(3);
			defaultMQProducer.send(message);
		}
		System.out.println("所有延迟消息发送完成");
		defaultMQProducer.shutdown();

延迟消息生产者与普通消息生产者主要的区别是延迟消息需要调用 setDelayTimeLevel 方法设置延迟级别,这里设置级别是3,则是延迟10秒。RocketMQ提供了18种延迟级别。可以在 RocketMQ的仪表板中的集群中的broker配置中找到。

image-20231003200021490

延迟消息的消费者与普通消息的消费者相同的。RocketMQ内部通过名为SCHEDULE_TOPIC_XXXX 的topic来存放延迟消息。

image-20231003201410410

7.批量消息

批量发送消息提高了传递消息的性能。官方建议批量消息的总大小不应超过1M,实际不应超过4M。如果超过4M的批量消息需要进行分批处理。同时设置broker的配置参数为4M(在broker的配置文件中修改:maxMessageSize=4194304)。核心代码如下:

	//4.创建消息
		List<Message> messageList = new ArrayList<>();
		for (int i = 0; i < 100*100; i++) {
			// 创建消息,指定topic,以及消息体
			messageList.add(new Message("batch_topic", ("飞哥测试批量消息" + i).getBytes()));
		}
		//批量消息消息小于4M的处理
		SendResult send = defaultMQProducer.send(messageList);
		System.out.println(send);

8.过滤消息

使用tag过滤

在大多数情况下,标签是一种简单而有用的设计,可以用来选择你想要的消息。

首先是根据tag来过滤消息,生产者在发送消息的时候指定该消息的tag标签,消费者则可以根据tag来过滤消息。

8.1. 过滤消息生产者

这里定义了三个tag,分别是tagA,tagB以及tagC,生产者在生产消息的时候给每个消息指定不同的tag。

DefaultMQProducer defaultMQProducer = new DefaultMQProducer("TagProducer_group");
		defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");
		defaultMQProducer.start();
		String[] tags = new String[]{"tagA", "tagB", "tagC"};
		for (int i = 0; i < 15; i++) {
			Message message = new Message("TagFilterTest", tags[i % tags.length], ("飞哥tag消息过滤" + tags[i % tags.length]).getBytes());
			SendResult send = defaultMQProducer.send(message);
			System.out.printf("%s%n", send);
		}
		defaultMQProducer.shutdown();

8.2. 过滤消息的消费者

消费者过滤出了标签带有tagA以及tagC的消息进行消费。这里其实是broker将consumer需要的消息推给消费者。

	DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("tagConsumer");
		defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");
		defaultMQPushConsumer.subscribe("TagFilterTest", "tagA||tagC");
		defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
			for (MessageExt msg : msgs) {
				System.out.println("接收到的消息=" + msg);
				System.out.println("接收到的消息体=" + new String(msg.getBody()));
			}
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		});
		defaultMQPushConsumer.start();
		System.out.println("消费者已经启动");

image-20231003212919155

使用SQL过滤

SQL 功能可以通过发送消息时输入的属性进行一些计算,在RocketMQ定义的语法下,可以实现一些有趣的逻辑。

语法

RocketMQ只定义了一些基本的语法类支持这个特性。

1. 数值比较:如 `>`,`>=`,`<=`,`BETWEEN`,`=`;
2. 字符比较:如 `=`,'<>',`IN`;
3. `IS NULL` 或 `IS NOT NULL` ;
4. 逻辑`AND`,`OR`,`NOT`;

常量类型有:

1. 数字,如 123,
2. 字符,如 'abc',必须用单引号;
3. `NULL`,特殊常数;
4. 布尔值,`TRUE` 或 `FALSE`;

SQL过滤生产者

生产者主要设置属性过滤 message.putUserProperty("a", String.valueOf(i)); 表示第一条消息键值对是 a=0,第二条消息键值对是a=1。

	DefaultMQProducer defaultMQProducer = new DefaultMQProducer("TagProducer_group");
		defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");
		defaultMQProducer.start();
		String[] tags = new String[]{"tagA", "tagB", "tagC"};
		for (int i = 0; i < 15; i++) {
			Message message = new Message("SQLFilterTest", tags[i % tags.length], ("飞哥sql消息过滤" + tags[i % tags.length]).getBytes());

			message.putUserProperty("a", String.valueOf(i));
			SendResult send = defaultMQProducer.send(message);
			System.out.printf("%s%n", send);
		}
		defaultMQProducer.shutdown();

SQL过滤消费者:

	DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("tagConsumer");
		defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");
		defaultMQPushConsumer.subscribe("SQLFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('tagA','tagC'))"+" and (a is null and a between 0 and 3)"));
		defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
			for (MessageExt msg : msgs) {
				System.out.println("接收到的消息=" + msg);
				System.out.println("接收到的消息体=" + new String(msg.getBody()));
			}
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		});
		defaultMQPushConsumer.start();
		System.out.println("消费者已经启动");

如果运行报 The broker does not support consumer to filter message by SQL92

image-20231003221207618

则需要修改 broker.conf 文件,增加如下配置:

# 开启对 propertyfilter的支持
enablePropertyFilter = true 
filterSupportRetry = true

然后重启broker。

总结

本文介绍了局部顺序消息,全局顺序消息,广播消息,延迟消息,以及如何批量发送消息和过滤消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1116747.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

凉鞋的 Godot 笔记 203. 变量的常用类型

203. 变量的常用类型 在上一篇&#xff0c;我们对变量进行了概述和简介&#xff0c;知识地图如下&#xff1a; 我们已经接触了&#xff0c;变量的字符串类型&#xff0c;以及一些功能。 在这一篇&#xff0c;我们尝试多接触一些变量的类型。 首先是整数类型。 整数类型 整…

生成指定范围内的指定个数的随机整数numpy.random.randint()

【小白从小学Python、C、Java】 【计算机等级考试500强双证书】 【Python-数据分析】 生成指定范围内的 指定个数的随机整数 numpy.random.randint() [太阳]选择题 以下哪个选项正确地描述了上述代码的功能&#xff1f; import numpy as np arr np.random.randint(1, 10, 5) p…

第一节——vue安装+前端工程化

作者&#xff1a;尤雨溪 官网&#xff1a;简介 | Vue.js 脚手架文档 创建一个项目 | Vue CLI 一、概念&#xff08;了解&#xff09; 是一套用于构建用户界面的渐进式框架。与其它大型框架不同的是&#xff0c;Vue 被设计为可以自底向上逐层应用。Vue 的核心库只关注视图层&…

凉鞋的 Unity 笔记 203. 变量的常用类型

203. 变量的常用类型 在上一篇&#xff0c;我们对变量进行了概述和简介&#xff0c;知识地图如下&#xff1a; 我们已经接触了变量的字符串类型&#xff0c;以及一些功能。 在这一篇&#xff0c;我们尝试多接触一些变量的类型。 首先是整数类型。 整数类型 整数类型一般是…

力扣第51题 N 皇后 c++ 难~ 回溯题

题目 51. N 皇后 困难 相关标签 数组 回溯 按照国际象棋的规则&#xff0c;皇后可以攻击与之处在同一行或同一列或同一斜线上的棋子。 n 皇后问题 研究的是如何将 n 个皇后放置在 nn 的棋盘上&#xff0c;并且使皇后彼此之间不能相互攻击。 给你一个整数 n &#xff0…

ChessGPT:免费好用的国际象棋对弈AI机器人

对于国际象棋初学者&#xff0c;需要找一个对手来练棋。ChessGPT&#xff0c;就是一个免费好用的AI对弈机器人&#xff0c;非常适合新手来提升&#xff0c;是一个很好的练习伙伴。网站地址是&#xff1a;https://www.chess.com/play/computer&#xff0c;也有手机版app&#xf…

Deep Learning for Geophysics综述阅读(未完)

文章题目《Deep Learning for Geophysics: Current and Future Trends》 文章解读&#xff1a;地球物理学&#xff08;人工智能轨道&#xff09;——&#xff08;1&#xff09;文献翻译《面向地球物理学的深度学习&#xff1a;当前与未来趋势》 - 知乎 (zhihu.com) 这里主要列…

运筹学:影子价格(shadow price)和对偶价格(dual price)

文章目录 对偶问题的解影子价格对偶价格对偶价格与影子价格的关系总结例题 对偶问题的解 影子价格 影子价格是一个经济学意义上的解释&#xff0c;因为不同的解读&#xff0c;目前对于影子价格准确的定义较为混乱。下面下来举几个例子&#xff1a; the shadow price associat…

【代码随想录第46天】 动态规划6

代码随想录第46天| 动态规划6 完全背包518. 零钱兑换 II377. 组合总和 Ⅳ 完全背包 代码随想录&#xff1a;完全背包 有N件物品和一个最多能背重量为W的背包。第i件物品的重量是weight[i]&#xff0c;得到的价值是value[i] 。每件物品都有无限个&#xff08;也就是可以放入背包…

解决Win10系统按 Win+L 键不能锁屏的问题

一、问题现象&#xff1a; 1、使用”WinL“快捷键无效&#xff1b; 2、系统设置 》电源与睡眠 》其他电源设置 》选择电源按钮的功能中&#xff0c;锁定选项未选中且不可选&#xff0c;且点击“更改当前不可用的设置”后仍然不可选。 3、用户操作选项中无”锁定“按钮&#…

通过TDE透明加密实现服务器防勒索 安当加密

安当TDE透明加密技术主要应用于对数据库中的数据执行实时加解密的应用场景&#xff0c;特别是在对数据加密有较高要求&#xff0c;以及希望加密后数据库性能影响几乎可以忽略的场景中。 安当TDE透明加密技术的防勒索应用场景可以通过以下步骤进行介绍&#xff1a; 数据保护&am…

淘宝商品详情API接口(标题|主图|SKU|价格|销量|库存..)

一、应用场景 淘宝商品详情接口的应用场景非常广泛&#xff0c;以下是其中几个例子&#xff1a; 商家用于展示商品信息&#xff1a;淘宝详情接口可以被用于商家的自主店铺或第三方电商平台上&#xff0c;方便展示商品详细信息。 商品价格比对&#xff1a;淘宝详情接口可以用于…

编程教室本周视频更新

入门教程、案例源码、学习资料、读者群 请访问&#xff1a; python666.cn 大家好&#xff0c;欢迎来到 Crossin的编程教室 &#xff01; 以下是近期制作的一些编程教学视频&#xff0c;欢迎观看、点赞、收藏、转发。 0.10.2竟然不等于0.3?! Python中的小数比较是否相等时的一个…

Ruo-Yi前后端分离相关笔记

1.前提条件和基础 Spring Boot Vue 环境要求&#xff1a;Jdk1.8以上版本、MySql数据库、Redis、Maven、Vue 2.使用若依 官网地址&#xff1a;RuoYi-Vue: &#x1f389; 基于SpringBoot&#xff0c;Spring Security&#xff0c;JWT&#xff0c;Vue & Element 的前后端分…

获得Pareto前沿的方法有什么?NSGA-Ⅱ怎么获得Pareto非支配解集 + 支配解集?怎么获得Pareto前沿?/非支配解集代码实现

获得Pareto前沿的方法有什么&#xff1f; 获得Pareto前沿的方法有很多&#xff0c;下面列举了一些常用的方法&#xff1a; 权重法&#xff08;Weighted Sum Method&#xff09;&#xff1a;为每个目标函数分配一个权重&#xff0c;并将多目标优化问题转化为单目标优化问题。通…

订单30分钟自动关闭的五种解决方案

1 前言 在开发中&#xff0c;往往会遇到一些关于延时任务的需求。例如 生成订单30分钟未支付&#xff0c;则自动取消生成订单60秒后,给用户发短信 对上述的任务&#xff0c;我们给一个专业的名字来形容&#xff0c;那就是延时任务 。那么这里就会产生一个问题&#xff0c;这…

ElementPlus表格中的背景透明

ElementPlus表格中的背景透明 最近写大屏&#xff0c;用到elementplus中的el-table&#xff0c;为了让显示效果好看一点&#xff0c;需要把表格的白色背景调整为透明&#xff0c;与整个背景融为一体。可以参考的资料非常少&#xff0c;大部分都是ElmentUI的方法&#xff0c;在…

摆动序列【贪心4】

题目 分析 代码 class Solution { public:int wiggleMaxLength(vector<int>& nums) {if(nums.size() < 2) return nums.size();int ret 0,left 0,right 0;for(int i 0;i < nums.size()-1;i){right nums[i1] - nums[i];if(right 0) continue;if(left …

Python数据分析实战-使用replace方法模糊匹配替换某列的值(附源码和实现效果)

实现功能 Python数据分析实战-使用replace方法模糊匹配替换某列的值 实现代码 import pandas as pd import re# 创建一个示例DataFrame data {A: [apple, banana, pineapple, orange, grape]} df pd.DataFrame(data)# 打印替换前的DataFrame print("替换前的DataFram…

5年经验之谈 —— 接口测试主要测哪些方面?

当今互联网时代&#xff0c;接口测试已经成为软件测试的一个重要组成部分。接口测试是指对系统各个接口进行验证&#xff0c;确保接口的正确性、稳定性和安全性。接口测试是软件开发过程中不可缺少的环节&#xff0c;它旨在确保接口能够正常工作&#xff0c;并且满足所需要的规…