RocketMQ sql92的使用及原理简单分析附源码

news2024/12/24 8:17:28

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

RocketMQ 版本

  • 5.1.0

RokcetMQ消息过滤

目前官方支持的消息过滤方式主要有两种

  • tag
  • sql92

我们可以通过查看ExpressionType的源码证明

tag过滤方式是现在最为常用的过滤方式,但是一个消息只能包含一个tag。

对于相对复杂的消息过滤场景tag过滤方式可能就不够用了,但是绝大多数业务场景tag过滤方式已经够用了。

sql92过滤方式可以有助于我们实现一些高级功能,比如RocketMQ的多测试环境消息隔离等。

这里就暂时不过多讨论sql92的具体使用场景,我们还是先来学习怎么使用sql92

sql92 语法规则

语法说明示例
IS NULL判断属性不存在。a IS NULL :属性a不存在。
IS NOT NULL判断属性存在。a IS NOT NULL:属性a存在。
> >= < <=用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错。 说明 可转化为数字的字符串也被认为是数字。a IS NOT NULL AND a > 100:属性a存在且属性a的值大于100。 a IS NOT NULL AND a > ‘abc’:错误示例,abc为字符串,不能用于比较大小。
BETWEEN xxx AND xxx用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错。等价于>= xxx AND <= xxx。表示属性值在两个数字之间。a IS NOT NULL AND (a BETWEEN 10 AND 100):属性a存在且属性a的值大于等于10且小于等于100。
NOT BETWEEN xxx AND xxx用于比较数字,不能用于比较字符串,否则消费者客户端启动会报错。等价于< xxx OR > xxx,表示属性值在两个值的区间之外。a IS NOT NULL AND (a NOT BETWEEN 10 AND 100):属性a存在且属性a的值小于10或大于100。
IN (xxx, xxx)表示属性的值在某个集合内。集合的元素只能是字符串。a IS NOT NULL AND (a IN (‘abc’, ‘def’)):属性a存在且属性a的值为abc或def。
= <>等于和不等于。可用于比较数字和字符串。a IS NOT NULL AND (a = ‘abc’ OR a<>‘def’):属性a存在且属性a的值为abc或a的值不为def。
AND OR逻辑与、逻辑或。可用于组合任意简单的逻辑判断,需要将每个逻辑判断内容放入括号内。a IS NOT NULL AND (a > 100) OR (b IS NULL):属性a存在且属性a的值大于100或属性b不存在。

由于SQL属性过滤是生产者定义消息属性,消费者设置SQL过滤条件,因此过滤条件的计算结果具有不确定性,服务端的处理方式如下:

  • 异常情况处理:如果过滤条件的表达式计算抛异常,消息默认被过滤,不会被投递给消费者。例如比较数字和非数字类型的值。

  • 空值情况处理:如果过滤条件的表达式计算值为null或不是布尔类型(true和false),则消息默认被过滤,不会被投递给消费者。例如发送消息时未定义某个属性,在订阅时过滤条件中直接使用该属性,则过滤条件的表达式计算结果为null。

  • 数值类型不符处理:如果消息自定义属性为浮点型,但过滤条件中使用整数进行判断,则消息默认被过滤,不会被投递给消费者。

sql92使用

源码

所有源码已上传至github

  • 地址:https://github.com/weihubeats/weihubeats_demos/tree/master/java-demos/rocketmq-demo/src/main/java/com/weihubeats/rocketmq/demo/sql92

消息发送

public class SQLProducer {

	public static int count = 10;

	public static String topic = "xiao-zou-topic";


	public static void main(String[] args) {
		DefaultMQProducer producer = MQUtils.createLocalProducer();
		
		IntStream.range(0, count).forEach(i -> {
			Message message = new Message(topic, ("sql92 test" + i).getBytes(StandardCharsets.UTF_8));
			try {
				if (i % 2 == 0) {
					message.putUserProperty("gray", "dev1");
				}
				SendResult sendResult = producer.send(message);
				DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
				System.out.printf("%s %s%n", sendResult, dtf2.format(LocalDateTime.now()));
			}
			catch (Exception e) {
				throw new RuntimeException(e);
			}
		});
		producer.shutdown();
		
	}
}

这里我们假装消息是发送个多个测试的消息,所以每条消息都在UserProperty添加了一个dev1标签。

我们要实现的就是比如只有dev1环境的消费者才会消费带有dev1标签的消息,其他消息则丢弃掉

消息消费

public class SQLConsumer {

	public static String GID = "xiao-zou-gid";


	public static void main(String[] args) throws Exception {
		DefaultMQPushConsumer consumer = MQUtils.createLocalConsumer(GID);
		String sql = "gray is not null and gray = 'dev1'";
		consumer.subscribe(MQUtils.TOPIC, MessageSelector.bySql(sql));
		consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
			System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		});
		/*
		 *  Launch the consumer instance.
		 */
		consumer.start();
		System.out.printf("Consumer Started.%n");

	}
}

