RocketMQ 如何保证全链路消息不丢失?

news2024/11/13 11:01:56

目录

1. RocketMQ 消息丢失的原因有哪些

2. 如何保证 RocketMQ 全链路消息不丢失

2.1 保证生产者发送消息到 MQ,消息不丢失

2.2 保证消息写入 Broker 后不丢失

2.3 保证 Broker 集群时,消息不丢失

2.4 保证消费者消费消息不丢失

3. 如果整个 MQ 服务都挂了呢,怎么保证消息零丢失


1. RocketMQ 消息丢失的原因有哪些

​所有 MQ 产品消息丢失的元凶无非就两个:网络+缓存

这两个原因落地到具体的场景:

1.生产者发送消息到 MQ,消息丢失

2.消息写入 MQ,消息丢失

    ① 消息写入内存后非正常关机;

    ② 消息写入磁盘后,磁盘坏了;

    ③ master 数据备份到 slave 由于网络原因导致备份失败;

3.消息者消费 MQ ,消息丢失

2. 如何保证 RocketMQ 全链路消息不丢失

2.1 保证生产者发送消息到 MQ,消息不丢失

【方案一】同步发送 + 消息重试机制

Producer 发送消息的三种方式:

1.单向发送:消息发送出去就不管了.

2.同步发送: 同步等待 Broker 响应.

3.异步发送: 异步处理 Broker 通知.

同步发送+消息重试:生产者向 mq 发送消息,mq 收到消息成功就会回复 ack,如果生产者没收到 ack,就会重试再次发送消息到 mq,通过这种机制来保证消息不丢失。(重试的消息会被加入到重试队列中,重试一定次数还未成功发送就会被加入到死信队列中)

PS:同步发送有个缺点,生产者在等待服务端回复ack的过程,他干不了别的事,所以比较综合的方式就是采用异步发送的方式,异步发送它会注册一个 sendCallback 回调监听器,相当于找了个小弟给你干活,可以在小弟的这个回调方法里面写逻辑,如果消息发送失败了,让小弟去重试就行。

【方案二】RocketMQ 的事务消息机制

⭐ RocketMQ 事务消息机制的思想:

        首先,事务消息的本质就是,生产者这边处理一个本地事务,消费者这边处理一个本地事务,这两个事务要保证一个原子性,但这是一个分布式事务,直接保证这两个事务的一致性很难做到; 而 RocketMQ 的事务机制是怎么保证这两个本地事务的一致性的呢,它是通过保证生产者处理本地事务和往 RocketMQ 发消息这两个操作是原子性的,然后在消费者这一端,只要生产者消息成功发送到了 MQ,MQ 就可以通过重试的机制将消息发送给消费者,哪怕当时消费者的事务处理失败了,那么经过多次的重试,最终消费的事务也会执行成功。所以他的本质是通过保证事务消息的一半,从而来保证整体业务逻辑的事务性

⭐ RocketMQ 事务消息机制的执行流程:

【参照上图】

1.生产者向 MQ 发送半消息(half),MQ 回复半消息.   

    这个消息不会被下游消费者消费,用于判断 MQ 服务是否正常.

2.生产者处理自己的本地事务.

3.生产者返回本地事务状态给 MQ.

    总共有三种状态:

    ① 本地事务执行成功 commit 状态,MQ 可以将消息向下游消费者推送.

    ② 本地事务执行失败 rollback 状态,MQ 直接将消息丢弃.

    ③ 本地事务执行时间比较长,unknow 状态,MQ 拿到这个消息后,等一段时间,就会向生产者发起一个响应,来确认生产者的本地事务有没有完成,生产者就会检查本地事务的状态,成功就返回 commit,失败就返回 rollback。这个发起确认生产者状态的操作默认可以重试 15 次,如果超过 15 次, 生产者的本地事务还没有执行成功,MQ 就会 rollback,丢弃消息.

⭐ Rocket 事务消息机制的应用场景:

如上图,在电商项目中,当用户下单后,等待支付,15 分钟内要完成支付这么一个场景,就可以使用 RocketMQ 事务消息机制来做!!

