基于电商场景的高并发RocketMQ实战-Consumer端队列负载均衡分配机制、并发消费以及消费进度提交

news2025/4/23 11:32:27

🌈🌈🌈🌈🌈🌈🌈🌈
【11来了】文章导读地址:点击查看文章导读!
🍁🍁🍁🍁🍁🍁🍁🍁

Consumer 端队列负载均衡分配机制

topic 是有一堆的 queue,而且分布在不同的 broker 上

并且在消费时,将多个 queue 分配给多个 consumer,每一个 consumer 会分配到一部分的 queue 进行消费

每个 consumer 会获取到 Topic 下包含的 queue 的信息 以及 每个 consumer group 下包含多少的 consumer ,那么 consumer 都使用相同的算法去做一次分配

  • Topic 下包含的 queue 的信息可以在 Broker 中获取
  • 每个 consumer group 下包含多少了 consumer 的信息也可以在 Broker 获取,因为每个 consumer 启动后,都会将 Broker 中进行注册

Consumer 分配队列:

Consumer 端队列的分配是通过 RebalanceService 这个组件实现的,拉取 Topic 的 queue 信息,拉取 consumer group 信息,根据算法分配 queue,确认自己需要拉取哪些 queue 的消息

RebalanceService 这个组件是在 Broker 中的,主要负责实现消息队列的动态负载均衡和自动分配,确保消息队列在消费者组内均匀分配,并在消费者组发生变化时进行动态调整,通过动态负载均衡和自动分配消息队列,保证了消费者组在消费消息时的 高效性和可靠性

那么分配好队列之后,Consumer 就知道自己分配了哪些 queue 了,Consumer 就可以去 Broker 中对应的 queue 进行数据的拉取,这里 Consumer 消息的拉取在 RocketMQ 中有两种实现(DefaultMQPushConsumer、DefaultMQPullConsumer, 但是在底层全部都是通过 pull 拉取消息进行消费的):

  • push 模式:服务端有数据后推送给客户端,实时性很高,但是增加了服务端工作量
  • pull 模式:客户端主动去服务端拉取数据,会导致数据接收不及时

RocketMQ 的长轮询:

RocketMQ 中使用了 长轮询 的方式,兼顾了 push 和 pull 两种模式的优点

长轮询: 长轮询本质上也是轮询,服务端在没有数据的时候并不是马上返回数据,而是会先将请求挂起,此时有一个长轮询后台线程每隔 5s 会去检查 queue 中是否有新的消息,如果有则去唤醒客户端请求,否则如果超过 15s 就会判断客户端请求超时

Consumer 端并发消费以及消费进度提交

Consumer 去 Broker 中拉取消息的线程只有一个,拉取到消息之后会将消息存放在 ProcessQueue 中,每一个 ConsumeQueue 都会对应一个 ProcessQueue

消息被拉取到会放在 ProcessQueue 中,等待线程池进行 并发消息 ,线程池处理消息时,就会调用到我们在创建生产者时注册的监听器中的 consumeMessage 方法,在这里会执行我们自己定义的业务逻辑,之后会返回状态码:SUCCESS 或 RECONSUME_LATER 等等,如果消费成功,线程会去 ProcessQueue 中删除对应的消息,并且会记录 consumer group 对于 queue 的消费进度 ,以实通过异步提交到 broker 中去,流程图如下:

在这里插入图片描述

Consumer 处理失败时的延迟消费机制:

在 consumer 消费消息失败的时候,线程池会将消费失败的消息发送到 Broker 中,在 Broker 中,对失败的消息进行一个 Topic 的改写为:RETRY_Topic_%,会根据之前的 Topic 名称进行改写,改写后呢,作为一个 延迟消息 重新写入 Commitlog 和 ConsumeQueue 中,再通过专门处理延迟消息的后台线程监听延迟消息是否到达延迟时间,当时间到达之后,会将改写后的 Topic 再重新改写为原来的 Topic 名称并写入 Commitlog,之后等待被消费者再次消费即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1345856.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于NXP I.MX8 + Codesys的工业软PLC解决方案

全新i.MX 8M Plus是一个混合人工智能SoC,将先进的嵌入式SoC与最新的人工智能/机器学习硬件NPU技术相结合,通过神经网络加速器,为边缘计算提供强大的机器学习能力,是i.MX 8M Plus一个最为突出的优势。WEC-IMX8P核心板特别适合在机器…

Flood Fill算法总结

算法思想 从一个起点开始,每一次随机选择一个新加进来的格子,看一下它周围能否扩展新的格子。如果能扩展,那么就扩展进来,直到不能扩展新的格子为止。当然需要判重,同样一个格子只能覆盖一次,这样能够保证时…

Python 下载与安装

1、下载 打开Python官网:Welcome to Python.org 点击下图所示的【Downloads】按钮进入下载页面。 ​ 进入下载页面后下拉至下图位置,选择版本,点击下载按钮下载。 页面会跳转至下一页下载页面,下拉到下图位置,选择…

什么是SEO?

什么是SEO? SEO代表“搜索引擎优化”。这是通过非付费(也称为“自然”)搜索引擎结果来提高网站流量的质量和数量以及品牌曝光率的做法。 尽管有首字母缩略词,但 SEO 既关乎搜索引擎本身,也关乎人。这是关于了解人们在…

fastjosn利用分析

fastjosn一般是使用TemplatesImpl链来进行攻击的,在上面其实已经分析过fastjson在反序列化的时候会调用满足条件的getter方法,因此就会调用TemplatesImpl类的getOutputProperties方法,然后通过getOutputProperties,调用newTransfo…

