RocketMQ教程-(5)-功能特性-消费者分类

news2024/12/23 8:51:30

Apache RocketMQ 支持 PushConsumer 、 SimpleConsumer 以及 PullConsumer 这三种类型的消费者,本文分别从使用方式、实现原理、可靠性重试和适用场景等方面为您介绍这三种类型的消费者。

背景信息​

Apache RocketMQ 面向不同的业务场景提供了不同消费者类型,每种消费者类型的集成方式和控制方式都不一样。了解如下问题,可以帮助您选择更匹配业务场景的消费者类型。

  • 如何实现并发消费:消费者如何使用并发的多线程机制处理消息,以此提高消息处理效率?

  • 如何实现同步、异步消息处理:对于不同的集成场景,消费者获取消息后可能会将消息异步分发到业务逻辑中处理,此时,消息异步化处理如何实现?

  • 如何实现消息可靠处理:消费者处理消息时如何返回响应结果?如何在消息异常情况进行重试,保证消息的可靠处理?

以上问题的具体答案,请参考下文。

功能概述

消息消费流程

如上图所示, Apache RocketMQ 的消费者处理消息时主要经过以下阶段:消息获取--->消息处理--->消费状态提交。

针对以上几个阶段,Apache RocketMQ 提供了不同的消费者类型: PushConsumer 、SimpleConsumer 和 PullConsumer。这几种类型的消费者通过不同的实现方式和接口可满足您在不同业务场景下的消费需求。具体差异如下:

在实际使用场景中,PullConsumer 仅推荐在流处理框架中集成使用,大多数消息收发场景使用 PushConsumer 和 SimpleConsumer 就可以满足需求。

若您的业务场景发生变更,或您当前使用的消费者类型不适合当前业务,您可以选择在 PushConsumer 和SimpleConsumer 之间变更消费者类型。变更消费者类型不影响当前Apache RocketMQ 资源的使用和业务处理。

危险

生产环境中相同的 ConsumerGroup 下严禁混用 PullConsumer 和其他两种消费者,否则会导致消息消费异常。

PushConsumer​

PushConsumers是一种高度封装的消费者类型,消费消息仅通过消费监听器处理业务并返回消费结果。消息的获取、消费状态提交以及消费重试都通过 Apache RocketMQ 的客户端SDK完成。

使用方式

PushConsumer的使用方式比较固定,在消费者初始化时注册一个消费监听器,并在消费监听器内部实现消息处理逻辑。由 Apache RocketMQ 的SDK在后台完成消息获取、触发监听器调用以及进行消息重试处理。

示例代码如下:

// 消费示例:使用PushConsumer消费普通消息。
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
    // 设置消费者分组。
    .setConsumerGroup("YourConsumerGroup")
    // 设置接入点。
    .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())
    // 设置预绑定的订阅关系。
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    // 设置消费监听器。
    .setMessageListener(new MessageListener() {
        @Override
        public ConsumeResult consume(MessageView messageView) {
            // 消费消息并返回处理结果。
            return ConsumeResult.SUCCESS;
        }
    })
    .build();

PushConsumer的消费监听器执行结果分为以下三种情况:

  • 返回消费成功:以Java SDK为例,返回ConsumeResult.SUCCESS,表示该消息处理成功,服务端按照消费结果更新消费进度。

  • 返回消费失败:以Java SDK为例,返回ConsumeResult.FAILURE,表示该消息处理失败,需要根据消费重试逻辑判断是否进行重试消费。

  • 出现非预期失败:例如抛异常等行为,该结果按照消费失败处理,需要根据消费重试逻辑判断是否进行重试消费。

PushConsumer 消费消息时,若消息处理逻辑出现预期之外的阻塞导致消息处理一直无法执行成功,SDK会按照消费超时处理强制提交消费失败结果,并按照消费重试逻辑进行处理。消息超时,请参见PushConsumer消费重试策略。

出现消费超时情况时,SDK虽然提交消费失败结果,但是当前消费线程可能仍然无法响应中断,还会继续处理消息。

内部原理

在PushConsumer类型中,消息的实时处理能力是基于SDK内部的典型Reactor线程模型实现的。如下图所示,SDK内置了一个长轮询线程,先将消息异步拉取到SDK内置的缓存队列中,再分别提交到消费线程中,触发监听器执行本地消费逻辑。

