RocketMQ重置消费位点源码分析

news2025/1/10 18:10:31

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

背景

最近在使用RocketMQ的重置消费位点的时候经常出现报错,所以就打算研究下RocketMQ是如何重置消费者的消费位点的

RocketMQ版本

  • 5.1.0
  • Dashboard: 最新github master分支代码

源码分析

dashboard

功能入口

该功能主要是在RocketMQdashboard页面

我们直接使用跳过堆积按钮,看调用的哪个接口

通过接口请求我们可以看到调用的是skipAccumulate.do接口

然后我们简单看看参数

{
    "resetTime": -1,
    "consumerGroupList": [
        "gid-xiao-zou-topic"
    ],
    "topic": "xiao-zou-topic",
    "force": true
}

可以看到传入了一个gid,一个topic,还有一个force为true

force这个参数我们后面进行源码分析再说

我们直接全局搜索找到这个接口

可以看到是传统的MVC架构,controller-service-serviceImpl
我们这里直接去看看他的实现类

  • ConsumerServiceImpl

可以看到核心方法是调用org.apache.rocketmq.tools.admin.MQAdminExtresetOffsetByTimestamp方法

Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce)
    throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

这里可以看到这几个参数我们就是我们之前接口请求的json都有的。
topicgroup我们都知道,时间戳传入的有点特殊,这里传入的是一个-1isForce传入的是true
isForce就是表示强制是否强制重置消费进度
force参数为true时,无论消费者当前的消费进度是否比指定的时间戳早,都会将消费进度重置为指定时间戳对应的消息。

force参数为false时,只有当消费者当前的消费进度比指定的时间戳早时,才会将消费进度重置为指定时间戳对应的消息。

RocketMQ

这里我们找到了入口我们直接回到RocketMQ源码我们去看看
org.apache.rocketmq.tools.admin.MQAdminExtresetOffsetByTimestamp方法

public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
    boolean isC) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
    List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
    Map<MessageQueue, Long> allOffsetTable = new HashMap<>();
    if (brokerDatas != null) {
        for (BrokerData brokerData : brokerDatas) {
            String addr = brokerData.selectBrokerAddr();
            if (addr != null) {
                Map<MessageQueue, Long> offsetTable = this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, timeoutMillis, isC);
                if (offsetTable != null) {
                    allOffsetTable.putAll(offsetTable);
                }
            }
        }
    }
    return allOffsetTable;
}

这里代码我们分析下

  1. 通过Nameserve获取topic的元数据topicRouteData
  2. 通过topic的元数据topicRouteData获取到topic所在的broker信息List<BrokerData> brokerDatas
  3. 循环所有向所有broker发送重置消费位点请求

步骤一和二不是我们的分析重点
我们重点看看3的源代码

  public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group,
    final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
    throws RemotingException, MQClientException, InterruptedException {
    ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
    requestHeader.setTopic(topic);
    requestHeader.setGroup(group);
    requestHeader.setTimestamp(timestamp);
    requestHeader.setForce(isForce);
    // offset is -1 means offset is null
    requestHeader.setOffset(-1L);

    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader);
    if (isC) {
        request.setLanguage(LanguageCode.CPP);
    }
    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
        request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            if (response.getBody() != null) {
                ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
                return body.getOffsetTable();
            }
        }
        default:
            break;
    }

    throw new MQClientException(response.getCode(), response.getRemark());
}

实际的逻辑肯定都封装在broker的,所以我们直接通过请求码INVOKE_BROKER_TO_RESET_OFFSET找到对应的broker的逻辑代码

  • AdminBrokerProcessor
public RemotingCommand resetOffset(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final ResetOffsetRequestHeader requestHeader =
        (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
    LOGGER.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}",
        RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
        requestHeader.getTimestamp(), requestHeader.isForce());

    if (this.brokerController.getBrokerConfig().isUseServerSideResetOffset()) {
        String topic = requestHeader.getTopic();
        String group = requestHeader.getGroup();
        int queueId = requestHeader.getQueueId();
        long timestamp = requestHeader.getTimestamp();
        Long offset = requestHeader.getOffset();
        return resetOffsetInner(topic, group, queueId, timestamp, offset);
    }

    boolean isC = false;
    LanguageCode language = request.getLanguage();
    switch (language) {
        case CPP:
            isC = true;
            break;
    }
    return this.brokerController.getBroker2Client().resetOffset(requestHeader.getTopic(), requestHeader.getGroup(),
        requestHeader.getTimestamp(), requestHeader.isForce(), isC);
}
  1. 判断是否开启broker管理消费位点。5.0之前都是client管理的,为了兼容云原生,支持http的方式,后面都支持broker管理消费位点
  2. 如果不由broker管理消费位点则调用this.brokerController.getBroker2Client().resetOffset

