RocketMQ中单消费者订阅多个Topic,会阻塞消费吗?

news2025/1/24 5:03:19

RocketMQ

问题

背景是这样: 最近有个项目用MQ做消息流转,在RocketMQ集群模式下,一个消费者实例,订阅了两个Topic A、B

Topic A:存储的是批量业务消息。
Topic B:存储的是单个业务消息。

有个小伙伴问我:A 先存入了100万消息,消费端线程开始消费 Topic A 数据,消费速度较慢;之后 B 也存入了1万消息,A 的数据积压会阻塞 B 的消费吗?

先说结论:B 消息消费会有一定的延迟,但不会绝对阻塞。

分析

本文 RocketMQ 版本:4.4.0。

从一张类图入手。

项目中,生产者 DefaultMQProducer 和消费者 DefaultMQPushConsumer 启动的时候,都会调用 MQClientInstance#start 方法来启动 MQClientInstance 组件。

MQClientInstance#start 主要工作:

  1. 启动NettyRemotingClient客户端,用于与NameServer和Broker进行通信。

  2. 定时更新Topic所对应路由信息、清除离线broker、向所有在线broker发送心跳包等。

  3. 启动拉取消息服务线程(PullMessageService),异步拉取消息。

  4. 启动负载均衡服务线程(RebalanceService),用于实现消息队列(Message Queue)的负载均衡。

在RocketMQ中,有ProcessQueue和MessageQueue两个概念。

  1. MessageQueue表示一个消息队列,它包含主题、队列ID、Broker地址信息。

  2. ProcessQueue 表示消费者正在消费的消息队列,是一个消费进度的抽象概念。在消费者消费消息时,将消息从MessageQueue中取出,并将消息保存在ProcessQueue中进行消费。包含了消费者消费的消息列表、消息偏移量以及一些状态信息等。

消费端 ProcessQueue 与 Broker 中的 MessageQueue 是一一对应的。

PullMessageService#run

在 PullMessageService 类中定义了 pullRequestQueue 属性,用于存储拉取消息请求(PullRequest)的队列。

PullRequest 拉取消息请求的数据结构定义了:

  • 消费者组。

  • 消费主题。

  • 消费队列。

  • 消费偏移量等。

PullMessageService#run 方法会从队列中取出 PullRequest 并执行,通过不断的发送 PullRequest来持续的拉取消息。

public class PullMessageService extends ServiceThread {
    private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
  
  @Override
    public void run() {
      PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
    }
}

PullMessageService#pullMessage 方法找到消费组里面的消费实现类,执行对应的逻辑。

private void pullMessage(final PullRequest pullRequest) {
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
        }
    }

DefaultMQPushConsumerImpl#pullMessage主要功能:

  1. 消费者服务状态校验,判断消费者是否是运行态;

  2. 流控校验,判断ProcessMessage 队列缓存消息数是否超过阀值。

  3. 并发消费和顺序消费相关的校验,并发消费限制最大跨度偏移量(offset),顺序消费则判断是否锁定,未锁定设置消费点位。

  4. 创建匿名内部类PullCallback,后续拉取消息返回响应后会回调onSuccess完成消息的消费;

  5. 调用PullAPIWrapper#pullKernelImpl拉取消息。

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
  public void pullMessage(final PullRequest pullRequest) {
    //校验
    //判断
    ...
    
    //创建匿名内部类
    PullCallback pullCallback = new PullCallback() {
      @Override
      public void onSuccess(PullResult pullResult) {
        switch (pullResult.getPullStatus()) {
          case FOUND:
            有新消息,封装请求提交到线程池,
            自定义消费者消费数据
            
            DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
              pullResult.getMsgFoundList(),
              processQueue,
              pullRequest.getMessageQueue(),
              dispatchToConsume);
          case NO_NEW_MSG:
          case NO_MATCHED_MSG:
            没有新消息或未匹配到消息时,继续往队列放入拉取消息请求(PullRequest),循环调用。
          ...
      }
    }
  }
}

PullStatus状态定义值,消息的中间状态,枚举:

  1. FOUND:拉取到消息。

  2. NO_NEW_MSG:没有新消息。

  3. NO_MATCHED_MSG:没有匹配的消息。

  4. OFFSET_ILLEGAL:非法的偏移量,可能设置的太大或大小。

ConsumeMessageConcurrentlyService#submitConsumeRequest 消费线程入口是这里,封装一个consumerRequest对象来执行业务 初始化消费者线程池,会根据配置来创建消费线程数。

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
  //线程池,用于执行自定义消费逻辑
  private final ThreadPoolExecutor consumeExecutor;

  @Override
  public void submitConsumeRequest(
    提交任务,如定义了消费消息最大值时,会根据设置的值进行分割,然后提交到线程池待执行。
  )
  
  class ConsumeRequest implements Runnable {
    @Override
    public void run() {
      //找到自定义消息监听类
      MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
      ...
      //执行对应的实现类,消费数据
      status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
      ...
    }
  }
}

