真香,聊聊 RocketMQ 5.0 的 POP 消费模式!

news2024/11/27 10:38:37

大家好,我是君哥。

大家都知道,RocketMQ 消费模式有 PULL 模式和 PUSH 模式,不过本质上都是 PULL 模式,而在实际使用时,一般使用 PUSH 模式。

不过,RocketMQ 的 PUSH 模式有明显的不足,主要体现在以下几个方面:

  1. 消息积压了,增加消费者不一定能解决。PUSH 模式如下图:

上面的图中,消费组中的消费者每个消费者消费两个 MessageQueue,这种情况下,增加消费者是可以提高消费能力的。

但是下面这张图,每个消费者消费一个 MessageQueue,因为同一个 MessageQueue 只能被同一个消费组中的一个消费者消费,所以增加消费者并不能提高消费能力。

  1. 客户端的处理逻辑比较多,比如负载均衡、offset 管理、消费失败后的处理(比如失败消息发送回 Broker),这些逻辑都在客户端。

  2. 如果再支持其他语言,客户端会变得越来越重。

  3. 消费者机器 hang 住,可能会导致消息积压,如下图:

通过客户端负责均衡,MessageQueue0 这个队列分配给了 Consumer0 进行独占消费,如果 Consumer0 这个消费者 hang 住了,但是服务没有挂,不能从 Name Server 中下线,因为 Consumer0 拉取到的消息不能消费,也就不能给 Broker 发送更新 Offset 的请求,最终导致消息积压。这种情况只能手动让 Consumer0 下线或者让 Consumer0 重启。

RocketMQ 5.0 为了解决 PUSH Consumer 上面的问题,引入了 POP Consumer。

1 POP 客户端

POP 模式的客户端引入的背景是 RocketMQ 5.0 为了更好地拥抱云原生,客户端要改造成无状态的轻量级客户端,RocketMQ 4.x 中客户端具有的负载均衡、权限管理、消费管理等功能都从客户端移动到了 Proxy。

POP 消费模式如下图:

四个消费者都可以消费 Broker1 和 Broker2 上面的所有队列,这样即使某一个消费者 hang 住了,其他消费者也可以消费,并不会造成消息积压。

同时,从上图中可以看到,POP 客户端还有一个优势,增加消费者数量是可以提高消费能力的,不受 MessageQueue 数量和消费者数量的限制。

跟 PUSH 模式相比,POP 模式拉取到消息后,会设置一个 POP_CK 属性,代码如下:

//MQClientAPIImpl.java
if (requestHeader instanceof PopMessageRequestHeader) {
 if (startOffsetInfo == null) {
  // we should set the check point info to extraInfo field , if the command is popMsg
  // find pop ck offset
  String key = messageExt.getTopic() + messageExt.getQueueId();
  if (!map.containsKey(messageExt.getTopic() + messageExt.getQueueId())) {
   map.put(key, ExtraInfoUtil.buildExtraInfo(messageExt.getQueueOffset(), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), responseHeader.getReviveQid(),
    messageExt.getTopic(), brokerName, messageExt.getQueueId()));

  }
  messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, map.get(key) + MessageConst.KEY_SEPARATOR + messageExt.getQueueOffset());
 } else {
  String queueIdKey = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId());
  String queueOffsetKey = ExtraInfoUtil.getQueueOffsetMapKey(messageExt.getTopic(), messageExt.getQueueId(), messageExt.getQueueOffset());
  int index = sortMap.get(queueIdKey).indexOf(messageExt.getQueueOffset());
  Long msgQueueOffset = msgOffsetInfo.get(queueIdKey).get(index);

  messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,
   ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(queueIdKey), responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
    responseHeader.getReviveQid(), messageExt.getTopic(), brokerName, messageExt.getQueueId(), msgQueueOffset)
  );
  //...
 }
}

可以看到,POP_CK 属性包含了 brokerName、Topic、QueueId、offset 等参数,通过这个属性可以唯一标识一条消息了。

