RocketMQ5.0消息过滤

news2024/12/23 6:34:09

前言

消费者订阅了某个主题后,RocketMQ 会将该主题中的所有消息投递给消费者。若消费者只需要关注部分消息,可通过设置过滤条件在 Broker 端进行过滤,只获取到需要关注的消息子集,避免接收到大量无效的消息。

以电商交易场景为例,用户从下单到拿到商品,中间会产生很多消息,被不同的下游系统订阅消费。下游系统往往只关心自己需要处理的消息,比如支付系统只关心支付消息,这时候生产者就可以在发送消息的时候给消息打上标签,下游系统按需订阅即可。
image.png

过滤方式

RocketMQ 支持两种消息过滤方式。

Tag标签过滤

生产者在发送消息前,可以先给消息打上标签,每条消息最多设置一个 Tag 标签:

Message message = provider.newMessageBuilder()
        .setTopic("Trade_Topic")
        .setTag("pay")
        .setBody("xxx".getBytes())
        .build();
producer.send(message);

消费者配置 Tag 标签过滤规则:

consumer.subscribe("Trade_Topic", 
                   new FilterExpression("pay", FilterExpressionType.TAG));

Tag 标签过滤规则:

  • 单 Tag 匹配:过滤表达式为目标 Tag,相同 Tag 的消息才会投递给消费者
  • 多 Tag 匹配:过滤表达式为多个目标 Tag 用||分割,消息符合任一 Tag 就会被投递
  • 全部匹配:过滤表达式为*,所有消息都会投递

SQL属性过滤

SQL 属性过滤是 RocketMQ 提供的高级消息过滤方式,每个消息都可以额外设置用户属性和系统属性,消费者订阅时可设置 SQL 语法的过滤表达式过滤多个属性。

SQL 过滤也可以实现 Tag 标签过滤的效果,Tag 属于系统属性,属性名称是 TAGS

首先,生产者发送消息前给消息设置自定义属性:

Message message = provider.newMessageBuilder()
        .setTopic("Trade_Topic")
        .setBody("xxx".getBytes())
        .addProperty("price", "99800")
        .addProperty("region", "杭州")
        .build();
producer.send(message);

消费者配置 SQL 过滤规则,这里以 杭州区域价格大于 100 的订单 为例:

consumer.subscribe("Trade_Topic", 
                   new FilterExpression("region='杭州' AND price>10000", FilterExpressionType.SQL92));

SQL 属性过滤使用 SQL92 语法作为过滤规则表达式,语法规范如下:
image.png

如何选择

尽量用 Tag 标签过滤,实现更加轻量级,效率更高,在扫描 ConsumeQueue 时就可以先通过 TagHash 过滤一遍。而消息属性是存储在 CommitLog 文件里的,意味着 SQL 属性过滤必须读到完整的消息才能判断是否要过滤,性能较差。

设计实现

org.apache.rocketmq.store.MessageFilter是 RocketMQ 抽象出来的消息过滤接口,两个方法:

  • isMatchedByConsumeQueue:通过 ConsumeQueue 里的 tagsCode 先匹配一次,也就是 Tag 标签的哈希码,tagsCode 不同 Tag 肯定不同
  • isMatchedByCommitLog:根据 CommitLog 里的完整消息属性匹配
public interface MessageFilter {
    
    boolean isMatchedByConsumeQueue(final Long tagsCode,
        final ConsumeQueueExt.CqExtUnit cqExtUnit);

    boolean isMatchedByCommitLog(final ByteBuffer msgBuffer,
        final Map<String, String> properties);
}

RocketMQ 的处理逻辑是:先根据 ConsumeQueue 里的 tagsCode 过滤,通过了再读取 CommitLog 里的完整消息走 SQL 属性过滤,实现类会根据配置的过滤规则在不关心的过滤方法里直接返回 true。