回到PullAPIWrapper#pullKernelImpl拉取消息方法中,最终会往Broker发送请求。

NettyRemotingAbstract#invokeAsyncImpl

public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
    final InvokeCallback invokeCallback){
    //获取信号量,最大为65535个
    boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
    if (acquired) {
      final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
      //缓存正在执行的请求
      this.responseTable.put(opaque, responseFuture);
      
      channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture f) throws Exception {
            if (f.isSuccess()) {
                //设置响应状态
                responseFuture.setSendRequestOK(true);
                return;
            }
            requestFail(opaque);
            log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
        }
      });
    }
}

上面代码中创建了ResponseFuture对象,放在了responseTable Map对象中,后续会获取后回调对应的方法。

也就是回调: MQClientAPIImpl#pullMessageAsync

private void pullMessageAsync(
    final String addr,
    final RemotingCommand request,
    final long timeoutMillis,
    final PullCallback pullCallback) {
  this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
    @Override
    public void operationComplete(ResponseFuture responseFuture) {
        RemotingCommand response = responseFuture.getResponseCommand();
        if (response != null) {
          PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
          assert pullResult != null;
          //最终会回调PullMessageService#pullMessage方法中定义的回调函数,完成消息的分发
          pullCallback.onSuccess(pullResult);
        }
    }
  });
}

拉取到的消息会调用PullMessageService#pullMessage方法中定义的回调函数,完成消息的分发。

RebalanceService#run

定时任务,重分配消息队列。

public class RebalanceService extends ServiceThread {
@Override
    public void run() {
        while (!this.isStopped()) {
            //20s轮询
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }
    }
}

最终会调用RebalanceImpl#updateProcessQueueTableInRebalance 重新分配 topic 队列。

updateProcessQueueTableInRebalance方法主要功能:

  1. 获取当前消费者的订阅信息,包括订阅的主题、消费者组等。

  2. 获取当前消费者的消费进度,包括已消费的消息队列和消息偏移量等。

  3. 获取当前主题的所有消息队列。

  4. 根据消费者组和消息队列数量进行负载均衡,将消息队列分配给消费者。

  5. 更新消费者的消费进度,将已消费的消息队列和消息偏移量更新为新分配的消息队列。

  6. 往pullRequestQueue队列中写入PullRequest请求数据。

最终也会生成拉取消息请求,放入队列中,等待PullMessageService线程执行。

答案?

至此,我们简单分析了消息的拉取、消费流程,那么回到我们之前的问题来。

假设单消费者实例定义了4个消费线程,那么线程池会创建4个核心线程用来执行任务。

生产者往Topic A先写入100万消息后,拉取线程会从这8个队列(Topic A和Topic B各4个)拉取消息,此时拉取到A的数据会放入消费者线程池等待消费者线程消费。

pull线程会持续的拉取,所以会持续的往线程池队列尾部写入Msg任务,按照我们分析的这种场景,Topic B消息虽然后面写入了,但是Topic B消息拉取后的数据放在了队列的中部或者后尾部的位置。

假如Thread 拿到消息后处理较慢,则会导致线程池队列的数据出现积压,也就是会最终会对B数据的消费产生影响,但不是绝对阻塞。

结论:

1. 会存在一定的消费延迟,但不是绝对的阻塞。也不是必须等A消费完,才会消费B的数据。

上测试代码

Topic A的消费类,线程睡眠1s,模拟正常业务处理耗时。

@Slf4j
@MQConsumeService(topic = "A")
public class A extends AbstractMQMsgProcessor {
    @Override
    protected boolean consumeMessage(String tag, String keys, MessageExt messageExt) {
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        String message = new String(messageExt.getBody(), StandardCharsets.UTF_8);
        log.info("A message:{}, {}, {}, {}", message, Thread.currentThread().getName(), LocalDateTime.now(), keys);
        return true;
    }
}


Topic B的消费类。

@Slf4j
@MQConsumeService(topic = "B")
public class B extends AbstractMQMsgProcessor {
    @Override
    protected boolean consumeMessage(String tag, String keys, MessageExt messageExt) {
        String message = new String(messageExt.getBody(), StandardCharsets.UTF_8);
        log.info("B message:{}, {}, {}, {}", message, Thread.currentThread().getName(), LocalDateTime.now(), keys);
        return true;
    }
}

模拟先写入A类消息5000条,再写入B类消息100条。