从上面的代码还可以看到,responseHeader 中有一个 invisibleTime 属性,这个属性的作用是消费者通过 POP 模式拉取到一条消息后,这段时间(invisibleTime)内这条消息在 Broker 端是不可见的,消费者再次拉取就不会重复拉取到。但是如果过了这段时间,消费者还没有给 Broker 返回 ACK,这条消息会变为可见,再次被消费者拉取到

消费完成后,向 Broker 发送 ACK 消息,见下面代码:

public void ackMessageAsync(
 final String addr,
 final long timeOut,
 final AckCallback ackCallback,
 final AckMessageRequestHeader requestHeader //
) throws RemotingException, MQBrokerException, InterruptedException {
 final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
 this.remotingClient.invokeAsync(addr, request, timeOut, new BaseInvokeCallback(MQClientAPIImpl.this) {

  @Override
  public void onComplete(ResponseFuture responseFuture) {
   RemotingCommand response = responseFuture.getResponseCommand();
   if (response != null) {
    try {
     AckResult ackResult = new AckResult();
     if (ResponseCode.SUCCESS == response.getCode()) {
      ackResult.setStatus(AckStatus.OK);
     } //...
     assert ackResult != null;
     ackCallback.onSuccess(ackResult);
    } //...
   } else {
    //...
   }

  }
 });
}

2. Broker

从上面的介绍可以看到,每个消费者都可以从 Broker 的所有 MessageQueue 上拉取消息,那如果多个消费者都从一个 MessageQueue 上面拉取,有没有可能会重复消费呢?

Broker 收到消息拉取请求,从 MessageStore 拉取消息时,首先会给 MessageQueue 进行加锁,加锁成功后,才会拉取消息,这是其他客户端来拉取时就会加锁失败。

//PopMessageProcessor.java
String lockKey = topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId;
long offset = getPopOffset(topic, requestHeader, queueId, false, lockKey);
if (!queueLockManager.tryLock(lockKey)) {
 restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
 return restNum;
}

Broker 从 MessageStore 拉取到消息后,会定义一个 CheckPoint 放入缓存,代码如下:

//PopMessageProcessor.java
private long popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult,
 PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid,
 Channel channel, long popTime,
 ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo,
 StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {
 String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
  requestHeader.getConsumerGroup()) : requestHeader.getTopic();
 String lockKey =
  topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId;
 //...
 offset = getPopOffset(topic, requestHeader, queueId, true, lockKey);
 GetMessageResult getMessageTmpResult = null;
 try {
  //...

  restNum = getMessageTmpResult.getMaxOffset() - getMessageTmpResult.getNextBeginOffset() + restNum;
  if (!getMessageTmpResult.getMessageMapedList().isEmpty()) {

   if (isOrder) {
    //...
   } else {
    appendCheckPoint(requestHeader, topic, reviveQid, queueId, offset, getMessageTmpResult, popTime, this.brokerController.getBrokerConfig().getBrokerName());
   }
  } //...
 } //...
 return restNum;
}

Broker 收到消费者发来的 ACK 后,会把 CheckPoint 从缓存中移除。

如果 Broker 一直没有收到 ACK,则会把 CheckPoint 从缓存中移除,同时把 CheckPoint 发送给 MessageStore,由 MessageStore 发送到重试队列。代码如下:

boolean removeCk = !this.serving;
 // ck will be timeout
 if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) {
  removeCk = true;
 }

 // the time stayed is too long
 if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime()) {
  removeCk = true;
 }

 // double check
 if (removeCk) {
  // put buffer ak to store
  if (pointWrapper.getReviveQueueOffset() < 0) {
   putCkToStore(pointWrapper, false);
  }
 }
}

3 总结

POP 客户端有很多的优势,总结如下:

  1. 无状态,更好地拥抱云原生;

  2. 计算相关的功能下移到 Proxy,更加轻量级;

  3. 消费能力扩展不受 MessageQueue 数量的限制;

  4. 消费者 hang 住,并不会导致消息积压。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/551315.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity 过场工具(Cutscene)设计(四) ——组件化设计