【定时任务做法】

        1. 这个场景最简单的做法就是,用户下完单之后,起一个 15 分钟的定时任务,然后等待 15 分钟结束后,再去检查一下用户有没有完成支付,完成支付了就 commit,没有完成支付就 rollback。

        2. 但是这样存在一个问题,如果用户下完订单 1 分钟就完成支付了呢?所以这样就没办法立即感知用户的支付状态,就办法立即给用户发货。(例如火车票,机票抢票)

        3. 所以就不能等到 15 分钟后再去检查订单状态,就得每隔一分钟就做一个定时任务,而且还要维护任务的状态,处理任务失败的情况,这就显得很麻烦。

【RocketMQ 事务消息机制的做法】

使用 RocketMQ 事务消息的机制来做就显得更优雅。执行流程如下:

  1. 订单创建后,生产者发送一条事务消息给 RocketMQ,表示创建订单.
  2. 生产者执行本地事务(在 MySQL 中记录订单信息)。
  3. 向支付系统申请预支付订单.
  4. 主动向 MQ 返回 Unknow 状态. (即使下单成功)
  5. RocketMQ 在接收到 Unknow 状态后,会定期回查生产者的事务状态.
  6. 生产者在回查时检查本地事务状态,如果订单已支付,则提交事务消息;如果订单未支付且超过15分钟,则回滚事务消息并取消订单.

所以定时任务中复杂的检查过程就得以优化了,但是我们要在 RocketMQ 中配置对应的参数,例如:回查的次数、回查的间隔、事务超时的时间

TransactionMQProducer producer 
     = new TransactionMQProducer("transaction_producer_group");

// 设置事务回查参数
producer.setTransactionCheckMax(10);        // 每条消息最大回查 10 次
producer.setTransactionCheckInterval(30000); // 回查间隔 30 秒
producer.setTransactionTimeout(60000);      // 事务超时 60 秒

PS:事务超时时间:消息从发送到生产者开始,在指定时间内没有提交或回滚事务,则认为事务重试,就进行回查。

当然,有的朋友可能会问了,支付系统那边订单支付成功后,会有一个回调事件来通知你事务执行成功了,万一支付系统他这个消息不发呢,发失败了怎么办? 万一呢是不是,所以主动去查 + 被动去推,这样才是最稳妥的做法。

2.2 保证消息写入 Broker 后不丢失

当我们把消息发送到 Broker 后,它会将消息存到缓存里面,缓存会将这些数据过段时间写入磁盘,那缓存也会存在断电丢失的可能,那如何保证消息不丢失呢 ?

PS:有些朋友就会说了,我不用这个缓存,直接写磁盘,不就没问题了??

应用能控制的只有用户态的缓存,而内核态还涉及了一个缓存(PageCache),这个缓存只有操作系统内部可以调用。所以这个缓存没办法不用,这是操作系统强制使用的。所以此处的缓存丢失消息的可能针对的是内核态的缓存。

【同步刷盘机制】

虽然应用程序控制不了内核态的缓存,但是操作系统提供了一个刷盘的接口,而应用程序就可以调这个刷盘的操作去申请一次刷盘,将这个 PageCache 刷到磁盘里面去。

同步刷盘:同步刷盘也不能绝对的保证数据不丢失!所谓的同步,字面意思是来一个消息,就调用一次刷盘,但是当消息非常多的时候,那么这个调用刷盘的频率就会非常高,那么操作系统就扛不住了,而操作系统就是因为写磁盘太慢才设计了一个 PageCache 缓存。所以所谓的同步刷盘并不是来一个消息就调用一次刷盘操作,RocketMQ 是每隔 10 ms 定时调用一次刷盘,只不过这个定时间隔比较短,它可以减少消息丢失的可能性。本质上来说,它还是存在消息丢失的可能。

【异步刷盘】

RocketMQ 还提供了异步刷盘的机制,所谓的异步刷盘不只是消息积累一批,然后再一次性写入磁盘。RocketMQ 对异步刷盘也是做了处理的

异步刷盘:异步刷盘可以配置是使用堆内存,还是使用堆外内存(直接内存)。

