【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析

news2024/11/25 7:43:04

透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析

  • DefaultMQPushConsumerImpl拉取消息
  • consumeMessageService的并发消费和顺序消费
    • 并发消费
    • 顺序消费
      • concurrently 创建 ConsumeRequest
        • concurrently ConsumeRequest#run 消费主体逻辑
          • 消费结束之后清除数据
      • orderly 创建 ConsumeRequest
        • orderly ConsumeRequest#run 消费主体逻辑
          • 顺序处理机制
          • 关于 offset 提交
  • 消息消费的失败

DefaultMQPushConsumerImpl拉取消息

首先,DefaultMQPushConsumerImpl 是一个实现了 RocketMQ 的消费者客户端接口的类。该类的主要作用是从 RocketMQ 的 Broker 获取消息并进行消费。

主要可以通过pullMessage方法进行获取对应的操作,如下图所示。
在这里插入图片描述

在消费消息时,DefaultMQPushConsumerImpl 会将获取到的消息放入一个processQueue中,processQueue包含了一个TreeMap数据结构,它按照消息的 commitLogOffset 顺序来排列。
在这里插入图片描述

DefaultMQPushConsumerImpl 通过定时的方式,从 Broker 上拉取消息。具体来说,它会调用DefaultMQPushConsumerImpl 自身定义的PullMessageService类,该类会定时的从消息服务器中拉取消息。

源码如下所示。
在这里插入图片描述
一旦消息拉取成功,PushConsumer 会将消息交给 processQueue 中的一个队列进行处理,这个队列对应同一个消息主题的同一个消息队列。

processQueue 中的每个消息都会根据消息的commitLogOffset排列位置。这个位置决定了消息被消费的顺序。也就是说,processQueue 存放的顺序决定了消息消费的顺序。

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);

consumeMessageService的并发消费和顺序消费

consumeMessageService 是一个用于消费消息的服务方法,它可以实现消息的并发消费和顺序消费。当使用 consumeMessageService 时,需要考虑业务的实际需求以及消息处理的性质,权衡使用并发消费和顺序消费。

并发消费

并发消费是指多个消费者同时消费同一批消息以提高处理速度,需要注意消息幂等性以避免重复消费。

DefaultMQPushConsumer的consumeMessageBatchMaxSize参数默认值为1,表示默认批量消费的消息数量是1个。在并发消费方式下,若一个队列中拉取到32条消息,则会创建32个ConsumeRequest对象,每个ConsumeRequest对象对应1条消息,提交到线程池中运行。

顺序消费

顺序消费则是按照消息产生的顺序逐个消费,适合处理需要顺序进行的业务逻辑,如订单处理,但实现可能带来性能瓶颈,需谨慎设计。指同一时刻,一个 queue 只有一个线程在消费。只让一个线程消费,由加锁来实现,而顺序则由 TreeMap 来实现

一个队列中拉取到32条消息,则只会创建一个ConsumeRequest对象,该对象会被提交到线程池中,在ConsumeRequest.run方法中会按照消息的offset顺序一条一条地消费,直到TreeMap为空

concurrently 创建 ConsumeRequest

public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    if (msgs.size() <= consumeBatchSize) {
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            this.submitConsumeRequestLater(consumeRequest);
        }
    } else {
        for (int total = 0; total < msgs.size(); ) {
            List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
            for (int i = 0; i < consumeBatchSize; i++, total++) {
                if (total < msgs.size()) {
                    msgThis.add(msgs.get(total));
                } else {
                    break;
                }
            }
            ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                for (; total < msgs.size(); total++) {
                    msgThis.add(msgs.get(total));
                }

                this.submitConsumeRequestLater(consumeRequest);
            }
        }
    }
}

消费者在消费消息时,根据批量消费的大小来决定是将任务提交到线程池中一次性消费,还是将任务分成多次提交到线程池中进行消费。

首先判断msgs中消息的数量是否小于等于一个批量消费数量consumeBatchSize,如果小于等于,那么将所有消息封装成一个ConsumeRequest对象并提交到consumeExecutor线程池中,其中dispatchToConsume表示是否立即分发给消费者消费。

如果消息数量大于批量消费数量,那么将消息分段提交到线程池中进行消费。首先通过两层循环,将msgs中的消息按照consumeBatchSize分成若干个小的MessageExt列表,每个小的MessageExt列表封装成一个ConsumeRequest对象并提交到consumeExecutor线程池中。

如果线程池提交任务出现拒绝执行异常,说明该线程池已经满了,这时候需要将当前小的MessageExt列表继续循环并依次每次取出一个消息封装成ConsumeRequest对象进行提交,直到所有的小的MessageExt列表被完整地提交到线程池中。若还有未提交的列表,则将该ConsumeRequest对象提交到一个新的线程池中进行定时的重复提交。

