在Broker端进行消息过滤

news2025/1/21 21:59:47

        在Broker端进行消息过滤,可以减少无效消息发送到Consumer,少占用网络带宽从而提高吞吐量。Broker端有三种方式进行消息过滤。

1.消息的Tag和Key

        对一个应用来说,尽可能只用一个Topic,不同的消息子类型用Tag来标识(每条消息只能有一个Tag),服务器端基于Tag进行过滤,并不需要读取消息体的内容,所以效率很高。发送消息设置了Tag以后,消费方在订阅消息时,才可以利用Tag在Broker端做消息过滤。其次是消息的Key。对发送的消息设置好Key,以后可以根据这个Key来查找消息。所以这个Key一般用消息在业务层面的唯一标识码来表示,这样后续查询消息异常,消息丢失等都很方便。Broker会创建专门的索引文件,来存储Key到消息的映射,由于是哈希索引,应尽量使Key唯一,避免潜在的哈希冲突。Tag和Key的主要差别是使用场景不同,Tag用在Consumer的代码中,用来进行服务端消息过滤,Key主要用于通过命令行查询消息。

2.通过Tag进行过滤

        用Tag方式进行过滤的方法是传入感兴趣的Tag标签,Tag标签是一个普通字符串,是在创建Message的时候添加的,一个Message只能有一个Tag。使用Tag方式过滤非常高效,Broker端可以在ConsumeQueue中做这种过滤,只从CommitLog里读取过滤后被命中的消息。看一下ConsumerQueue的存储格式,如图7-1所示。

图7-1 ConsumerQueue的存储格式

Consume Queue的第三部分存储的是Tag对应的hashcode,是一个定长的字符串,通过Tag过滤的过程就是对比定长的hashcode。经过hashcode对比,符合要求的消息被从CommitLog读取出来,不用担心Hash冲突问题,消息在被消费前,会对比完整的Message Tag字符串,消除Hash冲突造成的误读。

3.用SQL表达式的方式进行过滤

        使用Tag方式过滤虽然高效,但是支持的逻辑比较简单,在构造Message的时候,还可以通过putUserProperty函数来增加多个自定义的属性,基于这些属性可以做复杂的过滤逻辑,如代码清单7-1所示。

代码清单7-1 在消息中增加自定义属性

Message msg = new Message("TopicTest",
    tag,
    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));
msg.putUserProperty("b",  “hello”);

 

代码中这个消息就有了两个特殊的属性值a和b,我们用类似SQL表达式的方式对消息进行过滤,用法如下(目前只支持在PushConsumer中实现这种过滤):

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");  // only subsribe messages have property a, also a >=0 and a <= 3

consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently()
{    

@Override    

public ConsumeConcurrentlyStatus consumeMessage
    (List<MessageExt> msgs, ConsumeConcurrentlyContext context)
{        

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;    

}

});

consumer.start();
 

类似SQL的过滤表达式,支持如下语法:

·数字对比,比如>、>=、<、<=、BETWEEN、=;

·字符串对比,比如=、<>、IN;

·IS NULL or IS NOT NULL;

·逻辑符号AND、OR、NOT。

支持的数据类型:

·数字型,比如123、3.1415;

·字符型,比如'abc'、注意必须用单引号;

·NULL,这个特殊字符;

·布尔型,TRUEorFALSE。

SQL表达式方式的过滤需要Broker先读出消息里的属性内容,然后做SQL计算,增大磁盘压力,没有Tag方式高效。

4.Filter Server方式过滤

        Filter Server是一种比SQL表达式更灵活的过滤方式,允许用户自定义Java函数,根据Java函数的逻辑对消息进行过滤。要使用Filter Server,首先要在启动Broker前在配置文件里加上filterServer-Nums=3这样的配置,Broker在启动的时候,就会在本机启动3个Filter Server进程。Filter Server类似一个RocketMQ的Consumer进程,它从本机Broker获取消息,然后根据用户上传过来的Java函数进行过滤,过滤后的消息再传给远端的Consumer。这种方式会占用很多Broker机器的CPU资源,要根据实际情况谨慎使用。上传的java代码也要经过检查,不能有申请大内存、创建线程等这样的操作,否则容易造成Broker服务器宕机。实现过滤逻辑的示例如代码清单7-2所示。

代码清单7-2 实现过滤逻辑的代码示例

public class MessageFilterImpl implements MessageFilter {
    @Override
    public boolean match(MessageExt msg) {
        String property = msg.getUserProperty("SequenceId");
        if (property != null) {
            int id = Integer.parseInt(property);
            if ((id % 3) == 0 && (id > 10)) {
                return true;
            }
        }
        return false;
    }
}

 

