消息中间件之RocketMQ源码分析(六)

news2024/12/27 14:35:17

Consumer消费方式

RocketMQ的消费方式包含Pull和Push两种

  • Pull方式。
    用户主动Pull消息,自主管理位点,可以灵活地掌控消费进度和消费速度,适合流计算、消费特别耗时等特殊的消费场景。
    缺点也显而易见,需要从代码层面精准地控制消费,对开发人员有一定要求,再RocketMQ中
    org.apache.rocketmq.client.consumer.DefaultMQPullConsume是默认的Pull消费者实现类
    在这里插入图片描述
    1.fetchSubscribeMessageQueues(String topic).拉取全部可以消费的Queue.如果某一个Broker下线,这里也可以实时感知到
    2.遍历全部Queue,拉取每个Queue可以消费的消息
    3.如果拉取到消息,则执行用户编写的消费代码
    4.保存消费进度。消费进度可以执行updateConsumeOffset()方法,将消息位点上报给Broker,也可以自行保存消费位点。比如流计算平台Flink使用Pull方式拉取消息消费,通过Checkpoint管理消费进度
  • Push方式。
    代码介入非常简单,适合大部分业务场景。缺点灵活度差,在了解消费原理后,排查消费问题可简单快捷.在RocketMQ中org.apache.rocketmq.client.consumer.DefaultMQPushConsumer是默认的Push消费者实现类
    在这里插入图片描述
    1.初始化Push消费者实例。业务代码初始化DefaultMQPushConsumer实例,启动Push服务PullMessageService.该服务是一个线程服务,不断执行run()方法拉取已经订阅Topic的全部队列的消息,将消息保存在本地的缓存队列中
    启动PullMessageService的拉取服务
    在这里插入图片描述

在这里插入图片描述
PullMessageService不断拉取消息。pullRequestQueue中保存着待拉取地Topic和Queue消息,程序不断从pullRequestQueue中获取PullRequest并执行拉取消息方法
在这里插入图片描述
消费者拉取消息并消费,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

基本校验。校验ProcessQueue是否dropped;校验消费者服务状态是否正常;校验消费者是否被挂起。在Rebalance时,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance方法在运行时设置ProcessQueue.setDropped(true)的逻辑,,设置成功后,在执行拉取消息时,将不再拉取dropped为true的ProcessQueue
在这里插入图片描述
在这里插入图片描述

拉取条数、字节数限制检查。如果本地缓存消息数量大于配置的最大拉取条数(默认1000,可以调整),则延迟一段时间再拉取;
如果本地缓存消息字节数大于配置的最大缓存字节数,则延迟i短时间再拉取,这两种校验方式都相当于本地流控
在这里插入图片描述
并发消费和顺序消费校验。
在并发消费时,processQueue.getMaxSpan()方法是用于计算本地缓存队列中的哥消息和最后一个消息的offset差值。
顺序消费时,如果当前拉取的队列在Broker没有被锁定,说明已经由拉取正在执行,当前拉取请求晚点执行,如果不是第一次拉取,
需要先计算最新的拉取位点并修正最新的待拉取位点信息,再执行拉取
在这里插入图片描述
本地缓存队列的Span如果大于配置的最大差值(默认2000,可以调整),
则认为本地消费过慢,需要执行本地流控
在这里插入图片描述
队列锁定
在这里插入图片描述
订阅关系校验。如果待拉取的Topic在本地缓存中订阅关系为空,则本地拉取不执行,待下一个正常心跳或者Rebalance后订阅关系恢复正常,方可正常拉取
在这里插入图片描述
封装拉取请求和拉取后的回调对象PullCallback。这里主要将消息拉取请求和拉取结果处理封装成PullCallback,
并通过调用PullAPIWrapper.pullKernelImpl()方法将拉取请求发出去。
如果拉取到消息,那么将消息保存到对应的本地缓存队列ProcessQueue中,然后将这些消息交给ConsumeService服务;
在这里插入图片描述

