RocketMQ之消费者带你了解概念和消费流程

news2025/1/13 13:36:08

1. 背景

RocketMQ 的消费可以算是 RocketMQ 的业务逻辑中最复杂的一块。这里面涉及到许多消费模式和特性。本想一篇文章写完,写到后面发现消费涉及到的内容太多,于是决定分多篇来写。本文作为消费系列的第一篇,主要讲述 RocketMQ 消费涉及到的模式和特性,也会概括性地讲一下消费流程。

我将 RocketMQ 的消费流程大致分成 4 个步骤

  1. 重平衡
  2. 消费者拉取消息
  3. Broker 接收拉取请求后从存储中查询消息并返回
  4. 消费者消费消息

每个步骤都会用一篇文章来讲解。

先了解一下 RocketMQ 消费涉及到地概念

2. 概念简述

2.1 消费组概念与消费模式

和大多数消息队列一样,RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。在了解它们之前,需要先引入消费组的概念。

2.1.1 消费组

一个消费者实例即是一个消费者进程,负责消费消息。单个消费者速度有限,在实际使用中通常会采用多个消费者共同消费同样的 Topic 以加快消费速度。这多个消费同样 Topic 的消费者组成了消费者组。

消费组是一个逻辑概念,它包含了多个同一类的消费者实例,通常这些消费者都消费同一类消息(都消费相同的 Topic)且消费逻辑一致。

消费组的引入是用来在消费消息时更好地进行负载均衡和容错。

2.1.2 广播消费模式(BROADCASTING)

广播消费模式即全部的消息会广播分发到所有的消费者实例,每个消费者实例会收到全量的消息(即便消费组中有多个消费者都订阅同一 Topic)。

如下图所示,生产者发送了 5 条消息,每个消费组中的消费者都收到全部的 5 条消息。

广播模式使用较少,适合各个消费者都需要通知的场景,如刷新应用中的缓存。

alt

注意事项:

  1. 广播消费模式下不支持 顺序消息
  2. 广播消费模式下不支持 重置消费位点
  3. 每条消息都需要 被相同订阅逻辑的多台机器处理
  4. 消费进度在客户端维护,出现重复消费的概率稍大于集群模式。如果消费进度文件丢失,存在消息丢失的可能。
  5. 广播模式下,消息队列 RocketMQ 版保证每条消息至少被每台客户端消费一次,但是并 不会重投消费失败的消息,因此业务方需要关注消费失败的情况。
  6. 广播模式下, 客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
  7. 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
  8. 广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 版控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
2.1.3 集群消费模式(CLUSTERING)

集群消费模式下,同一 Topic 下的一条消息只会被同一消费组中的一个消费者消费。也就是说,消息被负载均衡到了同一个消费组的多个消费者实例上。

更具体一点,在同一消费组中的不同消费者会根据负载机制来平均地订阅 Topic 中的每个 Queue。(默认 AVG 负载方式)

alt

RocketMQ 默认使用集群消费模式,这也是大部分场景下会使用到的消费模式。

2.2 消费者拉取消息模式

2.2.1 Pull

指消费者主动拉取消息进行消费,主动从 Broker 拉取消息,主动权由消费者应用控制。

2.2.2 Push

Broker 主动将消息 Push 给消费者,Broker 收到消息就会主动推送到消费者端。该模式的消费实时性较高,也是主流场景中普遍采用的消费形式。

消费者组中的消费者实例会根据预设的负载均衡算法对 Topic 中的 Queue 进行均匀的订阅,每个 Queue 最多只能被一个消费者订阅。

在 RocketMQ 中,Push 消费其实也是由 Pull 消费(拉取)实现。Push 消费只是通过客户端 API 层面的封装让用户感觉像是 Broker 在推送消息给消费者。

2.2.3 POP

RocketMQ 5.0 引入的新消费形式,是 Pull 拉取的另一种实现。也可以在 Push 模式下使用 POP 拉取消息,甚至可以和 Push 模式共同使用(分别消费重试 Topic 和普通 Topic)。