resetOffset这个方法的实现有点长,我们慢慢看

public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
    boolean isC) {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    // 获取topic的元数据信息
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
    // 如果topic为空则报错
    if (null == topicConfig) {
        log.error("[reset-offset] reset offset failed, no topic in this broker. topic={}", topic);
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic);
        return response;
    }

    Map<MessageQueue, Long> offsetTable = new HashMap<>();
    // 循环
    for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
        MessageQueue mq = new MessageQueue();
        mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
        mq.setTopic(topic);
        mq.setQueueId(i);
        // 查询当前消费偏移量
        long consumerOffset =
            this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
        // 如果当前偏移量为-1表示消费进度不存在
        if (-1 == consumerOffset) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(String.format("THe consumer group <%s> not exist", group));
            return response;
        }

        long timeStampOffset;
        // 如果传入的参数时间戳为-1 则表示要跳过堆积,直接更新消费位点为最新的,否则获取指定时间的消费位点
        if (timeStamp == -1) {

            timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
        } else {
            timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
        }
        
        // 如果查询到的偏移量不正常将要重置的消费位点设置为0
        if (timeStampOffset < 0) {
            log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
            timeStampOffset = 0;
        }
        // 如果开启强制重置消费位点则直接重置
        // 否则需要消费位点小于当前消费的消费位点,避免丢失消息
        if (isForce || timeStampOffset < consumerOffset) {
            offsetTable.put(mq, timeStampOffset);
        } else {
            offsetTable.put(mq, consumerOffset);
        }
    }

    ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
    requestHeader.setTopic(topic);
    requestHeader.setGroup(group);
    requestHeader.setTimestamp(timeStamp);
    RemotingCommand request =
        RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
    if (isC) {
        // c++ language
        ResetOffsetBodyForC body = new ResetOffsetBodyForC();
        List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable);
        body.setOffsetTable(offsetList);
        request.setBody(body.encode());
    } else {
        // other language
        ResetOffsetBody body = new ResetOffsetBody();
        body.setOffsetTable(offsetTable);
        request.setBody(body.encode());
    }
    
    // 获取消费者信息
    ConsumerGroupInfo consumerGroupInfo =
        this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
    // 消费者信息不为空,有消费者连接broker。 否则报错
    if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
        ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
            consumerGroupInfo.getChannelInfoTable();
        for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
            int version = entry.getValue().getVersion();
            if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
                try {
                    this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
                    log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
                        topic, group, entry.getValue().getClientId());
                } catch (Exception e) {
                    log.error("[reset-offset] reset offset exception. topic={}, group={} ,error={}",
                        topic, group, e.toString());
                }
            } else {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("the client does not support this feature. version="
                    + MQVersion.getVersionDesc(version));
                log.warn("[reset-offset] the client does not support this feature. channel={}, version={}",
                    RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
                return response;
            }
        }
    } else {
        String errorInfo =
            String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
                requestHeader.getGroup(),
                requestHeader.getTopic(),
                requestHeader.getTimestamp());
        log.error(errorInfo);
        response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
        response.setRemark(errorInfo);
        return response;
    }
    response.setCode(ResponseCode.SUCCESS);
    ResetOffsetBody resBody = new ResetOffsetBody();
    resBody.setOffsetTable(offsetTable);
    response.setBody(resBody.encode());
    return response;
}

可以看到是获取服务端的消费位点,然后设置消费位点,然后通过发送请求通知所有client,通知他们修改他们本地的消费位点.所以这里重置消费位点失败有几种情况

  1. topic不存在
  2. 消费者不存在
  3. 没有连接的消费者

这里需要注意即使消费者有消息堆积,消费者没有连接到broker,也是会重置消费位点失败的。
这里重置消费位点实际还是还是通知所有client用心的消费位点去broker拉去消息。不是去修改broker的消费位点

我们来看看最终通知client的处理逻辑
网络请求码是220

public static final int RESET_CONSUMER_CLIENT_OFFSET = 220;

  • ClientRemotingProcessor
this.mqClientFactory.resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), offsetTable);

public synchronized void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
    DefaultMQPushConsumerImpl consumer = null;
    try {
        MQConsumerInner impl = this.consumerTable.get(group);
        if (impl instanceof DefaultMQPushConsumerImpl) {
            consumer = (DefaultMQPushConsumerImpl) impl;
        } else {
            log.info("[reset-offset] consumer dose not exist. group={}", group);
            return;
        }
        consumer.suspend();

        ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
        for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {
            MessageQueue mq = entry.getKey();
            if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
                ProcessQueue pq = entry.getValue();
                pq.setDropped(true);
                pq.clear();
            }
        }

        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException ignored) {
        }

        Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();
        while (iterator.hasNext()) {
            MessageQueue mq = iterator.next();
            Long offset = offsetTable.get(mq);
            if (topic.equals(mq.getTopic()) && offset != null) {
                try {
                    consumer.updateConsumeOffset(mq, offset);
                    consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
                    iterator.remove();
                } catch (Exception e) {
                    log.warn("reset offset failed. group={}, {}", group, mq, e);
                }
            }
        }
    } finally {
        if (consumer != null) {
            consumer.resume();
        }
    }
}