public GetMessageResult getMessage(){
    ......
	// 先通过consumequeue里的tagsCode过滤
    if (messageFilter != null
        && !messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(), cqUnit.getCqExtUnit())) {
        if (getResult.getBufferTotalSize() == 0) {
            status = GetMessageStatus.NO_MATCHED_MESSAGE;
        }
        continue;
    }
	// 再从CommitLog读取完整消息
	SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
	// 再执行SQL属性过滤
    if (messageFilter != null
        && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
        if (getResult.getBufferTotalSize() == 0) {
            status = GetMessageStatus.NO_MATCHED_MESSAGE;
        }
        selectResult.release();
        continue;
    }
	......
}

Tag 标签过滤的实现
Broker 把消息写入 CommitLog 后,ReputMessageService 线程会每隔 1ms 把新消息写入到 consumequeue 文件,以加速消费者的消费效率。ConsumeQueue 文件由若干个 CqUnit 组成,每个 CqUnit 占用固定的 20 个字节:

CqUnit{
    long offset; // 消息在 CommitLog 偏移量
    int size; // 消息长度
    long tagsCode; // Tag哈希码
}

image.png
消费者在消费 ConsumeQueue 时就可以直接通过 tagsCode 进行标签过滤:

public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
    // by tags code.
    if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
        if (tagsCode == null) {
            return true;
        }
        // '*' 订阅所有
        if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
            return true;
        }
        // 消息的tagsCode是否包含在消费者订阅的Tags里面
        return subscriptionData.getCodeSet().contains(tagsCode.intValue());
    }
}

因为是哈希码,所以 tagsCode 存在哈希冲突的可能性,不过概率极小。万一冲突了,Broker 还是会继续投递消息,RocketMQ 5.0 版本会由 Proxy 再进行一次 Tag 的精准匹配,如果不匹配不会投递给消费者;RocketMQ 4.x 版本由消费者收到消息后自行判断,Tag 不匹配的消息会直接丢弃。

SQL 属性过滤的实现
为了执行 SQL 语法实现属性过滤,SQL 语法会先被编译成 Expression 对象,再由Expression#evaluate方法得出执行结果。

Expression expression = FilterFactory.INSTANCE
						.get(ExpressionType.SQL92)
        				.compile("a>10 AND b<10 OR c=10");
expression.evaluate(context);

要对消息属性过滤,首先要把消息属性提取出来,消息属性由若干个 String 类型的键值对组成,然后执行 SQL。

public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
    // tag过滤 直接返回true
    if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
        return true;
    }
    ConsumerFilterData realFilterData = this.consumerFilterData;
    // 消息属性
    Map<String, String> tempProperties = properties;
    // 没有SQL表达式
    if (realFilterData == null || realFilterData.getExpression() == null
        || realFilterData.getCompiledExpression() == null) {
        return true;
    }
    if (tempProperties == null && msgBuffer != null) {
        // 从CommitLog解码出消息属性
        tempProperties = MessageDecoder.decodeProperties(msgBuffer);
    }
    Object ret = null;
    try {
        MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
        // 执行SQL92表达式过滤
        ret = realFilterData.getCompiledExpression().evaluate(context);
    } catch (Throwable e) {
        log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
    }
    if (ret == null || !(ret instanceof Boolean)) {
        return false;
    }
    return (Boolean) ret;
}

尾巴

消息过滤是 RocketMQ 防止 Broker 端因为投递大量消费者不感兴趣的消息而导致资源浪费的一种手段,消费者可以根据自己感兴趣的消息类型配置过滤规则,分为 Tag 标签过滤 和 SQL 属性过滤两种方式。Tag 标签过滤效率高,因为 Broker 在构建 consumequeue 文件时会写入消息 Tag 的哈希码,直接比较哈希码可以避免通过 CommitLog 读取完整消息。SQL 针对消息属性过滤,此时必须读取到完整的消息才能过滤,效率较低。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1352679.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

