RocketMQ的TAG过滤和SQL过滤机制

news2025/1/12 19:01:18

写作目的

项目中各个中台都使用同一个DB。而DB下会使用中间件监听binlog转换成MQ消息,而下游的各个中台去MQ去拿自己感兴趣的消息

  • TAG
    如果使用TAG去获取自己感兴趣的消息,那么对于一条学生表变更binlog,最少要插入三条消息,比如TAG=学生表,比如TAG=UPDATE修改操作,比如TAG=学生状态为1,等等。想到的就三种。。。
    所以上面这种方式缺陷还是挺明显的。

  • SQL过滤
    如果使用SQL过滤的方式,我们可以对某些属性进行过滤,自己拼接SQL,灵活性就上来了。

但是我好奇的一点是SQL怎么加到TAG里呢?并且TAG只能支持一个属性值呀。所以接下来从源码和原理的角度进行分析和探讨。

总体来说Tag过滤和SQL过滤如下图所示
在这里插入图片描述

代码展示

本着简单的原则出发

TAG过滤

当producer构建消息时消息时会构造方法里会有TAG的属性,如代码所示,Tag = Creative。

Message msg =
              new Message(
                  "CBeann", // topic
                  "Creative", // tag
                  "OrderID188", // key
                  "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // msg body

当consumer订阅topic时要想监听Tag = creative的就可以如下图所示

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
		//主题,Tag
        consumer.subscribe("CBeann", "Creative");
        consumer.setNamesrvAddr("114.115.208.175:9876");
        consumer.setConsumerGroup("group1");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    // wrong time format 2017_0422_221800
    // consumer.setConsumeTimestamp("20181109221800");
    consumer.registerMessageListener(...)

SQL过滤

与Tag消息不同的是,produccer生产的msg需要放入一些属性,如下代码所示,放入age属性的值为18。

 Message msg =
              new Message(
                  "CBeann", // topic
                  "creative", // tag
                  "OrderID188", // key
                  "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // msg body
          msg.putUserProperty("age", String.valueOf(18));
          SendResult sendResult = producer.send(msg);

consumer中则不能根据tag过滤了。需要使用MessageSelector

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        MessageSelector messageSelector = MessageSelector.bySql("age >= 5");
        consumer.subscribe("TopicTest", messageSelector);
        //consumer.setNamesrvAddr("114.115.208.175:9876");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumerGroup("group1Sql");
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    consumer.registerMessageListener()

TAG过滤机制

过滤图

此处以Tag过滤机制为例,消息过滤存在两个地方,一个是服务器端,另一个是消息者端。
在这里插入图片描述

假设消费者订阅的topic=CBeann,tag=creative,creative的hashCode =9527(假设一下)

而topic=CBeann的消息队列里有3条消息
msg1[tag=feed,tagHashCode= 9000]
msg2[tag=creative,tagHashCode= 9527]
msg3[tag=material,tagHashCode= 9527]

当consumer消费者给broker服务器发送获取topic=CBeann,tag=creative请求时,请求会转化为topic=CBeann,tagHashCode=9527
因此对于上述的3条消息,经过tagHashCode匹配后会把msg2和msg3发送给consumer消息者。
而Consumer消费者会根据tag匹配后留下msg2

源码思路讲解

构建SubscriptionData

首先要了解一点,我们在consumer中设置订阅的topic和tag是什么样的一个数据结构呢?

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("CBeann", "Creative");

其实一直往下跟subcribe方法,最后我们可以定位到FilterAPI#buildSubscriptionData方法。我们传入的topic=CBeann;tag=creative 被封装到SubscriptionData对象里,包括topic、tag、tagHashCode
在这里插入图片描述

brokder过滤逻辑

那么consumer消费端是存储着topic、tag、tagHashCode。而consumer会把topic和tagHashCode发送给Broker服务器。

当consumer消费者向broker服务端请求获取消息时,broker会从ConsumeQueue获取offset之后的所有如下所示的三元组。ConsumeQueue里的数据三元组如下图所示。
在这里插入图片描述

其实三元组是解析出来的,解析的三个属性就是上图中的offsetPy、sizePy和tagCode。下面我们重点关注一下tagCode
在这里插入图片描述
解析出来的tagsCode如果匹配成功,则保留,如果匹配失败,则continue。
在这里插入图片描述

接下来看一下是怎么匹配呢?如果是*,则全匹配,否则就根据tagsCode匹配。此处不是根据tag匹配,所以会有hash冲突的数据也会匹配到
在这里插入图片描述

结论此时我们可以看到,broker服务器端是通过hashcode匹配的,哈希冲突的msg会被认为有效消息发送给consumer端

consumer过滤逻辑

一般这种RPC的都是通过回调实现的,所以看完源码后定位到了一个CallBack方法。该CallBack方法如下所示,拿到Broker发送的消息后在经过processPullResult预处理后才会真正去判断消息是否获取到。
在这里插入图片描述
拿到消息后再经过Tag过滤,如下图所示,则到达我们自定义的处理消息逻辑
在这里插入图片描述
结论此时我们可以看到,consumer消费者端是通过tag匹配的,二次过滤因为哈希导致消息Tag不准确的问题

SQL过滤机制

SQL过滤和Tag过滤的消息有什么区别

结论:没区别,就是多了几个属性。比如下面的代码中的age属性

Message msg =
              new Message(
                  "CBeann", // topic
                  "creative", // tag
                  "OrderID188", // key
                  "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // msg body
          msg.putUserProperty("age", String.valueOf(18));

如上面代码所示,msg的tag=creative, 属性age=18。
其实根据Message的构造方法和putUserProperty方法可以发现,最后都是放到Properties里
在这里插入图片描述

构建SubscriptionData

SQL过滤和Tag过滤的consumer端有什么区别?

如下面代码所示,我们构造了一个MessageSelector

 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        MessageSelector messageSelector = MessageSelector.bySql("age >= 5");
        consumer.subscribe("TopicTest", messageSelector);

那么subscribe方法同样是把MessageSelector也是构建成SubscriptionData。不过和Tag那种不同的是,SubscriptionData里面放的是SQL即subString属性和expressionType属性SQL92
在这里插入图片描述

源码跟踪

broker过滤逻辑

SQL过滤和tag过滤都是经历下面的三个阶段,下面我们重点跟一下SQL过滤的代码块messageFilter#isMatchedByCommitLog。在这里插入图片描述

debug了一下,如下图所示,从buffer里解析出properties来然后和SQL进行校验,返回校验结果
在这里插入图片描述

consumer过滤逻辑

一般这种RPC的都是通过回调实现的,所以看完源码后定位到了一个CallBack方法。该CallBack方法如下所示,拿到Broker发送的消息后在经过processPullResult预处理后才会真正去判断消息是否获取到。
下面的这个图其实在上面也出现过,这个处理方法里并没有SQL过滤的逻辑,因此在consumer不过滤。
在这里插入图片描述

总结

  • 特殊的分表方式
    tag作为msg的properties,这个其实映射到数据库分库分表中。比如db的一条记录需要新增一个字段,我们完全可以新增一个setting表,存储这个properties属性。阿里这边的很多项目DB设计都是这么做的。
  • SQL过滤比Tag过滤慢的原因:比较慢,解析慢
    Tag过滤是直接等于,而SQL过滤还要通过表达式计算,SQL复杂的计算必然不如直接等于快。
    SQL过滤的时候需要解析properties,本身就是一种资源消耗。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/94443.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微服务框架 SpringCloud微服务架构 多级缓存 47 Lua 语法入门 47.1 初识Lua

微服务框架 【SpringCloudRabbitMQDockerRedis搜索分布式,系统详解springcloud微服务技术栈课程|黑马程序员Java微服务】 多级缓存 文章目录微服务框架多级缓存47 Lua 语法入门47.1 初识Lua47.1.1 初识Lua47.1.2 HelloWorld47 Lua 语法入门 47.1 初识Lua 47.1.1…

《纳瓦尔宝典》笔记三——做自己真正感兴趣的事情

你合上书本,留在你脑子里的才真正是你的智慧 目录 一、开始让你兴致盎然,后来又让你觉得索然无味了吗 二、在“成为自己”这件事“上,没有人比你做得好 三、专长无法被教授,但可以被学习 四、上学能带来什么 五、尽量做不需…

【大数据技术Hadoop+Spark】HBase分布式数据库架构、特点、数据存储方式、寻址机制详解(图文解释)

一、HBase简介 HBase起源于2006年Google发表的BigTable论文。HBase是一个高可靠性、高性能、面向列、可伸缩的分布式数据库,利用HBase可在廉价PC服务器上搭建起大规模结构化存储集群。HBase的目标是存储并处理大型的数据,更具体来说是仅需使用普通的硬件…

如何去图片水印?三个方法让你学会图片去水印

上大学的时候,老师给我们每人布置了一个关于介绍“我的家乡”的作业。课后在做这个PPT的时候,我发现在网上找的图片素材都带着水印,十分影响PPT展示效果。于是,我就上网找了一些如何去图片水印的方法,对这些方法进行实…

[附源码]Nodejs计算机毕业设计基于的宿舍楼跳蚤市场管理系统Express(程序+LW)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程。欢迎交流 项目运行 环境配置: Node.js Vscode Mysql5.7 HBuilderXNavicat11VueExpress。 项目技术: Express框架 Node.js Vue 等等组成,B/S模式 Vscode管理前后端分…

logrotate 详解

logrotate 程序是一个日志文件管理工具。用于分割日志文件,删除旧的日志文件,并创建新的日志文件,起到“转储”作用。可以节省磁盘空间。下面就对 logrotate 日志轮转操作做一梳理记录。 1、配置文件介绍 Linux系统默认安装logrotate工具&am…

PAT乙级 1084 外观数列 python

题目 思路: 需要注意的点: 只有相邻的字符相同,才统计这个字符出现的次数 利用before作为后一个字符是否与前一个字符相同 进行字符的重复次数统计标志 另: 我发现用下列形式最后一个测试点就会超时 resultresultstr(count) j改…

12.16

为什么使用promise 一.指定回调函数的方式更加灵活 1. 旧的: 必须在启动异步任务前指定 2. promise: 启动异步任务 > 返回promie对象 > 给promise对象绑定回调函 数(甚至可以在异步任务结束后指定/多个) 二. 支持链式调用, 可以解决回调地狱问题 1. 什么是回调地狱?…

90后世界五百强新青年,每年沪漂8个月,长住7天酒店,如何做到不焦虑不躺平?

疫情三年,使许多行业与从业者陷入胶着的困境。特别是身处互联网时代的我们,每天不断接收大量信息,情绪极易被干扰,陷入现实与幻想的拉扯之中。996、内卷、emo、躺平、摆烂等网络热词层出不穷,展现了Z时代青年迷茫、敏感…

MySQL --- 函数大全 6

目录 1.结果集的分区中的每一行分配一个连续的整数 ROW_NUMBER() 2.将字符串追加到指定的数量 RPAD() 3.删除尾随空格 RTRIM() 4.将秒转换为“hh:mm:ss”格式 SEC_TO_TIME() 5.返回指定时间或日期时间值的第二部分 …

预训练时候 Pytorch FrozenBatchNorm (BN)一些小心得

在预训练模型中 会发现 这样使用: # resnet model builder function def build_resnet(archresnet50, pretrainedTrue,freeze_backbone_batchnormTrue, freeze_layer1True,norm_layermisc_nn_ops.FrozenBatchNorm2d):# weightsif pretrained: #如果是预训练 权重是…

Blazorise NumericPicker、DragDrop和Datagrid组件

Blazorise NumericPicker、DragDrop和Datagrid组件 添加了在“BaseAfterRenderComponent”中实现延迟执行的功能。 NumericPicker:现在可以检查大数值。 改进了DragDrop组件。 数据网格:引入了“CancellationTokenSource”和相应的过滤器更改取消选项。 …

通信原理Simulink使用经验

目录 一、如何将Simulink模型文件导出为PDF ​二、如何将模型文件导出为高清位图 三、如何调整鼠标滚轮为上下移动 四、如何控制页面的位置 五、如何得到Scope示波器仿真波形的位图 六、如何修改Scope波形的视图 一、如何将Simulink模型文件导出为PDF 确认此处的打印就可…

2022软考高级架构设计师-经历分享

一.背景 软考全称为【计算机技术与软件专业技术资格(水平)考试】,是由国家人力资源和社会保障部(原人事部)、工业和信息化部(原信息产业部)领导,全国计算机软件考试办公室负责实施和日常管理的对全国计算机与软件专业技术人员进行的职业资格、…

发烧友实测 | 用飞凌OKA40i-C开发板玩转FFmpeg

本篇试用报告由发烧友 zealsoft提供,感谢 zealsoft的支持。飞凌嵌入式会在电子发烧友和电路城论坛持续开展开发板有奖试用活动,更有京东E卡等着你!欢迎大家的持续关注。1. FFmpeg的安装 FFmpeg是用来记录、转换数字音频、视频的开源软件&…

python作业大教学:制作员工管理系统

前言 大家早好、午好、晚好吖 ❤ ~ 环境使用: Python 3.8 Pycharm 功能需求 “”" 创建员工类:属性有:员工号,员工姓名,员工工资; 创建用户类:属性有:用户名,密码&#x…

Excel插件E灵:按家庭建表,创建一户一表。将明细表转成一户一表

单击播放视频: 教你将Excel明细表转换成一户一表,即按家庭创建新表01需求概述 图1是档案明细表,图2是家庭收入采集表的模板样式。现要求将明细表转换成家庭收入采集表,有几个家庭就生成几份单独的采集表。 图1 档案明细表 图2 家…

玩游戏蓝牙耳机哪款好?适合打游戏的无线蓝牙耳机推荐

随着蓝牙耳机越来越受欢迎,喜欢戴蓝牙耳机玩游戏的人也越来越多,特别是近几年随着技术不断地升级,支持低延迟、能听声辨位的游戏蓝牙耳机逐渐火热起来。今天在这里给大家分享几款适合打游戏的无线蓝牙耳机,一起来看看吧。 一、南…

神舟战神z7使用U盘重装系统操作教程分享

神舟战神z7使用U盘重装系统操作教程分享。有用户使用神舟战神z7笔记本电脑的时候安装了一个程序导致了电脑中毒,系统出现了损坏,无法正常的使用。今天教你如何通过U盘重装系统的方法来进行系统的重置,恢复正常的电脑使用。 准备工作&#xff…

复杂度分析

算法 如何评估一个算法的好坏?(事后统计法的应用) 在同一个问题上 比较不同算法对于同一输入的执行时间 事后统计法的缺点 1.严重依赖硬件以及运行时各种不确定因素(比如cpu好一点的效率就高一点) 2.必须编写相应的测试代码 3.测试时输入的数据难以保证公平性…