科普文:微服务之Spring Cloud Alibaba消息队列组件RocketMQ如何保证发送消息不丢失

news2025/1/22 17:00:14

概叙

本文分析了 RocketMQ 同步发送、异步发送和单向发送三种方式的原理、优缺点以及使用场景,并且分析了每种方式涉及到的核心源码。

科普文:微服务之Spring Cloud Alibaba版本选择-CSDN博客

通过上文的介绍可以知道同步发送方式可以保证消息发送时不丢,但是性能相对其他两种方式差一些。

RocketMQ 是一款优秀的开源消息中间件,作为 Java程序员可以多去阅读它的源码,吸收其中比较好的代码思维。

RocketMQ 保证消息不丢失?

RocketMQ 保证消息不丢失主要通过以下几个方面实现:

  1. 同步发送:通过send方法发送消息,并等待服务器响应,确保消息已经成功投递。

  2. 事务消息:通过发送 half 消息,并完成事务提交,确保消息接收方完整处理后才将消息标记为成功。

  3. 消息持久化:RocketMQ 会将消息持久化到磁盘,以防止服务器宕机导致的数据丢失。

  4. 主从架构:RocketMQ 采用主从架构,可以配置备份策略确保数据不丢失。

  5. 生产者本地缓存:生产者发送失败时,可以开启本地缓存,并配置重试策略,在网络恢复时重发。

其实所有MQ基本都是这样保证消息不丢失:1.发送消息成功ack;2.服务端接收和存储消息成功(高可用集群);3.消费消息成功ack。

RocketMQ通过刷盘机制、消息拉取机制和ACK机制等多种方式来确保消息投递的可靠性,防止消息丢失。

1.刷盘机制

RocketMQ中的消息分为内存消息和磁盘消息,内存消息在Broker内存中进行读写,磁盘消息则保存在磁盘上。RocketMQ支持同步刷盘和异步刷盘两种方式,通过刷盘机制可以确保消息在Broker宕机时不会丢失。在同步刷盘模式下,消息写入磁盘时,会等待磁盘的写入完成才返回写入成功的响应。在异步刷盘模式下,消息写入磁盘后立即返回写入成功的响应,但不等待磁盘写入完成。

2.ACK 机制

在 RocketMQ 中,Producer 发送消息后,Broker 会返回 ACK 确认信号,表示消息已成功发送。如果 Broker 未收到 ACK 确认信号,则会尝试重新发送消息,直到收到确认。

RocketMQ 采用主从复制机制,每个消息队列都有一个主节点和多个从节点。主节点负责消息的写入和读取,从节点负责备份数据。当主节点宕机时,从节点会自动接管主节点的工作,确保消息不会丢失。

3.消息存储机制

RocketMQ默认采用双写模式存储消息,即将消息同时写入内存和磁盘,然后异步将内存中的消息刷盘到磁盘中。这种方式确保了消息的可靠性,即使系统宕机,也尽可能地避免消息丢失。

此外,RocketMQ还提供了多种机制来保证消息不丢失,例如事务消息、延迟消息、顺序消息等,可以根据业务需求选择和使用。

值得注意的是,为了保证消息的可靠性,RocketMQ发送消息的速度可能受到一定的限制,需要在消息可靠性和性能之间做出权衡。

RocketMQ 保证消息不丢失工作原理(同步发送)

参考:【RocketMQ】RocketMQ怎么保证消息不丢失_rocketmq如何保证消息不丢-CSDN博客

在同步发送模式下,RocketMQ 默认采用同步刷盘方式,当生产者将消息发送到 Broker 后,会等待 Broker 的响应(默认超时 5分钟),Broker 接收消息后,会将其写入内存缓存,并进行刷盘操作。

因此,如果 Broker 响应成功,代表消息一定成功写入磁盘。

高清详细原理图

MQ主要包含了4个组件 nameserver broker producer consumer

然后如何保证消息不丢失又需要对三个消息阶段进行保证

Producer发送消息阶段

1通过采用同步发送消息到broker,等待broker接收到消息过后返回的一个确认消息,虽然效率低,但是时丢失几率最小的方式,异步1和单向消息发送丢失的几率比同步消息丢失的几率大。