@GetMapping(value = "/send")
public void reportList() throws Exception {
    for (int i = 0; i<5000; i++) {
        processor.aysncSend("A""*""你好" + i);
    }
    Thread.sleep(10000);
    for (int i = 0; i<100; i++) {
        processor.aysncSend("B""*""hello" + i);

    }
}

16:20左右A类消息发送完毕,开始消费。

16:21左右B类消息发送完毕,此时待消费线程池队列中任务数在持续增加,可以看到在持续拉取A类消息,放入队列中待消费。

16:31分B类消息被消费,因为消费者线程是批量消费消息的,所以此时B类的10条消息都是由 ConsumeMessageThread_2 执行的。

后面A、B类消息会持续消费,最终在 16:46分所有消息消费完成。

总结

本文分析的版本是4.x,假如是最新版本,结论可能不一样,RockeMQ 5.0有了全新的消费模式-POP,对原有的消费模型的更新。

5.0之前的客户端架构中,拉取到消息之后会先将消息缓存到 ProcessQueue 中,当需要消费时,会从 ProcessQueue 中取出对应的消息进行消费,当消费成功之后再将消息从 ProcessQueue 中 remove 走。其中重试消息的发送,位点的更新在这个过程中穿插。

新版本在ProcessQueue 中维护了2个队列,有兴趣的同学可以去了解下,这里就不展开了。

参考资料

  1. https://xie.infoq.cn/article/68d0ef479b65ae4431f10f67e

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/493326.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于C++的职工管理系统

1、管理系统需求 职工管理系统可以用来管理公司内所有员工的信息 本教程主要利用C++来实现一个基于多态的职工管理系统 公司中职工分为三类:普通员工、经理、老板,显示信息时,需要显示职工编号、职工姓名、职工岗位、以及职责 普通员工职责:完成经理交给的任务 经理职责:完成…

分布式系统概念和设计-分布式文件系统服务体系结构和实践经验

分布式系统概念和设计 文件系统的特点 负责文件的组织&#xff0c;存储&#xff0c;检索&#xff0c;命名&#xff0c;共享和保护 文件包含数据和属性 数据&#xff1a;包含一系列数据项——8比特的字节&#xff0c;读写操作可访问任何一部分数据属性&#xff1a;用一个记录表…

一文详解 SCTP 协议

SCTP(Stream Control Transmission Protocol)流控制传输协议,由 RFC2960 定义。SCTP的设计目的是提供一种可靠的、面向消息的数据传输服务,以便于支持多点通信以及满足传输的可靠性需求。SCTP 目前广泛应用于VoIP、移动通信和云计算等领域。 SCTP 主要特点SCTP 消息结构SCTP …

Android9.0 原生系统SystemUI下拉状态栏和通知栏视图之锁屏通知布局

1.前言 在9.0的系统rom定制化开发中,对于系统原生systemui的锁屏界面的功能也是非常重要的,所以在锁屏页面布局中,也是有通知栏布局的,所以接下来对于息屏亮屏 通知栏布局的相关流程分析,看下亮屏后锁屏页面做了哪些功能 2.原生系统SystemUI下拉状态栏和通知栏视图之锁…

应用层开发想转Android framework开发要从何开始

前言 现如今&#xff0c;由于市面上应用App的更新逐渐变少&#xff0c;很多Android移动应用开发者都开始转型做系统开发&#xff0c;这比开发应用有趣多了&#xff0c;因为你可以探索系统模块的运行原理&#xff0c;从框架层面去了解它。 在应用层&#xff0c;你只需要配置好…

JAVA-异常

文章目录 1.异常的体系1.3异常的分类 2.异常的处理2.2异常的抛出throw2.3异常的捕获2.3.1异常声明throws2.3.2 try-catch捕获并处理2.3.3 finally 2.4 异常的处理流程 3.自定义异常类 1.异常的体系 Throwable&#xff1a;是异常体系的顶层类&#xff0c;其派生出两个重要的子类…

前端框架篇学习--选择命令式还是声明式

命令式与声明式定义 大白话&#xff1a;假期回家了&#xff0c;我想吃老妈的大盘鸡&#xff0c;然后老妈就去采购食材&#xff0c;剁鸡块&#xff0c;卤鸡肉&#xff0c;切土豆&#xff0c;然后爆炒起来&#xff0c;想方设法给我做好吃的大盘鸡。老妈上菜的餐桌&#xff0c;我…

SQL语句学习笔记(对库、表、字段、的操作)

查看mysql的状态 status 启动、停止 mySQL服务 图像界面方法&#xff1a; dos窗口执行&#xff1a;services.msc 控制面板–>管理工具–>服务 命令行方法&#xff1a; 启动&#xff1a; net start mysql80 停止&#xff1a; net stop mysql80 启动与环境变量 添加环境…

