【RocketMQ】(五)消息的消费

news2024/12/28 5:22:57

消费者从Broker拉取到消息之后,会将消息提交到线程池中进行消费,RocketMQ消息消费是批量进行的,如果一批消息的个数小于预先设置的批量消费大小,直接构建消费请求ConsumeRequest将消费请求提交到线程池处理,否则需要分批构建进行提交。

消息消费

在消息被提交到线程池后进行处理时,会调用消息监听器的consumeMessage进行消息消费,它返回消息的消费结果状态,状态有两种分别为CONSUME_SUCCESSRECONSUME_LATER

  • CONSUME_SUCCESS:表示消息消费成功。
  • RECONSUME_LATER:表示消费失败,稍后延迟重新进行消费。

处理消息消费结果

设置ackIndex

在消息消费完毕之后,会根据consumeMessage方法返回的结果状态进行处理,对ackIndex的值进行设置,ackIndex的值用于在下一步中处理消费失败的消息。

前面可知消费结果状态有以下两种:

  • CONSUME_SUCCESS:消息消费成功,此时ackIndex设置为消费的总消息个数 - 1,表示消息都消费成功。
  • RECONSUME_LATER:消息消费失败,延迟进行消费,此时ackIndex值为-1。

二、处理消费失败的消息

广播模式

广播模式下,如果消息消费失败,只将失败的消息打印出来不做其他处理。

集群模式

开启for循环,初始值为i = ackIndex + 1,结束条件为i < consumeRequest.getMsgs().size(),上面可知ackIndex有两种情况:

消费成功:ackIndex值为消息大小-1,此时ackIndex + 1的值等于消息的个数大小,不满足for循环的执行条件,相当于消息都消费成功,不需要进行失败的消息处理。
延迟消费:ackIndex值为-1,此时ackIndex+1为0,满足for循环的执行条件,从第一条消息开始遍历到最后一条消息,向Broker发送CONSUMER_SEND_MSG_BACK请求,如果发送成功Broker会根据延迟等级,放入不同的延迟队列中,到达延迟时间后,消费者将会重新进行拉取,如果发送失败,消费次数加1,并加入到失败消息列表中,稍后重新提交到消息消费线程池进行处理。

发送CONSUMER_SEND_MSG_BACK请求

延迟级别

RocketMQ的延迟级别对应的延迟时间常量定义如下:

public class MessageStoreConfig {
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
}

延迟级别与延迟时间对应关系:
延迟级别0 —> 对应延迟时间1s,也就是延迟1秒后消费者重新从Broker拉取进行消费
延迟级别1 —> 延迟时间5s
延迟级别2 —> 延迟时间10s

以此类推,最大的延迟时间为2h。

在向Broker发送CONSUMER_SEND_MSG_BACK请求的时候,会从上下文中获取设置的延迟级别(默认为0,也就是延迟1s),然后设置以下信息,向Broker发送请求:

  • 设置请求类型,请求类型为CONSUMER_SEND_MSG_BACK
  • 设置消费者组名称;
  • 设置消息在CommitLog中的偏移量;
  • 设置延迟级别;
  • 设置消息的ID;
  • 设置该消息的最大消费次数;
Broker对CONSUMER_SEND_MSG_BACK请求处理

Broker对CONSUMER_SEND_MSG_BACK类型的请求处理逻辑如下:

  1. 根据消费组获取该消费者组的订阅信息配置;
  2. 根据消费者组名称获取对应的重试主题;
  3. 从该消费者组的重试队列中随机选取一个队列;
  4. 根据消息在CommitLog中的偏移量从commitLog文件中获取消息内容;
  5. 判断消息的消费次数是否大于等于最大消费次数 或者 延迟等级小于0:
    • 如果条件满足,表示需要把消息放入到死信队列DLQ中,此时从死信队列中随机选取一个队列;
    • 如果条件不满足,判断延迟级别是否为0,如果为0的话,会使用消息的消费次数作 + 3为新的延迟级别进行延迟消费;
  6. 新建消息对象MessageExtBrokerInner,设置消息的相关信息,此时相当于生成了一个全新的消息(会设置之前消息的ID),重新添加到CommitLog中,消息主题的设置有两种情况:
    • 达到了加入DLQ队列的条件,此时主题为DLQ主题(%DLQ% + 消费组名称),消息之后会添加到选取的DLQ队列中;
    • 未达到DLQ队列的条件,设置延迟级别,使用重试主题(%RETRY% + 消费组名称),之后将消息投递到此主题下的队列中;
  7. 调用asyncPutMessage存储消息;