2发送消息失败或超时则进行重试。

3broker提供多master模式【即使某台broker宕机了,换一台broker进行投递,保持高可用】

===》采用同步消息和失败重试和多master模式

Broker处理消息阶段

手段四:提供同步刷盘的策略【等待刷盘成功才会返回producer成功】

当数据写入到内存中之后立刻刷盘(同步的将内存中的数据持久化到磁盘上),

手段五:提供主从模式,同时主从支持同步双写

主从broker都同步刷盘成功,才返回producer一个确认消息。

===》采用同步刷盘+broker主从模式,支持同步双写

Consumer消费消息阶段

consumer默认提供的是At least Once机制

手段6 broker队列中的消息消费成功,才返回一个确认消息给broker。

手段7 当消息消费失败了,进行消费消息重试机制(保证幂等就行了。)

===》采用先消费,在返回一个确认消息+消息重试。

详细原理说明

Producer发送消息阶段

发送消息阶段涉及到Producer到broker的网络通信,因此丢失消息的几率一定会有,那RocketMQ在此阶段用了哪些手段保证消息不丢失了(或者说降低丢失的可能性)。

手段一:提供SYNC的发送消息方式,等待broker处理结果。

RocketMQ提供了3种发送消息方式,分别是:

同步发送:Producer 向 broker 发送消息,阻塞当前线程等待 broker 响应 发送结果。

如下示例代码为一个完整的同步发送流程:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

public class SyncProducerTest {
  public static void main(String[] args) throws Exception {
    // 1、创建 producer,设置组名为 SyncGroupTest
    DefaultMQProducer producer = new DefaultMQProducer("SyncGroup");
    // 2、指定 NameServer的地址,以获取 Broker路由地址
    producer.setNamesrvAddr("x.x.x.x:9876");
    // 3、启动 producer
    producer.start();
    // 4、创建消息,并指定 Topic,Tag和消息体
    Message msg = new Message("SyncTopic", "sync", "SyncMessage".getBytes("UTF-8"));
    // 5、发送同步消息
    SendResult sendResult = producer.send(msg);
    // 6、通过 sendResult 判断消息是否成功送达
    System.out.printf("message send result:" + sendResult);
    // 7、关闭 Producer
    producer.shutdown();
  }
}

RocketMQ 的同步发送主要涉及以下几个关键源码类和方法:

  • DefaultMQProducer:生产者类,负责发送消息。

  • MQClientAPIImpl#sendMessage:底层消息发送实现。

  • NettyRemotingClient#invokeSync:通过 Netty 实现网络通信。

  • Broker 端的 SendMessageProcessor:处理发送请求。

源码参考:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message msg)

异步发送:Producer 首先构建一个向 broker 发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。

如下示例代码为一个完整的异步发送流程:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

public class AsyncProducerTest {
  public static void main(String[] args) throws Exception {
    // 1、创建 producer,设置组名为 AsyncGroupTest
    DefaultMQProducer producer = new DefaultMQProducer("AsyncGroup");
    // 2、指定 NameServer的地址,以获取 Broker路由地址
    producer.setNamesrvAddr("x.x.x.x:9876");
    // 3、启动 producer
    producer.start();
    // 4、创建消息,并指定Topic,Tag和消息体
    Message msg = new Message("AsyncTopic","async", "AsyncMessage".getBytes("UTF-8"));
    // 5、发送异步消息,SendCallback是处理异步回调的方法
    producer.send(msg, new SendCallback() {
      @Override
      public void onSuccess(SendResult sendResult) {  // 成功回调
        System.out.println("message send success: " + sendResult);
      }
      @Override
      public void onException(Throwable throwable) {  // 失败回调
        System.out.println("message send fail: " + throwable);
      }
    });
    // 6、关闭 Producer
    producer.shutdown();
  }
}

RocketMQ 的异步发送主要涉及以下几个关键源码类和方法:

  • DefaultMQProducer:生产者类,负责发送消息。

  • MQClientAPIImpl#sendMessage:底层消息发送实现。

  • NettyRemotingClient#invokeAsync:通过 Netty 实现网络通信。

  • Broker 端的 SendMessageProcessor:处理发送请求。

