RocketMQ 源码分析——Consumer

news2024/11/23 17:17:42

文章目录

  • 消费者启动流程
  • 消费者模式
    • 集群消费
    • 广播消费
  • Consumer负载均衡
    • 集群模式
    • 广播模式
  • 并发消费流程
    • 获取topic配置信息
    • 获取Group的ConsumerList
    • 获取Queue的消费Offset
    • 拉取Queue的消息
    • 更新Queue的消费Offset
  • 顺序消费流程
  • 消费存在的问题
    • 消费卡死
    • 启动之后较长时间才消费

消费者启动流程

image.png

image.png

image.png

DefaultMQPushConsumerImpl类是核心类

image.png

image.png

消费者模式

集群消费

image.png

一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。

实际上,每个Consumer是平均分摊Message Queue的去做拉取消费。例如某个Topic有3个Queue,其中一个Consumer Group 有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的1个Queue消息。

而由Producer发送消息的时候是轮询所有的Queue,所以消息会平均散落在不同的Queue上,可以认为Queue上的消息是平均的。那么实例也就平均地消费消息了。

这种模式下,消费进度 (Consumer Offset) 的存储会持久化到Broker

广播消费

image.png

消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即使这些 Consumer 属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。

这种模式下,消费进度 (Consumer Offset) 会存储持久化到实例本地

Consumer负载均衡

集群模式

在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。

而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。

默认的分配算法是AllocateMessageQueueAveragely,还有另外一种平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条queue,只是以环状轮流分queue的形式

如下图:

image.png

需要注意的是,集群模式下,queue都是只允许分配只一个实例,这是由于如果多个实例同时消费一个queue的消息,由于拉取哪些消息是consumer主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分到不同的queue。

通过增加consumer实例去分摊queue的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实例上继续消费。

但是如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量。

image.png

广播模式

由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。

在实现上,其中一个不同就是在consumer分配queue的时候,所有consumer都分到所有的queue。

并发消费流程

一般我们在消费时使用回调函数的方式,使用得最多的是并发消费,消费者客户端代码如下:

image.png

在RocketMQ的消费时,整体流程如下:

image.png

获取topic配置信息

在消费者启动之后,第一步都要从NameServer中获取Topic相关信息。这一步设计到组件之间的交互,RocketMQ使用功能号来设计的。

image.png

image.png

image.png

最终在MQClientAPIImpl类中完成调用

image.png

获取Group的ConsumerList

在消费消息前,需要获取当前分组已经消费的相关信息:ConsumerList

image.png

如果是广播消费模式,则不需要从服务器获取消费进度(广播消费模式把进度在本地<消费端>进行存储)

image.png

而广播消费模式,则需要从服务器获取消费进度相关信息,具体如下:

image.png

image.png

获取Queue的消费Offset

在分配完消费者对应的Queue之后,如果是集群模式的话,需要获取这个消费者对应Queue的消费Offset,便于后续拉取未消费完的消息。
image.png

进入RebalancePushImpl类

image.pngimage.png

image.png

image.png

image.png

拉取Queue的消息

image.png

image.png

最终进入DefaultMQPushConsumerImpl类的pullMessage方法

image.png

image.png

image.png

更新Queue的消费Offset

因为RocketMQ的推模式是基于拉模式实现的,因为拉消息是一批批拉,所以不能做到拉一批提交一次偏移量,所以这里使用定时任务进行偏移量的更新。

image.png

image.png

image.png

顺序消费流程

顺序消费代码:image.png顺序消费的流程和并发消费流程整体差不多,唯一的多的就是使用锁机制来确保一个队列同时只能被一个消费者消费,从而确保消费的顺序性

ConsumeMessageOrderlyService类

image.png

这里有一个定时任务,是每个20秒运行一次(周期性的去续锁,锁的有效期是60S)

image.png

image.png

image.png

消费存在的问题

消费卡死

消费的流程中,尤其是针对顺序消息,会有卡死的现象,由于顺序消息中需要到Broker中加锁,如果消费者某一个挂了,那么在Broker层是维护了60s的时间才能释放锁,所以在这段时间只能,消费者是消费不了的,在等待锁。

另外如果还有Broker层面也挂了,如果是主从结构,获取锁都是走的Master节点,如果Master节点挂了,走Slave消费,但是slave节点上没有锁,所以顺序消息如果发生了这样的情况,也是会有卡死的现象。

