解密RocketMq的运行机制,带你玩转分布式消息通信

news2024/11/15 8:00:54

一、 MQ背景&选型

消息队列作为高并发系统的核心组件之一,能够帮助业务系统解耦提升开发效率和系统稳定性。主要具有以下优势:

  • 削峰填谷(主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题)
  • 系统解耦(解决不同重要程度、不同能力级别系统之间依赖导致一死全死)
  • 提升性能(当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统)
  • 蓄流压测(线上有些链路不好压测,可以通过堆积一定量消息再放开来压测)

目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比于Rabbitmq、kafka具有主要优势特性有:
• 支持事务型消息(消息发送和DB操作保持两方的最终一致性,rabbitmq和kafka不支持)
• 支持结合rocketmq的多个系统之间数据最终一致性(多方事务,二方事务是前提)
• 支持18个级别的延迟消息(rabbitmq和kafka不支持)
• 支持指定次数和时间间隔的失败消息重发(kafka不支持,rabbitmq需要手动确认)
• 支持consumer端tag过滤,减少不必要的网络传输(rabbitmq和kafka不支持)
• 支持重复消费(rabbitmq不支持,kafka支持)

Rocketmq、kafka、Rabbitmq的详细对比,请参照下表格:


2c3433c0972d00ef1cba86a640fe6aaa.jpeg

二、RocketMQ集群概述

1. RocketMQ集群部署结构

921d7fcc91d705a95d94ae06a45fd197.jpeg

1) Name Server

Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

2) Broker

Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的Broker Name,不同的Broker Id来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。

每个Broker与Name Server集群中的所有节点建立长连接,定时(每隔30s)注册Topic信息到所有Name Server。Name Server定时(每隔10s)扫描所有存活broker的连接,如果Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接。

3) Producer

Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败。

Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接。

4) Consumer

Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

Consumer每隔30s从Name server获取topic的最新队列情况,这意味着Broker不可用时,Consumer最多最需要30s才能感知。

Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,然后继续消费。

当Consumer得到master宕机通知后,转向slave消费,slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是一旦master恢复,未同步过去的消息会被最终消费掉。

消费者对列是消费者连接之后(或者之前有连接过)才创建的。我们将原生的消费者标识由 {IP}@{消费者group}扩展为 {IP}@{消费者group}{topic}{tag},(例如xxx.xxx.xxx.xxx@mqtest_producer-group_2m2sTest_tag-zyk)。任何一个元素不同,都认为是不同的消费端,每个消费端会拥有一份自己消费对列(默认是broker对列数量*broker数量)。新挂载的消费者对列中拥有commitlog中的所有数据。

三、 Rocketmq如何支持分布式事务消息

场景

A(存在DB操作)、B(存在DB操作)两方需要保证分布式事务一致性,通过引入中间层MQ,A和MQ保持事务一致性(异常情况下通过MQ反查A接口实现check),B和MQ保证事务一致(通过重试),从而达到最终事务一致性。

原理:大事务 = 小事务 + 异步

1. MQ与DB一致性原理(两方事务)

流程图

d3fc964fc6ae87a91af04e3583605008.jpeg

上图是RocketMQ提供的保证MQ消息、DB事务一致性的方案。

MQ消息、DB操作一致性方案:

1)发送消息到MQ服务器,此时消息状态为SEND_OK。此消息为consumer不可见。

2)执行DB操作;DB执行成功Commit DB操作,DB执行失败Rollback DB操作。

3)如果DB执行成功,回复MQ服务器,将状态为COMMIT_MESSAGE;如果DB执行失败,回复MQ服务器,将状态改为ROLLBACK_MESSAGE。注意此过程有可能失败。

4)MQ内部提供一个名为“事务状态服务”的服务,此服务会检查事务消息的状态,如果发现消息未COMMIT,则通过Producer启动时注册的TransactionCheckListener来回调业务系统,业务系统在checkLocalTransactionState方法中检查DB事务状态,如果成功,则回复COMMIT_MESSAGE,否则回复ROLLBACK_MESSAGE。

说明