00-Git 详解

Git 应用 一、Git概述 1.1 什么是Git git 是一个代码协同管理工具,也称之为代码版本控制工具,代码版本控制或管理的工具用的最多的: svn、 git。 SVN 是采用的 同步机制,即本地的代码版本和服务器的版本保持一致(提…

3D视觉-结构光测量-网格结构光测量

网格结构光测量 网格结构光测量也常常被称为面结构光测量,它首先需要用投影器件产生符合条件的网格状投射光,并投射到待测物表面,由于网格上的光条具有两个方向,因此该方法可以通过两个测量方向分析待测物表面的三维坐标信息&…

键盘字符(#键)显示错误

当屏幕上显示的键与键盘上按下的键不同时,尤其是 # 键。大多数情况下,此错误是由于 raspbian 和 NOOBS 软件的默认英国键盘配置所致。 解决方案: 要解决此问题,您需要将配置更改为您自己的键盘或语言的配置。这可以通过转到树莓派…

Docker单点部署Seata(2.0.0) + Nacos(v2.3.0) + Mysql(5.7)

文章目录 一、部署Nacos二、部署Mysql三、Seata准备工作1. 记住nacos、mysql、宿主机的ip2. 建立数据库3. Nacos远程配置文件 四、部署Seata五、初步检验Seata部署情况六、微服务使用Seata1.引入依赖2. application.yml配置 七、遇到的坑1. Nacos显示Seata服务的ip为容器内网ip…

springboot 双数据源配置

1:pom <!--SpringBoot启动依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</group…

[MySQL] MySQL数据库创建存储过程

一、mysql存储过程的相关知识 1.1 存储过程的概念 存储过程是一组为了完成特定功能的SQL语句集合。类似于于其他编程语言中的函数&#xff0c;定义一个函数方法&#xff0c;可以随时起到传参调用的功能。 存储过程在使用过程中是将常用或者复杂的工作预先使用SQL语句写好并用…

c++ / day01

1. 整理思维导图 2. 定义自己的命名空间myspace&#xff0c;并在myspace中定义一个字符串&#xff0c;实现求字符串大小的函数。 代码 #include <iostream>using namespace std;namespace myns {unsigned long long strlen(string s){return s.length();}}int main() {…

Linux文件fd剖析

学习之前&#xff0c;首先要认识什么是文件&#xff1f; 空文件也是要在内存中占据空间的&#xff0c;因为它还有属性数据。文件 属性 内容文件操作 对内容 对属性 或者对内容和属性的操作标定一个文件的时候&#xff0c;必须使用&#xff1a;路径文件名&#xff0c;文件具…

操作系统:linux(在虚拟机上详细步骤安装)Centos

文章目录 前言&#xff1a;一、如何在自己的电脑上安装centos?二、在虚拟机上安装centos2.1安装步骤&#xff1a; 前言&#xff1a; 操作系统有&#xff1a;windows server 不开源的收费的、linux 开源的免费的&#xff0c;精简安装&#xff08;没有UI)。国产的操作系统有&am…

<JavaEE> TCP 的通信机制(一) -- 确认应答 和 超时重传

目录 TCP的通信机制的核心特性 一、确认应答 1&#xff09;什么是确认应答&#xff1f; 2&#xff09;如何“确认”&#xff1f; 3&#xff09;如何“应答”&#xff1f; 二、超时重传 1&#xff09;丢包的概念 2&#xff09;什么是超时重传&#xff1f; 3&#xff09…

ORACLE P6 v23.12 最新虚拟机(VM)全套系统环境分享

引言 根据上周的计划&#xff0c;我简单制作了两套基于ORACLE Primavera P6 最新发布的23.12版本预构建了虚拟机环境&#xff0c;里面包含了全套P6 最新版应用服务 此虚拟机仅用于演示、培训和测试目的。如您在生产环境中使用此虚拟机&#xff0c;请先与Oracle Primavera销售代…

C#中使用as关键字将对象转换为指定类型

目录 一、定义 二、示例 三、生成 使用as关键字可以将对象转换为指定类型&#xff0c;与is关键字不同&#xff0c;is关键字用于检查对象是否与给定类型兼容&#xff0c;如果兼容则返回true&#xff0c;如果不兼容则返回false。而as关键字会直接进行类型转换&#xff0c;如果…

算法逆袭之路(1)

11.29 开始跟进算法题进度! 每天刷4题左右 ,一周之内一定要是统一类型 而且一定稍作总结, 了解他们的内在思路究竟是怎样的!! 12.24 一定要每天早中晚都要复习一下 早中午每段一两道, 而且一定要是同一个类型, 不然刷起来都没有意义 12.26/27&#xff1a; 斐波那契数 爬…

探秘交互设计:深入了解五大核心维度!

交互式设计是用户体验&#xff08;UX&#xff09;设计的重要组成部分。本文将解释什么是交互设计&#xff0c;并分享一些有用的交互设计模型&#xff0c;并简要描述交互设计师通常做什么。 如何解释交互设计 交互式设计可以用一个简单的术语来理解&#xff1a;它是用户和产品…

Kubernetes集群部署Rook Ceph实现文件存储,对象存储,块存储

Kubernetes集群部署Rook Ceph部署Ceph集群 1. Rook Ceph介绍 Rook Ceph是Rook项目中的一个存储方案&#xff0c;专门针对Ceph存储系统进行了优化和封装。Ceph是一个高度可扩展的分布式存储系统&#xff0c;提供了对象存储、块存储和文件系统的功能&#xff0c;广泛应用于提供…