Unity 过场工具(Cutscene)设计&#xff08;四&#xff09; ——组件化设计 写到这一篇文章前就开始在考虑如何才能说清楚自己的设计思路&#xff0c;因为后续涉及到编辑器和Runtime框架的实际设计和实现过程&#xff0c;两者之间是互相有设计因果关系的。为了阐述自己的核心设计…

从0.5开始开发一个导购网站

提醒&#xff1a;文中没有具体如何修改的代码&#xff0c;只是提供了修改的思路。 为什么是从0.5开始呢&#xff1f; 因为这里借助了一个大佬的开源项目Springboot项目仿天猫商城: Springboot项目仿天猫商城 前台jsp页面 大佬的代码简洁&#xff0c;没有什么多余的功能&…

系统调用与API

系统调用介绍 什么是系统调用 为了让应用程序有能力访问系统资源&#xff0c;也为了让程序借助操作系统做一些由操作系统支持的行为&#xff0c;每个操作系统都会提供一套接口&#xff0c;以供应用程序使用。系统调用涵盖的功能很广&#xff0c;有程序运行所必需的支持&#xf…

leetCode刷题记录2

文章目录 hot100题560. 和为 K 的子数组581. 最短无序连续子数组 ▲617. 合并二叉树 hot100题 560. 和为 K 的子数组 560. 和为 K 的子数组 先暴力&#xff0c;过了再说 public int subarraySum(int[] nums, int k) {int ans 0;for (int i 0; i < nums.length; i) {in…

保姆级教程Windows11下安装RocketMQ

一、RocketMQ介绍 RocketMQ 是阿里巴巴开源的分布式消息中间件。支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念&#xff0c;如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等。 二、Rock…

vector类详解【c++】

&#x1f600;博主主页 &#x1f600;博主码云 目录 &#x1f3c5;vector简介&#x1f3c5;vector使用&#x1f3c6;vector的定义&#x1f3c6;vector iterator 的使用&#x1f3c6;vector 空间函数&#x1f3c6;vector的扩容问题&#x1f3c6;vector 增删查改&#x1f3c6;vec…

Python tkintertools 模块介绍(新版)

&#x1f680;tkintertools&#x1f680; The tkintertools module is an auxiliary module of the tkinter module tkintertools 模块是 tkinter 模块的辅助模块 Installation/模块安装 Stable version/稳定版本 Version/版本 : 2.6.1Release Date/发布日期 : 2023/05/21 p…

Edge 浏览器:隐藏功能揭秘与高效插件推荐

文章目录 一、前言二、Edge 的各种奇淫巧计2.1 开启 Edge 分屏功能2.2 启动 Edge 浏览器后直接恢复上次关闭前的页面2.3 解决 Edge 浏览器无法同步账号内容2.4 开启垂直标签页&#xff08;推荐&#xff09;2.5 设置标签分组&#xff08;推荐&#xff09;2.6 设置标签睡眠时间&a…

网络管理 - 简单网络管理协议 SNMP

文章目录 1 概述1.1 结构1.2 操作 2 SNMP2.1 报文格式2.2 五大报文类型2.3 三大组件 3 扩展3.1 网工软考真题 1 概述 #mermaid-svg-xmaaQjpp1bT1axfw {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-xmaaQjpp1bT1axf…

实验篇(7.2) 01. 实验环境介绍 远程访问 ❀ Fortinet网络安全专家 NSE4

【简介】学习NSE4&#xff0c;如果只看文章而不动手做实验&#xff0c;就象耍流氓。为了有效的巩固学习到的内容&#xff0c;建议经常动手做实验。实验不怕出错&#xff0c;身经百战后&#xff0c;再在生产环境部署和配置FortiGate防火墙&#xff0c;就会做到胸有成竹。 虚拟实…

【网络协议详解】——RIP协议(学习笔记)

