Rocketmq面试(四)RocketMQ 的推模式和拉模式有什么区别?

news2025/2/13 10:29:11

一、PUSH模式

public class Consumer {
  public static void main(String[] args) throws InterruptedException, MQClientException {
    // 初始化consumer,并设置consumer group name
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
   
    // 设置NameServer地址 
    consumer.setNamesrvAddr("localhost:9876");
    //订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
    consumer.subscribe("TopicTest", "*");
    //注册回调接口来处理从Broker中收到的消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
    // 启动Consumer
    consumer.start();
    System.out.printf("Consumer Started.%n");
  }
}

消费者会定义一个消息监听器,并把这个监听器注册到DefaultMQPushConsumer,同时也会注册到其实现类,当拉取消息的时候,就会使用这个监听器来处理消息,那这个监听器什么时候调用呢?

消费者真正拉取请求的类是DefaultMQPush-ConsumerImpl,这个类的pullMessage方法调用了PullApiWrapper的pullKernelImpl方法,这个方法有一个参数是回调函数Pull-Callback,当PULL状态是PullStatus.FOU-ND代表拉取成功。

PullCallback pullCallback = new PullCallback() {
    @Override
    public void onSuccess(PullResult pullResult) {
        if (pullResult != null) {
            pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                subscriptionData);

            switch (pullResult.getPullStatus()) {
                case FOUND:
                    //省略部分逻辑
                        DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                            pullResult.getMsgFoundList(),
                            processQueue,
                            pullRequest.getMessageQueue(),
                            dispatchToConsume);
                        //省略部分逻辑
                    break;
                //省略其他case
                default:
                    break;
            }
        }
    }

    @Override
    public void onException(Throwable e) {
        //省略
    }
};

这个处理逻辑调用了ConsumerMessageService的submitConsumeRequest方法,看一下并发消费的处理逻辑

public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    if (msgs.size() <= consumeBatchSize) {
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            this.submitConsumeRequestLater(consumeRequest);
        }
    } else {
        //分批处理,跟上面逻辑一致
}

ConsumerRequest是一个线程类,run方法里面调用了消费者定义的消息处理方法

public void run() {
    //省略逻辑
    MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
    //省略逻辑
    try {
        //调用消费方法
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    } catch (Throwable e) {
        //省略逻辑
    }
    //省略逻辑
}

下面以并发消费方式下的同步拉取消息为例总结一下消费者消息处理过程:

1.在MessageListenerConcurrently中定义消费者处理逻辑,在消费者启动的时候注册到DefultMQpushConsumer,和DefaultMQ-PushConsumerImpl

2.消费者启动时,启动消费拉取线程PullMessageService,里面死循环不停的从broker拉取消息,这里调用了DefaultMQPushConsumerImpl类的pullMessage方法

3.DefaultMQPushConsumerImpl类的pullMessage方法调用了PullAPIWrapper的pullKernelImpl方法真正的发送PULL请求,并传入PullCallback的回调函数

4.拉取到消息后,调用 PullCallback 的 onSuccess 方法处理结果,这里调用了 ConsumeMessageConcurrentlyService 的 submitConsumeRequest 方法,里面用 ConsumeRequest 线程来处理拉取到的消息;

5.ConsumerRequest处理消息时调用了消费端定义的消费逻辑,也就是MessageListerConcurrently的consumeMessage方法。

pull模式

官方代码

DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.start();
try {
    while (running) {
        List<MessageExt> messageExts = litePullConsumer.poll();
        System.out.printf("%s%n", messageExts);
    }
} finally {
    litePullConsumer.shutdown();
}

可以我们看到,PULL模式需要在处理逻辑里面不停的去拉取,比如上面代码中写了一个死循环,那PULL模式中poll韩式是怎么实现的呢?

跟踪源码可以看到,消息拉取最终是从 DefaultLitePullConsumerImpl 类中的一个 LinkedBlockingQueue 上面拉取。那消息是什么时候 put 到 LinkedBlockingQueue 呢?官方拉取消息的代码中有一个 subscribe 方法订阅了 Topic,这个 subscribe 方法最终调用了 DefaultLite-PullConsumerImpl 类的 subscribe

    public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
        try {
            if (topic == null || topic.equals("")) {
                throw new IllegalArgumentException("Topic can not be null or empty.");
            }
            setSubscriptionType(SubscriptionType.SUBSCRIBE);
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),
                topic, subExpression);
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
            assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
            if (serviceState == ServiceState.RUNNING) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
                updateTopicSubscribeInfoWhenSubscriptionChanged();
            }
        } catch (Exception e) {
            throw new MQClientException("subscribe exception", e);
        }
    }

这里给 DefaultLitePullConsumer 类的 messageQueueListener 这个监听器进行了赋值。当监听器监听到 MessageQueue 发送变化时,就会启动消息拉取消息的线程 Pull-TaskImpl,拉取消息成功后,调用 submitConsume-Request 方法把拉取到的消息放到 consumeRequestCache,然后启动下一次拉取。在消费者启动时,会启动 RebalanceService 这个线程,可以看到最终调用了 最终调用了 Rebalance-LitePullImpl 的 messageQueueChanged 方法,代码如下:

