RocketMq源码分析(八)--消息消费流程

news2024/12/28 21:23:00

文章目录

  • 一、消息消费实现
  • 二、消息消费过程
    • 1、消息拉取
    • 2、消息消费
      • 1)提交消费请求
      • 2)消费消息

一、消息消费实现

  消息消费有2种实现,分别为:并发消费实现(ConsumeMessageConcurrentlyService)和顺序消费实现(ConsumeMessageOrderlyService)。本次以并发消费实现为切入进行探讨消息的消费流程。
在这里插入图片描述

二、消息消费过程

1、消息拉取

  1)在消息服务PullMessageService中完成将消息从远程服务器拉取到本地,具体实现由DefaultMQPushConsumerImpl#pullMessage方法完成

//org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessage
    private void pullMessage(final PullRequest pullRequest) {
        //从MQClientInstance中获取内部实现类MQConsumerInner
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            //强转换成PUSH消息消费服务,然后消费消息
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
        }
    }

  2)DefaultMQPushConsumerImpl#pullMessage方法中定义了回调实现,在成功拉取消息后,先将消息放到processQueue中,然后再提交消费请求(DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest)异步完成消息消费。

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage 回调部分代码
       PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
               // ......

	             //从服务器拉取到消息后回调 PullCallBack 回调方法后,先将消息放入到 ProccessQueue中,
	               boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
	               // 然后把消息提交到消费线程池中执行,
	               // 也就是调用 ConsumeMessageService#submitConsumeRequest 开始进入到消息消费的事件中来
	               DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
	                   pullResult.getMsgFoundList(),
	                   processQueue,
	                   pullRequest.getMessageQueue(),
	                   dispatchToConsume);

  // ......
                    }
                }
            }

// ......
        };

2、消息消费

1)提交消费请求

  pullMessage方法中回调提交消息消费(submitConsumeRequest),进入消息并发消费实现(ConsumeMessageConcurrentlyService),其实现代码如下:

// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest方法
    @Override
    public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispatchToConsume) {
        final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (msgs.size() <= consumeBatchSize) {
            ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
            try {
                //异步线程池中执行
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                //提交异常,延迟5S再提交
                this.submitConsumeRequestLater(consumeRequest);
            }
        } else {
            //超过最大数量,分批
            for (int total = 0; total < msgs.size(); ) {
                List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
                for (int i = 0; i < consumeBatchSize; i++, total++) {
                    if (total < msgs.size()) {
                        msgThis.add(msgs.get(total));
                    } else {
                        break;
                    }
                }
                ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                try {
                    this.consumeExecutor.submit(consumeRequest);
                } catch (RejectedExecutionException e) {
                    for (; total < msgs.size(); total++) {
                        msgThis.add(msgs.get(total));
                    }
                    this.submitConsumeRequestLater(consumeRequest);
                }
            }
        }
    }

  submitConsumeRequest方法中,

  • 先获取单次消费消息最大条数(consumeBatchSize,默认1条)
  • 如果本次提交消息条数小于等于单次消费消息最大条数,则直接创建ConsumeRequest并提交到线程池(consumeExecutor)中执行
  • 如果超过单次消费消息最大条数,则按consumeBatchSize分割分批提交

2)消费消息

  ConsumeMessageConcurrentlyService中创建消息消费请求线程ConsumeRequest,然后提交到线程池。

// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run
        @Override
        public void run() {

            //在进行消息重新负载时如果该消息队列被分配给消费组内其他消费者,drop设置为true,阻止消费者消费不属于自己的消息队列
            if (this.processQueue.isDropped()) {
                log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
                return;
            }

            //类名.this:一般用于内部类需要使用其外部类的实例对象时候使用 ClassName.this 代表其外部类对象,直接写this则代表内部类本身对象
            MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
            ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
            ConsumeConcurrentlyStatus status = null;
            //恢复重试消息主题名
            // RocketMQ将消息存入 commitlog 文件时,如果发现消息的延时级别 delayTimeLevel 大于0会
            //首先将重试主题存人在消息的属性中,然后设置主题名称为 SCHEDULE TOPIC ,以便时间到后重新参与消息消费
            defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());

            ConsumeMessageContext consumeMessageContext = null;
            //执行钩子
            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext = new ConsumeMessageContext();
                consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
                consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
                consumeMessageContext.setProps(new HashMap<String, String>());
                consumeMessageContext.setMq(messageQueue);
                consumeMessageContext.setMsgList(msgs);
                consumeMessageContext.setSuccess(false);
                ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
            }

            long beginTimestamp = System.currentTimeMillis();
            boolean hasException = false;
            ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
            try {
                if (msgs != null && !msgs.isEmpty()) {
                    for (MessageExt msg : msgs) {
                        MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                    }
                }
                //内部消息监听器消费消息
                status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
            } catch (Throwable e) {
                log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
                    RemotingHelper.exceptionSimpleDesc(e),
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    msgs,
                    messageQueue), e);
                hasException = true;
            }
            long consumeRT = System.currentTimeMillis() - beginTimestamp;
            if (null == status) {
                if (hasException) {
                    returnType = ConsumeReturnType.EXCEPTION;
                } else {
                    returnType = ConsumeReturnType.RETURNNULL;
                }
            } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                returnType = ConsumeReturnType.TIME_OUT;
            } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
                returnType = ConsumeReturnType.FAILED;
            } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
                returnType = ConsumeReturnType.SUCCESS;
            }

            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
            }

            if (null == status) {
                log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    msgs,
                    messageQueue);
                status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            //后置钩子
            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext.setStatus(status.toString());
                consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
                ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
            }
            
            ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
                .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
            //同步消息消费状态和offset
            if (!processQueue.isDropped()) {
                ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
            } else {
                log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
            }
        }

  ConsumeRequest线程中,执行步骤如下

  • 判断消费的队列是否dropped,如果为true,则停止直接终止该消费请求
  • 恢复重试消息的topic和namespace
  • 如果存在钩子函数,则执行前置钩子函数 ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext)
  • 调用消息监听器消费消息listener.consumeMessage(io.openmessaging.rocketmq.consumer.PushConsumerImpl.MessageListenerImpl)
  • 如果存在后置钩子,则执行后置钩子函数
  • 消息消费结果处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1144255.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue3-vite-ts-pinia

Vue3 vite Ts pinia 实战 源码 electron 仓库地址&#xff1a;https://gitee.com/szxio/vue3-vite-ts-pinia 视频地址&#xff1a;小满Vue3&#xff08;课程导读&#xff09;_哔哩哔哩_bilibili 课件地址&#xff1a;Vue3_小满zs的博客-CSDN博客 初始化Vue3项目 方式一 …

分布式数据库Apache Doris简易体验

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;我是【IT邦德】&#xff0c;江湖人称jeames007&#xff0c;10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】&#xff01;&#x1f61c;&am…

公司电脑禁用U盘的方法

公司电脑禁用U盘的方法 安企神U盘管理系统下载使用 在这个复杂的数据时代&#xff0c;保护公司数据的安全性至关重要。其中&#xff0c;防止未经授权的数据泄露是其中的一个关键环节。U盘作为一种常用的数据传输工具&#xff0c;也成为了潜在的安全风险。因此&#xff0c;公司…

DOM节点学习

喜欢的东西太贵了&#xff0c;我一咬牙&#xff0c;狠下心决定不喜欢了&#xff01; 【文档节点--DOM有哪些节点】 仔细看下面文档的html标签的不同 1.li标签没换行 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"&…

【代码随想录】算法训练计划04

1、24. 两两交换链表中的节点 题目&#xff1a; 给你一个链表&#xff0c;两两交换其中相邻的节点&#xff0c;并返回交换后链表的头节点。你必须在不修改节点内部的值的情况下完成本题&#xff08;即&#xff0c;只能进行节点交换&#xff09;。 思路&#xff1a; 链表这种题…

自己动手搭建一个传奇是什么体验?下面是我搭建的详细教程,大家跟着教程做,不光是学会了技术,平时还可以帮朋友搭建

传奇游戏是一代人的回忆&#xff0c;它曾经风靡一时&#xff0c;让无数玩家沉迷其中。这款游戏以其独特的玩法、丰富的故事背景和深刻的角色刻画&#xff0c;吸引了一大批忠实粉丝。 在传奇游戏中&#xff0c;玩家可以体验到各种不同的职业和角色&#xff0c;每个角色都有自己…

计算机毕业设计 基于SpringBoot高校竞赛管理系统的设计与实现 Javaweb项目 Java实战项目 前后端分离 文档报告 代码讲解 安装调试

&#x1f34a;作者&#xff1a;计算机编程-吉哥 &#x1f34a;简介&#xff1a;专业从事JavaWeb程序开发&#xff0c;微信小程序开发&#xff0c;定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事&#xff0c;生活就是快乐的。 &#x1f34a;心愿&#xff1a;点…

javascript数据类型

目录 原始数据类型 引用数据类型 类型检测 类型转换 总结 原始数据类型 JavaScript 中有六种原始数据类型&#xff0c;它们是&#xff1a; Undefined&#xff08;未定义&#xff09;: 表示一个未被赋值的变量。Null&#xff08;空值&#xff09;: 表示一个空对象指针。B…