concurrently ConsumeRequest#run 消费主体逻辑

消息消费者消费消息的地方,listener.consumeMessage方法会被消费者调用,将消息列表和消息处理上下文传入。

status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
  • msgs是需要消费的消息列表,这里使用了Collections.unmodifiableList方法来创建一个不可修改的消息列表,这是为了保证消息的安全性,防止消息在消费过程中被意外或恶意修改。

  • context是消息处理的上下文,可能包含消费者的订阅信息、消费进度等信息,可根据业务需要进行扩展和使用。

  • consumeMessage方法返回消费结果,通常是一个枚举类型,表示消费结果的状态,如消费成功、消费失败等。消费结果会影响消息处理的下一步流程。

消费结束之后清除数据

主要用于移除已经消费完成的消息。直接从 msgTreeMap 中删除消息,并返回 msgTreeMap 中第一条消息的 queue offset 值。

org.apache.rocketmq.client.impl.consumer.ProcessQueue#removeMessage

public long removeMessage(final List<MessageExt> msgs) {
    long result = -1;
    final long now = System.currentTimeMillis();
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        this.lastConsumeTimestamp = now;
        try {
            if (!msgTreeMap.isEmpty()) {
                result = this.queueOffsetMax + 1;
                int removedCnt = 0;
                for (MessageExt msg : msgs) {
                    MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
                    if (prev != null) {
                        removedCnt--;
                        msgSize.addAndGet(0 - msg.getBody().length);
                    }
                }
                msgCount.addAndGet(removedCnt);
                if (!msgTreeMap.isEmpty()) {
                    result = msgTreeMap.firstKey();
                }
            }
        } finally {
            this.lockTreeMap.writeLock().unlock();
        }
    } catch (Throwable t) {
        log.error("removeMessage exception", t);
    }
    return result;
}

具体来说,它接收一个 MessageExt 类型的消息列表msgs,通过遍历msgs,查找msgTreeMap中相应的消息,将找到的消息删除并计数,更新msgCount和msgSize这两个计数器。代码中也使用了重入锁lockTreeMap来保证线程安全。函数将返回result,表示下一步应该消费的消息的offset,如果没有可消费的消息,则返回-1。

orderly 创建 ConsumeRequest

在消息消费过程中,判断是否需要立即将消息分发给消费者进行消费。

public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispathToConsume) {
    if (dispathToConsume) {
        ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
        this.consumeExecutor.submit(consumeRequest);
    }
}

首先判断参数dispathToConsume为true,如果为true,表示需要立即分发给消费者消费;否则就不需要进行分发,因为可能等待其他条件触发再进行消费。

如果需要立即分发,那么将该消息的消息队列和消息处理队列封装成ConsumeRequest对象,并将该对象提交到consumeExecutor线程池中进行执行。每个消费者线程从consumeExecutor线程池中取出ConsumeRequest对象并进行消费。

orderly ConsumeRequest#run 消费主体逻辑

先简单介绍一下 RocketMQ 消息消费的流程:消费者将消息从 Broker 中拉取到本地的 ProcessQueue 中,然后在 ProcessQueue 中进行消息消费。

// 获取锁
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
    for (boolean continueConsume = true; continueConsume; ) {
        // 从 TreeMap 中获得消息
        List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
        if (!msgs.isEmpty()) {
            status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
        } else {
            continueConsume = false;
        }
    }
    ...
}

public class MessageQueueLock {
    private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>();

    public Object fetchLockObject(final MessageQueue mq) {
        Object objLock = this.mqLockTable.get(mq);
        if (null == objLock) {
            objLock = new Object();
            Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
            if (prevLock != null) {
                objLock = prevLock;
            }
        }

        return objLock;
    }
}

首先实例化了 MessageQueueLock,用于保证多线程环境下的线程同步和互斥。在代码的第一行中,获取到了当前 MessageQueue 的锁对象 objLock。这个锁对象是在 mqLockTable 中获取的,mqLockTable 存储了每个 MessageQueue 的锁对象,用于对不同的 MessageQueue 进行互斥控制。

在代码的后面,使用 synchronized 对 objLock 进行加锁,并进入到了循环中。在循环中,调用 processQueue.takeMessags() 方法从 ProcessQueue 中获取消息,返回的是一个消息列表。如果消息列表不为空,则调用 messageListener.consumeMessage() 方法来进行消息消费。

如果消息列表为空,说明当前的 ProcessQueue 中没有更多的消息,结束当前的循环,并退出 synchronized 块,释放了 objLock 的锁,等待下一次的消费请求。

整个逻辑是通过锁机制来实现对 ProcessQueue 进行互斥控制的,保证了多个消费者之间的消费的安全性。同时,使用了循环来进行多次消费。

顺序处理机制