asyncPutMessage方法中,会对延迟级别进行判断,如果延迟时间级别大于0,说明消息需要延迟消费,此时做如下处理:

  1. 获取延迟消息的主题名称,RocketMQ对延迟消息有一个默认的主题名称SCHEDULE_TOPIC_XXXX;
  2. 根据消息设置的延迟级别,获取对应的延迟队列,SCHEDULE_TOPIC_XXXX主题下,会根据延迟级别创建对应的消息队列,所以这一步会根据消息的延迟级别投递到对应的队列中;
  3. 在消息属性中,设置消息原本的主题名称和消息队列,然后将消息当前的Topic改成RMQ_SYS_SCHEDULE_TOPIC

总结
消费者在消息消费失败的时候,会向Broker发送CONSUMER_SEND_MSG_BACK请求,在请求处理中会判断消息的消费次数是否大于最大的消费次数,如果超过最大消费次数,会将消息投递到死信队列中。
如果未达到最大的消费次数,会根据请求中设置的延迟级别,重新生成一条消息,使用重试主题(%RETRY% + 消费组名称),并随机选取一个队列投递消息,延迟进行消费,不过消息不会立刻投递到队列中,在消息存储之前会对延迟级别进行判断,如果需要延迟消费,会使用RocketMQ默认创建的SCHEDULE_TOPIC_XXXX主题,先根据延迟级别将消息投递到对应的延迟队列中,然后由一个定时任务去检测这个主题下的消息,当消息到达延迟的时间后,再将消息取出投递到原本主题下的消息队列中,之后的流程就与普通消息的存储一致,将消息存入CommitLog中,再创建对应的ConsumeQueue数据,消费者就可以拉取到消息重新进行消费。

消费者在启动的时候,会处理订阅的Topic数据,如果是集群模式,会自动添加重试主题的订阅(%RETRY% + 消费组名称),然后就可以从重试主题中拉取到对应的重试消息进行消费。

更新拉取偏移量

以上步骤处理完毕后,首先调用removeMessage从处理队列中移除消息并返回拉取消息的偏移量,然后更新拉取偏移量。

RocketMQ消费模式分为广播模式和集群模式,广播模式下消费进度保存在每个消费者端,集群模式下消费进度保存在Broker端。

广播模式
广播模式对应的OffSetStore实现类为LocalFileOffsetStore,使用了一个ConcurrentMap类型的变量offsetTable存储每个消息队列对应的拉取偏移量,KEY为消息队列,value为该消息队列对应的拉取偏移量。
在更新拉取进度的时候,对offsetTable中的值进行更新,需要注意这里只是更新了offsetTable中的数据,并没有持久化到磁盘。

集群模式
集群模式对应的实现类为RemoteBrokerOffsetStore,更新进度与广播模式下的更新类似,都是只更新了offsetTable中的数据。

持久化的触发
消费者在启动的时候注册了定时任务,定时将消息拉取进度进行持久化,对于广播模式,将每个消息队列对应的拉取偏移量持久化到本地文件即可,对于集群模式,由于拉取进度保存在Broker端,所以需要向Broker发送请求进行持久化,在RocketMQ的存储目录中有一个对应的文件,叫consumerOffset.json,里面的offsetTable中保存了每个消息队列的消费进度,持久化时会将消费进度写入这个文件:

