一、RocketMQ消息重复消费问题
1.1、官网
1.2、消息重复被消费原因
通过上述官网的描述我们可以知道,RocketMQ中的消息是存在重复消费的情况的。那么消息为什么会被重复消费呢?先来回顾一下RocketMQ的消息是怎么发送和接收的:
从上图可以看出,导致消息被重复消费的原因有2个:
(1)生产者重复投递消息;
(2)消费者扩容reBalance(重平衡);
1.3、如何解决
1.3.1、概述
在了解了消息被重复消费的原因后,解决起来思路就比较清晰了。这里给出一种比较常见的解决方案:生产者发送消息时给消息带一个唯一标识(例如订单系统,可以将订单号作为唯一标识),消费者消费消息时要控制消息的幂等性。
1.3.2、幂等性
所谓幂等性,通俗的讲就是多次对某条数据的操作产生的影响和第一次操作产生的影响相同。举个MySQL中增删改查的例子方便大家伙理解:例如数据库中存在一个account表,它有如下字段:id name balance;那么执行增删改时,幂等性情况如下:
1.3.3、解决方法(思路)
(1)发送者发送消息时,需要给消息携带一个唯一表示(自己业务控制);
(2)消费者收到消息后,要控制消息的幂等性;
1.3.4、案例
/**
* 测试消息重复消费:生产者端发送两条key相同的消息
*/
@Test
public void sendRepeatableMessage() {
String key = LocalDateTimeUtil.format(LocalDateTime.now(), "yyyyMMddHHmmssSSS") + new Random().nextInt(5);
log.info("key:{}", key);
Message<String> keyMessage = MessageBuilder
.withPayload("boot 这是一个Key为【" + key + "】的消息")
.setHeader(RocketMQHeaders.KEYS, key)
.build();
Message<String> repeatKeyMessage = MessageBuilder
.withPayload("boot 这是一个Key为【" + key + "】的消息")
.setHeader(RocketMQHeaders.KEYS, key)
.build();
SendResult keyResult = rocketMQTemplate.syncSend(boot-repeatable-key-topic, keyMessage);
SendResult repeatKeyResult = rocketMQTemplate.syncSend(boot-repeatable-key-topic, keyMessage);
log.info("重复消息发送成功,【keyResult sendStatus】:{},【repeatKeyResult sendStatus】:{}",keyResult.getSendStatus(),repeatKeyResult.getSendStatus());
}
/**
* @Author : 一叶浮萍归大海
* @Date: 2023/12/26 14:46
* @Description:
*/
@RocketMQMessageListener(
topic = "boot-repeatable-key-topic",
consumerGroup = "boot-repeatable-key-consumer-group"
)
@Slf4j
@Component
public class MyRocketMqRepeatableListener implements RocketMQListener<MessageExt> {
@Autowired
private OrderMqLogMapper orderMqLogMapper;
/**
* 解决消息重复消费问题
* @param messageExt
*/
@Override
public void onMessage(MessageExt messageExt) {
String orderSn = messageExt.getKeys();
String message = StrUtil.utf8Str(messageExt.getBody());
OrderMqLogDO orderMqLogDO = new OrderMqLogDO().setOrderSn(orderSn).setMessage(message);
try {
orderMqLogMapper.insert(orderMqLogDO);
log.info("doBusiness...,orderSn:{},消息内容:{}",orderSn,message);
} catch (Exception e) {
log.error("消息已消费,请勿重新消费! orderSn:{},消息内容:{}",orderSn,message);
e.printStackTrace();
}
}
}
/**
* @Author : 一叶浮萍归大海
* @Date: 2023/12/26 14:18
* @Description: 订单消息消费表,防止重复消费
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
@TableName("order_mq_log")
public class OrderMqLogDO implements Serializable {
/**
* 主键
*/
private Long id;
/**
* 订单编号
*/
private String orderSn;
/**
* 消息内容
*/
private String message;
}