RocketMQ高级特性三-消费者分类

news2024/9/21 4:53:45

目录

前言

概述

区别

PullConsumer

定义与概述

原理机制

使用场景

优缺点

Java 代码示例

SimpleConsumer

定义与概述

原理机制

使用场景

优缺点

Java 代码示例

PushConsumer

定义与概述

原理机制

使用场景

优缺点

Java 代码示例

总结


前言

RocketMQ中的消费者分类主要包括三种类型:PullConsumerSimpleConsumer、和PushConsumer。每种消费者类型都有其特定的使用场景、原理机制以及优缺点。

注:生产环境中相同的 ConsumerGroup 下严禁混用 PullConsumer 和其他两种消费者,否则会导致消息消费异常。

概述

如上图所示, Apache RocketMQ 的消费者处理消息时主要经过以下阶段:消息获取--->消息处理--->消费状态提交。

针对以上几个阶段,Apache RocketMQ 提供了不同的消费者类型: PushConsumer 、SimpleConsumer 和 PullConsumer。这几种类型的消费者通过不同的实现方式和接口可满足您在不同业务场景下的消费需求。具体差异如下:

在实际使用场景中,PullConsumer 仅推荐在流处理框架中集成使用,大多数消息收发场景使用 PushConsumer 和 SimpleConsumer 就可以满足需求。

若您的业务场景发生变更,或您当前使用的消费者类型不适合当前业务,您可以选择在 PushConsumer 和SimpleConsumer 之间变更消费者类型。变更消费者类型不影响当前Apache RocketMQ 资源的使用和业务处理。

区别

对比项PushConsumerSimpleConsumerPullConsumer
接口方式使用监听器回调接口返回消费结果,消费者仅允许在监听器范围内处理消费逻辑。业务方自行实现消息处理,并主动调用接口返回消费结果。业务方自行按队列拉取消息,并可选择性地提交消费结果
消费并发度管理由SDK管理消费并发度。由业务方消费逻辑自行管理消费线程。由业务方消费逻辑自行管理消费线程。
负载均衡粒度5.0 SDK是消息粒度,更均衡,早期版本是队列维度消息粒度,更均衡队列粒度,吞吐攒批性能更好,但容易不均衡
接口灵活度高度封装,不够灵活。原子接口,可灵活自定义。原子接口,可灵活自定义。
适用场景适用于无自定义流程的业务消息开发场景。适用于需要高度自定义业务流程的业务开发场景。仅推荐在流处理框架场景下集成使用

PullConsumer

定义与概述

PullConsumer是一种传统的消息拉取模式,在这种模式下,消费者需要主动从Broker中拉取消息,而不是由Broker主动推送消息。这种模式提供了更大的灵活性,允许消费者根据自身的处理能力和业务逻辑,自主控制消息的拉取和消费节奏。

原理机制

在PushConsumer类型中,消息的实时处理能力是基于SDK内部的典型Reactor线程模型实现的。如下图所示,SDK内置了一个长轮询线程,先将消息异步拉取到SDK内置的缓存队列中,再分别提交到消费线程中,触发监听器执行本地消费逻辑。

  • 消息拉取:消费者调用pull方法,从Broker的消息队列中主动拉取消息。拉取的过程通常是循环进行的,消费者可以在每次拉取时指定要拉取的消息数量和位置。
  • 偏移量管理:消费者需要自行管理消息消费的偏移量(offset),确保每次拉取的消息都是未消费过的,防止消息重复消费。
  • 流控管理:消费者可以根据自身的处理能力和当前系统的负载情况,动态调整消息的拉取频率和数量,避免处理能力不足导致的消息积压。
使用场景
  • 批处理:适合需要批量处理消息的场景,例如数据分析、日志处理等。
  • 自定义处理逻辑:在需要对消息进行复杂的过滤、分组或排序处理时,PullConsumer提供了更大的灵活性。

PushConsumer的消费监听器执行结果分为以下三种情况:

  • 返回消费成功:以Java SDK为例,返回ConsumeResult.SUCCESS,表示该消息处理成功,服务端按照消费结果更新消费进度。

  • 返回消费失败:以Java SDK为例,返回ConsumeResult.FAILURE,表示该消息处理失败,需要根据消费重试逻辑判断是否进行重试消费。

  • 出现非预期失败:例如抛异常等行为,该结果按照消费失败处理,需要根据消费重试逻辑判断是否进行重试消费。

PushConsumer 消费消息时,若消息处理逻辑出现预期之外的阻塞导致消息处理一直无法执行成功,SDK会按照消费超时处理强制提交消费失败结果,并按照消费重试逻辑进行处理。消息超时,请参见PushConsumer消费重试策略。

