RocketMQ源码学习笔记:Producer发送消息流程

news2024/9/21 14:45:00

这是本人学习的总结,主要学习资料如下

  • 马士兵教育
  • rocketMq官方文档

目录

  • 1、Overview
  • 2、验证消息
  • 3、查找路由
  • 4、选择消息发送队列
    • 4.1、选择队列的策略
    • 4.2、源码阅读
      • 4.2.1、轮询规避
      • 4.2.2、故障延迟规避
        • 4.2.2.1、计算规避时间
        • 4.2.2.2、选择队列
      • 4.2.3、ThreadLocal的使用
  • 5、发送消息
    • 5.1、客户端建立的时间


1、Overview

消息发送主要可以分成下面四个步骤。

  1. 验证消息
  2. 查找路由
  3. 选择队列
  4. 消息发送

之后从源码查看四个步骤的具体内容。

我们建立一个DefaultMQProducer之后,调用DefaultMQProducer#send()方法就可发送信息。

查看send()的代码最终会来到DefaultMQProducerImpl#sendDefaultImpl(),我们从这里开始看源码。

2、验证消息

发送前必然验证一下消息。

主要是检验消息的状态,一些必要的值不能为空等。

this.makeSureStateOK();
// 1、检查消息
Validators.checkMessage(msg, this.defaultMQProducer);

公司内部想设置一些新的规则用来发送前拦截信息就适合放在checkMessage()里。

这部分没有太多内容。


3、查找路由

所谓的路由是指可用的Broker的信息,包括地址,具体的消息队列等。

下面这一句获取到路由。

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

DefaultMQProducer内部缓存这路由信息,维护在ConcurrentHashMap<String, TopicPublishInfo>

private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>();

tryToFindTopicPublishInfo()中会先检查路由信息是否存在,不存在还需要从NameServer中获取路由列表。

this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);

更新的时候会加上一个ReentrantLock,更新结束后释放。

获取到路由信息后开始选择队列发送消息。


4、选择消息发送队列

4.1、选择队列的策略

得到路由信息后就开始选择消息队列发送信息。

选择队列有两种策略

  • 轮询规避:轮询选择队列。如果上次发送消息失败,那就消息需要重新发送,这时就需要规避掉上次发送失败的队列,寻找下一个队列发送。
  • 故障延迟策略:在选择队列发送时根据以往发送时长判断该队列的Broker是否可用。对于发送失败的BrokerProducer会规避该Broker一段时间。

这是发送消息的流程图。

假设我们的Broker是集群,有两个Broker。消息会选择其中一个Broker发送消息,如果失败就重试,直到发送成功或者超过重试次数。
在这里插入图片描述


4.2、源码阅读

这里会探索源码如何实现这两种队列选择策略。

选择队列的入口在DefaultMQProducerImpl#sendDefaultImpl -> this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
在这里插入图片描述
从入口进入查看代码,最终在MQFaultStrategy#selectOneMessageQueue,代码通过sendLatencyFaultEnable这个字段来选择不同的选择策略。

在这里插入图片描述

4.2.1、轮询规避

这是轮询规避的源码。
在这里插入图片描述
其中lastBrokerName是上一次消息发送时选择的broker。这代表该消息上一次发送失败了,所以记录着上一次失败的broker以在这次选择Broker时规避他。

所以lastBrokerName==null时该消息是第一次发送,不需要规避,直接随机选择一个队列发送。

如果上一次发送失败,则开始轮询选择一个队列,保证这个新选出的队列和上一个不同后就可以返回。


4.2.2、故障延迟规避

4.2.2.1、计算规避时间

故障延迟规避策略需要记录发送时间并计算。在看选择Broker的代码时需要看看源码如何记录发送时间并计算出规避时间的。

计算规避时间的代码在this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);。这时消息正常发送会调用一次这个方法;如果出现异常在catch快也会调用这个方法计算规避时间。
在这里插入图片描述

进入这个方法,代码如下。

需要注意isolation=true时表示消息发送出现异常,这时便认为延迟时长是30000ms

同时也可以看到sendLatencyFaultEnable==true表示开启故障规避策略,这种情况才需要计算规避时间。选择Broker时也是通过这个属性判断使用过故障规避还是轮询规避。
在这里插入图片描述

规避时间的计算比较简单,阿里根据自己的经验设置了一个对照表来计算时间,如下图所示。比如延迟是550ms以内的Broker不用规避;延迟在550~1000ms的需要规避30s

在这里插入图片描述

这里跳过计算规避时间的代码细节,进入下一行代码this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);

代码如下,它其实就是将不可用的Broker维护在faultItemTable中,并且记录着解禁时间。以后选择Broker会通过这个集合查看Broker是否可用。
在这里插入图片描述

4.2.2.2、选择队列

我们回到选择Broker的代码MQFaultStrategy#selectOneMessageQueue,下图是相关代码。

