阿里二面:RocketMQ 消费者拉取一批消息,其中部分消费失败了,偏移量怎样更新?

news2025/2/25 8:19:34

大家好,我是君哥。

最近有读者参加面试时被问了一个问题,如果消费者拉取了一批消息,比如 100 条,第 100 条消息消费成功了,但是第 50 条消费失败,偏移量会怎样更新?就着这个问题,今天来聊一下,如果一批消息有消费失败的情况时,偏移量怎么保存。

1 拉取消息

1.1 封装拉取请求

以 RocketMQ 推模式为例,RocketMQ 消费者启动代码如下:

public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() {  @Override  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {   try{    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);   }catch (Exception e){    return ConsumeConcurrentlyStatus.RECONSUME_LATER;   }   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  } }); consumer.start();}

上面的 DefaultMQPushConsumer 是一个推模式的消费者,启动方法是 start。消费者启动后会触发重平衡线程(RebalanceService),这个线程的任务是在死循环中不停地进行重平衡,最终封装拉取消息的请求到 pullRequestQueue。这个过程涉及到的 UML 类图如下:

1.2 处理拉取请求

封装好拉取消息的请求 PullRequest 后,RocketMQ 就会不停地从 pullRequestQueue 获取消息拉取请求进行处理。UML 类图如下:

拉取消息的入口方法是一个死循环,代码如下:

//PullMessageServicepublic void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) {  try {   PullRequest pullRequest = this.pullRequestQueue.take();   this.pullMessage(pullRequest);  } catch (InterruptedException ignored) {  } catch (Exception e) {   log.error("Pull Message Service Run Method exception", e);  } } log.info(this.getServiceName() + " service end");}

这里拉取到消息后,提交给 PullCallback 这个回调函数进行处理。

拉取到的消息首先被 put 到 ProcessQueue 中的 msgTreeMap 上,然后被封装到 ConsumeRequest 这个线程类来处理。把代码精简后,ConsumeRequest 处理逻辑如下:

//ConsumeMessageConcurrentlyService.javapublic void run() { MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; try {  //1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的  status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { } if (!processQueue.isDropped()) {  //2.处理消费结果  ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else {  log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); }}

2 处理消费结果

2.1 并发消息

并发消息处理消费结果的代码做精简后如下:

//ConsumeMessageConcurrentlyService.javapublic void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest) { int ackIndex = context.getAckIndex(); switch (status) {  case CONSUME_SUCCESS:   if (ackIndex >= consumeRequest.getMsgs().size()) {    ackIndex = consumeRequest.getMsgs().size() - 1;   }   int ok = ackIndex + 1;   int failed = consumeRequest.getMsgs().size() - ok;   break;  case RECONSUME_LATER:   break;  default:   break; } switch (this.defaultMQPushConsumer.getMessageModel()) {  case BROADCASTING:   for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {   }   break;  case CLUSTERING:   List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());   for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {    MessageExt msg = consumeRequest.getMsgs().get(i);    boolean result = this.sendMessageBack(msg, context);    if (!result) {     msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);     msgBackFailed.add(msg);    }   }   if (!msgBackFailed.isEmpty()) {    consumeRequest.getMsgs().removeAll(msgBackFailed);   }   break;  default:   break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {  this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); }}

从上面的代码可以看出,如果处理消息的逻辑是串行的,比如文章开头的代码使用 for 循环来处理消息,那如果在某一条消息处理失败了,直接退出循环,给 ConsumeConcurrentlyContext 的 ackIndex 变量赋值为消息列表中失败消息的位置,这样这条失败消息后面的消息就不再处理了,发送给 Broker 等待重新拉取。代码如下:

public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() {  @Override  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {   for (int i = 0; i < msgs.size(); i++) {    try{     System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);    }catch (Exception e){     context.setAckIndex(i);     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;    }   }   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        } }); consumer.start();}

消费成功的消息则从 ProcessQueue 中的 msgTreeMap 中移除,并且返回 msgTreeMap 中最小的偏移量(firstKey)去更新。注意:集群模式偏移量保存在 Broker 端,更新偏移量需要发送消息到 Broker,而广播模式偏移量保存在 Consumer 端,只需要更新本地偏移量就可以。

如果处理消息的逻辑是并行的,处理消息失败后给 ackIndex 赋值是没有意义的,因为可能有多条消息失败,给 ackIndex 变量赋值并不准确。最好的方法就是给 ackIndex 赋值 0,整批消息全部重新消费,这样又可能带来幂等问题。