1. 使用堆内存,这些消息就会在 JVM 的堆中进行分配,当需要消息刷盘的时候, RocketMQ 的后台线程会主动调用刷盘操作将消息写入磁盘;但是这个过程有 GC 的介入,就免不了 STW,这就会影响消息从 RocketMQ 写入堆内存的速度(STW 直接暂停)以及消息从堆内存刷到磁盘的速度(STW 影响后台线程,导致刷盘速度下降)。

2. 使用直接内存,消息写入内存以及消息的刷盘就不会受 JVM 垃圾回收的影响,它可以更高效的利用操作系统的内存和 I/O 调度机制。使用直接内存,就不再是 RocketMQ 主动调用刷盘了,而是操作系统接管了,它会通过脏页的机制先给内存中修改了但尚未刷盘的消息打上一个脏页的标记,一旦这个脏页的内存超过整个内存的阈值了,它内部就会调用一次刷盘操作。当然使用直接内存虽然性能更优,但是它会导致内存泄漏,因为没有 GC 的介入,长时间的内存泄露,可能会导致 OOM。

PS:异步刷盘使用直接内存,RocketMQ 不再主动调用刷盘,但并不代表着它无法控制直接内存写入磁盘的时机了,它可以利用 mmap 内存映射机制 (使用 FileChannel.map 方法将磁盘文件的一部分映射到直接内存,即直接缓冲区),当需要保证数据一致性的时候,主动去同步脏页(调用 MappedByteBuffer.force() 强制将缓冲区的内容写入磁盘)。

当然有些朋友可能会问,应用程序直接决定消息写入磁盘的位置,如果我将消息写入到磁盘的启动 Cache 上,那操作系统不就崩了吗 ??

所谓的内存映射,它映射的区域仅限于它所映射的文件部分,不能越界访问其他内存区域系统关键区域。即使应用程序尝试访问或修改不属于自己的内存区域,操作系统的权限控制机制也会阻止这种操作。

2.3 保证 Broker 集群时,消息不丢失

【方案一】普通集群,指定角色,各司其职

普通集群可以使用同步同步的主节点来 "保证" 消息不丢失。

ASYNC_MASTER:异步同步的主节点

SYNC_MASTER:同步同步的主节点

SLAVE:从节点 (master 节点挂了后,slave 节点不会切换)

所谓同步同步的主节点,就是当生产者发送消息到 Broker 的时候,Broker 集群的主节点会先将数据同步到 slave,同步完之后才会给生产返回响应。

而异步同步的主节点,当生产者发送消息到 Broker 后,主节点会先返回响应给生产者,然后异步的发起一个线程往 slave 同步数据。

【方案二】Dledger 高可用集群,自行选举,多数同意

1.生产者将消息写到 Leader

2 Leader 将消息写到自己的 commitLog 日志里面,

3. 然后给生产者返回一个响应,

4. 随着心跳,给其他的节点发起消息同步,

5. 如果有多数的节点完成了消息同步,那么最终这个消息就会写到磁盘里,并记为 commit 状态。

但是这种集群下,它仍然是会存在消息丢失的可能性的,只不过这个可能性非常的小(在工程上忽略不计)。

当 Leader 节点将消息记录到 commitLog 里面后,还没来得及同步消息,它就挂了,这个时候,Dledger 集群就会选出一个日志最新的节点作为 Leader,而挂掉的 Leader 节点中还未提交的消息,当服务重启的时候,就会被主动丢弃,然后以新的 Leader 中的消息为准。

2.4 保证消费者消费消息不丢失

此处消费者端由于有消息重试机制,所以通常是不会丢失的,更多的是要考虑消息幂等性问题(由于网络抖动,消费可能被消费多次)。

虽然说有消息重试机制,但并不代表着消息的绝对不丢失,在某些情况下,还是会存在消息丢失的,当 MQ 发送消息给消费者,如果消费者在消息还未实际处理完成之前就返回了消费成功的响应,就存在消息丢失的可能。所以要保证消费者消费消息不丢失,最好还是实际处理完消息后,再返回消费成功的响应。

3. 如果整个 MQ 服务都挂了呢,怎么保证消息零丢失

