这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
背景
最近在使用RocketMQ的重置消费位点的时候经常出现报错,所以就打算研究下RocketMQ
是如何重置消费者的消费位点的
RocketMQ版本
- 5.1.0
- Dashboard: 最新github master分支代码
源码分析
dashboard
功能入口
该功能主要是在RocketMQ
的dashboard
页面
我们直接使用跳过堆积按钮,看调用的哪个接口
通过接口请求我们可以看到调用的是skipAccumulate.do
接口
然后我们简单看看参数
{
"resetTime": -1,
"consumerGroupList": [
"gid-xiao-zou-topic"
],
"topic": "xiao-zou-topic",
"force": true
}
可以看到传入了一个gid
,一个topic
,还有一个force
为true
force这个参数我们后面进行源码分析再说
我们直接全局搜索找到这个接口
可以看到是传统的MVC架构,controller
-service
-serviceImpl
我们这里直接去看看他的实现类
ConsumerServiceImpl
可以看到核心方法是调用org.apache.rocketmq.tools.admin.MQAdminExt
的resetOffsetByTimestamp
方法
Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
这里可以看到这几个参数我们就是我们之前接口请求的json都有的。
topic
和group
我们都知道,时间戳传入的有点特殊,这里传入的是一个-1
,isForce
传入的是true
isForce
就是表示强制是否强制重置消费进度
当force
参数为true
时,无论消费者当前的消费进度是否比指定的时间戳早,都会将消费进度重置为指定时间戳对应的消息。
当force
参数为false
时,只有当消费者当前的消费进度比指定的时间戳早时,才会将消费进度重置为指定时间戳对应的消息。
RocketMQ
这里我们找到了入口我们直接回到RocketMQ源码我们去看看
org.apache.rocketmq.tools.admin.MQAdminExt
的resetOffsetByTimestamp
方法
public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
boolean isC) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
Map<MessageQueue, Long> allOffsetTable = new HashMap<>();
if (brokerDatas != null) {
for (BrokerData brokerData : brokerDatas) {
String addr = brokerData.selectBrokerAddr();
if (addr != null) {
Map<MessageQueue, Long> offsetTable = this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, timeoutMillis, isC);
if (offsetTable != null) {
allOffsetTable.putAll(offsetTable);
}
}
}
}
return allOffsetTable;
}
这里代码我们分析下
- 通过Nameserve获取
topic
的元数据topicRouteData
- 通过
topic
的元数据topicRouteData
获取到topic
所在的broker
信息List<BrokerData> brokerDatas
- 循环所有向所有
broker
发送重置消费位点请求
步骤一和二不是我们的分析重点
我们重点看看3的源代码
public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group,
final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
throws RemotingException, MQClientException, InterruptedException {
ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
requestHeader.setTimestamp(timestamp);
requestHeader.setForce(isForce);
// offset is -1 means offset is null
requestHeader.setOffset(-1L);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader);
if (isC) {
request.setLanguage(LanguageCode.CPP);
}
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
if (response.getBody() != null) {
ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
return body.getOffsetTable();
}
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
实际的逻辑肯定都封装在broker的,所以我们直接通过请求码INVOKE_BROKER_TO_RESET_OFFSET
找到对应的broker的逻辑代码
- AdminBrokerProcessor
public RemotingCommand resetOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final ResetOffsetRequestHeader requestHeader =
(ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
LOGGER.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
requestHeader.getTimestamp(), requestHeader.isForce());
if (this.brokerController.getBrokerConfig().isUseServerSideResetOffset()) {
String topic = requestHeader.getTopic();
String group = requestHeader.getGroup();
int queueId = requestHeader.getQueueId();
long timestamp = requestHeader.getTimestamp();
Long offset = requestHeader.getOffset();
return resetOffsetInner(topic, group, queueId, timestamp, offset);
}
boolean isC = false;
LanguageCode language = request.getLanguage();
switch (language) {
case CPP:
isC = true;
break;
}
return this.brokerController.getBroker2Client().resetOffset(requestHeader.getTopic(), requestHeader.getGroup(),
requestHeader.getTimestamp(), requestHeader.isForce(), isC);
}
- 判断是否开启broker管理消费位点。5.0之前都是client管理的,为了兼容云原生,支持http的方式,后面都支持broker管理消费位点
- 如果不由broker管理消费位点则调用
this.brokerController.getBroker2Client().resetOffset
resetOffset
这个方法的实现有点长,我们慢慢看
public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
boolean isC) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
// 获取topic的元数据信息
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
// 如果topic为空则报错
if (null == topicConfig) {
log.error("[reset-offset] reset offset failed, no topic in this broker. topic={}", topic);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic);
return response;
}
Map<MessageQueue, Long> offsetTable = new HashMap<>();
// 循环
for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue();
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
mq.setTopic(topic);
mq.setQueueId(i);
// 查询当前消费偏移量
long consumerOffset =
this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
// 如果当前偏移量为-1表示消费进度不存在
if (-1 == consumerOffset) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("THe consumer group <%s> not exist", group));
return response;
}
long timeStampOffset;
// 如果传入的参数时间戳为-1 则表示要跳过堆积,直接更新消费位点为最新的,否则获取指定时间的消费位点
if (timeStamp == -1) {
timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
} else {
timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
}
// 如果查询到的偏移量不正常将要重置的消费位点设置为0
if (timeStampOffset < 0) {
log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
timeStampOffset = 0;
}
// 如果开启强制重置消费位点则直接重置
// 否则需要消费位点小于当前消费的消费位点,避免丢失消息
if (isForce || timeStampOffset < consumerOffset) {
offsetTable.put(mq, timeStampOffset);
} else {
offsetTable.put(mq, consumerOffset);
}
}
ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
requestHeader.setTimestamp(timeStamp);
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
if (isC) {
// c++ language
ResetOffsetBodyForC body = new ResetOffsetBodyForC();
List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable);
body.setOffsetTable(offsetList);
request.setBody(body.encode());
} else {
// other language
ResetOffsetBody body = new ResetOffsetBody();
body.setOffsetTable(offsetTable);
request.setBody(body.encode());
}
// 获取消费者信息
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
// 消费者信息不为空,有消费者连接broker。 否则报错
if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
consumerGroupInfo.getChannelInfoTable();
for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
int version = entry.getValue().getVersion();
if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
try {
this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
topic, group, entry.getValue().getClientId());
} catch (Exception e) {
log.error("[reset-offset] reset offset exception. topic={}, group={} ,error={}",
topic, group, e.toString());
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("the client does not support this feature. version="
+ MQVersion.getVersionDesc(version));
log.warn("[reset-offset] the client does not support this feature. channel={}, version={}",
RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
return response;
}
}
} else {
String errorInfo =
String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
requestHeader.getGroup(),
requestHeader.getTopic(),
requestHeader.getTimestamp());
log.error(errorInfo);
response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
response.setRemark(errorInfo);
return response;
}
response.setCode(ResponseCode.SUCCESS);
ResetOffsetBody resBody = new ResetOffsetBody();
resBody.setOffsetTable(offsetTable);
response.setBody(resBody.encode());
return response;
}
可以看到是获取服务端的消费位点,然后设置消费位点,然后通过发送请求通知所有client
,通知他们修改他们本地的消费位点.所以这里重置消费位点失败有几种情况
- topic不存在
- 消费者不存在
- 没有连接的消费者
这里需要注意即使消费者有消息堆积,消费者没有连接到broker
,也是会重置消费位点失败的。
这里重置消费位点实际还是还是通知所有client
用心的消费位点去broker
拉去消息。不是去修改broker
的消费位点
我们来看看最终通知client
的处理逻辑
网络请求码是220
public static final int RESET_CONSUMER_CLIENT_OFFSET = 220;
- ClientRemotingProcessor
this.mqClientFactory.resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), offsetTable);
public synchronized void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
DefaultMQPushConsumerImpl consumer = null;
try {
MQConsumerInner impl = this.consumerTable.get(group);
if (impl instanceof DefaultMQPushConsumerImpl) {
consumer = (DefaultMQPushConsumerImpl) impl;
} else {
log.info("[reset-offset] consumer dose not exist. group={}", group);
return;
}
consumer.suspend();
ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {
MessageQueue mq = entry.getKey();
if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
ProcessQueue pq = entry.getValue();
pq.setDropped(true);
pq.clear();
}
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ignored) {
}
Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();
while (iterator.hasNext()) {
MessageQueue mq = iterator.next();
Long offset = offsetTable.get(mq);
if (topic.equals(mq.getTopic()) && offset != null) {
try {
consumer.updateConsumeOffset(mq, offset);
consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
iterator.remove();
} catch (Exception e) {
log.warn("reset offset failed. group={}, {}", group, mq, e);
}
}
}
} finally {
if (consumer != null) {
consumer.resume();
}
}
}
总的处理流程如下:
-
获取到需要重置消费位点的消费者,然后暂停消费
-
获取消费者的消息处理队列表
processQueueTable
,遍历processQueueTable
中的条目,对于满足条件topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)
的消息队列 mq,执行以下操作
a. 将对应的消息处理队列 pq 设置为已丢弃状态,即 pq.setDropped(true)
b. 清空消息处理队列 pq,即 pq.clear()。 -
线程休眠10s
-
遍历
processQueueTable
,对于满足条件topic.equals(mq.getTopic()) && offset != null
的消息队列 mq,执行以下操作:
a. 从offsetTable
中获取对应消息队列 mq 的消费位点offset
。
b. 尝试更新消费者的消费位点为offset
,即consumer.updateConsumeOffset(mq, offset)
c. 从消费者的消息队列重新平衡实现中移除不必要的消息队列,即consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq))
d. 使用迭代器的remove()
方法从processQueueTable
中移除当前的消息队列mq
-
恢复消费
总结
总的来说RocketMQ
的消费位点如果是客户端管理,重置消费位点是由客户端发起,发送到broker
,最终还是由broker
去通知所有broker
去更新本地消费位点