注:出现消费超时情况时,SDK虽然提交消费失败结果,但是当前消费线程可能仍然无法响应中断,还会继续处理消息。

优缺点
  • 优点
    • 灵活性高:消费者可以完全控制消息拉取的时机和频率,适合对消费策略要求较高的场景。
    • 流控能力强:通过手动控制拉取频率,消费者能够避免消息积压和系统过载。
  • 缺点
    • 实现复杂:消费者需要手动管理偏移量、处理消息的幂等性问题,并实现流控逻辑。
    • 实时性较低:由于消息是被动拉取的,实时性相对较低。
Java 代码示例
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.Set;

public class PullConsumerExample {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("ConsumerGroupName");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.start();

        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
        for (MessageQueue mq : mqs) {
            long offset = getMessageQueueOffset(mq);
            PullResult pullResult = consumer.pullBlockIfNotFound(mq, "*", offset, 32);
            processPullResult(pullResult);
            putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
        }

        consumer.shutdown();
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        // 获取消费队列的偏移量
        return 0;
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        // 更新消费队列的偏移量
    }

    private static void processPullResult(PullResult pullResult) {
        // 处理拉取的消息
        switch (pullResult.getPullStatus()) {
            case FOUND:
                // 处理消息
                break;
            case NO_NEW_MSG:
                // 没有新消息
                break;
            default:
                break;
        }
    }
}

SimpleConsumer

定义与概述

SimpleConsumer是RocketMQ 4.8.0版本之后引入的一种消费模式,它是PullConsumer的简化版本,适用于需要拉取消息但不想管理复杂消费逻辑的场景。SimpleConsumer简化了消息拉取和流控的实现,提供了更为直观和易用的API。

原理机制
  • 简化的拉取模型:SimpleConsumer内置了部分PullConsumer的逻辑,如偏移量管理和拉取频率控制,使开发者可以更专注于业务逻辑的实现。
  • 异步拉取:SimpleConsumer支持异步拉取消息,进一步简化了使用流程,并减少了开发者在流控管理上的负担。
  • 偏移量自动管理:SimpleConsumer可以自动管理消息的偏移量,无需手动维护,降低了重复消费的风险。
使用场景
  • 轻量级消费:适合对消费量不大或对消费逻辑要求较简单的场景,例如小型任务处理和轻量级的数据收集。
  • 异步任务处理:适合需要异步处理消息的场景,通过异步拉取简化了业务逻辑的实现。
优缺点
  • 优点
    • 简单易用:相较于PullConsumer,SimpleConsumer大大简化了开发难度,降低了使用门槛。
    • 自动管理:自动偏移量管理和异步拉取降低了消息丢失和重复消费的风险。
  • 缺点
    • 灵活性不足:由于自动管理逻辑的加入,SimpleConsumer在一些复杂场景下可能不如PullConsumer灵活。
    • 适用场景有限:由于其简化的设计,SimpleConsumer更适合处理较为简单的消费逻辑。
Java 代码示例
import org.apache.rocketmq.client.consumer.SimpleConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

public class SimpleConsumerExample {
    public static void main(String[] args) throws MQClientException {
        SimpleConsumer consumer = new SimpleConsumer("ConsumerGroupName", "localhost:9876");

        MessageQueue mq = new MessageQueue("TopicTest", "BrokerName", 0);
        long offset = consumer.searchOffset(mq, System.currentTimeMillis());

        List<Message> messages = consumer.pull(mq, "*", offset, 10);
        for (Message message : messages) {
            System.out.printf("Received Message: %s%n", new String(message.getBody()));
        }

        consumer.shutdown();
    }
}

PushConsumer

定义与概述

PushConsumer是RocketMQ中最常用的消费模式,Broker主动将消息推送给消费者,消费者只需关注如何处理接收到的消息。PushConsumer是事件驱动的消息消费模式,适用于需要实时处理消息的场景。

原理机制
  • 消息推送:Broker会自动将新到达的消息推送到消费者,消费者只需实现相应的处理逻辑。
  • 并发消费:PushConsumer支持多线程并发消费,通过配置线程池可以提高消息处理的并发性。
  • 消费模式:支持两种消费模式:并发消费(ConsumeConcurrently)和顺序消费(ConsumeOrderly)。并发消费不保证消息的顺序性,而顺序消费保证同一个队列内的消息按顺序消费。
使用场景
  • 实时处理:适合需要实时处理消息的场景,如金融交易、监控告警、实时数据分析等。
  • 并发消费:适合需要高并发处理的场景,通过配置多线程提高消费吞吐量。