2.2 顺序消息

对于顺序消息,从 msgTreeMap 取出消息后,先要放到 consumingMsgOrderlyTreeMap 上面,更新偏移量时,是从 consumingMsgOrderlyTreeMap 上取最大的消息偏移量(lastKey)。

3 总结

回到开头的问题,如果一批消息按照顺序消费,是不可能出现第 100 条消息消费成功了,但第 50 条消费失败的情况,因为第 50 条消息失败的时候,应该退出循环,不再继续进行消费。

如果是并发消费,如果出现了这种情况,建议是整批消息全部重新消费,也就是给 ackIndex 赋值 0,这样必须考虑幂等问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/197758.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Pycharm出现‘Error loading package list:Connection refused: connect’问题

问题描述依次打开File->Settting窗口点击图中号弹出如下错误&#xff1a;Package错误窗口‘Error loading package list:Connection refused: connect 一段时间后继续弹出以下窗口&#xff1a;2.问题分析目前这个问题普遍说是由于网络配置原因引起的&#xff0c;在这之前&am…

【数据结构/C++】 树详解

目录树树的定义树的基本术语二叉树⼆叉树的种类满二叉树完全二叉树二叉树的性质二叉树的遍历方法前序遍历中序遍历后序遍历层序遍历二叉树的实现树 树的定义 树&#xff08;Tree&#xff09;是n&#xff08;n≥0&#xff09;个结点的有限集。n0时称为空树。在任意一颗非空树中…

[Effective Objective] 块与大中枢派发

为了解决多线程问题&#xff0c;苹果公司以全新的方式设计了多线程。核心就是“块”&#xff08;block&#xff09;与“大中枢派发”&#xff08;Grand Central Dispatch, GCD&#xff09;。 “块”是一种可在C、C及Objective-C代码中使用的“词法闭包”&#xff0c;借由此机制…

在一起多少天怎么设置?如何微信推送在一起多少天

马上情人节要到了&#xff0c;你和你的对象在一起多久了&#xff1f;两个人在恋爱中&#xff0c;会需要记录彼此在一起的每一天&#xff0c;特别是一些重要的纪念日比如100天纪念日&#xff0c;365天、或者520天纪念日。市面上有许多工具&#xff0c;可以帮我们记录这些重要的日…

指针空值nullptr(C++11)

在良好的C/C编程习惯中&#xff0c;声明一个变量时最好给该变量一个合适的初始值&#xff0c;否则可能会出现 不可预料的错误&#xff0c;比如未初始化的指针。如果一个指针没有合法的指向&#xff0c;我们基本都是按照如下 方式对其进行初始化&#xff1a;void TestPtr() { in…

【Docker 02】docker镜像和容器命令大全

对于入门学习者,更推荐的方式是通过官网的Reffrence手册,学习使用命令,不仅存在用法,选项参数的解释,还有用力example。 docker命令的基本语法结构: docker 子命令 [选项] [参数] 一、Docker基本命令 1.镜像有关 一批模板文件,不同的镜像可以包含的环境内容是不一样的,…

深入了解多线程原理

目录 背景知识&#xff1a; 什么是进程&#xff1f; 什么是线程&#xff1f; 线程与进程的区别&#xff1a; Thread类及常用方法&#xff1a; 循环打印的例子&#xff1a; start() 和 run() 的区别&#xff1a; 通过监视窗口查看线程&#xff1a; 创建线程&#xff1a; 1.继承 …

console控制台有sql语句输出但log文件中不输出sql解决方式

控制台可以输出sql&#xff0c;但是log文件中无sql输出&#xff0c;如何解决&#xff1f;把握两点就可以输出&#xff1a;第一点&#xff0c;mybatis 本身的logImpl配置这个参数是配置mybatis所使用的日志框架&#xff0c;取值范围如下&#xff1a;SLF4JLOG4J #表示使用LOG4J作…

提名倒计时! | 2022 龙蜥社区优秀贡献者

各位盆友们&#xff1a;2022 年&#xff0c;那些为龙蜥壮大做出杰出贡献的人们&#xff0c;包括开源背后的推动者、组织者、布道者、代码贡献者&#xff0c;让我们看到了热爱技术的力量&#xff01;为此社区推出「2022 龙蜥社区优秀贡献者」活动。截至目前&#xff0c;距离报名…

CSAPP Malloc Lab