上面以DB为例,其实此处可以是任何业务或者数据源。

以上SEND_OK、COMMIT_MESSAGE、ROLLBACK_MESSAGE均是client jar提供的状态,在MQ服务器内部是一个数字。

TransactionCheckListener 是在消息的commit或者rollback消息丢失的情况下才会回调(上图中灰色部分)。这种消息丢失只存在于断网或者rocketmq集群挂了的情况下。当rocketmq集群挂了,如果采用异步刷盘,存在1s内数据丢失风险,异步刷盘场景下保障事务没有意义。所以如果要核心业务用Rocketmq解决分布式事务问题,建议选择同步刷盘模式。

2. 多系统之间数据一致性(多方事务)

6bf75708f5345c0b31e3b8b74acd1611.jpeg

当需要保证多方(超过2方)的分布式一致性,上面的两方事务一致性(通过Rocketmq的事务性消息解决)已经无法支持。这个时候需要引入TCC模式思想(Try-Confirm-Cancel,不清楚的自行百度)。

以上图交易系统为例

1)交易系统创建订单(往DB插入一条记录),同时发送订单创建消息。通过RocketMq事务性消息保证一致性

2)接着执行完成订单所需的同步核心RPC服务(非核心的系统通过监听MQ消息自行处理,处理结果不会影响交易状态)。执行成功更改订单状态,同时发送MQ消息。

3)交易系统接受自己发送的订单创建消息,通过定时调度系统创建延时回滚任务(或者使用RocketMq的重试功能,设置第二次发送时间为定时任务的延迟创建时间。在非消息堵塞的情况下,消息第一次到达延迟为1ms左右,这时可能RPC还未执行完,订单状态还未设置为完成,第二次消费时间可以指定)。延迟任务先通过查询订单状态判断订单是否完成,完成则不创建回滚任务,否则创建。 PS:多个RPC可以创建一个回滚任务,通过一个消费组接受一次消息就可以;也可以通过创建多个消费组,一个消息消费多次,每次消费创建一个RPC的回滚任务。 回滚任务失败,通过MQ的重发来重试。

以上是交易系统和其他系统之间保持最终一致性的解决方案。

3.案例分析

1) 单机环境下的事务示意图

如下为A给B转账的例子。


ea1a2229d8909c39873174d8483ed261.jpeg

以上过程在代码层面甚至可以简化到在一个事物中执行两条sql语句。

2) 分布式环境下事务

和单机事务不同,A、B账户可能不在同一个DB中,此时无法像在单机情况下使用事物来实现。此时可以通过一下方式实现,将转账操作分成两个操作。

a) A账户


b5fe0ff49f6803b7fe79501fad14632d.jpeg

b) MQ消息
A账户数据发生变化时,发送MQ消息,MQ服务器将消息推送给转账系统,转账系统来给B账号加钱。

c) B账户

dc2e1247dae3343ee5278fe3f3d2cc88.jpeg

四、 顺序消息

1. 顺序消息缺陷

发送顺序消息无法利用集群Fail Over特性消费顺序消息的并行度依赖于队列数量队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题遇到消息失败的消息,无法跳过,当前队列消费暂停。

2. 原理

produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息。

注意:把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue

3. 扩展

可以通过实现发送消息的对列选择器方法,实现部分顺序消息。

举例:比如一个数据库通过MQ来同步,只需要保证每个表的数据是同步的就可以。解析binlog,将表名作为对列选择器的参数,这样就可以保证每个表的数据到同一个对列里面,从而保证表数据的顺序消费

五、 最佳实践

1. Producer

1) Topic

一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用tags 在broker做消息过滤。

2) key

每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过 topic,key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key 尽可能唯一,这样可以避免潜在的哈希冲突。

// 订单Id

String orderId= "20034568923546";

message.setKeys(orderId);

3) 日志

消息发送成功或者失败,要打印消息日志,务必要打印 send result 和key 字段。

4) send

send消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在sendResult里定义。

SEND_OK:消息发送成功

FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失

FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到Slave时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失

SLAVE_NOT_AVAILABLE:消息发送成功,但是此时slave不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失