POP 与 Pull 可以通过一个开关实时进行切换。POP 模式下,Broker 来控制每个消费者消费的队列和拉取的消息,把重平衡逻辑从客户端移到了服务端。

主要解决了原来 Push 模式消费的以下痛点:

  • 富客户端:客户端逻辑比较重,多语言支持不友好
  • 队列独占:Topic 中的一个 Queue 最多只能被 1 个 Push 消费者消费,消费者数量无法无限扩展。且消费者 hang 住时该队列的消息会堆积。
  • 消费后更新 offset:本地消费成功才会提交 offset

RocketMQ 5.0 的轻量化 gRPC 客户端就是基于 POP 消费模式开发

2.3 队列负载机制与重平衡

在集群消费模式下,消费组中的消费者共同消费订阅的 Topic 中的所有消息,这里就存在 Topic 中的队列如何分配给消费者的问题。

2.3.1 队列负载机制

RocketMQ Broker 中的队列负载机制将一个 Topic 的不同队列按照算法尽可能平均地分配给消费者组中的所有消费者。RocketMQ 预设了多种负载算法供不同场景下的消费。

AVG:将队列按数量平均分配给多个消费者,按 Broker 顺序先分配第一个 Broker 的所有队列给第一个消费者,然后给第二个。

AVG_BY_CIRCLE:将 Broker 上的队列轮流分给不同消费者,更适用于 Topic 在不同 Broker 之间分布不均匀的情况。

默认采用 AVG 负载方式。

2.3.2 重平衡(Rebalance)

为消费者分配队列消费的这一个负载过程并不是一劳永逸的,比如当消费者数量变化、Broker 掉线等情况发生后,原先的负载就变得不再均衡,此时就需要重新进行负载均衡,这一过程被称为重平衡机制。

每隔 20s,RocketMQ 会进行一次检查,检查队列数量、消费者数量是否发生变化,如果变化则触发消费队列重平衡,重新执行上述负载算法。

2.4 消费端高可靠

2.4.1 重试-死信机制

在实际使用中,消息的消费可能出现失败。RocketMQ 拥有重试机制和死信机制来保证消息消费的可靠性。

  1. 正常消费:消费成功则提交消费位点

  2. 重试机制:如果正常消费失败,消息会被消费者发回 Broker,放入重试 Topic: %RETRY%消费者组。最多重试消费 16 次,重试的时间间隔逐渐变长。(消费者组会自动订阅重试 Topic)。

    这里地延迟重试采用了 RocketMQ 的延迟消息,重试的 16 次时间间隔为延迟消息配置的每个延迟等级的时间(从第三个等级开始)。如果修改延迟等级时间的配置,重试的时间间隔也会相应发生变化。但即便延迟等级时间间隔配置不足 16 个,仍会重试 16 次,后面按照最大的时间间隔来重试。

  3. 死信机制:如果正常消费和重试 16 次均失败,消息会保存到死信 Topic %DLQ%消费者组 中,此时需人工介入处理

2.4.2 队列负载机制与重平衡

当发生 Broker 挂掉或者消费者挂掉时,会引发重平衡,可以自动感知有组件挂掉的情况并重新调整消费者的订阅关系。

2.5 并发消费与顺序消费

在消费者客户端消费时,有两种订阅消息的方式,分别是并发消费和顺序消费。广播模式不支持顺序消费,仅有集群模式能使用顺序消费。

需要注意的是,这里所说的顺序消费指的是队列维度的顺序,即在消费一个队列时,消费消息的顺序和消息发送的顺序一致。如果一个 Topic 有多个队列, 是不可能达成 Topic 级别的顺序消费的,因为无法控制哪个队列的消息被先消费。Topic 只有一个队列的情况下能够实现 Topic 级别的顺序消费。

具体顺序生产和消费代码见 官方文档。

顺序生产的方式为串行生产,并在生产时指定队列。

并发消费的方式是调用消费者的指定 MessageListenerConcurrently 作为消费的回调类,顺序消费则使用 MessageListenerOrderly 类进行回调。处理这两种消费方式的消费服务也不同,分别是 ConsumeMessageConcurrentlyServiceConsumeMessageOrderlyService