手机视频监控客户端APP如何实现跨安卓、苹果和windows平台,并满足不同人的使用习惯

目 录 一、手机视频监控客户端的应用和发展 二、手机视频监控客户端存在的问题 三、HTML5视频监控客户端在手机上实现的方案 &#xff08;一&#xff09;HTML5及其优点 &#xff08;二&#xff09;HTML5在手机上实现视频应用功能的优势 四、手机HTML5…

FFmpeg之——获取上传视频的尺寸(长、宽)

获取上传视频的尺寸&#xff1a; 获取视频尺寸通常需要借助第三方库FFmpeg。 首先&#xff0c;确保你的系统中已安装了FFmpeg&#xff0c;并且FFmpeg的可执行文件路径已经添加到你的系统环境变量中。 1.官网下载ffmpeg 进入 链接: ffmpeg官网 网址&#xff0c;点击下载wind…

milvus学习(一)cosin距离和欧式距离

参考&#xff1a;https://blog.csdn.net/qq_36560894/article/details/115408613 归一化以后的cosin距离和欧式距离可以相互转化&#xff0c;未归一化的不可以相互转化&#xff08;因为距离带单位&#xff09;。

AI的明天从这里开始:OJAC近屿智能带您探索AIGC星辰大海的无限可能!

你是对人工智能充满好奇的编程小白&#xff0c;还是渴望工作赋能的白领&#xff1f;或者是想投身AIGC浪潮的创业者&#xff1f;无论你的背景如何&#xff0c;只要你对AI世界充满热情&#xff0c;我们OJAC近屿智能AIGC星辰大海大模型工程师和产品经理启航班以及系列课程都欢迎您…

利用Spring Cloud和Java系统设置优化工程项目管理系统源码的二次开发体验

工程项目管理涉及众多环节和角色&#xff0c;如何实现高效协同和信息共享是关键。本文将介绍一个采用先进技术框架的Java版工程项目管理系统&#xff0c;该系统支持前后端分离&#xff0c;功能全面&#xff0c;可满足不同角色的需求。从项目进度图表到施工地图&#xff0c;再到…

Java 语言概述

Java 概述 是 SUN&#xff08;Stanford University Network&#xff0c;斯坦福大学网络公司&#xff09;1995年推出的一门高级编程语言 是一种面向 Internet 的编程语言。Java 一开始富有吸引力是因为 Java 程序可以在 Web 浏览器中运行。这些 Java 程序被称为 Java 小程序&am…

深入研究矫正单应性矩阵用于立体相机在线自标定

文章&#xff1a;Dive Deeper into Rectifying Homography for Stereo Camera Online Self-Calibration 作者&#xff1a;Hongbo Zhao, Yikang Zhang, Qijun Chen,, and Rui Fan 编辑&#xff1a;点云PCL 欢迎各位加入知识星球&#xff0c;获取PDF论文&#xff0c;欢迎转发朋…

文件监控软件丨文件权限管理工具

文件已经成为企业最重要的资产之一。然而&#xff0c;文件的安全性和完整性经常受到威胁&#xff0c;如恶意软件感染、人为误操作、内部泄密等。 为了确保文件的安全&#xff0c;文件监控软件应运而生。本文将深入探讨文件监控软件的概念、功能、应用场景和未来发展等方面。 文…

Deep Q-Network (DQN)理解

DQN&#xff08;Deep Q-Network&#xff09;是深度强化学习&#xff08;Deep Reinforcement Learning&#xff09;的开山之作&#xff0c;将深度学习引入强化学习中&#xff0c;构建了 Perception 到 Decision 的 End-to-end 架构。DQN 最开始由 DeepMind 发表在 NIPS 2013&…

中兴通讯携手龙蜥社区,共创繁荣生态 | 2023龙蜥操作系统大会