image.png

image.png

image.png

启动之后较长时间才消费

在并发消费的时候,当我们启动了非常多的消费者,维护了非常多的topic的时候、或者queue比较多的时候,可以看到消费的流程的交互是要启动多线程,也要做相当多的事情,所以会感觉要启动较长的时间才能消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1024712.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

操作系统(5-7分)

内容概述 进程管理 进程的状态 前驱图 同步和互斥 PV操作&#xff08;难点&#xff09; PV操作由P操作原语和V操作原语组成&#xff08;原语是不可中断的过程&#xff09;&#xff0c;对信号量进行操作&#xff0c;具体定义如下&#xff1a; P&#xff08;S&#xff09;&#…

渗透测试信息收集方法和工具分享

文章目录 一、域名收集1.OneForAll2.子域名挖掘机3.subdomainsBurte4.ssl证书查询 二、获取真实ip1.17CE2.站长之家ping检测3.如何寻找真实IP4.纯真ip数据库工具5.c段&#xff0c;旁站查询 三、端口扫描1.端口扫描站长工具2.masscan(全端口扫描)nmap扫描3.scanport4.端口表5.利…

API(八)cosocket常用SDK

一 同步且非阻塞的底层SDK&#xff1a;cosocket 说明&#xff1a; 本篇章只是对cosocket常用话API的汇总,并没有实际案例加以辅证场景&#xff1a; 许多单机版的中间件都是基于cosocket做的二次开发 OpenResty 的核心和精髓 cosocket ① coscoket常用的指令 个人建议&am…

java jax-ws webservice编程,入门教程,包含服务端与客户端,编码

java jax-ws webservice 就是服务端程序提供接口,客户端,通过服务端提供的jar包(或者打包的类文件),通过jax-ws直接调用服务端暴露的接口来进行操作. 服务端 共三个文件 接口 package com.jaxwsdemo.serveice;import javax.jws.WebMethod; import javax.jws.WebParam; impo…

【java基础】Java常见的创建对象方式

背景&#xff1a; 对于好多程序员来说&#xff0c;你问他&#xff0c;如何创建对象&#xff0c;他可能就只知道new个对象不就行了&#xff0c;但是当我们需要看一些框架的源码的时候&#xff0c;经常发现他们不是这样创建对象&#xff0c;然后回过头来&#xff0c;我们就得补充…

2023.9.18 网络层 IP 协议详解

目录 IP协议 IPv4 32位 源IP 地址 / 32位 目的IP 地址 IP 地址管理 特殊 IP 路由选择 IP协议 IPv4 32位 源IP 地址 / 32位 目的IP 地址 基本知识&#xff1a; 在 IP 报头中一般表示为 32位 二进制整数日常生活中的 IP 一般将 32位 二进制整数&#xff0c;也就是 4字节 的二…

Vue的模板语法(下)

一.事件处理 事件修饰符 Vue通过由点(.)表示的指令后缀来调用修饰符&#xff0c; .stop&#xff0c; .prevent&#xff0c;.capture&#xff0c;.self&#xff0c;.once .stop&#xff1a;阻止事件冒泡。当一个元素触发了事件&#xff0c;并且该元素包含嵌套的父元素时&#…

【大虾送书第十期】从不了解用户画像,到用画像数据赋能业务看这一本书就够了

目录 &#x1f36d;写在前面 &#x1f36d;内容简介 &#x1f36d;设计图和原型图 &#x1f36d;参考目录 &#x1f36d;文末福利 &#x1f990;博客主页&#xff1a;大虾好吃吗的博客 &#x1f990;专栏地址&#xff1a;免费送书活动专栏地址 写在前面 在大数据时代&#xff0…

小程序游戏开发和app游戏开发有什么不同呢?

小程序游戏开发和App游戏开发有一些重要的区别&#xff0c;这些区别涉及到平台、技术、发布和用户体验等方面。以下是它们之间的主要不同之处&#xff1a; 平台&#xff1a; 小程序游戏开发是为特定的小程序平台设计的&#xff0c;如微信小程序、支付宝小程序等。这些小程序通常…

Android使用Chrome浏览器进行抓包