在这里插入图片描述
它的大概流程是,轮询队列,如果可用就返回。实在找不到可用的就随机选择一个Broker发送。

它通过latencyFaultTolerance.isAvailable(mq.getBrokerName())判断队列是否可用,里面实际就是通过前面讲到的faultItemTable来查看队列是否可用。


4.2.3、ThreadLocal的使用

在选择队列时,无论是轮询规避还是故障延迟规避都需要循环遍历messageQueue找到适合的queue发送信息。

获取下标的方式用到了ThreadLocal。如下图所示,sendWhichQueue本质上就是一个ThreadLocal<Integer>对象。

请添加图片描述
生产者发送信息时可能会有多个线程同时发信息。

这些线程发送信息时应该各自维护一个消息队列的下标,这样每个线程发送信息时才会比较均匀地向每个队列都发送信息。

另外这些线程发送信息时可能会指定消息队列的id,所以线程各自维护一个消息队列的下标是很有必要的。

这个场景就很适合ThreadLocal,选择消息队列时用ThreadLocal来维护下标。



5、发送消息

5.1、客户端建立的时间

客户端发送消息时,建立HTTP连接是在send()方法中而不是在start()方法中。

站在设计者的角度需要考虑到,开发者在start()方法后可能还需要过一段时间才会真正发送信息,甚至不发信息。

那么建立HTTP连接放在start()就比较浪费资源,所以建立HTTP连接放在了send()方法中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1927708.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

正运动控制器:EtherCAT总线初始化

1、EtherCAT总线初始化的目的 运动控制器的EtherCAT 总线接口可用于连接 EtherCAT 伺服驱动器和 EtherCAT 扩展模块&#xff0c;无论连接什么模块&#xff0c; EtherCAT 总线都需要编写一段 EtherCAT 总线初始化程序来进行电机和 EtherCAT 扩展模块的使能。使能之后的应用与脉…

QT多线程下,信号槽分别在什么线程中执行,如何控制?

可以通过connect的第五个参数进行控制信号槽执行时所在的线程 connect有几种连接方式&#xff0c;直接连接、队列连接和 自动连接 直接连接&#xff08;Qt::DirectConnection&#xff09;&#xff1a;信号槽在信号发出者所在的线程中执行 队列连接&#xff08;Qt::QueuedConn…

LeetCode-返回链表倒数第K个节点、链表的回文结构,相交链表

一、返回链表倒数第k个节点 . - 力扣&#xff08;LeetCode&#xff09; 本体思路参展寻找中间节点的方法&#xff0c;寻找中间节点是定义快慢指针&#xff0c;快指针每次走两步&#xff0c;慢指针每次走一步&#xff0c;当快指针为空或者快指针的下一个节点是空时&#xff0c;…

vue实现提交时对不同板块的表单内容进行校验

需求 1、需要对第一个红色框框板块内所有带星号的地方进行校验&#xff0c;并将提示语显示到对应的输入框下面&#xff0c;如图&#xff1a; 2、第二个红色框框板块中&#xff0c;点击 “添加相关人员” 能实现对多人的添加功能&#xff0c;并且能绑定相对应的校验规则 3、在…

linux进行redis的安装并使用RDB进行数据迁移

现在有两台电脑&#xff0c;分别是A&#xff0c;B&#xff0c;现在我要把A电脑上的redis的数据迁移到B电脑上&#xff0c;B电脑上是没有安装redis的 1.找到A电脑的redis的版本 1.先启动A电脑的redis&#xff0c;一般来说&#xff0c;都是直接在linux的控制台输入&#xff1a;re…

数据结构与算法(1):递归函数的设计技巧

1.前言 哈喽小伙伴们大家好哦~从今天开始笔者就要开始正式学习数据结构与算法了&#xff0c;在这里写知识博客既是做一些学习笔记&#xff0c;又相当于给大家做知识分享咯&#xff0c;希望大家一起加油哦&#xff01; 2.正文 2.1递归的引入 在正式讲解递归之前&#xff0c;…

创建鸿蒙手机模拟器(HarmonyOS Emulator)

文 | Promise Sun 一.前提条件&#xff1a; 鸿蒙项目开发需要使用模拟器进行开发测试&#xff0c;但目前想在DevEco Studio开发工具中使用模拟器就必须到华为官网进行报名申请&#xff0c;参加“鸿蒙模拟器&#xff08;HarmonyOS Emulator&#xff09;Beta活动申请”。 申请审…

中间件的理解

内容来源于学习网站整理。【一看就会】什么是前端开发的中间件&#xff1f;_哔哩哔哩_bilibili 每日八股文~白话说mq&#xff0c;消息中间件_哔哩哔哩_bilibili 例如&#xff1a; 1&#xff09;两个人打电话&#xff0c;中间的通信网络就是中间件。 2&#xff09;菜鸟驿站&…

SpringBoot以及swagger的基本使用