take消息时,将消息从 msgTreeMap 取出,并放入 consumingMsgOrderlyTreeMap。消费完成后,清空 consumingMsgOrderlyTreeMap。将 offset 设为 this.consumingMsgOrderlyTreeMap.lastKey() + 1,表示已经消费的消息的下一条消息的 offset。

// org.apache.rocketmq.client.impl.consumer.ProcessQueue#commit

public long commit() {
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        try {
            Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
            msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
            for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {
                msgSize.addAndGet(0 - msg.getBody().length);
            }
            this.consumingMsgOrderlyTreeMap.clear();
            if (offset != null) {
                return offset + 1;
            }
        } finally {
            this.lockTreeMap.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("commit exception", e);
    }

    return -1;
}
关于 offset 提交

offset 是消费者从 broker 拉取的下一条消息的偏移量

消息消费的失败

  • 顺序消费:如果处理某条消息失败且重试次数小于阈值,从 consumingMsgOrderlyTreeMap 中取出这条消息并重新放入 msgTreeMap;如果重试次数超过阈值,则将消息发送回 broker 并根据重试次数决定发送消息到 SCHDULE_TOPIC_XXXX 或死信队列

  • 并发消费:如果处理消息时失败,则将消息发送回 broker。如果发送失败,将会继续消费消息,直到成功消费并提交给 broker。

发送 ConsumeRequest 的时机有两个,一是在拉取到消息后,二是在出现异常后延迟提交。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/684862.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

黑马程序员前端 Vue3 小兔鲜电商项目——(十)订单页

文章目录 路由配置和基础数据渲染模板代码配置路由封装接口渲染数据 切换地址-打开弹框交互切换地址-地址切换交互生成订单支付页组件封装订单接口绑定事件 路由配置和基础数据渲染 模板代码 新建 src\views\Checkout\index.vue 文件&#xff0c;添加以下代码&#xff1a; &…

容器管理中关于CGroup的那些事

前言 在一个docker宿主机上可以启动多个容器&#xff0c;默认情况下&#xff0c;docker并没有限制其中运行的容器使用硬件资源。 但如果在实际环境中&#xff0c;容器的负载过高&#xff0c;会占用宿主机大量的资源。这里的资源主要指的CPU&#xff0c;内存&#xff0c;和IO带…

Python Pandas 筛选数据以及字符串替换

str.replace使用示例 假设有一个DataFrame df&#xff0c;其中有一个列名为text&#xff0c;包含一些文本字符串&#xff1a; import pandas as pd data {text: [hello world, foo bar, hello there]} df pd.DataFrame(data) 我们可以使用str.replace方法来替换字符串。比…

操作系统——Linux 进程控制

一、实验题目 Linux 进程控制 二、实验目的 通过进程的创建、撤销和运行加深对进程概念和进程并发执行的理解&#xff0c;明确进程和程序之间的区别。 三、实验内容&#xff08;实验原理/运用的理论知识、算法/程序流程图、步骤和方法、关键代码&#xff09; &#xff08;…

开源网安S-SDLC解决方案,为银行打造主动防御的安全体系

​某银行是全国上市最早的一批股份制商业银行&#xff0c;总部位于深圳&#xff0c;在全国拥有上百家分行、上千家营业机构&#xff0c;资产总额达数千亿元。近年来&#xff0c;该银行围绕数据化、智能化、生态化&#xff0c;全力打造“数字银行”&#xff0c;助力建设“数字中…

第十六届CISCN复现----MISC

1.被加密的生产流量 下载附件&#xff0c;发现是一个文件名为modus的压缩包&#xff0c;解压后是一个pcap文件&#xff0c;用wireshark打开 文件名modus&#xff0c;已经提示了工控流量&#xff0c;很多情况下都是和TCP协议结合起来的 工控CTF之协议分析1——Modbus_ctf modb…

基于java+swing+mysql学生信息管理系统V2.0

基于javaswingmysql学生信息管理系统V2.0 一、系统介绍二、功能展示1.项目骨架2.数据库表3.项目内容4.登陆5.学生信息查询6、学生信息添加7、学生信息修改8、学生信息删除 四、其它1.其他系统实现五.获取源码 一、系统介绍 项目类型&#xff1a;Java SE项目&#xff08;awtswi…

Gorm Many To Many

写cmdb的时候要去做一些软件资源的落库&#xff0c;发布要使用到的应用属性。应用有哪些属性&#xff1f; 应用有它的type类型&#xff0c;是api还是admin&#xff0c;还是job或者task。它的语言是go java.....&#xff0c;它的own也就是属于哪个开发的&#xff0c;这是它的属…

设备管理模块实现

文章目录 1 .导航树模块的实现2. 查询定位功能的实现3. 资源管理功能的实现4. 电缆段入沟功能实现 1 .导航树模块的实现 导航树的各节点是通过Ajax 技术异步加载的&#xff0c;系统初始化时导航树只会加载初始的城市节点&#xff0c;用户根据自身需要选择相应的父节点加载其逻…