当整个 MQ 服务都挂了,那么就需要有一个降级机制来临时存储未成功发送的消息,确保系统在恢复后能够尽快地将这些消息重新写入 MQ。可以选择 Redis 、本地文件,或者内中的集合作为降级缓存。然后新起一个线程不断的尝试将降级缓存里的消息写入到 MQ,这样至少在 MQ 服务重启时,消息可以尽快的写入到 MQ 里去。

【总结】

RocketMQ 如何保证全链路消息不丢失:

1. 生产者发送消息到 MQ,消息不丢失

  • 同步发送 + 多次尝试   -- 降低吞吐
  • 事务消息机制  -- 多次网络请求

2. Broker 收到消息后消息不丢失

  • 设置同步刷盘   -- I/O 负担
  • 搭建 Dledger   -- 不断地 RPC 心跳,网络负担

3.  消费者消费消息不丢失

  • 同步处理消息,再提交 offset   -- 无法通过异步提高吞吐

4. 整个 MQ 集群挂了,如何保证消息零丢失

  • 增加临时的降级存储

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1931523.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

学习测试9-接口测试 3-jmeter

jmeter启动 测试计划 1 创建线程组 2 创建http请求 数据类型 from表单数据可以通过剪切板直接粘贴 JSON数据需要从括号开始复制 3 查看结果树 4 http cookie管理器,可以记住登录状态 内部不用设置 5 断言 系统返回的信息进行判断 系统返回“新增会议信息成功” …

SerDes系列之如何选择AC耦合电容

交流耦合电容用于隔离PCB互连时的直流分量(Common-mode voltage),同时传递交流分量(Voltage swing),其作用类似于一个高通滤波器。 但是,如果电容容值选取不当,使用过程中会产生信号…

Linux中的环境变量

一、基本概念 环境变量(environment variables)一般是指在操作系统中用来指定操作系统运行环境的一些参数。 如:我们在编写C/C代码的时候,在链接的时候,从来不知道我们的所链接的动态静态库在哪里,但是照样可以链接成功&#xff…

Guava LocalCache源码分析:LocalCache的get、put、expand、refresh、remove、clear、cleanUp

Guava LocalCache源码分析:LocalCache的get、put、expand 前言一、get二、put三、expand 前言 上篇文章,详细描写了Guava LocalCache怎样如ConcurrentHashMap对缓存数据进行了分段存储。本章主要针对LocalCache重要的几个接口进行说明。 一、get CanIg…

[笔记]Fluke3563 振动分析仪

参考文档:Fluke 3563 Analysis Vibration Sensor system | Fluke 1.四大机械故障损伤原因 2.振动特征 福禄克做的示意图很棒: 不平衡对应转动轴的一倍频,不对中是2倍频,然后3~6倍频会有未紧固故障,更高频的位置是齿轮…

怎么压缩视频文件?简单的压缩视频方法分享

视频已成为我们日常生活中不可或缺的一部分。但随着视频质量的提高,文件大小也逐渐成为我们分享的阻碍。如何有效压缩视频文件,使其既能保持清晰,又能轻松分享?今天,给大家分享五种实用的视频压缩方法,快来…

ubuntu上模拟串口通信

前言 有时候写了一些串口相关的程序,需要调试的时候,又没有硬件,或者需要等其他模块完成才能一起联调。这样搭建环境费时费力,很多问题等到最后联调才发现就已经很晚了。 本文提供一种在ubuntu环境下模拟串口,直接就可…

性价比高的宠物空气净化器什么牌子好?热门养宠空气净化器分享

作为一名有6年经验的铲屎官,许多新手铲屎官可能听说过宠物空气净化器,但了解不多。实际上,宠物空气净化器是养猫家庭必备的小家电之一。它的大面积进风口能有效吸附空气中的浮毛和皮屑,专门的除臭技术可以去除猫咪带来的异味。宠物…

Python 视频水印批量添加器

功能如下可以 一、选择水印位置 二、批量添加水印 三、可添加文本或图片 # -*- 编码:utf-8 -*- import cv2 import os import numpy as np from moviepy.editor import VideoFileClip from concurrent.futures import ThreadPoolExecutor import tkinter as tk fro…