总的处理流程如下:

  1. 获取到需要重置消费位点的消费者,然后暂停消费

  2. 获取消费者的消息处理队列表processQueueTable,遍历 processQueueTable 中的条目,对于满足条件 topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)的消息队列 mq,执行以下操作
    a. 将对应的消息处理队列 pq 设置为已丢弃状态,即 pq.setDropped(true)
    b. 清空消息处理队列 pq,即 pq.clear()。

  3. 线程休眠10s

  4. 遍历processQueueTable,对于满足条件topic.equals(mq.getTopic()) && offset != null的消息队列 mq,执行以下操作:
    a. 从 offsetTable 中获取对应消息队列 mq 的消费位点 offset
    b. 尝试更新消费者的消费位点为offset,即 consumer.updateConsumeOffset(mq, offset)
    c. 从消费者的消息队列重新平衡实现中移除不必要的消息队列,即 consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq))
    d. 使用迭代器的 remove()方法从processQueueTable中移除当前的消息队列mq

  5. 恢复消费

总结

总的来说RocketMQ的消费位点如果是客户端管理,重置消费位点是由客户端发起,发送到broker,最终还是由broker去通知所有broker去更新本地消费位点

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/733487.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于matlab使用虚幻引擎模拟环境设计激光雷达SLAM算法(附源码)

一、前言 此示例演示如何记录来自 3D 仿真环境的合成激光雷达传感器数据&#xff0c;以及如何使用记录的数据开发同步定位和映射 &#xff08;SLAM&#xff09; 算法。 自动驾驶工具箱在Simulink中集成了虚幻引擎模拟环境。与此仿真环境相关的 Simulink 模块可以在库中找到。…

C++常用库函数——表

文章目录 1、常用数学函数2、常用字符串处理函数3、其他常用函数4、实现键盘和文件输入/输出的成员函数 1、常用数学函数 头文件 #include 或者 #include <math.h> 2、常用字符串处理函数 头文件 #include 或者 #include <string.h> 3、其他常用函数 头文件#i…

电动汽车充电站监控系统设计_kaic

1 绪论 1.1 引言 汽车工业的告诉发展&#xff0c;汽车带来的环境污染、能源短缺、资源枯竭和安全等方面的问题越来越突出。为了保持国民经济的可持续发展&#xff0c;保护人类居住环境和能源供给&#xff0c;各国政府不惜巨资&#xff0c;投入大量人力、物力&#xff0c;寻…

Docker更新后无法启动容器

前提&#xff1a; 由于使用的操作系统版本比较低&#xff0c;centos7.3&#xff0c;准备更新操作系统&#xff0c;随后执行了yum update 操作&#xff0c;结果&#xff0c;docker也从忘记了的某个版本更新到了最新的 24版本&#xff0c;导致使用的容器也没有了&#xff0c;这下…

【离散数学】实践三 PageRank体验

文章目录 PageRank算法工作原理简化版本完整版本 graphs分析graphs代码 [^4] 结语 PageRank算法工作原理 算法原理1 简化版本 若页面B,C,D…N都有链接到页面A上&#xff0c;则页面A的PageRank值如下&#xff1a; P R ( A ) ( P R ( B ) L ( B ) P R ( C ) L ( C ) P R (…

ES6: 模版字符串

前言: ES5 中我们表示字符串的时候使用 或者 "" 作用: 在 ES6 中&#xff0c;我们还有一个东西可以表示字符串&#xff0c;就是 &#xff08;反引号&#xff09; let str hello worldconsole.log(typeof str) // string和单引号还有双引号的区别: 反引号可以换行…

【设计模式】设计模式简介+七大设计原则介绍

设计模式介绍 简介 设计模式是对软件设计中普遍存在(反复出现) 的各种问题&#xff0c;所提出的解决方案。 为什么学习设计模式 软件要做大&#xff0c;要先进行设计&#xff0c;才能保证其有高的稳定性、扩展性&#xff08;容易按照新的需求添加功能&#xff09;、复用性&…

MySQL-分库分表详解(五)

