文章目录
- 前言:
- 一、RocketMq是什么?
- 二、RocketMq 模型介绍:
- 1.RocketMq 模型图:
- 2.RocketMq 生产者:
- 2.1 生产者消费发送流程:
- 2.2 生产者消息发送:
- 2.2.1 同步发送普通消息:
- 2.2.1 异步发送普通消息:
- 2.2.1.1 定义callback 回调:
- 2.2.1.2 消息发送时添加回调:
- 2.2.2 发送延迟消息:
- 2.2.2.1 同步/异步发送延迟消息:
- 2.2.2.2 延迟消息实现原理:
- 2.2.3 发送顺序消息:
- 2.2.3.1 顺序消息发送:
- 2.2.3.2 顺序消息实现原理:
- 2.2.4 发送事务消息:
- 2.2.4 .1 发送消息:
- 2.2.4 .2 事务消息发送实现原理:
- 3.RocketMq Broker 消息存储:
- 3.1 消息存储结构:
- 3.2 消息存储流程:
- 3.3 每个topic 中消息读写队列的数量:
- 3.4 消息的删除:
- 4.RocketMq 消费者:
- 4.1 @RocketMQMessageListener 注解消费消息:
- 4.2 @RocketMQMessageListene 属性详解:
- 三、总结:
- 四、参考:
前言:
作为一个国产的消息中间件,阿里参考Kafka 开发了RocketMq 进行消息通信,它具有分布式,高可用、高吞吐、可伸缩的特性,本文对于其模型进行探究;
一、RocketMq是什么?
RocketMQ是一个非常优秀的分布式消息传递平台,能够帮助开发人员实现高性能、可靠的消息传递和流处理。它在互联网公司、金融机构和其他大型企业中广泛使用。
二、RocketMq 模型介绍:
1.RocketMq 模型图:
模型图中已经展示了RocketMq的几个重要组件,与kafka 不同的是这里多出了一个叫NameServer的组件;这里针对NameServer 的特点进行下说明 :
- 对broker 进行维护,并对消息消息进行路由;
- NameServer 需要保证高可用,每个nameserver 都保存了所有broker 的信息,所有name sever 之间是不需要进行通信的,每个broker 都和所有的namesever 保持有心跳,每个nameserver 每隔10s 也会去 排查broker 如果 broker 超过120s 仍然没有与nameserver 进行心跳连接,则认为改broker 已经挂掉,name server 进行剔除;
- 生产者和消费者从namse server 获取到要连接的broker 信息,然后在本地进行一个缓存,然后与某一个broker 建立tcp 的长链接,进行消息的发送和接收;
2.RocketMq 生产者:
2.1 生产者消费发送流程:
生产者从NameServer 获取到Broker 信息, 创建tcp 连接,然后进行消息的发送,下面分场景使用RocketMQTemplate 模版对象进行消息的发送;
2.2 生产者消息发送:
2.2.1 同步发送普通消息:
/** 单条
* 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)
* sendResult为返回的发送结果
*/
public <T> SendResult sendMsg(String topic, T msg) {
Message<T> message = MessageBuilder.withPayload(msg).build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));
return sendResult;
}
/**
* 同步发送批量消息
* @param topic
* @param msgs
* @return
* @param <T>
*/
public <T> SendResult sendBatchMsg(String topic, List<Object> msgs) {
List<Message> messages = msgs.stream().map(msg -> {
Message<T> message = (Message<T>) MessageBuilder.withPayload(msg).build();
return message;
}).collect(Collectors.toList());
return sendBatchMsg(topic, null, msgs);
}
/**
* 同步发送批量消息
* @param topic
* @param tag
* @param msgs
* @return
* @param <T>
*/
public <T> SendResult sendBatchMsg(String topic, String tag, List<Object> msgs) {
if (StringUtils.isNotBlank(tag)) {
topic = topic.concat(":") + tag;
}
SendResult sendResult = rocketMQTemplate.syncSend(topic, msgs);
log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));
return sendResult;
}
2.2.1 异步发送普通消息:
2.2.1.1 定义callback 回调:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ProducerRocketMQCallback implements SendCallback {
@Override
public void onSuccess(SendResult sendResult) {
// 异步发送消息成功回调
log.debug("Message send successfully! Message ID: {}", sendResult.getMsgId());
}
@Override
public void onException(Throwable throwable) {
// 异步发送消息失败回调
log.error("Message send failed! Error message: {}", throwable.getMessage());
}
}
2.2.1.2 消息发送时添加回调:
@Autowired
private ProducerRocketMQCallback callback;
/**
* 发送异步消息
*
* @param topic
* @param msgBody
*/
public void sendAsyncMsg(String topic, Object msgBody) {
sendAsyncMsg(topic, null, msgBody, callback);
}
/**
* 发送异步消息
*
* @param topic
* @param tag
* @param msgBody
* @param callback
*/
public void sendAsyncMsg(String topic, String tag, Object msgBody, SendCallback callback) {
if (StringUtils.isNotBlank(tag)) {
topic = topic.concat(":") + tag;
}
rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), callback);
}
/**
* 发送异步消息
*
* @param topic 消息Topic
* @param message 消息实体
* @param sendCallback 回调函数
* @param timeout 超时时间
*/
public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout) {
rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout);
}
/**
* 发送异步消息-- 批量
*
* @param topic
* @param msgs
*/
public void sendAsyncBatchMsg(String topic, List<Object> msgs) {
List<Message> messages = msgs.stream().map(msg -> {
Message message = MessageBuilder.withPayload(msg).build();
return message;
}).collect(Collectors.toList());
sendAsyncBatchMsg(topic, null, messages);
}
/**
* 发送异步消息--批量
*
* @param topic
* @param tag
* @param msgs
*/
public void sendAsyncBatchMsg(String topic, String tag,List<Message> msgs) {
if (StringUtils.isNotBlank(tag)) {
topic = topic.concat(":") + tag;
}
rocketMQTemplate.asyncSend(topic, msgs, callback);
}
2.2.2 发送延迟消息:
2.2.2.1 同步/异步发送延迟消息:
/**
* 发送延时消息-- 同步发送
* @param topic
* @param msgBody
* @param delayLevel
*/
public void sendDelayMsg(String topic, Object msgBody, Integer delayLevel){
sendDelayMsg(topic,null,msgBody,null,delayLevel);
}
/**
* 发送延时消息
* 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*
* @param topic
* @param tag
* @param msgBody
* @param timeout
* @param delayLevel 值的有效范围1至18
*/
public void sendDelayMsg(String topic, String tag, Object msgBody, Long timeout, Integer delayLevel) {
if (StringUtils.isNotBlank(tag)) {
topic = topic.concat(":") + tag;
}
if (timeout == null) {
timeout = messageTimeOut;
}
rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), timeout, delayLevel);
}
/**
* 发送异步延迟消息
* @param topic
* @param msgBody
* @param delayLevel
*/
public void asyncSendDelay(String topic, Object msgBody, Integer delayLevel){
asyncSendDelay(topic, MessageBuilder.withPayload(msgBody).build(),null,delayLevel);
}
/**
* 发送异步延迟消息
*
* @param topic 消息Topic
* @param message 消息实体
* @param timeout 超时时间
* @param delayLevel 延迟消息的级别
*/
public void asyncSendDelay(String topic, Message<?> message, Long timeout, int delayLevel) {
if (null == timeout){
timeout = messageTimeOut;
}
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("topic:{}消息---发送MQ成功---", topic);
}
@Override
public void onException(Throwable throwable) {
log.error("topic:{}消息---发送MQ失败 ex:{}---", topic, throwable.getMessage());
}
}, timeout, delayLevel);
}
2.2.2.2 延迟消息实现原理:
- 生产者发送消息在一个临时topic 进行存储;
- delay server 定时任务扫描;
- 时间到了发送到指定的topic
- 消费者消费到消息;
2.2.3 发送顺序消息:
2.2.3.1 顺序消息发送:
提示:callback 方法为之前发送异步消息定义的回调方法
/**
* 发送顺序消息
*
* @param topic 消息主题
* @param msg 消息体
* @param hashKey 确定消息发送到哪个队列中
* @param <T> 消息泛型
*/
public <T> void syncSendOrderly(String topic, T msg, String hashKey) {
Message<T> message = MessageBuilder.withPayload(msg).build();
log.info("发送顺序消息,topic:{}, hashKey:{}", topic, hashKey);
rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
}
/**
* 发送顺序消息--异步
*
* @param topic 消息主题
* @param msg 消息体
* @param hashKey 确定消息发送到哪个队列中
* @param <T> 消息泛型
*/
public <T> void asyncSendOrderly(String topic, T msg, String hashKey ) {
Message<T> message = MessageBuilder.withPayload(msg).build();
log.info("发送顺序消息,topic:{}, hashKey:{}", topic, hashKey);
rocketMQTemplate.asyncSendOrderly(topic, message, hashKey,callback);
}
/**
* 发送顺序消息
*
* @param topic 消息主题
* @param msg 消息体
* @param hashKey 确定消息发送到哪个队列中
* @param timeout 超时时间
*/
public <T> void syncSendOrderly(String topic, T msg, String hashKey, long timeout) {
Message<T> message = MessageBuilder.withPayload(msg).build();
log.info("发送顺序消息,topic:{}, hashKey:{}, timeout:{}", topic, hashKey, timeout);
rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);
}
2.2.3.2 顺序消息实现原理:
1)实现思路:
1、生产者发送消息的时候,到达Broker应该是有序的。所以对于生产者,不能使用多线程异步发送,而是单线程顺序发送;
2、写入Broker的时候,应该是顺序写入的。也就是相同主题的消息应该集中写入,选择同一个Message Queue,而不是分散写入;
3、消费者消费的时候只能有一个线程。否则由于消费的速率不同,有可能出现记录到数据库的时候无序;
2)消息发送队列的实现:
提示:默认队列选择同对 hashKey 的hashcode 值然后对队列的个数 取模,得到要发送的队列
3)rocketmq 实现:
通过定义hashKey(可以使用相同的hashKey) 将消息发送到同一个队列,然后同一个消费组中消费者,消费这个队列,完成顺序消费;
2.2.4 发送事务消息:
2.2.4 .1 发送消息:
1) 增加消息发送的监听:
import com.alibaba.fastjson.JSONObject;
import com.example.springrocket.dto.MessageDto;
import com.example.springrocket.dto.ResultTrancDto;
import com.example.springrocket.service.MyTranctionService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MyTransactionListener implements TransactionListener {
@Autowired
private MyTranctionService myTranctionService;
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//本地事务
String msg = new String(message.getBody());
MessageDto messageDto = JSONObject.parseObject(msg,MessageDto.class);
try {
// 本地事务调用
ResultTrancDto res = myTranctionService.doTransac(messageDto);
if (res.isSuccess()) {
// 本地事务执行成功,提交半消息
log.debug("本地事务执行成功,提交事务事务消息");
return LocalTransactionState.COMMIT_MESSAGE;
} else {
// 本地事务执行失败,回滚半消息
log.debug("本地事务执行失败,回滚事务消息");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
// 异常情况返回未知状态
return LocalTransactionState.UNKNOW;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
String msg = new String(messageExt.getBody());
MessageDto messageDto = JSONObject.parseObject(msg,MessageDto.class);
// 本地事务核查
if (myTranctionService.checkTransac(messageDto)){
// 已经完成处理
return LocalTransactionState.COMMIT_MESSAGE;
}
// 数据库没有处理完毕,回滚事务
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
2 ) 本地事务发送和核查 demo:
2.1 )MyTranctionService.java
package com.example.springrocket.service;
import com.example.springrocket.dto.MessageDto;
import com.example.springrocket.dto.ResultTrancDto;
public interface MyTranctionService {
ResultTrancDto doTransac( MessageDto messageDto);
boolean checkTransac(MessageDto messageDto);
}
2.2 )MyTranctionServiceImpl .java
package com.example.springrocket.service.impl;
import com.example.springrocket.dto.MessageDto;
import com.example.springrocket.dto.ResultTrancDto;
import com.example.springrocket.service.IUserService;
import com.example.springrocket.service.MyTranctionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MyTranctionServiceImpl implements MyTranctionService {
@Autowired
private IUserService userService;
@Override
public ResultTrancDto doTransac(MessageDto messageDto) {
userService.saveUser(messageDto.getBody().toString());
return new ResultTrancDto(true,"");
}
@Override
public boolean checkTransac(MessageDto messageDto) {
return userService.checkName(messageDto.getBody().toString());
}
}
2.3 )UserServiceImpl.java
package com.example.springrocket.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.springrocket.entity.User;
import com.example.springrocket.mapper.UserMapper;
import com.example.springrocket.service.IUserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.List;
/**
* <p>
* 用户 服务实现类
* </p>
*
* @author lgx
* @since 2023-06-15
*/
@Slf4j
@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
@Override
@Transactional(rollbackFor = Exception.class)
public boolean saveUser(String msg) {
for (int i=0;i<2;i++){
User user = new User();
user.setName(msg+"_"+i).setAge(100);
this.save(user);
}
return true;
}
@Override
public boolean checkName(String msg) {
List<String> names= new ArrayList<>(1<<2);
for (int i=0;i<2;i++){
names.add(msg+"_"+i);
}
LambdaQueryWrapper<User> wrapper = new LambdaQueryWrapper();
wrapper.in(User::getName,names);
return this.count(wrapper) == 2;
}
}
3 )RocketMQTemplate 增加监听:
@PostConstruct
private void configProducer(){
( (TransactionMQProducer)rocketMQTemplate.getProducer()).setTransactionListener(transactionListener);
}
4 )事务消息发送:
public <T> void sendTransactionMessage(String topic, T msg) {
Message<T> message = MessageBuilder.withPayload(msg).build();
// 发送事务消息
rocketMQTemplate.sendMessageInTransaction(topic, message,"producuer-01");
}
2.2.4 .2 事务消息发送实现原理:
- 调用sendMessageInTransaction 方法进行消息的发送;
- 发送完成,进入监听器executeLocalTransaction 方法,这个方法中执行本地事务;
- 本地事务执行成功,则提交事务,失败则回滚事务,异常则发送unknown;
- broker 根据提交的事务类型选择,投递,删除消息,如果是unknown 则向客户端发起查询;
- 如果是unknown 进入客户端的checkLocalTransaction 方法进行事务的回查,然后提交或者回滚事务;
3.RocketMq Broker 消息存储:
3.1 消息存储结构:
1) topic 下的队列:
RocketMq 中的消息都存储与topic 中,topic是逻辑存储,每个topic下有多个队列,topic 主题在逻辑上是队列的集合;队列是 Apache RocketMQ 消息的最小存储单位;
2 )队列可以定义的属性:
-
读写。消息可以写入当前队列,也可以从当前队列中读取。
-
只读。可以从当前队列读取消息,但不能写入当前队列。
-
只写。消息可以写入当前队列,但不能从当前队列中读取。
-
读取或写入状态不可用。当前队列不允许读取或写入操作。
3)每个队列的结构:
每个队列对维护了存储消息的CommitLog文件,以及消费者对于读队列消费的偏移量,并维护了消息与磁盘之间的索引关系;
3.1)写队列:
多个消息存储(Message Store File)组成,每个消息存储文件中包含多条消息。一个消息存储文件通常包括四个部分:CommitLog、ConsumeQueue、IndexFile 和 HashSlotFile。具体组成如下:
CommitLog:CommitLog 是写队列中最重要的组成部分,也是消息存储文件中占用空间最大的部分。是一个只写的追文件,用于存储 Producer 发送的消息。新增的消息会被追加到文件末尾,CommitLog 中每条消息在写入时都会有唯一的 offset,Consumer 在消费时以 offset 为基准来寻找消息。CommitLog 的写入是顺序写,相对于随机写入来说速度更快。
ConsumeQueue:ConsumeQueue 用于消费者消费消息时的快速定位,是写队列的消息索引文件。MQ会根据 ConsumeQueue 中消息的 offset 信息快速定位到 CommitLog 中存储对应消息的位置,从而快速拉取到需要的消息。ConsumeQueue 是一个只读的文件,消息在消费时不会被删除,只会打上标记表示已被消费。RocketMQ 为每个 Topic 和 Consumer Group 创建一个 ConsumeQueue。
IndexFile:IndexFile 也是写队列的消息索引文件,用于支持随机读取消息。当 Consumer 并非按照顺序消费消息时,可以从 IndexFile 中查找 offset 等信息,找到所需的消息进行消费。IndexFile 是一个只读的文件,它记录了每个 ConsumeQueue 文件中第一条消息的 offset 以及对应的物理位置,方便 Consumer 在指定 offset 位置开始消费消息。
HashSlotFile:HashSlotFile 是 ConsumeQueue 和 IndexFile 的辅助文件,用于实现消息的定位和检索。HashSlotFile 中存储了 Topic 中所有消息的 Topic 名称和消息 ID,它把消息的 Topic 名称和消息 ID 作为 Key 进行 Hash 计算,将其分配到不同的 Hash Slot 中。每个 Hash Slot 对应一个 ConsumeQueue 文件和一个 IndexFile 文件,这样就可以快速地根据 Hash Slot 查找到对应的 ConsumeQueue 文件和 IndexFile 文件,从而实现快速定位和检索消息。
综上所述,一个写队列内部由多个消息存储文件组成,主要包括 CommitLog、ConsumeQueue、IndexFile 和 HashSlotFile 四个部分,它们协同工作,实现消息的存储、索引、定位和检索等功能。
3.2)读队列:
在 RocketMQ 中,读队列中的消息存储(Message Store File)也由多个文件组成,主要包括 ConsumeQueue、IndexFile 和 CommitLog 三个部分。
ConsumeQueue:ConsumeQueue 也是读队列的核心组成部分,它用于记录读取者已经消费的消息位置,方便重复消费以及快速定位下一条未消费的消息。ConsumeQueue 在读队列中的组成与写队列中的有所不同,它不再以文件的方式存储,而是以 Map 集合的方式存储在内存中。
IndexFile:IndexFile 也是读队列中的组成部分,用于支持快速定位和检索消息。IndexFile 中记录了消息的 offset、物理位置以及消息的存储时间等信息,可以根据 offset 或时间戳快速定位到一条消息的位置。
CommitLog:CommitLog 也是读队列中的组成部分,它存储了消息的实际内容。当消费者找到一条消息的 offset 后,可以通过 IndexFile 找到消息的物理位置,在 CommitLog 中查找到消息的实际内容并进行消费。
综上所述,一个读队列内部由 ConsumeQueue、IndexFile 和 CommitLog 三个部分组成。ConsumeQueue 用于记录读者已经消费的消息位置,IndexFile 用于快速定位和检索消息,CommitLog 则是消息实体的存储位置。通过这三个部分的协同工作,消费者可以快速地读取并消费消息,实现消息的高效传递和处理。
3.2 消息存储流程:
- Producer 发送一条消息到达broker 中,根据消息队列属性,将消息存放到某个队列中;
- 将该条消息追加到该队列CommitLog 文件(CommitLog 默认会以1G 大小进行分隔);
- 维护消息与磁盘关系的索引到IndexFile 中;
- 写队列中的消息同步到对应的读队列中,方便后续消费者从读队列获取消息;
3.3 每个topic 中消息读写队列的数量:
写队列的数量相当于分区的数量,默认8个,客户端发送消息创建默认4个,并且每个broker 中 都有这个队列数量的存储目录,队列的数量= 写队列的数量* broker 数量;
当写队列的个数大于读队列的个数时,会出现某些读队列上的消息无法被及时消费的情况,导致消息积压。因为当 Producer 发送消息时,RocketMQ 会将消息平均地分配到所有的可用写队列中,如果写队列的个数大于读队列的个数,那么某些读队列上的消息就无法及时被消费掉。另外,当读队列上有消息堆积时,可能会影响系统的性能和稳定性。
当写队列的个数小于读队列的个数时,则可能会导致某些队列空闲的情况,无法充分利用资源。因为如果写队列的个数小于读队列的个数,那么某些读队列可能会一直没有消息可消费,导致消费者无事可做,浪费系统资源。
因此,为了实现最佳性能和资源利用率,通常建议将读队列和写队列的个数尽可能地设置成同样的数量。在实际应用中,可以通过调整 Topic 和 Message Queue 的参数,来动态地增加或减少队列数量,以达到最佳的消息传递和处理效果。
3.4 消息的删除:
RocketMQ Broker 端支持的消息删除策略主要有三种:
1.) 定时删除策略(Time-based deletion strategy):在 Broker 的配置文件中设置消息保存时间(默认为 72 小时),超过这个时间的未被消费的消息将被删除。这种策略适用于对消费实时性没有特别的要求的场景,可以让消息自然过期删除。
2.) 空间删除策略(Space-based deletion strategy):在 Broker 的配置文件中设置一定的磁盘空间大小阈值,当磁盘空间使用超过阈值时,Broker 会删除一部分相对“冷门”的消息。这种策略适用于对消费实时性及消息可靠性要求高的场景,可以减少磁盘占用率和提高消息的重要性,默认磁盘使用率超过90% 直接拒绝消息写入。
3.) 主动删除策略(Manually deletion strategy):通过手动指定消息 ID 或时间戳删除特定消息。这种方式适用于需要精细控制消息存储和删除的场景,可以通过代码或 RocketMQ 控制台等方式实现。
需要注意的是,无论采用哪种消息删除策略,在删除消息时都需要确保消息已经被消费或不再需要消费,避免出现消息丢失或错误的情况。另外,为了提高消息存储和删除的效率,也可以使用消息索引、批量删除等技术手段来减少操作次数和提高操作效率。
4.RocketMq 消费者:
4.1 @RocketMQMessageListener 注解消费消息:
1) 自动提交ack:
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "consumer_test",
topic = "test1", consumeMode = ConsumeMode.CONCURRENTLY, messageModel = MessageModel.CLUSTERING,
selectorExpression = "*")
public class RocketMqConsumerTest implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
byte[] body = message.getBody();
String msg = new String(body);
log.debug("监听到消息:message:{}", msg);
// 手动提交 ACK
ConsumeConcurrentlyContext context = RocketMQUtil.getLocalContext();
if (context != null) {
List<MessageExt> msgs = context.getMsgs();
rocketMQUtil.ack(msgs);
}
}
}
默认情况下,RocketMQ 自动提交 Ack 是在业务代码执行完成之后提交的。也就是说,只有当 consumeMessage 方法成功返回后,才会自动提交 Ack,表示这个消息已经被成功消费。如果业务代码执行过程中出现了异常,消息将不会被标记为已消费,被重新投递给 Broker,等待下次消费。
2) 手动提交ack:
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "consumer_test",
topic = "test1", consumeMode = ConsumeMode.CONCURRENTLY, messageModel = MessageModel.CLUSTERING,
selectorExpression = "*")
public class RocketMqConsumerTest implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (int i = 0; i < msgs.size(); i++) {
try {
// 打印消息内容
String msgStr = new String(msgs.get(i).getBody());
log.debug("监听到消息:message:{}", msgStr);
} catch (Exception e) {
e.printStackTrace();
// 手动提交 Ack 时出错,进行消息重试
context.setAckIndex(i--);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 消费成功,返回 ACK
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
context对象的成员变量ackIndex的含义是:最后一个正常消费的消息索引号。索引号的最大值为传入的消息数量-1。通过context.setAckIndex(n),可以说明哪个索引号之后的消息消费失败。 这样从索引ackIndex往后的消息会被重新发送至broker,等待下一次消费。 然后从ProcessQueue中移除消费过的消息。 最后更新最新的offset至RemoteBrokerOffsetStore。
4.2 @RocketMQMessageListene 属性详解:
1.) consumerGroup:消费者组名,必填项。该值需要在同一个应用程序中唯一,用于区分不同的消费者。
2.)topic:待消费的消息主题,必填项。
3.)selectorExpression:消息过滤表达式,可选。如果指定了该属性,RocketMQ 服务器会只将符合该表达式的消息推送给消费者,其他消息会被过滤掉。如果不指定该属性,则表示消费所有消息。
4.) messageModel:消息模式,可选。支持两种模式:集群模式(默认值)和广播模式。集群模式下,同一个消费者组的多个消费者分摊消费消息;而广播模式下,同一个消费者组的多个消费者都会收到消息。
5.) consumeMode:消费模式,可选。共有三种模式:顺序消费、并发消费和异步阻塞消费。默认值是 CONCURRENTLY,表示并发消费。顺序消费模式会确保消费者按照发送顺序依次消费消息,适用于有序消息场景。异步阻塞消费模式则意味着消费者线程会被阻塞,直到消息处理完毕才会继续消费。
6.) consumeThreadMax:最大消费线程数,可选。默认值是 64,表示同一时刻最多有 64 条消息能够被消费。如果需要扩大消费能力,可以适当增大该属性的值。
7.) pullThresholdForQueue:消息拉取阈值,可选。默认值是 1000,表示每个消费者拉取一条消息之后,会立即再拉取 999 条消息保存在本地缓存中。当本地缓存中的消息处理完毕之后,消费者会再次拉取 1000 条消息。如果待消费的消息比较大,可以适当调大该属性的值,减少拉取消息的次数。
8.) pullInterval:拉取消息间隔时间,可选。默认值是 0,表示拉取消息之前不需要等待。如果待消费的消息比较少,可以把该属性值调大,节省网络开销,减轻 RocketMQ 服务器的压力。
9.) maxReconsumeTimes:消息重试次数上限,可选。默认值是 16,表示当某条消息消费失败时,自动进行重试,最多重试 16 次。如果达到重试次数上限还是无法消费该条消息,则将该消息保存在死信队列中。可以通过修改该属性来设置不同的重试机制。
10.) delayLevelWhenNextConsume:下一次消息消费的延时等级,可选。默认值是 0,表示消息不需要延时再次消费。如果待消费的消息经过一段时间仍然不能成功处理,则可以设置该属性来减少 RocketMQ 服务器的压力。延时等级越高,表示下一次消息消费的时间越晚。比如,delayLevelWhenNextConsume=5,表示下一次消息消费时间会晚上 5 分钟。
11.) accessChannel:接入通道,默认为本地通道(InProcess)。目前,RocketMQ 还支持远程通道(Cloud),将消息发送到云上的 RocketMQ 集群中。
12.) messageModel:消息分发模式,可选项,支持两种消息分发模式:集群模式和广播模式,默认为集群模式。
13.) enableVIPChannel:是否启用 VIP 通道,可选项,默认为 true,将会启用 VIP 通道来保
三、总结:
以上就是今天要讲的内容,本文介绍了RocketMq消息模型中组成的组件,并对每个组件单独进行了介绍和使用;具体springboot 项目中的使用可以参考:springboot 整合Rocketmq。
四、参考:
RocketMQ 官网文档;