UnityShaderBook中消融dissolve详解

消融这个效果算得上游戏开发中用的比较多的一个效果&#xff0c;表现游戏对象消失的时候经常用到&#xff0c;这个效果实现也非常简单&#xff0c;因此在《UnityShader入门精要》中也就短短几句话讲完了&#xff0c;这里我想针对书中的效果详细讲解一下。 Shader源代码&#x…

(浙大陈越版)数据结构 第二章 线性结构 2.2 堆栈

目录 2.2.1 什么是堆栈 堆栈 什么是堆栈 例子&#xff1a;计算机如何进行表达式求值&#xff1f;如&#xff1a;56/2-3*4 后缀表达式 堆栈的抽象数据类型描述 2.2.2 堆栈的顺序存储实现 例子&#xff1a;用一个数组实现两个堆栈&#xff0c;要求能最大利用数组空间&…

路径之谜(DFS)-2016年蓝桥杯国赛

路径之谜-2016年国赛 1、题目描述2、解题思路3、代码实现1、题目描述 小明冒充 X 星球的骑士,进入了一个奇怪的城堡。 城堡里边什么都没有,只有方形石头铺成的地面。 假设城堡地面是 nn* 个方格。如下图所示。 按习俗,骑士要从西北角走到东南角。可以横向或纵向移动,但不能…

【Java虚拟机】JVM常见诊断命令和调试工具

1.JVM常用命令行参数jps和jinfo实操 准备测试代码 /*** author lixiang* date 2023/5/4 20:53*/ public class JVMTest {public static void main(String[] args) throws InterruptedException {Thread.sleep(1000000);} }&#xff08;1&#xff09;命令jps&#xff1a;全称 …

【exgcd】牛客练习赛 D-青蛙兔子的约会

D-青蛙兔子的约会_牛客练习赛111 (nowcoder.com) 题意&#xff1a; 思路&#xff1a; 感觉和那个青蛙的约会比较像 就是列了个方程&#xff1a; a*xb*yn 考虑用exgcd解方程 然后看x在[L,R]有没有解 做法就是先把x的最小整数解求出来&#xff0c;然后考虑它的通解 xx0b/…

2023年五一数学建模 B 题过程与结果

文章目录 第一问第二问数据时序分析Auto-ARIMA第二问求解解的情况A->Q:D-> AQ-V总快递数 第三问第四问遗传算法求解 第五问SARIMA 模型拟合季节性规律 第一问 见 2023 年 五一杯 B 题过程 代码&#xff08;第一问&#xff09; 第二问 第二问考虑是一个时序预测问题&a…

图像生成论文阅读:Latent Diffusion算法笔记

标题&#xff1a;High-Resolution Image Synthesis with Latent Diffusion Models 会议&#xff1a;CVPR2022 论文地址&#xff1a;https://ieeexplore.ieee.org/document/9878449/ 官方代码&#xff1a;https://github.com/CompVis/latent-diffusion 作者单位&#xff1a;慕尼…

数字乡村建设与示范项目可行性研究报告(word可编辑)

本资料来源公开网络&#xff0c;仅供个人学习&#xff0c;请勿商用&#xff0c;如有侵权请联系删除 5.1 “三平台”&#xff1a;建设支撑农业发展农村治理惠民服务的三大应用平 台 5.3.1 建设智慧农业综合服务平台 夯实数字农业基础&#xff0c;推进重要农产品全产业链大数据…

redis服务搭建,C++实现redis客户端,redis远程可视化工具

目录 redis简介redis服务搭建redis常用命令C实现redis客户端redis远程可视化工具:Another Redis DeskTop Manager redis简介 官方网址&#xff1a;https://redis.io/ 开源地址&#xff1a;https://github.com/redis 中文文档&#xff1a;http://www.redis.cn/documentation.ht…

造轮子系列】面试官问:你能手写Vuex吗(一)?

大厂面试题分享 面试题库 前后端面试题库 &#xff08;面试必备&#xff09; 推荐&#xff1a;★★★★★ 地址&#xff1a;前端面试题库 web前端面试题库 VS java后端面试题库大全 Vuex 是 Vue.js 的状态管理模式&#xff0c;它主要解决了组件之间共享状态时的问题。在本文…

网络原理之传输层

网络原理&#xff0c;进一步了解网络是如何工作的~~ 按照网络协议这几个层次来展开分为五点&#xff1a; 应用层&#xff08;重点介绍&#xff09;传输层&#xff08;重点介绍&#xff09;网络层&#xff08;跳过&#xff09;数据链路层&#xff08;跳过&#xff09;物理层&a…

JavaScript,

JS-引入方式JS-基础语法 书写语法变量数据类型&#xff0c;运算符&#xff0c;控制语句 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"…