顺序消费的大致原理是依靠两组锁,一组在 Broker 端(Broker 锁),锁定队列和消费者的关系,保证同一时间只有一个消费者在消费;在消费者端也有一组锁(消费队列锁)以保证消费的顺序性。

2.6 消费进度保存和提交

消费者消费一批消息完成之后,需要保存消费进度。如果是集群消费模式,还需要将消费进度让其他消费者知道,所以需要提交消费进度。这样在消费者重启或队列重平衡时可以根据消费进度继续消费。

不同模式下消费进度保存方式的不同:

  1. 广播模式:保存在 消费者本地。因为每个消费者都需要消费全量消息消息。在 LocalfileOffsetStore 当中。
  2. 集群模式:保存在 Broker,同时消费者端缓存。因为一个 Topic 的消息只要被消费者组中的一个消费者消费即可,所以消息的消费进度需要统一保存。通过 RemoteBrokerOffsetStore 存储。

集群模式下,消费者端有定时任务,定时将内存中的消费进度提交到 Broker,Broker 也有定时任务将内存中的消费偏移量持久化到磁盘。此外,消费者向 Broker 拉取消息时也会提交消费偏移量。注意,消费者线程池提交的偏移量是线程池消费的这一批消息中偏移量最小的消息的偏移量。

  1. 消费完一批消息后将消息消费进度存在本地内存
  2. 消费者中有一个定时线程,每 5s 将内存中所有队列的消费偏移量提交到 Broker
  3. Broker 收到消费进度先缓存到内存,有一个定时任务每隔 5s 将消息偏移量持久化到磁盘
  4. 消费者向 Broker 拉取消息时也会将队列的消息偏移量提交到 Broker

3. 消费流程

这张图是阿里云的文章讲解消费时用到的,能够清晰地表示客户端 Push 模式并发消费流程。

alt 从左上角第一个方框开始看

  1. 消费者启动时唤醒重平衡服务 RebalanceService,重平衡服务是客户端开始消费的起点。
  2. 重平衡服务会周期性(每 20s)执行重平衡方法 doRebalance),查询所有注册的 Broker,根据注册的 Broker 数量为自身分配负载的队列 rebalanceByTopic()
  3. 分配完队列后,会为每个分配到的新队列创建一个消息拉取请求 pullRequest,这个拉取请求中保存一个处理队列 processQueue,即图中的红黑树( TreeMap),用来保存拉取到的消息。红黑树保存消息的顺序。
  4. 消息拉取线程应用生产-消费模式,用一个线程从拉取请求队列 pullRequestQueue 中弹出拉取请求,执行拉取任务,将拉取到的消息放入处理队列。
  5. 拉取请求在一次拉取消息完成之后会复用,重新被放入拉取请求队列 pullRequestQueue
  6. 拉取完成后,在 NettyClientPublicExecutorThreadPool 线程池异步处理结果,将拉取到的消息放入处理队列,然后调用 consumeMessageService.submitConsumeRequest,将处理队列和 多个消费任务提交到消费线程池。每个消费任务消费 1 批消息(1 批默认为 1 条)
  7. 每个消费者都有一个消费线程池 consumeMessageThreadPool ,默认有 20 个消费线程。
  8. 消费线程池的每个消费线程会尝试从消费任务队列中获取消费请求,执行消费业务逻辑 listener.consumeMessage
  9. 消费完成后,如果消费成功,则更新偏移量 updateOffset(先更新到内存 offsetTable,定时上报到 Broker。Broker 端也先放到内存,定时刷盘)。

参考资料

  • 官方文档——设计
  • RocketMQ 实战与进阶——丁威
  • RocketMQ消费消息——白云鹏
  • 消息中间件—RocketMQ消息消费(一)——癫狂侠
  • RocketMQ 消息接受流程——赵坤
  • RocketMQ 消息消费——贝贝猫
  • RocketMQ 5.0 POP 消费模式探秘
  • RocketMQ消息消费源码分析
  • Rocketmq消费消息原理——服务端技术栈
  • RocketMQ——4. Consumer 消费消息——Kong @@##@@

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1906383.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网络规划与设计————期末复习