在这里插入图片描述
2.核心-消费消息。由消费服务ConsumeMessageConcurrentlyService或者ConsumeMessagOrderlyService
将本地缓存队列中的消息不断放入到消费线程池,异步回调业务消费代码,此时业务代码可以消费消息
在这里插入图片描述
3.核心-保存消费进度。业务代码消费后,将消费结果返回给消费服务,再由消费服务将消费进度保存在本地,
由消费进度管理服务定时和不定时地持久化到本地(LcoalFileOffsetStore支持)或者远程Broker(RemoteBrokerOffsetStore支持)中,
对于消费失败地消息,RocketMQ客户端处理后发回给Broker,并告知消费失败

  • Pull和Push方式的比较
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1434523.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【发票识别】新增针对图片发票的识别(升级中)

说明 为了完善发票识别的功能,目前发票识别支持发票图片格式的识别,增加可用性。 体验 体验地址:https://invoice.behappyto.cn/invoice-service/ 体验地址上面有示例的发票,可以下载上传识别或者复制url地址进行识别。 技术栈…

数据结构.二叉树

一、树的基本概念 二、树的常考性质 三、二叉树的基本概念 四、二叉树的顺序存储 五、二叉树的链式存储 六、二叉树的遍历

深入剖析 Cortex-M4 微控制器在嵌入式系统中的特性和优势

Cortex-M4 微控制器是 ARM Cortex-M 架构中的一种类型,它具有许多功能和特性,使其在嵌入式系统中具有显著的优势。本文将深入剖析 Cortex-M4 微控制器的特性和优势,并提供示例代码来演示其用法。 ✅作者简介:热爱科研的嵌入式开发…

TreeSet 集合

TreeSet 集合 1. 概述2. 方法3. 遍历方式4. 两种排序方式4.1 默认排序规则/自然排序4.1.1 概述4.1.2 compareTo()方法4.1.3 代码示例14.1.4 代码示例2 4.2 比较器排序4.2.1 概述4.2.2 compare()方法4.2.3 代码示例14.2.4 代码示例2 4.3 排序方式的对比 5. 注意事项 文章中的部分…

5 款提升 UI 设计效率的软件工具

你知道如何选择正确的UI设计软件吗?你知道设计漂亮的用户界面和带来良好用户体验的应用程序需要什么界面设计软件吗?基于APP界面的不同功能,所选择的APP界面设计软件也会有所不同。然而,并不是说所有的APP界面设计软件都非常精通&…

ShardingSphere 5.x 系列【3】分库分表中间件技术选型

有道无术,术尚可求,有术无道,止于术。 本系列Spring Boot 版本 3.1.0 本系列ShardingSphere 版本 5.4.0 源码地址:https://gitee.com/pearl-organization/study-sharding-sphere-demo 文章目录 1. 前言2. My Cat3. ShardingSphe…

Chronos靶机渗透

Chronos靶机 一.信息收集1.靶机IP地址确认2.目录扫描3.常见漏洞扫描5.web网站探测1.网页2.源代码 二.网站渗透1.命令执行2.抓包---burp suite3.反弹shell 三.提权1.node.js原核污染第一个flag 2.sudo提权第二个flag 一.信息收集 1.靶机IP地址确认 ┌──(root㉿kali)-[/] └─…

Codeforces Round 914 (Div. 2)(D1/D2)--ST表

Codeforces Round 914 (Div. 2)(D1/D2)–ST表 D1. Set To Max (Easy Version) 题意: 给出长度为n的数组a和b,可以对a进行任意次数操作,操作方式为选择任意区间将区间内值全部变成该区间的最大值, 是否有可能使得数组a等于数组b…

centos跟新时间为网络时间

安装ntp yum install ntp -y 从本地获取网络时间并更新 ntpdate pool.ntp.org 时间设置成功

简单实验 java spring cloud 自定义负载均衡