源码参考:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message msg, SendCallback sendCallback)

Oneway发送:Oneway 方式只负责发送请求,不等待应答,Producer只负责把请求发出去,而不处理响应结果。

如下示例代码为一个完整的单向发送流程:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

public class OneWayProducerTest {
  public static void main(String[] args) throws Exception {
    // 1、创建 producer,设置组名为 OneWayGroupTest
    DefaultMQProducer producer = new DefaultMQProducer("OneWayGroup");
    // 2、指定 NameServer的地址,以获取 Broker路由地址
    producer.setNamesrvAddr("x.x.x.x:9876");
    // 3、启动 producer
    producer.start();
    // 4、创建消息,并指定Topic,Tag和消息体
    Message msg = new Message("OneWayTopic","oneway", "OneWayMessage".getBytes("UTF-8"));
    // 5、发送单向消息
    producer.sendOneway(msg);
    // 6、关闭 Producer
    producer.shutdown();
  }
}

RocketMQ 的单向发送主要涉及以下几个关键类和方法:

  • DefaultMQProducer:生产者类,负责发送消息。

  • MQClientAPIImpl#sendMessage:底层消息发送实现。

  • NettyRemotingClient#invokeOneway:通过 Netty 实现网络通信。

  • Broker 端的 SendMessageProcessor:处理发送请求。

源码参考:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendOneway(Message msg)

3种方式对比

我们在调用producer.send方法时,不指定回调方法,则默认采用同步发送消息的方式,这也是丢失几率最小的一种发送方式(但是效率比较低)。

手段二:发送消息如果失败或者超时,则重新发送。

发送重试源码如下,本质其实就是一个for循环,当发送消息发生异常或超时的时候重新循环发送。默认重试3次,重试次数可以通过producer指定。

手段三:broker提供多master模式

即使某台broker宕机了,保证消息可以投递到另外一台正常的broker上。

如果broker只有一个节点,则broker宕机了,即使producer有重试机制,也没用(Broker都挂了,哪来的重试机制),因此利用多主模式,当某台broker宕机了,换一台broker进行投递,保持高可用。

总结

producer消息发送方式虽然有3种,但为了减小丢失消息的可能性尽量采用同步的发送方式,同步等待发送结果,利用同步发送+重试机制+多个master节点,尽可能减小消息丢失的可能性。

Broker处理消息阶段

手段四:提供同步刷盘的策略【等待刷盘成功才会返回producer成功】

public enum FlushDiskType { SYNC_FLUSH, //同步刷盘 ASYNC_FLUSH//异步刷盘(默认) }

我们知道,当消息投递到broker之后,会先存到page cache【页面缓存】,然后根据broker设置的刷盘策略是否立即刷盘,也就是如果刷盘策略为异步,broker并不会等待消息落盘才返回producer一个成功的消息,也就是说当broker所在的服务器突然宕机,则会丢失部分页的消息。同步刷盘的策略【等待刷盘成功才会返回给producer一个成功的消息】

解释:

同步刷盘:当数据写入到内存中之后立刻刷盘(同步的将内存中的数据持久化到磁盘上),在保证刷盘成功的前提下响应一个消息给Producer。

异步刷盘:数据写入内存后,直接响应一个消息给Producer。异步将内存中的数据持久化到磁盘上。

手段五:提供主从模式,同时主从支持同步双写

即使broker设置了同步刷盘,如果主broker磁盘损坏,也是会导致消息丢失。 因此可以给broker指定slave,同时设置master为SYNC_MASTER,然后将slave设置为同步刷盘策略。

此模式下,producer每发送一条消息,都会等消息投递到master和slave都落盘成功了,broker才会当作消息投递成功,从而保证休息不丢失。

总结

在broker端,消息丢失的可能性主要在于刷盘策略和同步机制。

RocketMQ默认broker的刷盘策略为异步刷盘,如果有主从,同步策略也默认的是异步同步,这样子可以提高broker处理消息的效率,但是会有丢失的可能性。因此可以通过同步刷盘策略+同步slave策略(slave也可以进行刷盘)+主从双写的方式解决丢失消息的可能。

Consumer消费消息阶段


手段六:consumer默认提供的是At least Once机制