jetson nano刷机更新Jetpack

只是记录个人在使用英伟达jetson Nano的经历,由于头一次尝试,所以特此记录需要的问题和经验。 一,英伟达刷机教程(jetson nano 版本) 本次我是直接刷机到TF卡,然后TF卡作为启动盘进行启动,我看网上有带EMMC版本的,好像可以直接把系统镜像安装到EMMC里面。但是有个问题…

【每日一题】2558. 从数量最多的堆取走礼物-2023.10.28

题目&#xff1a; 2558. 从数量最多的堆取走礼物 给你一个整数数组 gifts &#xff0c;表示各堆礼物的数量。每一秒&#xff0c;你需要执行以下操作&#xff1a; 选择礼物数量最多的那一堆。如果不止一堆都符合礼物数量最多&#xff0c;从中选择任一堆即可。选中的那一堆留下…

ssm164学院学生论坛的设计与实现+vue

项目名称&#xff1a;ssm164学院学生论坛的设计与实现vue 点击这里进入源码目录 声明&#xff1a; 适用范围&#xff1a; 本文档适用于广泛的学术和教育用途&#xff0c;包括但不限于个人学习、毕业设计和课程设计。免责声明&#xff1a; 特此声明&#xff0c;本文仅供参考学…

赴日IT培训 日本IT行业为啥吃香?

确实现在有许多小伙伴尝到了赴日IT的甜头&#xff0c;可是去日本从事IT行业真的很简单吗&#xff1f;为什么日本的IT行业这么缺人呢&#xff1f;那今天小编就跟大家聊一聊日本的IT行业。 咱们先来说说日本的IT行业为什么缺人&#xff1f;其实不只是IT行业&#xff0c;可以说日…

Azure云工作站上做Machine Learning模型开发 - 全流程演示

目录 本文内容先决条件从“笔记本”开始设置用于原型制作的新环境&#xff08;可选&#xff09;创建笔记本开发训练脚本迭代检查结果 关注TechLead&#xff0c;分享AI全维度知识。作者拥有10年互联网服务架构、AI产品研发经验、团队管理经验&#xff0c;同济本复旦硕&#xff0…

53. 寻宝(第七期模拟笔试)(最小生成树练习)

本题链接&#xff1a;卡码网KamaCoder 题目&#xff1a; 样例&#xff1a; 输入 7 11 1 2 1 1 3 1 1 5 2 2 6 1 2 4 2 2 3 2 3 4 1 4 5 1 5 6 2 5 7 1 6 7 1 输出 6 思路&#xff1a; 由题意&#xff0c;这里是需要遍历完全部的顶点&#xff0c;求遍历完全部点的花费最短距离…

java基础 特殊文件

1.Properties属性文件&#xff1a; 1.1使用Properties读取属性文件里的键值对数据&#xff1a; package specialFile;import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.util.Enumeration; import java.util.Propert…

【C++】C++入门(中)--引用

目录 一 引用概念 二 引用特性 三 常引用 四 引用使用场景 1 做参数 2. 做返回值 1 例一 2 例二 3 例三 4 例四 五 传值, 传引用效率比较 六 值和引用的作为返回值类型的性能比较 七 引用和指针的区别 一 引用概念 引用不是新定义一个变量&#xff0c;而是给已存…

视频智能视觉分析真的遥不可及吗?有没有那种下载就能用的视频分析服务?

我一直有一个感觉&#xff0c;就是市面上很难找到那么一个带视频算法的软件&#xff0c;能让我们很直观地看到视频分析的效果&#xff0c;大部分都要内置在某种算力硬件上&#xff0c;或者对GPU要求比较严格&#xff0c;很难做到像以前我们做的视频直播软件那样&#xff0c;下载…

Springboot+vue地方废品回收机构管理系统(有报告)。Javaee项目,springboot vue前后端分离项目。

演示视频&#xff1a; Springbootvue地方废品回收机构管理系统&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot vue前后端分离项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&…

牛客网刷题-(7)

&#x1f308;write in front&#x1f308; &#x1f9f8;大家好&#xff0c;我是Aileen&#x1f9f8;.希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流. &#x1f194;本文由Aileen_0v0&#x1f9f8; 原创 CSDN首发&#x1f412; 如…

RK3568平台 sys虚拟文件系统添加节点

一.常见的linux文件系统 1. EXT2: EXT2是最早的Linux文件系统之一&#xff0c;它被广泛应用于Linux操作系统中。它支持大小为16TB的分区和最大文件大小为2TB。由于其简单性和高可靠性&#xff0c;在很长一段时间内仍被许多用户所选择。 2. EXT3: 2001年&#xff0c;Linux社区…