【深度学习】FaceChain-SuDe,免训练,AI换脸

https://arxiv.org/abs/2403.06775 FaceChain-SuDe: Building Derived Class to Inherit Category Attributes for One-shot Subject-Driven Generation 摘要 最近,基于主体驱动的生成技术由于其个性化文本到图像生成的能力,受到了广泛关注。典型的研…

PostgreSQL使用(二)

说明:本文介绍PostgreSQL的DML语言; 插入数据 -- 1.全字段插入,字段名可以省略 insert into tb_student values (1, 张三, 1990-01-01, 88.88);-- 2.部分字段插入,字段名必须写全 insert into tb_student (id, name) values (2,…

分享3个好用的启动盘u盘制作工具

对于经常需要安装维护电脑的同学,制作一个可启动的U盘是非常有必要的。小编今天就和大家分享三款优秀的U盘启动盘制作工具:Ventoy、UltraISO和Rufus。 1. Ventoy Ventoy是一款开源的启动U盘制作工具,它支持将ISO、WIM、IMG、VHD(x)和EFI等类…

SpringMVC 控制层框架-上

一、SpringMVC简介 1. 介绍 Spring Web MVC 是基于Servlet API构建的原始Web框架,从一开始就包含在Spring Framework 中。在控制层框架经历Srust、WebWork、Strust2等诸多产品的历代更迭之后,目前业界普遍选择了SpringMVC 作为Java EE项目表述层开发的首…

如何通过网络快速搜寻到自己的STM32设备

目录 一、问题概述 二、解决思路 三、代码实现 1.创建任务 2.UDP广播接收 一、问题概述 以前一直用RS232串口修改设备配置信息,但是现场施工人员的232线太细,经常容易断掉,这次准备用网口去修改,遇到了一个问题,…

WINUI或WPF灵活使用样式、控件模板、自定义控件、用户控件

在WINUI与WPF 中,控件模板(ControlTemplate)、样式(Style)、自定义控件(CustomControl)和用户控件(UserControl)都是构建复杂和灵活用户界面的重要工具,但它们…

vue3 中 lottie-web 封装组件

用到的JSON文件在“我的资源”里&#xff0c;下面这个链接直达 下面的代码中用到的JSON数据源 Lottie.vue <script setup> import { ref, onMounted } from vue import lottie from lottie-web// 设置组件参数 const props defineProps({renderer: {type: String,def…

手把手带你白嫖10年服务器

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 &#x1f38f;&#xff1a;你只管努力&#xff0c;剩下的交给时间 &#x1f3e0; &#xff1a;小破站 手把手带你白嫖10年服务器 如何获取如何使用成果个人网站 个人邮箱服务 重要的话重要说&#xff…

Ubuntu部署K8S集群-图文并茂(超详细)

Ubuntu部署K8S集群 1. 模版机系统环境准备1.1 安装Ubuntu1.2 设置静态IP地址 2. 主机准备2.1 使用模板机创建主机2.2 主机配置2.2.1 修改静态IP2.2.2 修改主机名2.2.3 主机名-IP地址解析2.2.4 时间同步2.2.5 内核转发、网桥过滤配置2.2.6 安装ipset和ipvsadm2.2.7 关闭SWAP分区…

【射频器件供应】Flann Microwave

国家 United Kingdom 地址 Flann Microwave Ltd Dunmere Road Bodmin, Cornwall PL31 2QL United Kingdom Flann Microwave于1956年成立于泰晤士河畔金斯顿萨里。在过去的四十年里&#xff0c;Flann Microwave一直是市场领先的天线设计公司&#xff0c;其精密微波器件和测试频…

鸿蒙开发:Universal Keystore Kit(密钥管理服务)【获取密钥属性(ArkTS)】

获取密钥属性(ArkTS) HUKS提供了接口供业务获取指定密钥的相关属性。在获取指定密钥属性前&#xff0c;需要确保已在HUKS中生成或导入持久化存储的密钥。 开发步骤 指定待查询的密钥别名keyAlias&#xff0c;密钥别名最大长度为64字节。调用接口[getKeyItemProperties]&…