【RocketMQ】RocketMQ标签、过滤及消息重复消费
文章目录
- 【RocketMQ】RocketMQ标签、过滤及消息重复消费
- 1. 标签(Tag)
- 1.1 示例
- 2. 键(Keys)
- 2.1 示例
- 3. 消息重复消费
- 3.1 示例
参考文档: 官方文档
1. 标签(Tag)
Topic 与 Tag 都是业务上用来归类的标识,区别在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。使用 Tag 可以实现对 Topic 中的消息进行过滤。
注:
- Topic:消息主题,通过 Topic 对不同的业务消息进行分类。
- Tag:消息标签,用来进一步区分某个 Topic 下的消息分类,消息从生产者发出即带上的属性。
什么时候应该用Topic,什么时候该用Tag?
可以从以下几个方面进行判断:
- 消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分。
- 业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。
- 消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分。
- 消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic。
总的来说,针对消息分类,您可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系。
1.1 示例
生产者发送含tag的消息到broker:
@Test
public void tagProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
//tag为vip1
Message message = new Message("tagTopic", "vip1", "我是vip1".getBytes());
//tag为vip2
Message message2 = new Message("tagTopic", "vip2", "我是vip2".getBytes());
producer.send(message);
producer.send(message2);
System.out.println("发送成功");
producer.shutdown();
}
消费者1接收 tag为vip1 的消息:
@Test
public void tagConsumer1() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
//tag为vip1
consumer.subscribe("tagTopic", "vip1");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println("我是VIP1的消费者,我正在消费消息:" + new String(list.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
消费者2接收 tag为vip1和vip2 的消息:
@Test
public void tagConsumer2() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-b");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("tagTopic", "vip1 || vip2");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println("我是VIP1和VIP2的消费者,我正在消费消息:" + new String(list.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
2. 键(Keys)
RocketMQ 每个消息可以在业务层面的设置唯一标识码 keys 字段,方便将来定位消息丢失问题。 Broker 端会为每个消息创建索引(哈希索引),应用可以通过 topic、key 来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。
注:msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。
2.1 示例
生产者发送一条 tag为key1 ,keys为key 的消息:
@Test
public void keyProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("key-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
String key = UUID.randomUUID().toString();
System.out.println(key);
Message message = new Message("keyTopic", "key1", key, "我是key1".getBytes());
producer.send(message);
System.out.println("发送成功");
producer.shutdown();
}
消费者接收消息,得到keys:
@Test
public void keyConsumer1() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("key-consumer-group-a");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("keyTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt messageExt = list.get(0);
System.out.println("我是VIP1的消费者,我正在消费消息:" + new String(messageExt.getBody()));
System.out.println(messageExt.getMsgId());
System.out.println("我们业务的标识:" + messageExt.getKeys());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
3. 消息重复消费
消息为什么会重复消费?
- 生产者重复投递消息
- 消费者扩容导致重平衡
如何解决:
- 数据库去重表,保证幂等
- 数据库唯一索引约束
- redis setnx
- 布隆过滤器
3.1 示例
生产者发送两条相同的消息:
@org.junit.jupiter.api.Test
public void repeatProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("repeat-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
String key = UUID.randomUUID().toString();
System.out.println(key);
//发送两条相同的消息,制造重复消费场景
Message m1 = new Message("repeatTopic", null, key, "扣减库存".getBytes());
Message m1Repeat = new Message("repeatTopic", null, key, "扣减库存".getBytes());
producer.send(m1);
producer.send(m1Repeat);
System.out.println("发送成功");
producer.shutdown();
}
消费者解决重复消费消息:
@org.junit.jupiter.api.Test
void repeatConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("repeatTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 先拿key
MessageExt messageExt = msgs.get(0);
String keys = messageExt.getKeys();
// 原生方式操作
Connection connection = null;
try {
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2B8&useSSL=false", "root", "123456");
} catch (SQLException e) {
e.printStackTrace();
}
PreparedStatement statement = null;
try {
// 插入数据库 因为我们 key做了唯一索引
statement = connection.prepareStatement("insert into order_log(`type`, `order_sn`, `user`) values (1,'" + keys + "','123')");
} catch (SQLException e) {
e.printStackTrace();
}
try {
// 新增 要么成功 要么报错 修改 要么成功,要么返回0 要么报错
statement.executeUpdate();
} catch (SQLException e) {
System.out.println("executeUpdate");
if (e instanceof SQLIntegrityConstraintViolationException) {
// 唯一索引冲突异常
// 说明消息来过了
System.out.println("该消息来过了");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
e.printStackTrace();
}
// 处理业务逻辑
// 如果业务报错 则删除掉这个去重表记录 delete order_log where order_sn = keys;
System.out.println(new String(messageExt.getBody()));
System.out.println(keys);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
消费者接收到消息,直接插入数据库,由数据库根据唯一索引约束对key进行判断,如果成功插入,则继续执行业务逻辑;如果失败,则直接返回。