public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
    MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
    if (messageQueueListener != null) {
        try {
            messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
        } catch (Throwable e) {
            log.error("messageQueueChanged exception", e);
        }
    }
}

下面以并发消费方式下的同步拉取消息为例总结一下消费者消息处理过程:

  1. 消费者启动,向 DefaultLitePullConsumer 订阅了 Topic,这个订阅过程会向 DefaultLitePullConsumer 注册一个监听器;

  2. 消费者启动过程中,会启动 Message-Queue 重平衡线程 Rebalance-Service,当重平衡过程发现 ProcessQueueTable 发生变化时,启动消息拉取线程;

  3. 消息拉取线程拉取到消息后,把消息放到 consumeRequestCache,然后进行下一次拉取;

  4. 消费者启动后,不停地从 consumeReq-uestCache 拉取消息进行处理。

总结

通过本文的讲解,可以看到 PUSH 模式和 PULL 模式本质上都是客户端主动拉取,RocketMQ并没有真正实现 Broker 推送消息的 PUSH 模式。RocketMQ 中 PULL 模式和 PUSH 模式的区别如下:

  1. PULL 模式是从 Broker 拉取消息后放入缓存,然后消费端不停地从缓存取出消息来执行客户端定义的处理逻辑,而 PUSH 模式是在死循环中不停的从 Broker 拉取消息,拉取到后调用回调函数进行处理,回调函数中调用客户端定义的处理逻辑

  2. PUSH 模式拉取消息依赖死循环来不停唤起业务,而 PULL 模式拉取消息是通过 MessageQueue 监听器来触发消息拉取线程,消息拉取线程会在拉取完一次后接着下一次拉取。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/639583.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于STM32的重力感应售货机系统设计

一、项目介绍 随着智能物联网技术的不断发展&#xff0c;人们的生活方式和消费习惯也正在发生改变。如今越来越多的人习惯于在线购物、自助购物等新型消费模式&#xff0c;因此智能零售自助柜应运而生。 本项目设计开发一款基于STM32主控芯片的智能零售自助柜&#xff0c;通过…

哪吒汽车,莫做“普信男”

作者 | 魏启扬 来源 | 洞见新研社 今年初&#xff0c;哪吒汽车创始人方运舟和张勇联合发表新年致辞&#xff0c;文末总结说 “2023-2025年&#xff0c;必将是一场艰难的挑战&#xff0c;也是哪吒汽车的生死存亡之战。” 哪吒汽车或许过于敏感了&#xff0c;就今年以来的市场表…

Tensorflow两步安装(超简单)

一、查看python版本&#xff0c;下载对应tensorflow文件 1.Anaconda已安装&#xff0c;找到Anaconda3文件夹&#xff0c;双击打开anaconda prompt&#xff0c;输入python&#xff0c;查看python版本 可以看到我的版本是3.9的 2.进入下面的网站&#xff0c;选择你需要的cpu或g…

【appium】appium自动化入门之API(下)——两万字API长文,建议收藏

目录 Appium API 前言 1.contexts &#xff08;返回当前会话中的上下文&#xff0c;使用后可以识别 H5 页面的控件&#xff09; 2.current_context &#xff08;返回当前会话的当前上下文 &#xff09; 3. context &#xff08;返回当前会话的当前上下文&#xff09; 4.find_e…

Django-搭建sysinfo获取系统信息

文章目录 前言一、项目搭建二、主机信息监控三、Celery定时任务和异步任务 前言 本篇基于&#xff1a;https://github.com/hypersport/sysinfo#readme 使用Django&#xff0c;搭建sysinfo&#xff0c;Linux中,sysinfo是用来获取系统相关信息的结构体 一、项目搭建 &#xff0…

CV方向如何找到适合自己的研究创新点?

做CV的论文创新的一些思路与方向。分别是无事生非&#xff0c;后浪推前浪&#xff0c;推陈出新&#xff0c;出奇制胜。 无事生非 在原始的数据集上加一些噪声&#xff0c;例如随机遮挡&#xff0c;或者调整饱和度亮度什么的&#xff0c;主要是根据具体的任务来增加噪声或扰动&a…

大模型LLM-微调经验分享总结

模型越大对显卡的要求越高&#xff0c;目前主流对大模型进行微调方法有三种&#xff1a;Freeze方法、P-Tuning方法和Lora方法。笔者也通过这三种方法&#xff0c;在信息抽取任务上&#xff0c;对ChatGLM-6B大模型进行模型微调。liucongg/ChatGLM-Finetuning: 基于ChatGLM-6B模型…

I/O设备详解

目录 一. 什么是IO设备 二. IO设备分类 2.1按照使用特性分类 2.2按照传输速率分配 2.3按照信息交换的单位分类 三. IO设备的构成 3.1 IO的机械部件 3.2 IO的电子部件 3.2.1设备控制器&#xff08;IO控制器功能简介&#xff09; 3.2.2设备控制器&#xff08;IO控制器&…

