RocketMQ 的消费者类型详解与最佳实践

news2025/4/17 16:52:31

作者:凌楚

在 RocketMQ 5.0 中,更加强调了客户端类型的概念,尤其是消费者类型。为了满足多样的 RocketMQ 中一共有三种不同的消费者类型,分别是 PushConsumer、SimpleConsumer 和 PullConsumer。不同的消费者类型对应着不同的业务场景。

消费者类型概览

本篇文章也会根据不同的消费者类型来进行讲述。在介绍不同的消息类型之前,先明确一下不同 RocketMQ 消费者中的一个通用工作流程:在消费者中,到达客户端的消息都是由客户端主动向服务端请求并挂起长轮询获得的。为了保证消息到达的及时性,客户端需要不断地向服务端发起请求(请求是否需要由客户端主动发起则与具体的客户端类型有关),而新的符合条件的消息一旦到达服务端,就会客户端请求走。最终根据客户端处理的结果不同,服务端对消息的处理结果进行记录。

在这里插入图片描述

另外 PushConsumerSimpleConsumer 中还会有一个 ConsumerGroup 的概念,ConsumerGroup 相当于是一组相同订阅关系的消费者的共同身份标识。而服务端也会根据 ConsumerGroup 来记录对应的消费进度。同一个 ConsumerGroup 下的消息消费者将共同消费符合当前订阅组要求的所有消息,而不是独立进行消费。相比较于 PullConsumerPushConsumerSimpleConsumer 更加适用于业务集成的场景,由服务端来托管消费状态和进度,相对来说更加的轻量与简单。

简单来说:

  • PushConsumer : 全托管的消费者类型,用户只需要注册消息监听器即可,符合对应订阅关系的消息就会调用对应的消费方法,是与业务集成最为普遍的消费者类型。

  • SimpleConsumer: 解耦消息消费与进度同步的消费者类型,用户自主接受来自服务端的消息,并对单条消息进行消息确认。和 PushConsumer 一样也由服务端托管消费进度,适用于用户需要自主控制消费速率的业务场景。

  • PullConsumer: 使用流处理框架进行管理的消费者类型,用户按照队列(Topic 的最小逻辑组成单位)来进行消息的接收并可以选择自动或者手动提交消费位点。

PushConsumer

PushConsumer 是 RocketMQ 目前使用最为广泛的消费者。用户只需要确认好订阅关系之后,注册相对应的 Listener 即可。符合对应订阅关系的消息在由 Producer 发出后,消费者的 Listener 接口也会被即时调用,那么此时用户需要在 Listener 中去实现对应的业务逻辑。

使用简介

以下是 Push 消费者的使用示例:

PushConsumer pushConsumer = provider.newPushConsumerBuilder()
 .setClientConfiguration(clientConfiguration)
    // set the consumer group name.
    .setConsumerGroup(consumerGroup)
    // set the subscription for the consumer.
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .setMessageListener(messageView -> {
        // handle the received message and return consume result.
        LOGGER.info("consume message={}", messageView);
        return ConsumeResult.SUCCESS;
    })
    .build();
// block the main thread, no need for production environment.
Thread.sleep(Long.MAX_VALUE);
// close the push consumer when you don't need it anymore.
pushConsumer.close();

用户需要根据自己业务处理结果的不同来返回 ConsumeResult.SUCCESS或者 ConsumeResult.FAILURE。当用户返回 ConsumeResult.SUCCESS时,消息则被视为消费成功;当用户返回 ConsumeResult.FAILURE时,则服务端视为消费失败,会进行该条消息的退避重试,消息的退避重试是指,在消息被消费成功之前,当前消息会被多次投递到用户注册的 MessageListener 中直到消费成功,而两次消费之间的时间间隔则是符合退避规律的。

特别的,每个 ConsumerGroup 都会有一个最大消费次数的设置,如果当前消息的消费次数超过了这个设置,则消息不会再被投递,转而被投递进入死信队列。这个消费次数在消息每次被投递到 MessageListener 时都会进行自增。譬如:如果消息的最大消费次数为 1,那么无论对于这条消息,当前是被返回消费成功还是消费失败,都只会被消费这一次。