Android使用Chrome浏览器进行抓包 这里记录一个用Android真机抓包的方法。 打开Chrome浏览器&#xff0c;打开网址&#xff1a;chrome://inspect/#devices 找到对应 App 点击 inspect&#xff0c;进行网络请求&#xff0c;就能看到抓包的数据啦 数据线连上android设备&#x…

Mybatis框架学习

什么是mybatis&#xff1f; mybatis是一款用于持久层的、轻量级的半自动化ORM框架&#xff0c;封装了所有jdbc操作以及设置查询参数和获取结果集的操作&#xff0c;支持自定义sql、存储过程和高级映射 mybatis用来干什么&#xff1f; 用于处理java和数据库的交互 使用mybat…

容器中的nginx暴露一个端口部署多个功能的网站

随着容器的应用越来越多&#xff0c;将nginx部署在容器中也是常有之事。可能事先创建容器时只暴露了一个端口给浏览器连接&#xff0c;后面又想根据添加多个应用&#xff0c;根据URL的不同来访问不同的应用。比如在暴露了主机的83端口给nginx容器的80端口&#xff0c;原来只有一…

华为云云耀云服务器L实例评测|宝塔一站式安装数据库MySQL+Redis教程

目录 前言 一、传统服务器安装数据库 1.安装MySQL 2.安装Redis 二、云耀云服务器L安装MySQL 1.云耀云服务器L实例购买 2.远程登录并重置密码 3.云耀云服务器L初始化宝塔面板 4.宝塔面板安装数据库 5.MySQL第三方登录 三、云耀云服务器L安装Redis 1.宝塔面板安装Redis 2.Redis密…

JumpServer未授权访问漏洞 CVE-2023-42442

JumpServer未授权访问漏洞 CVE-2023-42442 一、漏洞描述二、漏洞影响三、网络测绘四、漏洞复现poc通过burp发送请求包小龙POC检测 五、修复建议 免责声明&#xff1a;请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息或者工具而造成的任何直接…

【正则表达式】

正则表达式 1 本节目标2 正则表达式概述2.1 什么是正则表达式2.2 正则表达式的特点 3 正则表达式在JavaScript中的使用3.1 创建正则表达式3.2 测试正则表达式test 4 正则表达式中的特殊字符4.1 正则表达式的组成4.2 边界符4.3 字符类4.4 量词符4.5 括号总结4.6 预定义类 5 正则…

如何分清PMP和NPDP?一篇文足以

先简单介绍一下两个证书&#xff1a; PMP&#xff1a;项目管理证书&#xff0c;项目经理 英文全称是Project Management Professional&#xff0c;中文全称叫做项目管理专业人士资格认证。 它是由美国项目管理协会&#xff08;PMI&#xff09;在全球范围内推出的针对项目经理…

mojo安装

docker安装mojo 官网 https://developer.modular.com/login 很奇怪登录页面不显示 类似于网站劫持 docker 安装mojo带jupyterlab的方式 https://hub.docker.com/r/lmq886/mojojupyterlab 拉取镜像 docker pull lmq886/mojojupyterlab docker pull lmq886/mojojupyterlab:1.2 启…

没有任何销售经验怎么管理销售团队?

本文将为大家讲解&#xff1a;1、什么是销售管理&#xff1f;2、销售管理的流程是什么&#xff1f;3、如何进行销售管理&#xff1f;4、crm客户管理系统对于销售管理有什么样的作用&#xff1f;5、2023年最全crm客户管理系统推荐。 一、什么是销售管理&#xff1f; 根据美国营…

【数据结构与算法】一文带你学透——算法概述

前言 本期我们所要学习的内容是数据结构与算法中的算法的相关内容&#xff0c;通过上期我们学的数据结构想必大家都会了吧&#xff0c;在学习完毕之后算法&#xff0c;我想你已经可以编写出比较优秀的代码了&#xff0c;著名计算机科学家沃思曾提出一个公式 程序数据结构算法。…

18.备忘录模式(Memento)

意图&#xff1a;在不破坏封装性的前提下&#xff0c;捕获一个对象的内部状态&#xff0c;并在该对象之外保存这个状态&#xff0c;这样就可以在以后将该对象恢复到原先保存的状态。 上下文&#xff1a;某些对象的状态在转换过程中&#xff0c;可能由于某种需要&#xff0c;要求…