CSAPP Malloc Lab 在这个实验室中&#xff0c;您将为C程序编写一个动态存储分配器&#xff0c;即您自己版本的malloc、free和realloc例程&#xff0c;实现一个正确&#xff0c;高效和快速的分配器。本实验性能指标有两个方面&#xff0c;内存利用率和吞吐量&#xff0c;这两个…

fpga图像处理(基于camera的图像读取和显示)

【声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing @163.com】 市面上目前很多的fpga开发板都有camera到lcd的显示demo。处理流程也是很相似的。一般的流程都是fpga首先初始化cmos,接着就是把数据从cmos读出来存储到sdram里面,显示模块再从sdra…

C语言高级教程-C语言数组(六):变长数组

C语言高级教程-C语言数组&#xff08;六&#xff09;&#xff1a;变长数组一、本文的编译环境二、一维数组在执行期间确定长度三、二维数组在执行期间确定长度四、一维变长数组实例五、完整程序5.1 Main.h 文件程序5.2 Main.c 文件程序六、总结一、本文的编译环境 本文的编译环…

压缩包版本快速安装MySQL教程

安装MySQL 跟随老师 狂神学java 学习地址 bilibilihttps://www.bilibili.com/video/BV1NJ411J79W?p1&vd_source69de4cea8c2ffc0f520876695f09a2da 这里建议大家使用压缩版 , 安装快 , 方便 . 不复杂 . 1、软件下载mysql5.7 64位下载地址: https://dev.mysql.com/get/Dow…

数据治理与IT治理的关系

前面我们辨析了数据治理的概念。这一篇文章要讲数据治理与IT治理的关系&#xff0c;首先来看看IT治理的概念。IT治理的理念最早是IBM&#xff08;InternationalBusiness Machines Corporation&#xff0c;国际商业机器公司&#xff09;引入中国的&#xff0c;属于公司治理的一部…

中金公司:全面注册制监管规则解读(附97页报告原文pdf下载链接)

省时查报告-专业、及时、全面的行研报告库省时查方案-专业、及时、全面的营销策划方案库【免费下载】2022年12月份热门报告盘点罗振宇2023年跨年演讲PPT原稿吴晓波2022年年终秀演讲PPT原稿推荐技术在vivo互联网商业化业务中的实践.pdf2023年&#xff0c;如何科学制定年度规划&a…

Spring Batch 批处理数据表

目录 引言 概述 batch_job_instance表 batch_job_execution表 batch_job_execution_context表 batch_job_execution_params表 btch_step_execution表 batch_step_execution_context表 H2内存数据库 转视频版 引言 接着上篇&#xff1a;Spring Batch 步骤对象-返回状…

MybatisPlus多表查询之零sql编写实现

1.前言 年初节奏还没有快起来&#xff0c;适合做做技术前瞻&#xff0c;无论是对个人还是团队都是好事。真要说分享&#xff0c;其实感觉也没啥好分享的&#xff0c;就像接手新项目一样&#xff0c;代码都是自己看&#xff0c;别人讲的再多&#xff0c;不看&#xff0c;不用&am…

OpenMP For Construct dynamic 调度方式实现原理和源码分析

OpenMP For Construct dynamic 调度方式实现原理和源码分析 前言 在本篇文章当中主要给大家介绍 OpenMp for construct 的实现原理&#xff0c;以及与他相关的动态库函数分析&#xff0c;与 for construct 非常相关的是循环的调度方式&#xff0c;在 OpenMP 当中一共有四种调…

KVM,QEMU与libvirt关系

KVM&#xff1a;负责cpu虚拟化内存虚拟化&#xff0c;实现了cpu和内存的虚拟化&#xff0c;但kvm不能模拟其他设备&#xff1b;KVM是linux内核的模块&#xff0c;它需要CPU的支持&#xff0c;采用硬件辅助虚拟化技术Intel-VT&#xff0c;AMD-V&#xff0c;内存的相关如Intel的E…

FPGA纯verilog代码实现8位精简指令集CPU,一学期的微机原理不如看懂这套代码,提供工程源码和技术支持

目录1、前言2、设计思想和架构3、硬件组成讲解4、vivado仿真5、vivado工程6、上板调试验证7、福利&#xff1a;工程源码获取1、前言 本文章主要针对大学本科阶段学生&#xff1b; 读文章之前先来几个灵魂拷问&#xff1a; 1、你是否学过《微机原理》、《单片机》、《汇编语言》…