应用场景与最佳实践

PushConsumer 是一种近乎全托管的消费者,这里的托管的含义在于用户本身并不需要关心消息的接收,而只需要关注消息的消费过程,除此之外的所有逻辑都在 Push 消费者的实现中封装掉了,用户只需要根据每条收到的消息返回不同的消费结果即可,因此也是最为普适的消费者类型。

MessageListener 是针对单条消息设计的监听器接口:

/**
* MessageListener is used only for the push consumer to process message consumption synchronously.
 *
 * <p> Refer to {@link PushConsumer}, push consumer will get message from server and dispatch the message to the
 * backend thread pool to consumer message concurrently.
 */
public interface MessageListener {
    /**
     * The callback interface to consume the message.
     *
     * <p>You should process the {@link MessageView} and return the corresponding {@link ConsumeResult}.
     * The consumption is successful only when {@link ConsumeResult#SUCCESS } is returned, null pointer is returned
     * or exception is thrown would cause message consumption failure too.
     */
    ConsumeResult consume(MessageView messageView);
}

绝大多数场景下,使用方应该快速处理消费逻辑并返回消费成功,不宜长时间阻塞消费逻辑。对于消费逻辑比较重的情形,建议可以先行提交消费状态,然后对消息进行异步处理。

实际在 Push 消费者的实现中,为了保证消息消费的及时性,消息是会被预先拉取客户端再进行后续的消费的,因此在客户端中存在对已拉取消息大小的缓存。为了防止缓存的消息过多导致客户端内存泄漏,也提前预留了客户端参数供使用者自行进行设置。

// 设置本地最大缓存消息数目为 16 条
pushConsumer.setMaxCachedMessageCount(16);
// 设置本地最大缓存消息占用内存大小为 128 MB
pushConsumer.setMaxCachedMessageSizeInBytes(128 * 1024 * 1024);

SimpleConsumer

相比较 PushConsumerSimpleConsumer 则暴露了更多的细节给使用者。在 SimpleConsumer 中,用户将自行控制消息的接收与处理。

使用简介

以下是 SimpleConsumer 的使用示例:

SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    // Set the consumer group name.
    .setConsumerGroup(consumerGroup)
    // set await duration for long-polling.
    .setAwaitDuration(awaitDuration)
    // Set the subscription for the consumer.
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .build();
// Max message num for each long polling.
int maxMessageNum = 16;
// Set message invisible duration after it is received.
Duration invisibleDuration = Duration.ofSeconds(15);
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
LOGGER.info("Received {} message(s)", messages.size());
for (MessageView message : messages) {
    final MessageId messageId = message.getMessageId();
    try {
        consumer.ack(message);
        LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
    } catch (Throwable t) {
        LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
    }
}
// Close the simple consumer when you don't need it anymore.
consumer.close();

SimpleConsumer 中用户需要自行进行消息的拉取,这一动作通过 SimpleConsumer#receive 这个接口进行,然后再根据自己业务逻辑处理结果的不同再对拉取到的消息进行不同的处理。SimpleConsumer#receive 也是通过长轮询来接受来自服务端的消息,具体的长轮询时间可以使用 SimpleConsumerBuilder#setAwaitDuration 来进行设置。

SimpleConsumer 中,用户需要通过 SimpleConsumer#receive 设置一个消息不重复的时间窗口(或者说关于通过这个接口收到的消息的一个不可见时间窗口),这个时间窗口从用户接受到这条消息开始计时,在这段时间之内消息是不会重复投递到消费者的,而超出这个时间窗口之后,则会对这条消息进行再一次的投递。在这个过程中,消息的消费次数也会进行递增。与 PushConsumer 类似的是,一旦消费次数超出 ConsumerGroup 的最大次数,也就不会进行重投了。

在这里插入图片描述

相比较于 PushConsumer 而言,SimpleConsumer 用户可以自主控制接受消息的节奏。SimpleConsumer#receive 会针对于当前的订阅关系去服务端拉取符合条件的消息。SimpleConsumer 实际上的每次消息接收请求是按照具体 Topic 的分区来 one by one 发起请求的,实际的 Topic 分区可能会比较多,因此为了保证消息接收的及时性,建议综合自己的业务处理能力一定程度上提高 SimpleConsumer#receive 的并发度。