【C++】红黑树的模拟实现

文章目录 一、红黑树的概念二、红黑树的性质三、红黑树节点的定义四、红黑树结构五、红黑树的插入操作六、红黑树的调整1.叔叔存在且为红2.叔叔不存在或者存在且为黑3.插入完整代码4.总结 七、红黑树的验证八、红黑树的删除九、红黑树与AVL树的比较十、红黑树的应用十一、红黑树…

d2l_第四章学习_Softmax Regression

x.1 Classification 分类问题 x.1.1 Classification和Regression的区别 注意&#xff0c;广义上来讲&#xff0c;Classification/Softmax Regression 和 Linear Regression 都属于线性模型。但人们口语上更习惯用Classification表示Softmax Regression&#xff0c;而用Regres…

C++特殊类的设计与类型转换

特殊类的设计与类型转换 特殊类的设计请设计一个类&#xff0c;只能在堆上创建对象请设计一个类&#xff0c;只能在栈上创建对象请设计一个类&#xff0c;只能创建一个对象(单例模式) C的类型转换 特殊类的设计 请设计一个类&#xff0c;只能在堆上创建对象 通过new创建的类就…

Baumer工业相机堡盟工业相机如何使用BGAPISDK对两个万兆网相机进行硬件触发同步(C++)

Baumer工业相机堡盟工业相机如何使用BGAPISDK对两个万兆网相机进行硬件触发同步&#xff08;C&#xff09; Baumer工业相机Baumer工业相机BGAPISDK和触发同步的技术背景Baumer工业相机使用BGAPISDK进行双相机主从相机触发1.引用合适的类文件2.使用BGAPISDK设置主相机硬件触发从…

C++中内存泄漏,内存溢出区别

C/C中内存泄露和内存溢出的区别 注&#xff1a;泄露为没有释放内存&#xff0c;溢出为分配空间不够&#xff0c;数据溢出了 内存溢出&#xff08;out of memory&#xff09;是指程序在申请内存时&#xff0c;没有足够的内存空间供其使用。 内存泄漏&#xff08;memory leak&…

【ROS_Driver驱动真实UR机械臂】

【ROS_Driver驱动真实UR机械臂】 1. 前言2. 安装fmauch_universal_robot和驱动3. 仿真3.1 启动gazebo3.2 启动move it规划3.3 启动rviz 4. 运行机械臂4.1 启动rviz4.2 启动示教器程序4.3 启动moveit4.4 启动rviz 5. 一些说明补充5.1 ur_calibration 提取标定信息5.2 自带程序5.…

从原理到实践:使用Mediacodec编码H265并实现解码H265码流

H265 H265&#xff0c;也称为HEVC&#xff08;High Efficiency Video Coding&#xff09;&#xff0c;是一种高效视频编码格式。它是H264&#xff08;AVC&#xff09;的后继者&#xff0c;也是ITU-T和ISO/IEC联合开发的标准。相比H264&#xff0c;H265可以在同样的视频质量下&…

【数据库原理与应用 - 第三章】数据库设计

数据库设计的步骤 需求分析阶段概念模型设计阶段 —— E-R图逻辑模型设计阶段 —— 关系模型物理结构设计阶段 数据库实施阶段数据库运行和维护阶段 目录 数据库设计的步骤 一、需求分析 1、主要任务 2、对象模型 二、数据库概念结构设计 1、概念数据模型 E-R图 1、概念…

Mybatis《学习笔记(22版尚硅谷)》

Mybatis简介 MyBatis历史 MyBatis最初是Apache的一个开源项目iBatis, 2010年6月这个项目由Apache Software Foundation迁移到了Google Code。随着开发团队转投Google Code旗下&#xff0c;iBatis3.x正式更名为MyBatis。代码于2013年11月迁移到GithubiBatis一词来源于“intern…

C语言中的for循环语句

表达式1&#xff1a;设置初始条件&#xff0c;只执行一次&#xff0c;可以为多个变量设置初始值 表达式2&#xff1a;循环条件表达式&#xff0c;判断是否集训循环 表达式3&#xff1a;执行循环体后再执行 例如&#xff0c;使用for循环打印1-10的数字首先定义整形变量a0&…

【V4L2】 v4l2框架分析之v4l2_device

&#x1f440;&#x1f449;本系列文章基于linux内核版本4.1.15分析media子系统下的V4L2。先分析组成V4L2的核心数据结构以及各组成元素的含义和作用。相关文章&#xff1a; ❤&#xff08;1&#xff09;《【V4L2】v4l2框架分析之video_device》 ❤&#xff08;2&#xff09;《…

30分钟了解并学会git的使用(绝对干货)

概述&#xff1a; 在programmer行业有一句流传许久的话&#xff1a;不会用git的程序员&#xff0c;不是真的程序员&#xff01;&#xff01;&#xff01; 足以看出来git在业界的认可度有多高&#xff0c;所以我们混这行的都应该学会这个由Linux之父林纳斯开发 的第二大发明&a…