PushConsumer原理

 可靠性重试

PushConsumer 消费者类型中,客户端SDK和消费逻辑的唯一边界是消费监听器接口。客户端SDK严格按照监听器的返回结果判断消息是否消费成功,并做可靠性重试。所有消息必须以同步方式进行消费处理,并在监听器接口结束时返回调用结果,不允许再做异步化分发。消息重试具体信息,请参见PushConsumer消费重试策略。

使用PushConsumer消费者消费时,不允许使用以下方式处理消息,否则 Apache RocketMQ 无法保证消息的可靠性。

  • 错误方式一:消息还未处理完成,就提前返回消费成功结果。此时如果消息消费失败,Apache RocketMQ 服务端是无法感知的,因此不会进行消费重试。

  • 错误方式二:在消费监听器内将消息再次分发到自定义的其他线程,消费监听器提前返回消费结果。此时如果消息消费失败,Apache RocketMQ 服务端同样无法感知,因此也不会进行消费重试。

顺序性保障

基于 Apache RocketMQ 顺序消息的定义,如果消费者分组设置了顺序消费模式,则PushConsumer在触发消费监听器时,严格遵循消息的先后顺序。业务处理逻辑无感知即可保证消息的消费顺序。

适用场景

PushConsumer严格限制了消息同步处理及每条消息的处理超时时间,适用于以下场景:

  • 消息处理时间可预估:如果不确定消息处理耗时,经常有预期之外的长时间耗时的消息,PushConsumer的可靠性保证会频繁触发消息重试机制造成大量重复消息。

  • 无异步化、高级定制场景:PushConsumer限制了消费逻辑的线程模型,由客户端SDK内部按最大吞吐量触发消息处理。该模型开发逻辑简单,但是不允许使用异步化和自定义处理流程。

SimpleConsumer

SimpleConsumer 是一种接口原子型的消费者类型,消息的获取、消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成

使用方式

SimpleConsumer 的使用涉及多个接口调用,由业务逻辑按需调用接口获取消息,然后分发给业务线程处理消息,最后按照处理的结果调用提交接口,返回服务端当前消息的处理结果。示例如下:

// 消费示例:使用 SimpleConsumer 消费普通消息,主动获取消息处理并提交。 
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
        // 设置消费者分组。
        .setConsumerGroup("YourConsumerGroup")
        // 设置接入点。
        .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())
        // 设置预绑定的订阅关系。
        .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
        // 设置从服务端接受消息的最大等待时间
        .setAwaitDuration(Duration.ofSeconds(1))
        .build();
try {
    // SimpleConsumer 需要主动获取消息,并处理。
    List<MessageView> messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
    messageViewList.forEach(messageView -> {
        System.out.println(messageView);
        // 消费处理完成后,需要主动调用 ACK 提交消费结果。
        try {
            simpleConsumer.ack(messageView);
        } catch (ClientException e) {
            logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);
        }
    });
} catch (ClientException e) {
    // 如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
    logger.error("Failed to receive message", e);
}

SimpleConsumer主要涉及以下几个接口行为:

 可靠性重试

SimpleConsumer消费者类型中,客户端SDK和服务端通过ReceiveMessageAckMessage接口通信。客户端SDK如果处理消息成功则调用AckMessage接口;如果处理失败只需要不回复ACK响应,即可在定义的消费不可见时间到达后触发消费重试流程。更多信息,请参见SimpleConsumer消费重试策略。

顺序性保障

基于 Apache RocketMQ 顺序消息的定义,SimpleConsumer在处理顺序消息时,会按照消息存储的先后顺序获取消息。即需要保持顺序的一组消息中,如果前面的消息未处理完成,则无法获取到后面的消息。

适用场景

SimpleConsumer提供原子接口,用于消息获取和提交消费结果,相对于PushConsumer方式更加灵活。SimpleConsumer适用于以下场景:

  • 消息处理时长不可控:如果消息处理时长无法预估,经常有长时间耗时的消息处理情况。建议使用SimpleConsumer消费类型,可以在消费时自定义消息的预估处理时长,若实际业务中预估的消息处理时长不符合预期,也可以通过接口提前修改。

  • 需要异步化、批量消费等高级定制场景:SimpleConsumer在SDK内部没有复杂的线程封装,完全由业务逻辑自由定制,可以实现异步分发、批量消费等高级定制场景。

  • 需要自定义消费速率:SimpleConsumer是由业务逻辑主动调用接口获取消息,因此可以自由调整获取消息的频率,自定义控制消费速率。