优缺点
  • 优点
    • 实时性高:消息被推送到消费者后可以立即处理,适用于需要实时响应的业务场景。
    • 易于实现:PushConsumer实现简单,开发者只需关注业务逻辑的实现,而无需关心消息拉取和偏移量管理。
  • 缺点
    • 流控难度大:推模式下流控难度较大,可能出现消息积压的情况,尤其是在消费处理速度跟不上消息推送速度时。
    • 并发限制:虽然支持多线程并发消费,但对于顺序消费模式,仍然可能存在并发处理的局限性。
Java 代码示例
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

public class PushConsumerExample {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TopicTest", "*");

        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msg : msgs) {
                System.out.printf("Received Message: %s%n", new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        consumer.start();
    }
}

总结

在RocketMQ的消费者分类中:

  • PullConsumer提供了灵活的消息拉取机制,适用于需要自主控制消费节奏的场景。
  • SimpleConsumer在PullConsumer的基础上进行了简化,适用于简单的消费场景。
  • PushConsumer则是最常用的模式,适用于需要实时处理消息的场景。

通过对不同消费者类型的理解和选择,可以更好地满足不同业务场景的需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2100544.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringMVC 第一次复学笔记

服务器启动时&#xff0c;创建spring容器&#xff1b;dispatcherServlet启动时&#xff0c;直接创建springmvc容器初始化一次&#xff0c;实现了springmvc和spring的整合。 SpringMVC里的组件 处理器映射器&#xff08;HandlerMapping&#xff09;负责匹配映射路径对应的Handl…

小琳Python课堂:Python全局解释器锁(GIL)的深入解析与应用

小琳Python课堂开讲啦&#xff01;今天我们来深入探讨Python中的一个重要概念——全局解释器锁&#xff08;GIL&#xff09;。&#x1f512; 技术细节角度 单线程执行保证&#xff1a;GIL确保了在任何时刻只有一个线程在执行Python字节码。互斥锁实现&#xff1a;GIL通过在解…

无人机之发动机篇

一、无人机发动机的分类 无人机发动机根据工作原理和应用场景的不同&#xff0c;主要分为以下几类&#xff1a; 电动马达&#xff1a; 特点&#xff1a;清洁、高效、体积小巧、重量轻便、噪音低、对环境影响小。 应用&#xff1a;多用于小型或微型无人机&#xff0c;因其续航…

Nature Microbiology|WISH标签技术:评估微生物组中单株水平的种群动态

微生态研究搞了n多年&#xff0c;益生菌还是那么几个&#xff0c;为什么&#xff1f; 一个共识是单一菌株的添加往往难以解决复杂问题。微生物群体的相互作用和平衡不应被忽视。实际上&#xff0c;我们需要在群体层面进行添加或干预&#xff0c;才能真正发挥益生菌的作用。然而…

AD原理图无法输入中文的问题及解决办法

电脑系统&#xff1a;Windows 11 专业版 AD版本&#xff1a;20.0.14 解决方法&#xff1a; 方法1、在word打好&#xff0c;复制到AD 方法2、尝试快捷键ctrlshift、ctrl空格、windows空格

超声波眼镜清洗机有用吗?清洁力好的超声波清洗机推荐

在当今快节奏的生活中&#xff0c;维持良好的卫生状况已成为日常不可或缺的一部分&#xff0c;尤其对于追求高品质生活方式的人来说更为重要。因此&#xff0c;选择一款高效便捷的超声波清洗机成为了提升居家清洁体验的理想方案。面对市面上琳琅满目的品牌&#xff0c;甄选出既…

24全网最全stable diffusion模型讲解!快来!!新手必收藏!!

前言 手把手教你入门绘图超强的AI绘画程序Stable Diffusion&#xff0c;用户只需要输入一段图片的文字描述&#xff0c;即可生成精美的绘画。给大家带来了全新Stable Diffusion保姆级教程资料包&#xff08;文末可获取&#xff09; AI模型最新展现出的图像生成能力远远超出人…

扫雷游戏(上)

开学快乐 今天我们来写扫雷的代码 一、了解扫雷是怎么玩儿的 首先这里放一个扫雷游戏的链接扫雷游戏网页版 - Minesweeper 然后我们点进去耍了一阵&#xff0c;发现扫雷首先要有一个棋盘&#xff0c;然后在玩家不知道的情况下设置雷在不同的格子&#xff0c;玩家点到雷就炸死…

arm调试-- gdb与gdbserver的安装与使用

一、安装 1.下载gdb源码 https://ftp.gnu.org/gnu/gdb/gdb-7.11.1.tar.gz 2. 解压编译gdb以及gdbserver (1)gdb PC端 tar -zxvf gdb-7.11.1.tar.gz cd gdb-7.11.1 mkdir _install ./configure --targetaarch64-linux-gnu --disable-werror --prefix/home/jinhao/gdb-7.1…