1、SpringBoot是什么&#xff1f; 一种快速开发、启动Spring的框架、脚手架 遵循“约定优于配置”的思想&#xff0c;使得能够快速创建和配置Spring应用 2、SpringBoot的核心特性 自动配置&#xff0c;一些依赖、默认配置都预设好了&#xff0c;减少了配置量起步依赖&#x…

ROS2-Navigation2初体验:Gazebo“打不开”

输入ros2 launch nav2_bringup tb3_simulation_launch.py headless:False后只能打开RVIZ而无法打开Gazebo的问题&#xff0c;多次尝试解决后发现只是多等待一会儿即可&#xff0c;在此给同样学习Navigation2的朋友们提个醒 。 Getting Started — Nav2 1.0.0 documentation 1…

Mindspore框架CycleGAN模型实现图像风格迁移|(二)实例数据集(苹果2橘子)

Mindspore框架&#xff1a;CycleGAN模型实现图像风格迁移算法 Mindspore框架CycleGAN模型实现图像风格迁移|&#xff08;一&#xff09;CycleGAN神经网络模型构建Mindspore框架CycleGAN模型实现图像风格迁移|&#xff08;二&#xff09;实例数据集&#xff08;苹果2橘子&#…

补充性文件

第一 二章 1&#xff0c;关系型数据库是什么&#xff1f;其中的关系是指什么&#xff1f; 答&#xff1a; 关系型数据库是一些相关的表和其他数据库对象的集合。数据模型符合满足一定条件的二维表格式。 2&#xff0c;E-R模型&#xff1f; 实体为表。用矩形表示。属性为字…

嵌入式物联网在工业中的应用——案例分析

作者主页: 知孤云出岫 目录 嵌入式物联网在工业中的应用——案例分析引言1. 智能工厂1.1 实时监控与数据采集 2. 智能物流2.1 库存管理 3. 智能维护3.1 设备故障预测 4. 智能交通4.1 交通流量监测 总结 嵌入式物联网在工业中的应用——案例分析 引言 嵌入式物联网&#xff08;…

回车不搜索直接页面刷新问题解决

使用技术栈&#xff1a;vue3、elementUiPlus 问题&#xff1a;回车触发方法&#xff0c;会刷新整个页面&#xff0c;不执行搜索 解决方法&#xff1a;在搜索的表单中增加submit.native.prevent submit.native.prevent

项目管理:不懂跟进,项目白做

在职场上&#xff0c;工作的本质其实就是信息的传递与处理。而信息的及时传递&#xff0c;也就是我们常说的及时跟进&#xff0c;往往被许多项目经理和职场人忽视。 他们或许在暗地里埋头苦干&#xff0c;却忽略了明面上的沟通与汇报&#xff0c;最终导致合作方和内部团队都对…

利用AI辅助制作ppt封面

如何利用AI辅助制作一个炫酷的PPT封面 标题使用镂空字背景替换为动态视频 标题使用镂空字 1.首先&#xff0c;新建一个空白的ppt页面&#xff0c;插入一张你认为符合主题的图片&#xff0c;占满整个可视页面。 2.其次&#xff0c;插入一个矩形&#xff0c;右键选择设置形状格式…

【SpringBoot】SpringCache轻松启用Redis缓存

目录&#xff1a; 1.前言 2.常用注解 3.启用缓存 1.前言 Spring Cache是Spring提供的一种缓存抽象机制&#xff0c;旨在通过简化缓存操作来提高系统性能和响应速度。Spring Cache可以将方法的返回值缓存起来&#xff0c;当下次调用方法时如果从缓存中查询到了数据&#xf…

JDK,JRE,JVM三者之间的关系

Java程序不是直接在操作系统之上运行&#xff0c;而是运行在JVM&#xff08;java虚拟机&#xff09;之上。 Java源代码&#xff08;.java文件&#xff09;经编译器编译成字节码&#xff08;.class文件&#xff09;&#xff0c;JVM本质上就是一个负责解释执行Java字节码的程序。…

结合实体类型信息(2)——基于本体的知识图谱补全深度学习方法

1 引言 1.1 问题 目前KGC和KGE提案的两个主要缺点是:(1)它们没有利用本体信息;(二)对训练时未见的事实和新鲜事物不能预测的。 1.2 解决方案 一种新的知识图嵌入初始化方法。 1.3 结合的信息 知识库中的实体向量表示&#xff0b;编码后的本体信息——>增强 KGC 2基…

鸿蒙跨平台框架ArkUI-X 小试牛刀视频播放

团队介绍 作者:徐庆 团队:坚果派 公众号:“大前端之旅” 润开鸿生态技术专家,华为HDE,CSDN博客专家,CSDN超级个体,CSDN特邀嘉宾,InfoQ签约作者,OpenHarmony布道师,电子发烧友专家博客,51CTO博客专家,擅长HarmonyOS/OpenHarmony应用开发、熟悉服务卡片开发。欢迎合…