{
	"offsetTable":{
		"TestTopic@TestTopicGroup":{ // 主题名称@消费者组名称
            0:0, // 每个消息队列对应的消费进度,Key中的0表示队列0,value中的0表示消息在ConsumeQueue中的逻辑偏移量
            1:1,
            2:1,
            3:0  
		}
	}
}

RocketMQ消息的消费相关源码可参考:【RocketMQ】【源码】消息的消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1036145.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Codeforces Round #898 (Div. 4)

首先庆祝自己上了绿名&#x1f384;&#x1f384;&#x1f384;&#x1f384;&#x1f384;&#x1f384;&#x1f384;&#x1f384;&#x1f384;&#x1f384;&#x1f384;&#x1f384;&#x1f384;&#x1f384;&#x1f384; 1873A - Short Sort 1873B - Good Kid c…

Verilog 不同编码风格对综合电路的影响

文章目录 示例 #1示例 #2示例 #3 Verilog是一种硬件描述语言&#xff08;HDL&#xff09;&#xff0c;用于设计数字电路和系统。统一、良好的代码编写风格&#xff0c;可以提高代码的可维护性和可读性。 同样的功能&#xff0c;不同的Verilog 编码风格也会对综合过程产生重大影…

前端uniapp如何转base64使用uniapp插件市场

插件市场 网址 使用 可以下载&#xff0c;也可以引用&#xff0c;我是下载下来的引用 代码 正常使用 pathToBase64(img).then(path > {img pathresolve(path)}).catch(error > {console.error(error)reject(error)})使用出现[object Promise]错误 解决方法 let img …

《动手学深度学习 Pytorch版》 7.4 含并行连接的网络(GoogLeNet)

import torch from torch import nn from torch.nn import functional as F from d2l import torch as d2l7.4.1 Inception块 GoogLNet 中的基本卷积块叫做 Inception 块&#xff08;大概率得名于盗梦空间&#xff09;&#xff0c;由 4 条并行路径组成。 前 3 条路径使用窗口…

Spring面试题17:Spring中什么是bean装配?有哪几种自动装配方式?自动装配有哪些局限性?

该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 面试官:Spring中什么是bean装配? 在Spring中,Bean装配是指将一个或多个Bean实例化、配置和组合在一起的过程。它是Spring容器的核心功能之一,通过Bean装…

宏基因组做在线组间差异Anosim分析与结果解读

写在前面 需求是返回的结果每组样本数量比较多&#xff0c;但组间差异不明显&#xff0c;挑出来重分析。详情请参考 [组间差异分析&#xff1a;Anosim]([组间差异分析&#xff1a;Anosim - 知乎 (zhihu.com))&#xff0c; 当然亲测更简便的方法可以借助云平台工具&#xff1a;…

ROS2 的行为树 — 第 1 部分:解锁高级机器人决策和控制

一、说明 在复杂而迷人的机器人世界中&#xff0c;行为树&#xff08;BT&#xff09;已成为决策过程中不可或缺的一部分。它们提供了一种结构化、模块化和高效的方法来对机器人的行为进行编程。BT起源于视频游戏行业&#xff0c;用于控制非玩家角色&#xff0c;他们在机器人领域…

面试官:为什么说HTTPS比HTTP安全? HTTPS是如何保证安全的?

公众号 小册 这是我整理的学习资料&#xff0c;非常系统和完善&#xff0c;欢迎一起学习 现代JavaScript高级小册 深入浅出Dart 现代TypeScript高级小册 linwu的算法笔记&#x1f4d2; 一、安全特性 在前文中&#xff0c;我们已经了解到HTTP在通信过程中存在以下问题&…

【pytest】 allure 生成报告

1. 下载地址 官方文档; Allure Framework 参考文档&#xff1a; 最全的PytestAllure使用教程&#xff0c;建议收藏 - 知乎 https://github.com/allure-framework 1.2安装Python依赖 windows&#xff1a;pip install allure-pytest 2. 脚本 用例 import pytest class …