12 月 17-18 日&#xff0c;由开放原子开源基金会指导&#xff0c;龙蜥社区主办&#xff0c;阿里云、中兴通讯、浪潮信息、Arm、Intel 等 24 家理事单位共同承办&#xff0c;主题为“云智融合共筑未来”的 2023 龙蜥操作系统大会在北京圆满结束。本次大会上&#xff0c;中兴通讯…

海外静态IP和动态IP有什么区别?推荐哪种?

什么是静态ip、动态ip&#xff0c;二者有什么区别&#xff1f;哪种好&#xff1f;关于这个问题&#xff0c;不难发现&#xff0c;在知道、知乎上面的解释有很多&#xff0c;但据小编的发现&#xff0c;这些回答都是关于静态ip和动态ip的专业术语解释&#xff0c;普通非专业人事…

java生产设备效率管理系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 java Web生产设备效率管理系统是一套完善的java web信息管理系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库为ac…

web自动化(6)——项目配置和Grid分布式

1. 框架的可配置性 项目之间的区别&#xff1a; 兼容性&#xff1a;有些项目只兼容chrome&#xff0c;有些只兼容Firefox等元素定位特点&#xff1a;有些项目闪现快&#xff0c;有的项目很慢有些项目集成Jenkins&#xff0c;不需要用python生成allure报告 如果想要我们的框架…

分布式(8)

目录 36.什么是TCC&#xff1f; 37.分布式系统中常用的缓存方案有哪些&#xff1f; 38.分布式系统缓存的更新模式&#xff1f; 39.分布式缓存的淘汰策略&#xff1f; 40.Java中定时任务有哪些&#xff1f;如何演化的&#xff1f; 36.什么是TCC&#xff1f; TCC&#xff08…

HTML5+CSS3③——无语义布局标签、画盒子、CSS定义、CSS引入方式

目录 无语义布局标签 画盒子 CSS定义 小结 CSS引入方式 小结 无语义布局标签 画盒子 CSS定义 小结 CSS引入方式 小结

潮玩宇宙大逃杀游戏搭建

潮玩宇宙是当下较火的社交互动平台&#xff0c;它不仅涵盖了各种潮玩商品&#xff0c;还拥有各种游戏玩法&#xff0c;尤其是大逃杀游戏非常火爆&#xff01;本文将介绍大逃杀游戏的开发和发展前景。 大逃杀游戏 大逃杀游戏是当下的一种新型游戏模式&#xff0c;旨在为玩家提供…

十分钟带你学会用python3网络爬虫抓取猫眼电影排行!

本节中&#xff0c;我们利用requests库和正则表达式来抓取猫眼电影TOP100的相关内容。requests比urllib使用更加方便&#xff0c;而且目前我们还没有系统学习HTML解析库&#xff0c;所以这里就选用正则表达式来作为解析工具。 1. 本节目标 本节中&#xff0c;我们要提取出猫眼…

ConcurrentHashMap源码学习

实现接口 ConcurrentMap&#xff08;Map的基础方法&#xff09;、Serializable(序列化) 基础属性 最大容量&#xff1a;2^30 默认容量&#xff1a;16 常用方法 PUT 调用PutVal方法进行插入。 判断key或value是否为空&#xff1a; 是&#xff1a;抛出空指针一场 否&#xff…

系列六、RestTemplate

一、RestTemplate 1.1、概述 RestTemplate是一种便捷的访问RestFul服务的模板类&#xff0c;是Spring提供的用于访问Rest服务的客户端模板工具集&#xff0c;它提供了多种便捷访问远程HTTP服务的方法。 1.2、API https://docs.spring.io/spring-framework/docs/5.2.2.RELEASE…

【中小型企业网络实战案例 七】配置限速

相关学习文章&#xff1a; 【中小型企业网络实战案例 一】规划、需求和基本配置 【中小型企业网络实战案例 二】配置网络互连互通【中小型企业网络实战案例 三】配置DHCP动态分配地址 【中小型企业网络实战案例 四】配置OSPF动态路由协议【中小型企业网络实战案例 五】配置可…