这里的消息消费方式唯一不同的是我们订阅消息的方式发生了变化

普通方法我们调用的是这个方法进行消息订阅的,传入tag就行

比如像这样

consumer.subscribe("TopicTest", "TagA || TagC || TagD");

但是这里我们使用的是sql92方式

传入的是一个MessageSelector,订阅的规则是

String sql = "gray is not null and gray = 'dev1'";

运行效果

  • 消息发送

这里我们发送了十条消息,只有5条是带有gray标签的

  • 消息消费

可以看到消息消费只有消费了带有gray标签的5条消息,符合我们的预期

sql92是在客户端还是在服务端过滤的?

sql92tag都是在服务端过滤的,我们可以查看源码得知

不过tag的过滤方式会在客户端再次过滤。因为在服务端是通过hashcode进行过滤的,为了提高性能,没有对原始的tag进行过滤,在通过hashcode过滤掉绝大多少的消息后,在客户端进行最后的tag完全过滤。

org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#processPullResult

如果统一都在客户端过滤会导致传输大量的消息到客户端,影响性能

总结

本次我们对RocketMQ sql92过滤消息进行了简单的使用以及少量的源码分析,并没有完整的从整个流程进行分析,因为本篇并不是源码分析偏。sql92在实际的项目中的相对来说较少,偶尔如果做RocketMQ消息的多册环境或者灰度,可能是一个方案,但不是最佳的

参考

  • 官方文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1162011.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【计算机网络】(谢希仁第八版)第三章课后习题答案

第三章 1.数据链路(即逻辑链路)与链路(即物理链路)有何区别? “电路接通了”与”数据链路接通了”的区别何在? 答&#xff1a;数据链路与链路的区别在于数据链路出链路外&#xff0c;还必须有一些必要的规程来控制数据的传输&#xff0c;因此&#xff0c;数据链路比链路多了…

不要在不使用cvx优化的场合使用log_det,应改为log(det(..)),否者可能会出现奇怪的错误

跑代码的时候遇到的问题 %% 分解为功率和单位模&#xff0c;交替优化功率分量和单位模clc;clear; Nt 8; % 发射天线数目 8 Nr 4; % 接收天线数目 4 Ne 6; % Eve天线数目 6 noisePower 1; SNRTotaldB -5:5:35; %%%%最大发射功率单位dB SNRTotal 10.^(SNRTotaldB./…

运动耳机品牌排行榜前十名,运动耳机品牌有哪些?

​当你在健身房挥汗如雨&#xff0c;或者在户外享受大自然的时候&#xff0c;最想要的是听歌放松心情&#xff0c;而运动耳机就像一个不知疲倦的伙伴&#xff0c;陪着你度过每一刻。它的稳定性和舒适性在最大程度上保证了你可以专注于你的运动&#xff0c;而不用分心于耳机的位…

基于springboot实现原创歌曲分享平台系统项目【项目源码+论文说明】计算机毕业设计

基于springboot实现原创歌曲分享平台演示 摘要 随着信息技术和网络技术的飞速发展&#xff0c;人类已进入全新信息化时代&#xff0c;传统管理技术已无法高效&#xff0c;便捷地管理信息。为了迎合时代需求&#xff0c;优化管理效率&#xff0c;各种各样的管理平台应运而生&am…

创建ABAP数据库表和ABAP字典对象-理解表字段02

理解表字段 这一步&#xff0c;您将定义表字段。首先&#xff0c;你需要了解你的需求: 内置的ABAP类型和新定义的字段类型 下面我们将会创建3个字段类型在数据库表中。 ●内置字段类型&#xff1a;最快的方法:应用系统已经提供好的字段类型&#xff0c;基本类型、长度和描述…

AGENTBENCH:评估LLMs作为代理的能力

背景&#xff1a; 这篇文章介绍了他们是如何去构造智能Agent评测集&#xff0c;以及如何对智能Agent能力做了几大分类。如果你无法评测一个问题&#xff0c;那么往往你也不能很好的解决一个问题。评测集的设计往往是更深入本质&#xff0c;因为评测集测试的是更泛化能力&#…

Find My磁吸卡包|苹果Find My技术与磁吸卡包结合,智能防丢,全球定位

2020年苹果发布会上&#xff0c;磁吸卡包应运而生&#xff0c;磁吸卡包可放入银行卡、交通卡等常规卡片&#xff0c;通过磁力轻松吸附在iPhone或MagSafe磁吸保护壳背后&#xff0c;用户轻装出门携带卡片更方便。 在智能化加持下&#xff0c;磁吸卡包加入防丢功能&#xff0c;…