♥️作者&#xff1a;小刘在C站 ♥️个人主页&#xff1a; 小刘主页 ♥️努力不一定有回报&#xff0c;但一定会有收获加油&#xff01;一起努力&#xff0c;共赴美好人生&#xff01; ♥️学习两年总结出的运维经验&#xff0c;以及思科模拟器全套网络实验教程。专栏&#xf…

前后端跨域的解决方案

对于使用Spring Boot和Vue的前后端分离项目&#xff0c;下面介绍四种常见的解决跨域问题的方式&#xff0c;并分别从Java后端的两种权限配置方法、前端代理、以及Nginx代理进行说明&#xff1a; 方案提供 使用Spring Boot的权限配置方法&#xff1a; CorsFilter过滤器&#xf…

ChatGPT 最佳实践指南之:提供参考文本

Provide reference text 提供参考文本 GPTs can confidently invent fake answers, especially when asked about esoteric topics or for citations and URLs. In the same way that a sheet of notes can help a student do better on a test, providing reference text to G…

图论之寻找桥边

目录 ①基准法 ②并查集 ③逆向思维之标记环边 ④并查集压缩路径 ①基准法 在图论中&#xff0c;一条边被称为“桥”代表这条边一旦被删除&#xff0c;这张图的连通块数量会增加。等价地说&#xff0c;一条边是一座桥当且仅当这条边不在任何环上。一张图可以有零或多座桥。…

FFmpeg5.0源码阅读—— av_read_frame

摘要&#xff1a;本文主要描述了FFmpeg中用于打开编解码器接口av_read_frame的具体调用流程&#xff0c;详细描述了该接口被调用时所作的具体工作。   关键字&#xff1a;ffmpeg、av_read_frame   读者须知&#xff1a;读者需要了解FFmpeg的基本使用流程&#xff0c;以及一…

JVM理论(二)类加载子系统

类加载流程 类加载流程 类加载器子系统负责从文件系统或者网络中加载class文件,class文件的文件头有特定的文件标识(CAFEBABE是JVM识别class文件是否合法的依据)classLoader只负责文件的加载,而执行引擎决定它是否被执行加载类的信息存放在运行时数据区的方法区中,方法区还包括…

【终端增强工具】这次,我把Terminal(终端)也接入ChatGPT了...

大家好&#xff0c;我是萌新程序员豆小匠。 为terminal&#xff08;终端&#xff09;增加自定义命令这个想法从开始学编程的时候就有了&#xff0c;但是一直没有付诸行动。 这次&#xff0c;终于抽时间完成了&#xff0c;且代码开源&#xff01; 实现的功能 先说下实现的功能…

Idea社区版创建SpringBoot

一 下载Spring Initalizr and Assistant插件 选择左上角的File->Settings->Plugins&#xff0c;在搜索框中输入Spring&#xff0c;出现的第一个Spring Boot Helper插件&#xff0c;点击Installed&#xff0c;下载插件。&#xff08;这里已经下载&#xff09; 二 创建Spr…

Python学习笔记【01-基础语法】

文章目录 第一章输入输出print转义字符字符编码标识符和保留字变量数据类型整形浮点型布尔类型字符串类型数据类型转换 注释 第二章input()函数运算符算术运算符赋值运算符比较运算符布尔运算符(逻辑运算符)位运算符运算符的优先级 第三章顺序结构对象的布尔值单分支结构双分支…

dSYM文件是什么 ?

Overview 概述 dSYM的全称是debug symbol, 所以dSYM文件就是debug symbol (dSYM) file, dSYM文件中的每个 debug symbol, 对应了一个关系, 这个关系一端是源代码里真实的符号(类名称、全局变量以及方法和函数名称), 另一端则是其定义的文件号和行号. Apple Documentation 苹果…

word批量替换时使用通配符

替换这个好操作&#xff0c;打开替换功能&#xff1a; 弹出窗口中分别输入要替换的对象和替换的结果&#xff1a; 这里主要介绍特殊的替换&#xff1a; 可以看到点击“更多”之后&#xff0c;会出现下面的选项 像是【区分大小写】、【全字匹配】、【同音】、【查找单词的所有形…

三张表学会MySQL的单表操作!

表单一信息 1、查询表中所有学生的信息 mysql> select * from student; 2、 查询表中所有学生的姓名和英语成绩 mysql> select name,english from student; 3、过滤表中的重复数据 mysql> select DISTINCT * from student; 4、统计每个学生的总分 mysql> sele…

大语言模型高效训练基础知识:优化器AdamW和Adafator

Prerequsite:Adam优化算法 Adam优化算法很长一段时间都是比较主流的参数更新算法&#xff0c;也有很多变种&#xff0c;本文介绍在大模型训练过程中使用的AdamW和Adafator AdamW 原论文&#xff1a;Decoupled Weight Decay Regularization AdamW指的是Adam Weight Decay&#…