1.概要 1.1 说明 这个是在前一个测试上的修改,所以这里只体现修改的内容。前一个测试的地址:检查实验 spring cloud nacos nacos-server-2.3.0-CSDN博客 1.2 记忆要点 1.2.1 引入对象 Autowired DiscoveryClient discoveryClient; 1.2.2 获取服务实…

简单说说redis分布式锁

什么是分布式锁 分布式锁(多服务共享锁)在分布式的部署环境下,通过锁机制来让多客户端互斥的对共享资源进行访问/操作。 为什么需要分布式锁 在单体应用服务里,不同的客户端操作同一个资源,我们可以通过操作系统提供…

Java SPI 代码示例

Java Service Provider Interface 是JDK自带的服务提供者接口又叫服务发现机制更是一种面向接口的设计思想。即JDK本身提供接口类, 第三方实现其接口,并作为jar包或其他方式注入到其中, 在运行时会被JDK ServiceLoader 发现并加载&#xff0c…

锐捷VSU和M-LAG介绍

参考网站 堆叠、级联和集群的概念 什么是堆叠? 框式集群典型配置 RG-S6230[RG-S6501-48VS8CQ]系列交换机 RGOS 12.5(4)B1005版本 配置指南 总结 根据以上的几篇文章总结如下: 级联:简单,交换机相连就叫级联,跟搭…

SaaS 电商设计 (八) 直接就能用的一套商品池完整的设计方案(建议收藏)

目录 一.前言1.1 在哪些业务场景里使用1.2 一些名词搞懂他1.3 结合业务思考一下-业务or产品的意图 二.方案设计2.1 业务主流程2.2 一步步带你分析B端如何配置2.3 数据流2.3.1 ES 数据表建设2.3.2 核心商品池流程2.3.2.1 商品池B端维护流程2.3.2.2 商品池版本更新逻辑 2.4 核心代…

PySpark(三)RDD持久化、共享变量、Spark内核制度,Spark Shuffle

目录 RDD持久化 RDD 的数据是过程数据 RDD 缓存 RDD CheckPoint 共享变量 广播变量 累加器 Spark 内核调度 DAG DAG 的宽窄依赖和阶段划分 内存迭代计算 Spark是怎么做内存计算的? DAG的作用?Stage阶段划分的作用? Spark为什么比MapReduce快? Spar…

Java项目服务器部署

Java项目服务器部署 Tomocat Java项目进行云服务器部署 如果有需要比赛部署的同学也可以联系我,我后续还会对于这个专栏继续展开 1、云服务器选购 1.1 阿里云选购(宝塔面板) 1.2 端口放行 这里说的就是端口放行,后面一些访问比…

打造直播带货商城APP:源码开发技术全解析

直播带货商城APP的创新模式吸引了用户,提升销售业绩,已经成为了近期开发者讨论的热门话题。今天,小编将深入讲解如何打造一款功能强大的直播带货商城APP,着重分析源码开发技术,为开发者提供全方位的指导。 一、前期准…

电商推荐系统

此篇博客主要记录一下商品推荐系统的主要实现过程。 一、获取用户对商品的偏好值 代码实现 package zb.grms;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Doub…

docker自定义镜像并使用

写在前面 本文看下如何自定义镜像。 ik包从这里 下载。 1:自定义带有ik的es镜像 先看下目录结构: /opt/program/mychinese [rootlocalhost mychinese]# ll total 16 -rw-r--r-- 1 root root 1153 Feb 5 04:18 docker-compose.yaml -rw-rw-r-- 1 el…

2024智慧城市新纪元:引领未来,重塑都市生活

随着科技的飞速发展和数字化转型的不断深入,2024年智慧城市领域迎来了全新的发展格局。 这一年,智慧城市的建设更加注重人性化、可持续性和创新性,为城市居民带来了前所未有的便捷与舒适。以下将重点关注智慧城市的几个核心内容,…