高速光耦合器TLP2361(TPL,E(T 在工业网络中的应用

TLP2361(TPL,E(T 由一个高输出 GaA ℓAs 发光二极管与集成的高增益、高速光电探测器组成。它采用 SO6 封装。 该光电耦合器可保证在高达 125 C 和 2.7 V 至 5.5 V 电源下运行。由于 TLP2361 已保证 1 mA 低电源电流 (ICCL/ICCH) 和 1.6 mA (Ta 125 C) 低阈值输入电流&#xf…

论坛类型知识问答科普源码系统 带完整搭建教程

在互联网上&#xff0c;用户对于获取各类知识的需求不断增长&#xff0c;尤其是对于一些专业性较强或者较为冷门的知识领域。传统的搜索引擎并不能完全满足用户的需求&#xff0c;因此&#xff0c;开发一款针对特定知识领域的问答系统变得尤为重要。今天源码小编来给大家介绍一…

java修仙传之海岛奇遇

昨日开会&#xff0c; 商量了一下接口返回数据&#xff0c; 要求统一&#xff0c; 之前也同意&#xff0c;直接抛异常&#xff0c; 现在觉得之前那个异常不好&#xff0c; 看着不美观&#xff0c;对客户不友好 要求重新做。 大概要求如下&#xff1a; 要求1&#xff1a;范…

公派访问学者申请应该如何选择国家?

选择国家是公派访问学者申请中至关重要的一步。不同国家拥有各自独特的文化、教育体系和研究环境&#xff0c;因此&#xff0c;选择合适的国家对于一个学者的职业生涯和学术发展至关重要。下面将探讨一些选择国家时需要考虑的因素。 首先&#xff0c;你应该考虑自己的研究领域和…

干货 | 一文详解华为ITR流程体系:目标、流程、理念

华为著名的三大流程IPD、LTC、ITR让研发、营销、服务三大板块联动&#xff0c;以客户为中心提供高质量且不断改善的产品和服务。其中ITR大大提升了客服部门的效率和服务水平&#xff0c;还让客服在服务过程中有增加销售的可能。本文详解华为ITR的概念、目标、地位、构成、理念&…

linux编译boost库并执行程序

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言 一、--prefix命令 二、安装过程 1、shell脚本&#xff1a; 2、gcc编译环境 执行过程 三、linux下执行cpp程序 总结 前言 提示&#xff1a;这里可以添加本文…

Gopro hero5运动相机格式化后恢复案例

Gopro运动相机以稳定著称&#xff0c;旗下的Hero系列销售全球。下面我们来看一个Hero5格式化后拍了少量素材的恢复案例。 故障存储:64G MicroSD卡 Exfat文件系统 故障现象: 64G的卡没备份数据时做了格式化操作又拍了一条&#xff0c;发现数据没有备份&#xff0c;客户自行使…

Qt 窗口无法移出屏幕

1 使用场景 设计一个缩进/展开widget的效果&#xff0c;抽屉效果。 看到实现的方法有定时器里move窗口&#xff0c;或是使用QPropertyAnimation。 setWindowFlags(Qt::Dialog | Qt::FramelessWindowHint |Qt::X11BypassWindowManagerHint&#xff09;&#xff1b; 2 窗口属…

如何接入淘宝官方商家订单的API接口?方便管理店铺订单?

一.什么是淘宝API接口&#xff1f; 电子商务市场发展迅速。企业需要能够经常适应不断变化的需求。许多人没有合适的程序员来满足激增的功能需求 如果一家电子商务公司有一个项目在社交媒体上传播开来&#xff0c;他们就没有时间创建、测试和部署代码更新。如果发生这种情况&…

美国中性原子Atom Computing搞事情,1180量子比特计算机将研发成功?

&#xff08;图片来源&#xff1a;网络&#xff09; 量子计算是当下最前沿的科技领域——利用量子力学定律来解决经典计算机难以解决的复杂问题。总部位于美国加利福尼亚州的量子计算机制造商Atom Computing公司大胆宣布&#xff1a;他们正在测试一台1180量子比特的量子计算机…

【k8s】资源管理命令-声明式

一、 yaml和json介绍 1、yuml语言介绍 YAML是一个类似XML、JSON的标记性语言&#xff0c;它强调以数据为中心&#xff0c;并不是以标识语言为重点&#xff0c;而YAML本身的定义比较简单。号称“一种人性化的数据格式语言”。 YAML的语法比较简单&#xff0c;主要有下面几个 …

自定义类型结构体(中)

目录 结构体内存对齐对齐规则例子一练习3练习4-结构体嵌套问题 为什么存在内存对齐平台原因(移植原因)性能原因 修改默认对齐数 感谢各位大佬对我的支持,如果我的文章对你有用,欢迎点击以下链接 &#x1f412;&#x1f412;&#x1f412; 个人主页 &#x1f978;&#x1f978;…

【LeetCode】每日一题 2023_11_2 环和杆(题目质量不错)

文章目录 刷题前唠嗑题目&#xff1a;环和杆题目描述代码与解题思路看看别人的题解 结语 刷题前唠嗑 今天是简单&#xff0c;我快乐了 题目&#xff1a;环和杆 题目链接&#xff1a;2103. 环和杆 题目描述 代码与解题思路 func countPoints(rings string) (ans int) {num…