上面代码实现了过滤逻辑,它是根据消息的“SequenceId”这个属性来过滤的,其实不一定要根据消息属性来过滤,也可以根据消息体的内容或其他特征过滤,如代码清单7-3所示。

代码清单7-3 使用FilterServer的Consumer示例

public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer-GroupNamecc4");
    // 使用Java代码,在服务器做消息过滤
    String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java");
        consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl", filterCode);
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }

 

在使用Filter Server的Consumer例子中,主要是把实现过滤逻辑的类作为参数传到Broker端,Broker端的Filter Server会解析这个类,然后根据match函数里的逻辑进行过滤。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1214969.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

工业机器人轨迹规划研究进展及发展趋势

原创 | 文 BFT机器人 01 轨迹规划简介 轨迹规划是工业机器人运动控制的基础&#xff0c;对工业机器人的工作效率和稳定性有重大影响。为掌握工业机器人轨迹规划方法的研究现状&#xff0c;根据工业机器人规划空间和优化目标的不同对轨迹规划方法进行分类&#xff0c;介绍了直…

SOP作业指导书系统如何帮助厂家实现数字化转型

SOP&#xff08;Standard Operating Procedure&#xff0c;标准操作程序&#xff09;电子作业操作手册的应用对于厂家实现数字化转型起着至关重要的作用。本文将探讨SOP电子作业操作手册如何帮助厂家实现数字化转型的重要性和优势。 首先&#xff0c;SOP作业指导书可以提高生产…

七、Nacos和Eureka的区别

一、nacos注册中心 二、临时实例与非临时实例 三、区别 Nacos支持服务端主动检测提供者状态:临时实例采用心跳模式&#xff0c;非临时实例采用主动检测模式临时实例心跳不正常会被剔除&#xff0c;非临时实例则不会被剔除Nacos支持服务列表变更的消息推送模式&#xff0c;服务…

K-means聚类方法

K-means聚类的思想和原理 模型介绍 对于有监督的数据挖掘算法而言&#xff0c;数据集中需要包含标签变量&#xff08;即因变量y的值&#xff09;。但在有些场景下&#xff0c;并没有给定的y值&#xff0c;对于这类数据的建模&#xff0c;一般称为无监督的数据挖掘算法&#x…

解密Vue中key的神奇原理:优化列表渲染效率的关键策略!

&#x1f3ac; 江城开朗的豌豆&#xff1a;个人主页 &#x1f525; 个人专栏 :《 VUE 》 《 javaScript 》 &#x1f4dd; 个人网站 :《 江城开朗的豌豆&#x1fadb; 》 ⛺️ 生活的理想&#xff0c;就是为了理想的生活 ! ​ 目录 ⭐ 专栏简介 &#x1f4d8; 文章引言 一…

wps、office插入的复选框无法设置字体及大小?教你一招

插入的表单无法设置字体及大小 脑瓜子嗡嗡的吧&#xff1f;&#xff01;&#xff01; 如果没有强制要求&#xff0c;建议就换成开发工具下的复选框吧 如果一定要用上面这种&#xff0c;就自己做一个吧&#xff0c;设置方法如下 制作方法&#xff1a;插入选项卡插入窗体的复选框…

攀登代码巅峰:架构师成长之路不可错过的软件架构好书

架构师成长推荐书 概述好书推荐《高并发架构实战&#xff1a;从需求分析到系统设计》《架构师的自我修炼&#xff1a;技术、架构和未来》《中台架构与实现&#xff1a;基于DDD和微服务》《分布式系统架构&#xff1a;架构策略与难题求解》《流程自动化实战&#xff1a;系统架构…

开发一款小程序游戏需要多少钱?

小程序游戏的开发成本因多种因素而异&#xff0c;无法提供具体的固定数字。以下是影响小程序游戏开发成本的一些关键因素&#xff1a; 游戏规模和复杂度&#xff1a; 小程序游戏可以是简单的休闲游戏&#xff0c;也可以是更复杂的策略游戏。规模和复杂度会影响开发所需的时间和…

3.3 Windows驱动开发:内核MDL读写进程内存

MDL内存读写是一种通过创建MDL结构体来实现跨进程内存读写的方式。在Windows操作系统中&#xff0c;每个进程都有自己独立的虚拟地址空间&#xff0c;不同进程之间的内存空间是隔离的。因此&#xff0c;要在一个进程中读取或写入另一个进程的内存数据&#xff0c;需要先将目标进…