【Hash表】字母异位词分组-力扣 49 题

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kuan 的首页,持续学…

Linux 文件 目录管理 链接

Linux 文件 基本属性 Linux 系统是一种典型的多用户系统&#xff0c;为了保护系统的安全性&#xff0c;不同的用户拥有不同的地位和权限。Linux 系统对不同的用户访问同一文件&#xff08;包括目录文件&#xff09;的权限做了不同的规定。 可以使用命令&#xff1a;ll 或 ls –…

vue点击容器外隐藏元素(点击非本身以外的部分隐藏元素)

如图点击蓝色边框以外任意地方隐藏蓝色边框容器&#xff08;不使用输入框的失焦事件&#xff09; 实现思路&#xff1a; 获取到dom节点然后通过其contains方法来判断点击的地方是否为其子元素或其本身 原生js获取dom跟vue的$el都可以实现 也可以通过vue的this.$refs.showBox…

MinGW相关错误

1、go编译c报错 cc1.exe: sorry, unimplemented: 64-bit mode not compiled in 参考&#xff1a;BeifangCc go编译c报错 cc1.exe: sorry, unimplemented: 64-bit mode not compiled in 说明当前gcc是32位&#xff0c;无法在当前64位机器上正常工作&#xff0c;需要更新gcc 下载…

Unity打包出来的APK文件有问题总结

一、Unity打包出来的APK文件安装失败&#xff0c;提示安装失败&#xff08;-108&#xff09;&#xff0c;或者是提示“包含病毒&#xff1a;a.gray.Bulimia.b” 有可能是遇到如上图所示的问题&#xff0c;提示安装失败&#xff08;-108&#xff09;。 有可能是遇到如上图所示的…

java入坑之Jsoup(待补充)

一、快速入门 1.1配置 <dependency><groupId>org.jsoup</groupId><artifactId>jsoup</artifactId><version>1.16.1</version> </dependency>1.2解析xml Jsoup&#xff1a;jsoup 是一款Java 的HTML解析器&#xff0c;可直接解…

Docker HarborDocker Registry

目录 介绍 Harbor和Registry的比较 搭建Dokcer Harbor Docker Registry安装 介绍 Harbor&#xff0c;是一个英文单词&#xff0c;意思是港湾&#xff0c;港湾是干什么的呢&#xff0c;就是停放货物的&#xff0c;而货物呢&#xff0c;是装在集装箱中的&#xff0c;说到集装…

台式COD快速测定仪操作说明

实验室检测水中COD指标需要消解&#xff0c;要准备好实验室多参数水质测定仪和配套智能型的消解器。所有配件准备齐全就可以进行水样的检测&#xff0c;检测流程以及操作说明如下图&#xff1a; 仪器的选定也需要根据实际的情况进行选择&#xff0c;最好选择指标可以定制的仪器…

软件测试(测试用例攻略)—写用例无压力

一、概念 测试用例的基本概念&#xff1a; 测试用例&#xff08;Test Case&#xff09;是为了实施测试而向被测试的系统提供的一组集合&#xff0c;这组集合包含&#xff1a;测试环境、操作步骤、测试数据、预期结果等要素 。 主要步骤&#xff1a; 测试环境——测试步骤—…

stack栈、queue队列、list链表容器

目录 stack栈容器 stack概念和定义 stack构造函数: stack数据操作: queue队列容器 queue概念和定义 queue构造函数 queue数据操作 list链表容器 list概念和定义 list构造函数 list赋值和交换 list大小操作 list插入和删除 list数据储存 list反转和排序 stack栈…

2023年汉字小达人区级比赛倒计时2天,最新问题解答和真题练一练

今天是9月23日&#xff0c;距离2023年第十届汉字小达人区级比赛&#xff08;初赛&#xff09;的自由报名参赛时间还有2天&#xff0c;六分成长结合家长和小朋友们问的比较多的问题进行解答&#xff0c;并提供一些真题供大家练习、了解比赛题型和规则。 问题1&#xff1a;2023年…