1、目的
消费者订阅了某个主题后,Apache RocketMQ 会将该主题中的所有消息投递给消费者。若消费者只需要关注部分消息,可通过设置过滤条件在 Apache RocketMQ 服务端进行过滤,只获取到需要关注的消息子集,避免接收到大量无效的消息。本文介绍消息过滤的定义、原理、分类及不同过滤方式的使用方法、配置示例等。
2、应用场景
Apache RocketMQ 作为发布订阅模型的消息中间件广泛应用于上下游业务集成场景。在实际业务场景中,同一个主题下的消息往往会被多个不同的下游业务方处理,各下游的处理逻辑不同,只关注自身逻辑需要的消息子集。
使用 Apache RocketMQ 的消息过滤功能,可以帮助消费者更高效地过滤自己需要的消息集合,避免大量无效消息投递给消费者,降低下游系统处理压力。
Apache RocketMQ 主要解决的单个业务域即同一个主题内不同消息子集的过滤问题,一般是基于同一业务下更具体的分类进行过滤匹配。如果是需要对不同业务域的消息进行拆分,建议使用不同主题处理不同业务域的消息。
场景示例
以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息:
-
订单消息
-
支付消息
-
物流消息
这些消息会发送到名称为Trade_Topic的Topic中,被各个不同的下游系统所订阅:
-
支付系统:只需订阅支付消息。
-
物流系统:只需订阅物流消息。
-
交易成功率分析系统:需订阅订单和支付消息。
-
实时计算系统:需要订阅所有和交易相关的消息。
过滤效果如下图所示:
3、功能概述
3.1 消息过滤定义
过滤的含义指的是将符合条件的消息投递给消费者,而不是将匹配到的消息过滤掉。
Apache RocketMQ 的消息过滤功能通过生产者和消费者对消息的属性、标签进行定义,并在 Apache RocketMQ 服务端根据过滤条件进行筛选匹配,将符合条件的消息投递给消费者进行消费。
3.2 消息过滤原理
消息过滤主要通过以下几个关键流程实现:
-
生产者:生产者在初始化消息时预先为消息设置一些属性和标签,用于后续消费时指定过滤目标。
-
消费者:消费者在初始化及后续消费流程中通过调用订阅关系注册接口,向服务端上报需要订阅指定主题的哪些消息,即过滤条件。
-
服务端:消费者获取消息时会触发服务端的动态过滤计算,Apache RocketMQ 服务端根据消费者上报的过滤条件的表达式进行匹配,并将符合条件的消息投递给消费者。
3.3 消息过滤分类
Apache RocketMQ 支持Tag标签过滤和SQL属性过滤,这两种过滤方式对比如下:
对比项 | Tag标签过滤 | SQL属性过滤 |
---|---|---|
过滤目标 | 消息的Tag标签。 | 消息的属性,包括用户自定义属性以及系统属性(Tag是一种系统属性)。 |
过滤能力 | 精准匹配。 | SQL语法匹配。 |
适用场景 | 简单过滤场景、计算逻辑简单轻量。 | 复杂过滤场景、计算逻辑较复杂。 |
4、订阅关系一致性
过滤表达式属于订阅关系的一部分,Apache RocketMQ 的领域模型规定,同一消费者分组内的多个消费者的订阅关系包括过滤表达式,必须保持一致,否则可能会导致部分消息消费不到。更多信息,请参见订阅关系(Subscription)。
5、使用建议
合理划分主题和Tag标签
从消息的过滤机制和主题的原理机制可以看出,业务消息的拆分可以基于主题进行筛选,也可以基于主题内消息的Tag标签及属性进行筛选。关于拆分方式的选择,应遵循以下原则:
-
消息类型是否一致:不同类型的消息,如顺序消息和普通消息需要使用不同的主题进行拆分,无法通过Tag标签进行分类。
-
业务域是否相同:不同业务域和部门的消息应该拆分不同的主题。例如物流消息和支付消息应该使用两个不同的主题;同样是一个主题内的物流消息,普通物流消息和加急物流消息则可以通过不同的Tag进行区分。
-
消息量级和重要性是否一致:如果消息的量级规模存在巨大差异,或者说消息的链路重要程度存在差异,则应该使用不同的主题进行隔离拆分。
6、注意事项
6.1 一个消费组内订阅同一个主题不同的TAG为什么会丢消息
原因
在RocketMQ中使用集群模式消费时,同一个消费组中的多个消费者共同完成主题中的队列的消费,即一个消费者只会分配到其中某几个队列,并且同一时间,一个队列只会分配给一个消费者,这样结合上面的的过滤机制,就会明显有问题,请看示例图:
C1=192.168.3.10,C2=192.168.3.11
其问题的核心关键是,同一个tag会分布在不同的队列中,但消费者C1分配到的队列为q0,q1,q0,q1中有TAGA和TAGB的消息,但TAGB的消息会被消费者C1过滤,但这部分消息却不会被C2消费,造成了消息丢失。
解决方案
同一个topic(主题)针对不同的tag配置不同的group即可。
参考:
rocketmq4:基本概念 | RocketMQ
rocketmq5:消息过滤 | RocketMQ