一、选择题(每题1分) 1、光纤线组建的标准以太网是______。 A.10BASE-5 B.10BASE-2 C.10BASE-T D.10BASE-F 其实也很好记,光纤的英文是 "Fiber Optic",双绞线的英文是 "Twisted Pair"。 5呢…

Redis核心问题总结(一)

1、为什么要使用Redis做缓存 缓存的好处 使用缓存的目的就是提升读写性能。而实际业务场景下,更多的是为了提升读性能,带来更好的性 能,带来更高的并发量。Redis 的读写性能比 Mysql 好的多,我们就可以把 Mysql 中的热点数据缓 …

Python28-8 GBM梯度提升算法

梯度提升算法(Gradient Boosting Machine,GBM)是一种集成学习方法,通过逐步构建一系列简单模型(通常是决策树),并结合这些模型来提高整体预测性能。GBM广泛用于回归和分类任务,因为它…

端到端自动驾驶新突破:Nvidia提出全并行PARA-Drive,斩获CVPR挑战赛冠军

论文标题: PARA-Drive: Parallelized Architecture for Real-time Autonomous Driving 论文作者: Xinshuo Weng, Boris Ivanovic, Yan Wang, Yue Wang, Marco Pavone 导读: 本文系统分析了自动驾驶高级架构的设计空间,提出了关…

单片机软件架构连载(3)-typedef

今天给大家讲typedef,这个关键字在实际产品开发中,也是海量应用。 技术涉及知识点比较多,有些并不常用,我们以贴近实际为原则,让大家把学习时间都花在重点上。 1.typedef的概念 typedef 是 C 语言中的一个关键字&…

artts升级版本后常见的编译错误(定期更新......)

1、设置泛型将参数配置为 null 时抛出了如下异常: Type null is not assignable to type T. T could be instantiated with an arbitrary type which could be unrelated to null. <ArkTSCheck> 解决办法 在 null 后面添加 ! 即可,以表示该值不会为 null data: T null!…

【可能是全网最丝滑的LangChain教程】十七、LangChain进阶之Retrievers

人生不能像做菜&#xff0c;把所有的料都准备好了才下锅。 01 Retrievers介绍 检索器&#xff08;Retrievers&#xff09; 是一种接口&#xff0c;用于根据非结构化查询返回文档&#xff0c;它比向量存储更为通用&#xff0c;既可以使用向量存储作为底层&#xff0c;也可以是其…

C++11右值引用及移动构造

区分左值和右值 在学习c11的右值引用前&#xff0c;大家肯定会有点陌生什么是右值&#xff1f;什么是左值&#xff1f;现在我先来带大家熟悉一下概念。 左值 可以被取地址&#xff0c;也可被修改&#xff08;const修饰的除外&#xff09; 可以出现在等号左边&#xff0c;也可…

华为HCIP Datacom H12-821 卷29

1.多选题 下面关于LSA age字段&#xff0c;描述正确的是∶ A、LSA age的单位为秒&#xff0c;在LSDB中的LSA的LS age随时间增长而增长 B、LSA age的单位为秒&#xff0c;在LSDB中的LSA的LS age随时间增长而减少 C、如果一条LSA的LS age达到了LS RefreshTime&#xff08…

【C++】AVL树(旋转、平衡因子)

&#x1f308;个人主页&#xff1a;秦jh_-CSDN博客&#x1f525; 系列专栏&#xff1a;https://blog.csdn.net/qinjh_/category_12575764.html?spm1001.2014.3001.5482 ​ 目录 前言 AVL树的概念 节点 插入 AVL树的旋转 新节点插入较高左子树的左侧---左左&#xff1a;…

Spring的AOP基础以及AOP的核心概念