Flink安装与编程实践

系列文章目录 Ubuntu常见基本问题 Hadoop3.1.3安装&#xff08;单机、伪分布&#xff09; Hadoop集群搭建 HBase2.2.2安装&#xff08;单机、伪分布&#xff09; Zookeeper集群搭建 HBase集群搭建 Spark安装和编程实践&#xff08;Spark2.4.0&#xff09; Spark集群搭建 文章目…

mongoDB相关知识

目录 常用操作删除数据库 启动问题集如何远程访问mongDB数据库由于widows安全策略&#xff0c;linux访问不到windows的mongDB 常用操作 删除数据库 windows下mongDB通过下面命令行进入 D:\mongodb\mongodb-win32-x86_64-2008plus-ssl-3.6.23-8-gc2609ed3ed\bin>mongod.exe…

Unity开发前的一些建议1_设置脚本的编码格式,设置IDE的编码格式

Unity开发前的一些建议1_设置脚本的编码格式&#xff0c;设置IDE的编码格式 乱码之后是是不可以撤回的哦。 这么做的理由&#xff0c;Unity右侧的Inspector面板看代码是UTF-8格式的。可以在Inspector中速览代码&#xff0c;且如果修改IDE&#xff0c;UTF-8比其他编码格式用的…

K8S复习

本文原文出自本人自己复习时整理&#xff0c;原文非常系统&#xff0c;建议拜师#yyds干货盘点# 手把手教你玩转 Kubernete 集群搭建(03)_wzlinux的博客-CSDN博客 1.docker的优势 在某一段时期内&#xff0c;大家一提到 Docker&#xff0c;就和容器等价起来&#xff0c;认为 Doc…

【架构】后端服务架构高性能设计方法

文章目录 前言1、无锁化1.1、串行无锁1.2、结构无锁 2、零拷贝2.1、内存映射2.2、零拷贝 3、序列化3.1、分类3.2、性能指标3.3、选型考量 4、池子化4.1、内存池4.2、线程池4.3、连接池4.4、对象池 5、并发化5.1、请求并发5.2、冗余请求 6、异步化6.1、调用异步化6.2、流程异步化…

【跟晓月学数据库】使用MySQLdump 对数据导入导出

前言 大家好&#xff0c;我是沐风晓月&#xff0c;今天给大家介绍MySQLdump的数据导出导入&#xff0c;希望对你有用。 &#x1f3e0;个人主页&#xff1a;我是沐风晓月 &#x1f9d1;个人简介&#xff1a;大家好&#xff0c;我是沐风晓月&#xff0c;阿里云社区专家博主&…

vue3+antd-design-vue+vite项目总结

代码热更新能力失效&#xff0c;每次都需要手动刷新&#xff0c;开发体验极差 1、先看看是否开启了热更新 2、再看看引入模块文件名是否正确。当前的项目部分人可以更新&#xff0c;部分不能&#xff0c;所以和1没什么关系&#xff0c;网上搜索发现vite对文件名大小写十分敏感&…

2-3查找树

2-3查找树 为了保证查找树的平衡性&#xff0c;我们需要一些灵活性&#xff0c;因此在这里我们允许树中的一个结点保存多个键。确切的说&#xff0c;我 们将一棵标准的二叉查找树中的结点称为2-结点(含有一个键和两条链)&#xff0c;而现在我们引入3-结点&#xff0c;它含有两…

Java版本企业招投标采购管理系统源码 +支持二开+spring cloud

一、立项管理 1、招标立项申请 功能点&#xff1a;招标类项目立项申请入口&#xff0c;用户可以保存为草稿&#xff0c;提交。 2、非招标立项申请 功能点&#xff1a;非招标立项申请入口、用户可以保存为草稿、提交。 3、采购立项列表 功能点&#xff1a;对草稿进行编辑&#x…

如何访问NetApp E系列存储的CLI命令行

NetApp存储的E系列&#xff08;e-series&#xff09;是收购LSI存储而来的&#xff0c;所以这个产品的install base&#xff0c;也就是安装量其实是很大的&#xff0c;因为早期LSI的商业模式就是OEM&#xff0c;给很多的IT公司做过OEM&#xff0c;比较典型的就是IBM的早期的DS存…

我想搭建一个商城?有哪些流程?

近年来&#xff0c;我国电子商务发展迅速。淘宝、京东、亚马逊等一大批电子商务巨头受到越来越多消费者的青睐。互联网普及率大大提高&#xff0c;消费者也逐渐形成了网上购物的习惯。在支付体验、物流服务和售后服务不断提升的过程中&#xff0c;越来越多的消费者依赖网络购物…