用户在接受到消息之后,可以选择对消息使用 ack 或者 changeInvisibleDuration,前者即对服务端表示对这条消息的确认,与 PushConsumer 中的消费成功类似,而 changeInvisibleDuration 则表示延迟当前消息的可见时间,即需要服务端在当前一段时间之后再向客户端进行投递。值得注意的是,这里消息的再次投递也是需要遵循 ConsumerGroup 的最大消费次数的限制,即一旦消息的最大消费次数超出了最大消费次数(每次消息到达可见时间都会进行消费次数的自增),则不再进行投递,转而进入死信队列。举例来说:

  • 进行 ack,即表示消息消费成功被确认,消费进度被服务端同步。

  • 进行 changeInvisibleDuration,

1)如果消息已经超过当前 ConsumerGroup 的最大消费次数,那么消息后续会被投递进入死信队列

2)如果消息未超过当前 ConsumerGroup 的最大消费次数,若请求在上一次消息可见时间到来之前发起,则修改成功,否则则修改失败。

应用场景与最佳实践

PushConsumer 中,消息是单条地被投递进入 MessageListener来处理的,而在 SimpleConsumer 中用户可以同时拿到一批消息,每批消息的最大条数也由 SimpleConsumer#receive 来决定。在一些 IO 密集型的应用中,会是一个更加方便的选择。此时用户可以每次拿到一批消息并集中进行处理从而提高消费速度。

PullConsumer

PullConsumer 也是 RocketMQ 一直以来都支持的消费者类型,RocketMQ 5.0 中全新的 PullConsumer API 还在演进中,敬请期待。下文中的 PullConsumer 会使用 4.0 中现存的 LitePullConsumer 进行论述,也是当前推荐的方式。

使用简介

现存的 LitePullConsumer 中的主要接口