        从producer投递消息到broker,即使前面这些过程保证了消息正常持久化,但如果consumer消费消息没有消费到也算是消息的丢失。因此RockerMQ默认提供了At least Once机制保证消息可靠消费。

何为At least Once?

Consumer先pull【主动拉取Broker中的信息】 消息到本地,消费完成后,才向服务器返回ack(消费成功的消息--acknowledge)。

通常消费消息的ack机制一般分为两种思路:

1、先提交后消费;

2、先消费,消费成功后再提交【这个更稳当】;

思路一可以解决重复消费的问题但是会丢失消息,因此Rocketmq默认实现的是思路二,由各自consumer业务方保证幂等(通过给每个消息携带一个唯一标识信息,去数据库进行判断。或者在producer的时候就存储一个唯一标识(消息),消费成功删除redis中的消息确保不被重复消费。)来解决重复消费问题。

手段七:消费消息重试机制

        当消费消息失败了,如果不提供重试消息的能力,则也不能算完全的可靠消费,因此RocketMQ本身提供了重新消费消息的能力。

总结

consumer端要保证消费消息的可靠性,主要通过At least Once+消费重试机制保证。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1973812.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

git分支介绍

git branch 查看当前分支情况 可以看见当前只有一个分支叫main,也就是默认分支,可以理解为树的主干,git早期版本中默认分支叫master 命令行创建一个新分支 git branch [分支名]在创建之后,如果需要切换到新分支需要git switc…

用VBA在Word文档中快速查找到黄色底纹内容

一、效果展示 二、代码 Sub 快速查找定位到黄色底纹内容()With Selection.Find.ClearFormatting.Wrap wdFindContinue.Font.Shading.BackgroundPatternColorIndex wdYellow.ExecuteEnd With End Sub

C++面向对象高级编程(I)侯捷 自我总结版

关键词: 类的防卫式声明,模版,内联函数,构造函数以及重载,const,pass by reference(有&符号),friend,操作符重载,返回local object的不能retu…

【LightGlue】lightglue performance

【LightGlue】lightglue performance 1. 图表分析2. 数据点解释3. LightGlue的细节4. 结论 这张图表展示了不同特征匹配方法在图像对每秒处理速度和相对位姿精度(Relative Pose Accuracy)之间的关系。 1. 图表分析 X轴(横轴)&…

使用 abortNavigation 阻止导航

title: 使用 abortNavigation 阻止导航 date: 2024/8/3 updated: 2024/8/3 author: cmdragon excerpt: 摘要:在Nuxt3中,abortNavigation是一个辅助函数,用于路由中间件内阻止不符合条件的页面访问,实现权限控制、错误处理及动态…

3条职场情商法则,让你少走弯路

人生的幸福和工作息息相关,它占据了大量时间,而时间是人生最珍贵的资源。但当年轻人从校园走进职场时,却常常面临困境。因为职场和学校是完全不同的场所,游戏规则大相径庭。 人们会惊奇地发现,曾经的学霸可能在职场中…

Qt 5.14安装(配置MSVC 2017)

Qt 5.14安装(配置MSVC 2017) 记录一下自己安装配置Qt的步骤。 我需要安装Qt,并使用MSVC编译,所以才写了这篇文章。 一、环境 操作系统:windows 11 (64位) Qt:Qt 5.14.2 vs&…

Linux 命令,touch说明与使用

1:touch命令功用: 对一个或多个文件,将访问时间和修改时间以及日期进行更新,既在 Linux 和 Unix 系统中,改变文件的访问和修改时间,touch对于强制其他的命令以某一方式处理文件时是有用的。如make的操作 和某些find命令…

LED显示屏的秘密:揭秘模拟信号与模拟电路

在我们日常生活中,LED显示屏无处不在,从商场的广告牌到舞台的背景墙,它们用绚丽多彩的画面吸引着我们的眼球。但你是否好奇,这些神奇的屏幕背后,是如何处理和显示信息的呢?今天,就让我们一起揭开…

计算机毕业设计hadoop+hive+spark医院数据分析大屏 医疗数据分析 医疗可视化 医院大数据 医院爬虫 医疗爬虫 数据仓库 大数据毕业设计

绪论 研究背景和意义 近年来,随着第五次科技革命的到来,科学、信息技术等迅猛发展,医疗信息方面的数据激增,全国各大医院每日也产生大量的数据,包括门诊每日接诊数据、患者信息数据、医院资源消耗数据等。数据的高速产…

Radamsa:一款高性能通用模糊测试工具

关于Radamsa Radamsa是一款高性能的通用模糊测试工具,广大研究人员可以将其当作一个应用程序稳定性测试的测试用例生成工具。 工具运行机制 该工具使用简单,支持自定义脚本开发,可以用于测试程序对格式错误和潜在恶意输入的承受能力。它的工…

MapCrafter - 定制精美的地图海报! | 限时免费

MapCrafter: 打造个性化城市地图海报的终极工具! 在您的 iPhone、iPad、Mac 或 VisionPro 上轻松制作美丽的城市地图海报,展示您的城市情感与创意。 https://apps.apple.com/cn/app/mapcrafter/id6557037905 为什么选择 MapCrafter? • 城市…

【中项】系统集成项目管理工程师-第9章 项目管理概论-9.7 项目管理原则

前言:系统集成项目管理工程师专业,现分享一些教材知识点。觉得文章还不错的喜欢点赞收藏的同时帮忙点点关注。 软考同样是国家人社部和工信部组织的国家级考试,全称为“全国计算机与软件专业技术资格(水平)考试”&…

CTF-web基础 TCP/UDP协议

传输层协议由TCP/UDP协议组成,来控制信息的传输,二者有什么区别呢,TCP比较靠谱,但是UDP速度比较快一点。 TCP协议 Transmission Control protocol, 三次握手:先给服务器传输询问要发消息,然后…

Python语言编制批处理文件管理器

在软件开发和系统管理中,批处理文件(.bat)是一种常见且有用的工具。它们可以自动化重复性任务,简化复杂的操作流程。然而,随着批处理文件数量的增加,管理和执行这些文件可能变得麻烦。今天,我们…

uniapp HarmonyOS项目实战

1. uniapp HarmonyOS项目实战 Dcloud发布了uniapp兼容鸿蒙的文档:Uniapp开发鸿蒙应用 1.1. 说明 (1)鸿蒙开发只支持Vue3,不支持Vue2、不支持plus、但支持nvue (2)nvue编译到鸿蒙后非原生渲染,…

AUTOSAR实战教程 - 模式管理BswM与其他各模块的交互

近日驻厂某OEM,幸得大块的个人时间, 把BswM这一块的内容从ETAS/ISOLAR工具配置到代码实现做了一个全方位的CT. 2024,希望孜孜内卷的汽车人升职加薪! 博主近期写的一首小诗,也一并送给大家,懂的都懂: 在看不到阳光的冬天/ 我染了风寒/ 白天点灯/ 晚上吃药/ 躺在被窝里才敢…

2024上海初中生古诗文大会暑假备考:单选题真题和独家解析

今天是2024年8月3日,现在距离2024年初中生古诗文大会初选还有3个月(11月3日正式开赛),我们继续来看10道选择题真题和详细解析。为帮助孩子自测和练习,题目的答案和解析统一附后。 本专题持续分享,欢迎到我…

C 语言二分查找法

二分查找定义 二分查找法用于查找一个有序数组中某个目标值是否存在,或者接近目标值的元素;相比把 整个数组遍历一次的0(n)复杂度,二分查找可以把复杂度降低到0(logzn): 原理讲解 原来中间的值mid (left right)/ 2,但…

用NSAT-1000实现S参数的自动化测试

在射频技术领域,S参数的准确测量对于保证产品质量至关重要。本文将带您了解NSAT-1000测试系统实现S参数自动化测试的流程。 S参数的测试原理 S参数测试原理是基于网络分析仪,网络分析仪发出一个已知频率和幅度的射频信号,通过网分测量信号得到…