目录 &#x1f552; 1. IP路由协议概述&#x1f558; 1.1 路由表&#x1f558; 1.2 路由的度量尺度/度量值&#x1f558; 1.3 路由管理距离 &#x1f552; 2. RIP协议&#x1f558; 2.1 概述&#x1f558; 2.2 工作原理 &#x1f552; 3. 报文格式&#x1f558; 3.1 RIP 协议报…

【自动化测试】第一次项目实施

测试项目简介&#xff1a;基于python语言 跨平台的测试自动化工具&#xff0c;适用于后台、原生或混合型客户端应用的测试。它支持 Android、iOS、Web、后台、云服务和 Windows 端的 UI 自动化测试。 上手快&#xff0c;操作简单&#xff0c;只要有一点python基础&#xff0c…

5. 多线程并发锁

本文介绍了多线程并发下为了避免临界资源被抢占而出现的错误&#xff0c;引入了锁和原子操作 来解决。 一、问题分析 创建10个线程&#xff0c;每个线程实现往总进程加1万个数。则总进程会达到10万 #include<stdio.h> #include <unistd.h> #include<pthread.h…

路径规划算法:基于头脑风暴算法的路径规划算法- 附代码

路径规划算法&#xff1a;基于头脑风暴的路径规划算法- 附代码 文章目录 路径规划算法&#xff1a;基于头脑风暴的路径规划算法- 附代码1.算法原理1.1 环境设定1.2 约束条件1.3 适应度函数 2.算法结果3.MATLAB代码4.参考文献 摘要&#xff1a;本文主要介绍利用智能优化算法头脑…

强化学习-初步认识

前言 强化学习这个概念是2017年Alpha Go战胜了当时世界排名第一的柯洁而被大众知道&#xff0c;后面随着强化学习在各大游戏比如王者荣耀中被应用&#xff0c;而被越来越多人熟知。王者荣耀AI团队&#xff0c;甚至在顶级期刊AAAI上发表过强化学习在王者荣耀中应用的论文。 什么…

BEVDet4D 论文学习

1. 解决了什么问题&#xff1f; 单帧数据包含的信息很有限&#xff0c;制约了目前基于视觉的多相机 3D 目标检测方法的性能&#xff0c;尤其是关于速度预测任务&#xff0c;要远落后于基于 LiDAR 和 radar 的方法。 2. 提出了什么方法&#xff1f; BEVDet4D 将 BEVDet 方法从…

C++ Vecter

C Vecter &#x1f4df;作者主页&#xff1a;慢热的陕西人 &#x1f334;专栏链接&#xff1a;C &#x1f4e3;欢迎各位大佬&#x1f44d;点赞&#x1f525;关注&#x1f693;收藏&#xff0c;&#x1f349;留言 本博客主要内容讲解了C中vector的介绍以及相关的一些接口的使用 …

Prometheus+Grafana监控系统

一、简介 1、Prometheus简介 官网&#xff1a;https://prometheus.io 项目代码&#xff1a;https://github.com/prometheus Prometheus&#xff08;普罗米修斯&#xff09;是一个最初在SoundCloud上构建的监控系统。自2012年成为社区开源项目&#xff0c;拥有非常活跃的开发人员…

第二章 Electron自定义界面(最大化、最小化、关闭、图标等等)

一、介绍 &#x1f606; &#x1f601; &#x1f609; Electron是一个使用 JavaScript、HTML 和 CSS 构建桌面应用程序的框架。 嵌入 Chromium 和 Node.js 到 二进制的 Electron 允许您保持一个 JavaScript 代码代码库并创建 在Windows上运行的跨平台应用 macOS和Linux——不需…

linux--systemd、systemctl

linux--systemd、systemctl 1 介绍1.1 发展sysvinitupstart主角 systemd 登场 1.2 简介 2 优点兼容性启动速度systemd 提供按需启动能力采用 linux 的 cgroups 跟踪和管理进程的生命周期启动挂载点和自动挂载的管理实现事务性依赖关系管理日志服务systemd journal 的优点如下&a…