2. AOP基础 学习完spring的事务管理之后&#xff0c;接下来我们进入到AOP的学习。 AOP也是spring框架的第二大核心&#xff0c;我们先来学习AOP的基础。 在AOP基础这个阶段&#xff0c;我们首先介绍一下什么是AOP&#xff0c;再通过一个快速入门程序&#xff0c;让大家快速体…

高级RAG检索中的五种查询重写策略_用于检索增强的大型语言模型的查询重写

一、前言 检索增强生成 (RAG) 作为人工智能 (AI) 领域的一项重要技术&#xff0c;近年来得到了飞速发展。它将基于检索模型和基于生成的模型相结合&#xff0c;利用海量外部数据&#xff0c;生成更具信息量、更准确、更具语境相关性的回复。检索策略是 RAG 系统的关键组成部分…

2024年最适合高级网工的11款Linux

号主&#xff1a;老杨丨11年资深网络工程师&#xff0c;更多网工提升干货&#xff0c;请关注公众号&#xff1a;网络工程师俱乐部 你们好&#xff0c;我的网工朋友。 Linux作为一个免费且开源的操作系统&#xff0c;随着时间的推移催生了多个发行版&#xff0c;并且得到了庞大…

golang验证Etherscan上的智能合约

文章目录 golang验证Etherscan上的智能合约为什么要验证智能合约如何使用golang去验证合约获取EtherscanAPI密钥Verify Source Code接口Check Source Code Verification Status接口演示示例及注意事项网络问题无法调用Etherscan接口&#xff08;最重要的步骤&#xff09; golan…

应用层协议原理——因特网提供的运输服务

我们已经考虑了计算机网络能够一般性地提供的运输服务。现在我们要更为具体地考察由因特网提供的运输服务类型。因特网(更一般的是TCP/IP网络)为应用程序提供两个运输层协议&#xff0c;即UDP和TCP。当软件开发者为因特网创建一个新的应用时&#xff0c;首先要做出的决定是&…

js逆向案例 | 加速乐反爬逆向

前言 加速乐作为一种常见的反爬虫技术&#xff0c;在网络上已有大量详尽深入的教程可供参考。然而&#xff0c;对于那些初次接触的人来说&#xff0c;直接面对它可能仍会感到困惑。 声明 本文仅用于学习交流&#xff0c;学习探讨逆向知识&#xff0c;欢迎私信共享学习心得。如…

收银系统源码-商品报损管理

千呼新零售2.0系统是零售行业连锁店一体化收银系统&#xff0c;包括线下收银线上商城连锁店管理ERP管理商品管理供应商管理会员营销等功能为一体&#xff0c;线上线下数据全部打通。 适用于商超、便利店、水果、生鲜、母婴、服装、零食、百货、宠物等连锁店使用。 详细介绍请…

ESXi6.7 update 3主机实现新硬件运行老环境

server 2003 SQL server 2000 SQL SP4 vmware tools 一、适用场景 1、运行多年的老企业&#xff0c;积累的数据量庞大&#xff0c;其中的数据库并不一定都是现在开发的平台或系统&#xff0c;而是已经正在运行&#xff0c;不能停业务的状态。 2、老系统老应用平台&#xf…

day01:项目概述,环境搭建

文章目录 软件开发整体介绍软件开发流程角色分工软件环境 外卖平台项目介绍项目介绍定位功能架构 产品原型技术选型 开发环境搭建整体结构&#xff1a;前后端分离开发前后端混合开发缺点前后端分离开发 前端环境搭建Nginx 后端环境搭建熟悉项目结构使用Git进行版本控制数据库环…

Day06-01-lvs

Day06-01-lvs 0. 核心内容1.负载均衡项目 选择故障: 2.lvs 预备姿势-arp3.lvs 概述4. lvs工作模式4.1 预备姿势4.2 lvs-dr模式4.3 lvs-nat模式4.4 小结 5. lvs-dr模式5.1 环境准备5.2 lvs-dr模式配置流程1) lvs服务端配置2) web服务器 RS服务端配置3) 小结4) 调试 5.3 抓包查看…