PullConsumer

使用建议​

PushConsumer合理控制消费耗时,避免无限阻塞

对于PushConsumer消费类型,需要严格控制消息的消费耗时,尽量避免出现消息处理超时导致消息重复。如果业务经常会出现一些预期外的长时间耗时的消息,建议使用SimpleConsumer,并设置好消费不可见时间。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/786039.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

巅峰极客2023 hellosql

随便输一个payload&#xff0c;有waf 这题只有两个回显&#xff0c;分别是太酷啦和nonono&#xff0c;不显示报错、登录成功等各种信息&#xff0c;目前只能想到用时间盲注。 抓包fuzz&#xff0c;194都是被过滤的 不止这些&#xff0c;手工测出来if、sleep、benchmark、*、rp…

HOOPS Visualize | HOOPS Exchange | HOOPS Web Platform

内容采集互联网&#xff0c;功能仅供参考&#xff01;&#xff01;&#xff01; HOOPS 3D Application Framework (HOOPS/3dAF)是由Tech Soft America公司开发并由Spatial再次销售的产品&#xff0c;该产品为当今世界上领先的3D应用程序提供了核心的图形架构和图形功能&#x…

Git版本控制系统入门

(创作不易&#xff0c;感谢有你&#xff0c;你的支持&#xff0c;就是我前行的最大动力&#xff0c;如果看完对你有帮助&#xff0c;请留下您的足迹&#xff09; 目录 Git 初识 Git 安装 Git 配置用户信息 掌握 Git 仓库 Git 的三个区域 Git 文件状态 Git 暂存区使用…

【uniapp2.0】解决真机测试底部有空白问题

我的错误做法&#xff08;不代表别人是否没用&#xff09; &#x1f4cc; 使用&#xff1a;uni.hideTabBar 报错&#xff1a;"errMsg":"hideTabBar:fail not TabBar page" 根因&#xff1a;这种情况一般是在小程序中为了隐藏tabbar调用的&#xff0c;但是有…

你还在用命令式编程?Python函数式编程让你的代码更优雅!

Python支持函数式编程&#xff0c;函数式编程是一种编程范式&#xff0c;它将计算机程序视为数学函数的组合。 文章目录 一、lambda表达式 lambda表达式的定义 lambda表达式的应用场景 lambda表达式的局限性 lambda表达式的高级用法 二、Python的高阶函数 高阶函数的定义…

(链表) 剑指 Offer II 022. 链表中环的入口节点 ——【Leetcode每日一题】

❓剑指 Offer II 022. 链表中环的入口节点 难度&#xff1a;中等 给定一个链表&#xff0c;返回链表开始入环的第一个节点。 从链表的头节点开始沿着 next 指针进入环的第一个节点为环的入口节点。如果链表无环&#xff0c;则返回 null。 为了表示给定链表中的环&#xff0c…

Python补充笔记4-面向对象

目录 一、编程思想​ 二、类与对象​ 三、类的创建​ 四、对象的创建​ 五、类属性、类方法、静态方法​ 六、动态绑定属性和方法​ 七、知识点总结 八、面向对象的三大特征 1.封装 2.继承​ 3.多态 九、方法重写 十、object类 十一、特殊方法和特殊属性 1.dict/len/add​ 2.…

python调用百度ai将图片识别为表格excel

python调用百度ai将图片识别为表格excel ocr ocr 百度ai官方文档&#xff1a;https://ai.baidu.com/ai-doc/OCR/Ik3h7y238 import requests import json import base64 import time文档&#xff1a;https://ai.baidu.com/ai-doc/OCR/Ik3h7y238 # 获取access_token def get_acc…

学习笔记--TCP/IP协议

TCP/IP协议 TCP (Transmission Control Protocol)传输控制协议&#xff0c;传输层协议。 一、协议的分层 ISO网络层分为7层 二、分层的作用 具体通信情况&#xff1a; 三、报文传输 三次握手连接&#xff0c;四次挥手释放 参考链接&#xff1a; https://zhuanlan.zhih…

直播带货app开发开发流程分析

随着小视频管理体系愈来愈变成人们的生活中的一部分&#xff0c;也随之短视频卖货逐步形成岗位内主流的转现方式&#xff0c;将短视频平台生产制造变成短视频带货体系计划愈来愈多&#xff0c;那样&#xff0c;把小视频管理体系开发设计变成短视频带货体系必须两步&#xff1f;…

在Microsoft Excel中如何快速合并表格

在 Excel 中分析数据时&#xff0c;在一个工作表中收集所有必要信息的频率是多少&#xff1f;几乎从来没有&#xff01;当不同的数据分散在许多工作表和工作簿中时&#xff0c;这是一种非常常见的情况。幸运的是&#xff0c;有几种不同的方法可以将多个表中的数据组合成一个表&…

【python】爬楼梯—递归分析(超级详细)

爬楼梯—递归分析 假设你正在爬楼梯。需要 n 阶你才能到达楼顶。 每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢&#xff1f; 注意&#xff1a;给定 n 是一个正整数。 示例 1&#xff1a; 输入&#xff1a; 1 输出&#xff1a; 1 解释&#xff1a; 有一种方…

GPT和MBR的区别

磁盘分区是操作系统管理磁盘数据的一项非常重要的功能。在分区时&#xff0c;用户需要选择一种分区表格式来组织磁盘上的分区&#xff0c;这也就是GPT和MBR两种分区表格式的由来。在本文中&#xff0c;将详细探讨GPT和MBR分区表格式的区别和如何选择它们。 1. MBR和GPT分区表格…

electron-egg 加密报错

electron框架&#xff1a;electron-egg 解决方式 npm uninstall bytenode npm install bytenode1.3.6node:internal/modules/cjs/loader:928 throw err; ^ Error: Cannot find module ‘node:assert/strict’ Require stack: D:\electron-egg-test\new-electron-egg\electr…

勘探开发人工智能技术:断层识别

1 断层识别 断层是地下岩层在受到挤压或拉伸力作用下,因脆性变形而形成的地层错断,是一种重要的地质构造特征。断层检测和解释是从地震剖面中认识岩层结构和储层特性的重要步骤。 1.1 数据描述 合成地震数据: 每一个合成地震数据都是由地质模型的反射系数与雷克子波进行褶…

Vite 4.4 正式版发布,全面拥抱 Lightning CSS

一、什么是 Vite Vite 是由 Evan You 推出的下一代前端构建工具&#xff0c;是官方 Vue CLI 的替代品&#xff0c;速度非常快。Vite 利用原生 ESM 并使用 Rollup 处理开发和打包工作。 从功能上讲&#xff0c;它的工作方式类似于预配置的 webpack 和 webpack-dev-server&#…

软考_软件设计师(中级)

视频链接&#xff1a;&#xff08;zst_2001&#xff09; https://www.bilibili.com/read/cv18526892?spm_id_from333.999.0.0 文章目录 1、计算机系统2、程序设计语言3、知识产权4、数据库5、面向对象6、UML7、设计模式8、操作系统9、结构化开发10、软件工程11、信息安全&…

【MCU学习】RTthread工程介绍

RT-Thread架构 RT-Thread诞生于2006年&#xff0c;是一款以开源、中立、社区化发展起来的物联网操作系统。 RT-Thread主要采用 C 语言编写&#xff0c;浅显易懂&#xff0c;且具有方便移植的特性&#xff08;可快速移植到多种主流 MCU 及模组芯片上&#xff09;。RT-Thread把面…

Ubuntu18.04安装Autoware1.15(解决Openplanner无法绕障的问题:Openplanner2.5)

文章目录 一、下载Autoware1.15源码二、安装依赖三、修改CUDA版本四、编译以及报错解决编译&#xff08;1&#xff09;报 undefined reference to cv::Mat::Mat() 的错就按照下面方式改相应包&#xff1a;&#xff08;2&#xff09;遇到报错&#xff1a;&#xff08;3&#xff…

opencv-20 深入理解HSV 色彩空间(通过指定,标记颜色等来拓展ROI区域)

RGB 色彩空间是一种被广泛接受的色彩空间&#xff0c;但是该色彩空间过于抽象&#xff0c;我们不能够直接通过其值感知具体的色彩。 我们更习惯使用直观的方式来感知颜色&#xff0c;HSV 色彩空间提供了这样 的方式。 通过 HSV色彩空间&#xff0c;我们能够更加方便地通过色调、…