Maven聚合与继承

聚合 当我们一次想要构建多个项目时&#xff0c;而不是到每一个模块的目录下分别执行mvn命令。这个时候就需要使用到maven的聚合特性 这里第一个特殊的地方是packaging&#xff0c;值设置为pom。我们正常开发的其他模块中都没有声明packaging&#xff0c;默认使用了默认值jar&a…

Android Studio 最新版本保姆级安装使用教程

Android Studio 2024 最新版本保姆级安装使用教程 1、官网下载2、安装Standard默认配置Custom配置 3、视频教程 这里是Android Studio 2024最新版本保姆级安装教程&#xff0c;高级版本基本上通用~ Android 是为世界各地的所有用户打造的&#xff0c;无论从设计、功能还是整体价…

操作系统面试真题总结(五)

文章收录在网站&#xff1a;http://hardyfish.top/ 文章收录在网站&#xff1a;http://hardyfish.top/ 文章收录在网站&#xff1a;http://hardyfish.top/ 文章收录在网站&#xff1a;http://hardyfish.top/ 线程切换要保存哪些上下文&#xff1f; 当发生线程切换时&#xf…

8、Django Admin后台中添加Logo

在项目settings.py文件 # 导入os&#xff0c;并且修改DIRS内容如下所示 import os TEMPLATES [{BACKEND: django.template.backends.django.DjangoTemplates,DIRS: [os.path.join(BASE_DIR, templates/)],APP_DIRS: True,OPTIONS: {context_processors: [django.template.con…

Mudo03 vscode配置相应的文件的搜索路径,库文件的搜索路径以及想要的链接库

使用muduo库&#xff0c;需要链接libmuduo_base.so、libmuduo_net.so 、libpthread.so VScode上如何配置相应的头文件的搜索路径&#xff1f;库文件的搜索路径&#xff1f; 文件的搜索路径&#xff1a; -I&#xff1a;头文件搜索路径 -L&#xff1a;库文件搜索路径 -Imuduo_ne…

docker安装nginx1.27.0

关于拉取不到镜像问题可以到这篇文章进行镜像配置 一、docker拉取nginx1.27.0镜像 docker pull nginx:1.27.0二、创建映射容器的文件目录 mkdir -p -m 777 /mydata/nginx/conf/conf.d mkdir -p -m 777 /mydata/nginx/log mkdir -p -m 777 /mydata/nginx/html 三、创建文件de…

IDEA下载及安装教程(24年7月更新)

IDEA全称IntelliJ IDEA&#xff0c;是由JetBrains公司开发的用于Java编程设计的软件&#xff0c;是一款高效的IDE工具&#xff0c;也是许多Java开发者喜爱的开发工具&#xff0c;具有全行代码补全功能和兼容性与可拓展性&#xff0c;同时也具有较高的使用度与知名度。今天&…

物业|基于SprinBoot+vue的物业管理系统(源码+数据库+文档)

物业管理系统 基于SprinBootvue的物业管理系统 一、前言 二、系统设计 三、系统功能设计 系统登录实现 后台模块实现 管理员模块实现 物业管理模块实现 业主模块实现 维修员模块实现 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、…

【数学建模国赛思路预约】2024高教社杯全国大学生数学建模竞赛助攻——思路、可运行代码、成品参考

2024年全国大学生数学建模大赛马上就要开始了&#xff0c;大家有没有准备好呢&#xff0c;今年将会和之前一样&#xff0c;将会在比赛赛中时期为大家提供比赛各题的相关解题思路、可运行代码参考以及成品论文。 一、分享计划 1、 赛中分享内容包括&#xff08;2023国赛为例&am…

Windows cmd 输入 Python 弹出应用商城

文章目录 1 使用场景1.1 打开 "运行" 窗口1.2 输入 Python&#xff0c;弹出 Windows 应用商城 2 解决办法2.1 打开 "管理应用执行别名"2.2 取消勾选2.3 验证 1 使用场景 1.1 打开 “运行” 窗口 快捷键&#xff1a;Win r&#xff0c;并输入 cmd 1.2 输入…

Dubbo学习圣经:从入门到精通 Dubbo3.0 + SpringCloud Alibaba 微服务基础框架

尼恩说在前面 在40岁老架构师 尼恩的读者交流群(50)中&#xff0c;最近有小伙伴拿到了一线互联网企业如得物、阿里、滴滴、极兔、有赞、希音、百度、网易、美团的面试资格&#xff0c;并且拿了很多大厂offer。 其中 SpringCloud 工业级底座 &#xff0c;是大家的面试核心&…