2. Consumer

1) 幂等

RocketMQ使用的消息原语是At Least Once,所以consumer可能多次收到同一个消息,此时务必做好幂等。

2) 日志

消费时记录日志,以便后续定位问题。

3) 批量消费

尽量使用批量方式消费方式,可以很大程度上提高消费吞吐量

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1143791.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Faster R-CNN(2016.1)

文章目录 摘要引言过去计算proposals的算法我们提出的 相关工作Object ProposalsDeep Networks for Object Detection Faster R-CNNRegion Proposal NetworksAnchorsTranslation-Invariant AnchorsMulti-Scale Anchors as Regression References多尺度预测有两种流行的方法我们…

云计算未来展望:边缘计算、量子计算与AI

文章目录 边缘计算:数据处理的新时代应用领域挑战与机遇 量子计算:超越传统计算的新范式量子比特应用前景挑战与机遇 人工智能:云计算的动力云中的AI应用领域挑战与机遇 结语 🎉欢迎来到云计算技术应用专栏~云计算未来展望&#x…

Vue学习之样式汇总

Vue学习之样式汇总 一 二者左右排版 案例 说明:头部一左一右排版,内容一左一右两个排版,公告栏文字超过点点点显示 代码实现 说明: (1)头部实现一左一右排版需要使用一下两个样式 display: flex;justify-…

4、QtCharts 做心电图

