前言
消费者订阅了某个主题后,RocketMQ 会将该主题中的所有消息投递给消费者。若消费者只需要关注部分消息,可通过设置过滤条件在 Broker 端进行过滤,只获取到需要关注的消息子集,避免接收到大量无效的消息。
以电商交易场景为例,用户从下单到拿到商品,中间会产生很多消息,被不同的下游系统订阅消费。下游系统往往只关心自己需要处理的消息,比如支付系统只关心支付消息,这时候生产者就可以在发送消息的时候给消息打上标签,下游系统按需订阅即可。
过滤方式
RocketMQ 支持两种消息过滤方式。
Tag标签过滤
生产者在发送消息前,可以先给消息打上标签,每条消息最多设置一个 Tag 标签:
Message message = provider.newMessageBuilder()
.setTopic("Trade_Topic")
.setTag("pay")
.setBody("xxx".getBytes())
.build();
producer.send(message);
消费者配置 Tag 标签过滤规则:
consumer.subscribe("Trade_Topic",
new FilterExpression("pay", FilterExpressionType.TAG));
Tag 标签过滤规则:
- 单 Tag 匹配:过滤表达式为目标 Tag,相同 Tag 的消息才会投递给消费者
- 多 Tag 匹配:过滤表达式为多个目标 Tag 用
||
分割,消息符合任一 Tag 就会被投递 - 全部匹配:过滤表达式为
*
,所有消息都会投递
SQL属性过滤
SQL 属性过滤是 RocketMQ 提供的高级消息过滤方式,每个消息都可以额外设置用户属性和系统属性,消费者订阅时可设置 SQL 语法的过滤表达式过滤多个属性。
SQL 过滤也可以实现 Tag 标签过滤的效果,Tag 属于系统属性,属性名称是 TAGS
首先,生产者发送消息前给消息设置自定义属性:
Message message = provider.newMessageBuilder()
.setTopic("Trade_Topic")
.setBody("xxx".getBytes())
.addProperty("price", "99800")
.addProperty("region", "杭州")
.build();
producer.send(message);
消费者配置 SQL 过滤规则,这里以 杭州区域价格大于 100 的订单 为例:
consumer.subscribe("Trade_Topic",
new FilterExpression("region='杭州' AND price>10000", FilterExpressionType.SQL92));
SQL 属性过滤使用 SQL92 语法作为过滤规则表达式,语法规范如下:
如何选择
尽量用 Tag 标签过滤,实现更加轻量级,效率更高,在扫描 ConsumeQueue 时就可以先通过 TagHash 过滤一遍。而消息属性是存储在 CommitLog 文件里的,意味着 SQL 属性过滤必须读到完整的消息才能判断是否要过滤,性能较差。
设计实现
org.apache.rocketmq.store.MessageFilter
是 RocketMQ 抽象出来的消息过滤接口,两个方法:
- isMatchedByConsumeQueue:通过 ConsumeQueue 里的 tagsCode 先匹配一次,也就是 Tag 标签的哈希码,tagsCode 不同 Tag 肯定不同
- isMatchedByCommitLog:根据 CommitLog 里的完整消息属性匹配
public interface MessageFilter {
boolean isMatchedByConsumeQueue(final Long tagsCode,
final ConsumeQueueExt.CqExtUnit cqExtUnit);
boolean isMatchedByCommitLog(final ByteBuffer msgBuffer,
final Map<String, String> properties);
}
RocketMQ 的处理逻辑是:先根据 ConsumeQueue 里的 tagsCode 过滤,通过了再读取 CommitLog 里的完整消息走 SQL 属性过滤,实现类会根据配置的过滤规则在不关心的过滤方法里直接返回 true。
public GetMessageResult getMessage(){
......
// 先通过consumequeue里的tagsCode过滤
if (messageFilter != null
&& !messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(), cqUnit.getCqExtUnit())) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
continue;
}
// 再从CommitLog读取完整消息
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
// 再执行SQL属性过滤
if (messageFilter != null
&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
selectResult.release();
continue;
}
......
}
Tag 标签过滤的实现
Broker 把消息写入 CommitLog 后,ReputMessageService 线程会每隔 1ms 把新消息写入到 consumequeue 文件,以加速消费者的消费效率。ConsumeQueue 文件由若干个 CqUnit 组成,每个 CqUnit 占用固定的 20 个字节:
CqUnit{
long offset; // 消息在 CommitLog 偏移量
int size; // 消息长度
long tagsCode; // Tag哈希码
}
消费者在消费 ConsumeQueue 时就可以直接通过 tagsCode 进行标签过滤:
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
// by tags code.
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
if (tagsCode == null) {
return true;
}
// '*' 订阅所有
if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
return true;
}
// 消息的tagsCode是否包含在消费者订阅的Tags里面
return subscriptionData.getCodeSet().contains(tagsCode.intValue());
}
}
因为是哈希码,所以 tagsCode 存在哈希冲突的可能性,不过概率极小。万一冲突了,Broker 还是会继续投递消息,RocketMQ 5.0 版本会由 Proxy 再进行一次 Tag 的精准匹配,如果不匹配不会投递给消费者;RocketMQ 4.x 版本由消费者收到消息后自行判断,Tag 不匹配的消息会直接丢弃。
SQL 属性过滤的实现
为了执行 SQL 语法实现属性过滤,SQL 语法会先被编译成 Expression 对象,再由Expression#evaluate
方法得出执行结果。
Expression expression = FilterFactory.INSTANCE
.get(ExpressionType.SQL92)
.compile("a>10 AND b<10 OR c=10");
expression.evaluate(context);
要对消息属性过滤,首先要把消息属性提取出来,消息属性由若干个 String 类型的键值对组成,然后执行 SQL。
public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
// tag过滤 直接返回true
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
return true;
}
ConsumerFilterData realFilterData = this.consumerFilterData;
// 消息属性
Map<String, String> tempProperties = properties;
// 没有SQL表达式
if (realFilterData == null || realFilterData.getExpression() == null
|| realFilterData.getCompiledExpression() == null) {
return true;
}
if (tempProperties == null && msgBuffer != null) {
// 从CommitLog解码出消息属性
tempProperties = MessageDecoder.decodeProperties(msgBuffer);
}
Object ret = null;
try {
MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
// 执行SQL92表达式过滤
ret = realFilterData.getCompiledExpression().evaluate(context);
} catch (Throwable e) {
log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
}
if (ret == null || !(ret instanceof Boolean)) {
return false;
}
return (Boolean) ret;
}
尾巴
消息过滤是 RocketMQ 防止 Broker 端因为投递大量消费者不感兴趣的消息而导致资源浪费的一种手段,消费者可以根据自己感兴趣的消息类型配置过滤规则,分为 Tag 标签过滤 和 SQL 属性过滤两种方式。Tag 标签过滤效率高,因为 Broker 在构建 consumequeue 文件时会写入消息 Tag 的哈希码,直接比较哈希码可以避免通过 CommitLog 读取完整消息。SQL 针对消息属性过滤,此时必须读取到完整的消息才能过滤,效率较低。