// PullConsumer 中的主要接口
public interface LitePullConsumer {
 // 注册路由变化监听器
void registerTopicMessageQueueChangeListener(String topic,
        TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException;
    // 将队列 assign 给当前消费者
    void assign(Collection<MessageQueue> messageQueues);
    // 针对当前 assigned 的队列获取消息
    List<MessageExt> poll(long timeout);
    // 查找当前队列在服务端提交的位点
    Long committed(MessageQueue messageQueue) throws MQClientException;
    // 设置是否自动提交队列位点
    void setAutoCommit(boolean autoCommit);
    // 同步提交队列位点
    void commitSync();
}

在 RocketMQ 中,无论是消息的发送还是接收,都是通过队列来进行的,一个 Topic 由若干个队列组成,消息本身也是按照队列的形式来一个个进行存储的,同一个队列中的消息拥有不同的位点,且位点的大小是随随消息达到服务端的时间逐次递增的,本质上不同 ConsumerGroup 在服务端的消费进度就是一个个队列中的位点信息,客户端将自己的消费进度同步给服务端本质上其实就是在同步一个个消息的位点。

在这里插入图片描述

在 PullConsumer 中将队列这个概念完整地暴露给了用户。用户可以针对自己关心的 topic 设置路由监听器从而感知队列的变化,并将队列 assign 给当前消费者,当用户使用 LitePullConsumer#poll 时会尝试获取已经 assign 好了的队列中的消息。如果设置了 LitePullConsumer#setAutoCommit 的话,一旦消息达到了客户端就会自动进行位点的提交,否则则需要使用 LitePullConsumer#commitSync 接口来进行手动提交。

应用场景与最佳实践

PullConsumer 中用户拥有对消息位点管理的绝对自主权,可以自行管理消费进度,这是与 PushConsumer 和 SimpleConsumer 最为本质的不同,这也使得 PullConsumer 在流计算这种需要同时自主控制消费速率和消费进度的场景能得到非常广泛的应用。更多情况下,PullConsumer 是与具体的流计算框架进行集成的。

如果您对 RocketMQ 感兴趣,欢迎扫描下方二维码加入钉钉群一起沟通交流~

在这里插入图片描述

点击此处 ,进入官网了解更多详情~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/48795.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI美颜SDK算法详解

AI美颜SDK是近几年兴起的新兴美颜方式&#xff0c;区别于传统的美颜工具&#xff0c;AI美颜采用人工智能的深度学习算法实现智能化美颜&#xff0c;下文小编将为大家讲解一下AI美颜SDK相关的知识。 一、与传统美颜的区别之处 从宏观角度来讲&#xff0c;AI美颜技术与传统美颜…

高维多元时序数据聚类

1. 简介 收集数据的能力不断增强&#xff0c;使我们有可能收集大量的异构数据。在可用的异构数据中&#xff0c;时间序列代表着尚未被充分探索的信息母体。当前的数据挖掘技术在分析时间序列时存在多个缺点&#xff0c;尤其是在应同时分析多个时间序列&#xff08;即多维时间序…

C# Winform控件库分享,免费开源,支持中文!(附DLL及教程)

这款控件包是基于MaterialSkin2二次开发的&#xff0c;可以更换想要的皮肤主题&#xff0c;一键转换暗色系&#xff0c;还拥有非常炫酷的动画&#xff0c;非常好看&#xff0c;原本的MaterialSkin2是国外团队开发的&#xff0c;不支持中文&#xff0c;所以我在里面加了几款中文…

springboot学生宿舍报修换宿管理系统-宿管

宿舍管理系统设计与实现由管理员和学生、宿管交互构成。学生对于本系统的使用&#xff0c;学生可以通过系统注册、登录&#xff0c;修改个人信息&#xff0c;查看学生宿舍、消息通知、换宿申请等功能。 宿管对于本系统的使用&#xff0c;宿管可以通过系统登录&#xff0c;修改个…

RabbitMQ如何确保消息发送 ? 消息接收?

发送方确认机制&#xff1a; 信道需要设置为 confirm 模式&#xff0c;则所有在信道上发布的消息都会分配⼀个唯⼀ ID。⼀旦消息被投递到queue&#xff08;可持久化的消息需要写⼊磁盘&#xff09;&#xff0c;信道会发送⼀个确认给⽣产者&#xff08;包含消息唯⼀ ID&#xff…

Codeforces Round #719 (Div. 3) E. Arranging The Sheep

翻译&#xff1a; 你正在玩“安排羊”游戏。这个游戏的目标是让羊排好队。游戏中的关卡是由长度为&#x1d45b;的字符串描述的&#xff0c;由角色的’组成。(空格)和*(绵羊)。在一个动作中&#xff0c;你可以移动任何羊向左或向右移动一个方格&#xff0c;如果相应的方格存在…

Paper写作怎么按照要求来具体分析?

许多留学生通常面临写学术Paper的问题&#xff0c;而大多数都不知道Paper如何写&#xff0c;因为写Paper并不是容易的事情。学术Paper应按照严格要求和规则撰写&#xff0c;而其应提供扎实&#xff0c;有争议的论点&#xff0c;然后由相关的无论是来自其他来源还是自己研究的证…

流媒体直播播放协议:HLS、RTMP、HTTP-FLV

流媒体直播播放协议&#xff1a;HLS、RTMP、HTTP-FLV一、推拉流二、协议介绍1. HLS2. RTMP3. HDL (HTTP-FLV)一、推拉流 在开始之前&#xff0c;先把流媒体服务中的双端关系说一下&#xff1a;在一个完整的流媒体服务框架中&#xff0c;角色就是“两端加一服”。推流端、拉流端…

httpclient

1.什么是httpclient HttpClient 是Apache Jakarta Common 下的子项目&#xff0c;可以用来提供高效的、最新的、功能丰富的支持 HTTP 协议的客户端编程工具包&#xff0c;并且它支持 HTTP 协议最新的版本和建议。 2.http请求&#xff08;结合spring的注解&#xff09; 2-1GET请…

相对位置编码之RPR式:《Self-Attention with Relative Position Representations》论文笔记

&#x1f604; 额&#xff0c;本想学学XLNet的&#xff0c;然后XLNet又是以transformer-XL为主要结构&#xff0c;然后transformer-XL做了两个改进&#xff1a;一个是结构上做了segment-level的循环机制&#xff0c;一个是在attention机制里引入了相对位置编码信息来避免不同se…

AutoCAD Electrical 2022—源箭头和目标箭头

在一张图纸上插入源 箭头&#xff1b; 选中一根导线&#xff1b; 如果源和目标在同一张图纸上&#xff0c;则可以点击确定&#xff0c;插入目标箭头&#xff1b; 如果不在同一张图纸上&#xff0c;则点击否&#xff0c;后面在插入目标箭头&#xff1b; 在另一张图纸上插入目标…

学习笔记:引用

概念 引用的作用是给一个变量起别名 格式&#xff1a; type & 别名 原名 引用必须初始化&#xff0c;在初始化后不能改变 int &b;ba;错误 int& b a; bc;不是将b从a的别名变为c的别名 而是将c的值赋给a int a 10; int& b a; b 20;//用别名改数据&…

Apifox:详细使用教程,带你轻松拿捏

目录 Apifox简介 Apifox的安装与使用 Apifox新建项目的流程 编写接口文档 Apifox简介 我们在日常编程开发过程中经常实行的是前后端分离架构的模式&#xff0c;一个项目的落地会通过产品、开发、测试三方会审&#xff0c;对项目需求评审过后&#xff0c;前后端开发会制定一…

基于SpringBoot医院信息管理系统源码

hisystem 1. 用idea打开项目&#xff0c;并且配置maven下载依赖 2. 导入数据库 hisystem.sql 3. 修改application.yml数据库相关配置 4. 用户注册&#xff0c;验证邮件的邮箱考虑到安全问题&#xff0c;暂不提供授权码&#xff0c;如有需求可使用自己邮箱&#xff0c;开启POP3…

Vue3 - 路由 Vue-router 4.X(配置与使用教程)

目录前言安装配置准备工作配置路由基本使用路由传参 1路由传参 2路由传参 3SEO前言 在咱们 Vue2 时代&#xff0c;官方推荐咱们使用 vue-router 3.X 的库&#xff0c;如果是用脚手架创建的话&#xff0c;就直接默认集成到里面了。 Vue3 使用的是 vue-router 4.X 官方库&#xf…

[附源码]计算机毕业设计JAVA小超市进销存管理系统

[附源码]计算机毕业设计JAVA小超市进销存管理系统 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM my…

m扩频通信系统在瑞利信道中的误码率性能matlab仿真

目录 1.算法描述 2.matlab算法仿真效果 3.MATLAB核心程序 4.完整MATLAB 1.算法描述 本课题&#xff0c;我们主要涉及到两个理论要点&#xff0c;第一个是瑞利衰落条件&#xff0c;第二个是扩频通信。下面分别对这两个理论进行介绍&#xff1a; 第一个是瑞利衰落条件&#x…

我们又重写了一个关键服务

#01 QueryCoord 组件介绍 QueryCoord 是 Milvus 中查询集群的中心调度节点&#xff0c;在用户将一个 Collection Load 到内存中时&#xff0c;QueryCoord 负责将该 Collection 的 Segment 调度到 QueryNode 集群中&#xff0c;以支持后续的查询。 QueryCoord 最核心的操作有4…

将egg项目部署至服务器

文章目录1.下载linux版本的node-v162.将node安装包从自己电脑上上传到自己的服务器3.在服务器中解压压缩包4.配置环境变量5.使文件生效6.将egg项目传到服务器指定目录下7.下载依赖8.npm start 运行不会占用终端 并且一直在运行 可以使用npm stop停用9.最后使用云服务器ip:端口号…

Linux22 --- 网络为什么要分层、使用tcp协议实现两个进程间通信的功能、IP地址转换函数

一、网络为什么要分层 1 1、分层的优点 1&#xff09;各层之间是独立的。某一层并不需要知道它的下一层是如何实现的&#xff0c;而仅仅需要知道该层通过层间的接口&#xff08;即界面&#xff09;所提供的服务。由于每一层只实现一种相对独立的功能&#xff0c;因而可将一个…