第07章 面向对象编程(进阶)

一 关键字&#xff1a;this 1.1 this是什么&#xff1f; 在Java中&#xff0c;this关键字不算难理解&#xff0c;它的作用和其词义很接近。 它在方法&#xff08;准确的说是实例方法或非static的方法&#xff09;内部使用&#xff0c;表示调用该方法的对象。它在构造器内部使…

超越传统:明懿金汇定义现代金融服务

量化交易的新纪元&#xff1a;明懿金汇引领创新浪潮 在数字化时代的飞速发展下&#xff0c;明懿金汇凭借其独特的跟单平台和卓越的金融服务&#xff0c;成为互联网金融行业的佼佼者。自2020年起&#xff0c;公司重点投资于互联网金融行业&#xff0c;并通过与国内知名证券软件开…

中国首幅1米分辨率土地覆盖图

SinoLC-1&#xff1a;中国1米分辨率土地覆盖图为首个具有中国国家尺度覆盖&#xff0c;空间分辨率1米的土地覆盖专题图。针对大范围高分辨率土地覆盖制图中地物复杂多样、高精度训练样本缺乏、制图方法区域迁移性要求高等关键难题&#xff0c;中国地质大学&#xff08;武汉&…

【MySQL学习笔记-001】- 创建表、插入数据、查看数据库结构

创建employees表 当创建一个表时&#xff0c;需要指定表的名称和每个列的名称和数据类型。以下是一个示例SQL语句&#xff0c;用于创建一个名为"employees"的表&#xff0c;其中包含员工ID、姓名、职位和工资等列&#xff1a; CREATE TABLE employees (employee_id…

35岁遭遇父亲肺癌、失业、失恋. . . . . .

写在前面 目前已经上班快两个月了&#xff0c;对现在的工作很满意&#xff0c;甚至说更喜欢这的氛围吧。 如题所示&#xff0c;从今年5月开始&#xff0c;发生的所有事&#xff0c;都完全超出了我自己可以承受的范围&#xff0c;好在这一切都过去了&#xff0c;真的感谢上天安…

从程序员到架构师,实现技术巅峰的完美转型

文章目录 一、程序员到架构师的转型过程1. 技术知识的积累2. 设计和决策能力的提升3. 沟通和协调能力的锻炼4. 批判性思维和解决问题能力的培养5. 不断学习和创新的精神 二、转型中需要克服的困难和挑战1. 技术知识的广度和深度2. 设计和决策的难度和风险3. 沟通和协调的挑战4.…

WorkPlus移动数字化平台高定制化服务,贴身满足企业的个性化需求

在企业协同沟通领域&#xff0c;企业微信、钉钉、飞书等平台已经成为了常见的选择。然而&#xff0c;WorkPlus作为一款独具特色的沟通协作平台&#xff0c;能够提供优质的原厂平台级定制化服务&#xff0c;从而满足企业的安全特性、强可控要求以及高度定制化的业务场景&#xf…

layui表头多出一列(已解决)

问题描述 &#xff1a;layui表头多出来一列&#xff0c;但是表体没有内容&#xff0c;很影响美观。 好像是原本的表格有滚轮&#xff0c;我操作放大之后滚轮没有了&#xff0c;但是滚轮自带的表头样式还在&#xff0c; 之后手动把这个样式隐藏掉了&#xff0c;代码如下&#xf…

避免defer陷阱:拆解延迟语句,掌握正确使用方法

基本概念 Go语言的延迟语句defer有哪些特点&#xff1f;通常在什么情况下使用&#xff1f; Go语言的延迟语句&#xff08;defer statement&#xff09;具有以下特点&#xff1a; 延迟执行&#xff1a;延迟语句会在包含它的函数执行结束前执行&#xff0c;无论函数是正常返回还是…

技术管理责任制度《三》

为了加强新时期科技档案的保密工作&#xff0c;确保档案在保管、利用、复制、销毁过程中的保密工作&#xff0c;特规定如下&#xff1a; 彩虹图纸管理软件_图纸管理系统_图纸文档管理软件系统_彩虹EDM【官网】 1、档案员要认真学习和严格执行国家有关安全、保密制度规定&#…

关于数据mysql ->maxwell->kafka的数据传输

个人名片&#xff1a; &#x1f405;作者简介&#xff1a;一名大三在校生&#xff0c;热爱生活&#xff0c;爱好敲码&#xff01; \ &#x1f485;个人主页 &#x1f947;&#xff1a;holy-wangle ➡系列内容&#xff1a; &#x1f5bc;️ tkinter前端窗口界面创建与优化 &…