文章目录 ui界面核心代码全部代码 ui界面 核心代码 void Dialog::slot_timer() {qreal xOffset0.f;//x的偏移量,推进的距离qreal dIncrease10;//增加量//数据for(int i0;i<10;i){m_xdIncrease;xOffsetdIncrease;m_splineSerise->append(m_x,qrand()%10);//根据实际情况删…

IDEA新建maven项目,使用mybatis操作数据库完整过程

IDEA新建maven项目&#xff0c;使用mybatis操作数据库完整过程 一、IDEA新建maven项目二、配置mybatis三、创建表对应实体类四、创建mapper接口五、使用mybatis操作数据库 前提&#xff1a; 这个教程是在maven项目中使用mybatis进行数据库操作&#xff0c;不是在spring boot项目…

51单片机复位电容计算与分析(附带Proteus电路图)

因为iC x (dU/dt).在上电瞬间&#xff0c;U从0变化到U,所以这一瞬间就是通的&#xff0c;然后这就是一个直流回路&#xff0c;因为电容C直流中是断路的&#xff0c;所以就不通了。 然后来分析一下这个电容的电压到底是能不能达到单片机需要的复位电压。 这是一个线性电容&…

听力检测为什么要在标准化的隔声屏蔽系统中进行?

作者兰明&#xff0c;医学硕士&#xff0c;听力学博士&#xff0c;听觉健康门诊主任 美国国家研究委员会;;行为、认知和感官科学委员会联合出版的听力损失确定社会保障福利的资格一书中关于测试环境的要求如下&#xff1a; 行动建议4-4 测试环境 听力学评估是在受控的声学环境中…

基于springboot实现休闲娱乐代理售票平台系统项目【项目源码+论文说明】

基于springboot实现休闲娱乐代理售票系统演示 摘要 网络的广泛应用给生活带来了十分的便利。所以把休闲娱乐代理售票管理与现在网络相结合&#xff0c;利用java技术建设休闲娱乐代理售票系统&#xff0c;实现休闲娱乐代理售票的信息化。则对于进一步提高休闲娱乐代理售票管理发…

随写 - GPT使用时机

感慨科技的进步&#xff0c;还记得15年的时候初中&#xff0c;需要写一篇什么读后感&#xff0c;东抄西凑一篇500字的语句不通顺的文章交上去&#xff0c;那时候什么文库啥的都不需要会员&#xff0c;直接复制就行了。 现在问一问GPT什么都出来了。 1.EDA设计 由于课程结束&am…

基于springboot实现校园疫情防控系统项目【项目源码+论文说明】

基于springboot实现校园疫情防控系统演示 摘要 随着信息技术和网络技术的飞速发展&#xff0c;人类已进入全新信息化时代&#xff0c;传统管理技术已无法高效&#xff0c;便捷地管理信息。为了迎合时代需求&#xff0c;优化管理效率&#xff0c;各种各样的管理系统应运而生&am…

2023-10-28 LeetCode每日一题(从数量最多的堆取走礼物)

2023-10-28每日一题 一、题目编号 2558. 从数量最多的堆取走礼物二、题目链接 点击跳转到题目位置 三、题目描述 给你一个整数数组 gifts &#xff0c;表示各堆礼物的数量。每一秒&#xff0c;你需要执行以下操作&#xff1a; 选择礼物数量最多的那一堆。如果不止一堆都符…

Java练习题2022-3

从键盘上输入一个数值字符串&#xff08;表示非负整数&#xff0c;所以该字符串不带正负号和小数点&#xff09;&#xff0c;输出这个字符串中的数字字符重新组合的最小数。例如“654321”输出的为“123456”&#xff1b;“001254”输出为“100245”&#xff1b;“00000”输出为…

Python学习笔记--类的定义和调用

二、类的定义和调用 1、怎么理解类&#xff1f; 类是什么&#xff1f; 个人认为理解类&#xff0c;最简单的方式就是&#xff1a;类是一个变量和函数的集合。 可以看下下面的这张图。 这张图很好的诠释了类&#xff0c;就是把变量和函数包装在一起。 当然我们包装也不是毫…

SpringSecurity详解,实现自定义登录接口

目录 1 SpringSecurity概述1.1 权限框架1.1.1 Apache Shiro1.1.2 SpringSecurity 1.2 授权和认证1.3 SpringSecurity的功能 2 认证原理及流程2.1 项目引入SpringSecurity2.2 认证流程详解 3 自定义登录接口3.1 理论讲解3.2 代码实战3.3 接口测试 1 SpringSecurity概述 1.1 权限…

【计算机视觉】相机

文章目录 一、原始的相机&#xff1a;针孔相机&#xff08;Pinhole Camera&#xff09;二、针孔相机的数学模型三、真实相机四、透镜的缺陷 我的《计算机视觉》系列参考UC Berkeley的CS180课程&#xff0c;PPT可以在课程主页看到。 成像原理 一、原始的相机&#xff1a;针孔相机…

C++11的std::function和bind绑定器

可调用对象 在C中&#xff0c;存在“可调用对象”这么一个概念。准确来说&#xff0c;可调用对象有如下几种定义&#xff1a; 1、是一个函数指针 2、是一个具有operator()成员函数的类对象&#xff08;仿函数&#xff09; 3、是一个可转换为函数指针的类对象 4、是一个类成员&a…

基于SSM和VUE的留守儿童信息管理系统

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…

数据链路层和DNS之间的那些事~

数据链路层&#xff0c;考虑的是两个节点之间的传输。这里面的典型协议也很多&#xff0c;最知名的就是“以太网”。我们本篇主要介绍的就是以太网协议。这个协议规定了数据链路层&#xff0c;也规定了物理层的内容。 目录 以太网帧格式 帧头 载荷 帧尾 DNS 从输入URL到…

中软国际:战略携手三大伙伴,三线出击收割AI红利

【科技明说 &#xff5c; 重磅专题】 2023年&#xff0c;当我看到中软国际成立AIGC研究院的消息后&#xff0c;认为基于解放号平台全面能力&#xff0c;去推进政企数智化服务&#xff0c;在很大程度上还是需要生态伙伴的技术力量。 为什么呢&#xff1f;这里简单说一下中软国际…

【Bug—eNSP】华为eNsp路由器设备启动一直是0解决方案!

问题描述 在上机实验时&#xff0c;打开ensp软件&#xff0c;添加AR设备时启动异常&#xff0c;最开始错误代码是40&#xff0c;最后通过重新安装&#xff0c;又出现了新的问题&#xff0c;启动AR设备一直是0&#xff0c;而且界面卡住。 解决方法 打开VirtualBox&#xff0c;将…