RabbitMQ 消息丢失的场景,如何保证消息不丢失?

news2025/1/10 2:14:47

一.RabbitMQ消息丢失的三种情况

  • 第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。

  • 第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了

  • 第三种:消费端弄丢了数据。刚消费到,还没处理,结果进程挂了,比如重启了。

二.RabbitMQ消息丢失解决方案

1.针对生产者

方案1 :开启RabbitMQ事务

可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit

// 开启事务
channel.txSelect
try {
      // 这里发送消息
} catch (Exception e) {
      channel.txRollback

// 这里再次重发这条消息

}

// 提交事务
channel.txCommit

缺点:

RabbitMQ 事务机制是同步的,你提交一个事务之后会阻塞在那儿,采用这种方式基本上吞吐量会下来,因为太耗性能。

方案2:使用confirm机制

事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的

在生产者开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmq之中,rabbitmq会给你回传一个ack消息,告诉你这个消息发送OK了;如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。

//开启confirm
channel.confirm();
//发送成功回调
public void ack(String messageId){
  
}

// 发送失败回调
public void nack(String messageId){
    //重发该消息
}

2.针对RabbitMQ

说三点:

(1)要保证rabbitMQ不丢失消息,那么就需要开启rabbitMQ的持久化机制,即把消息持久化到硬盘上,这样即使rabbitMQ挂掉在重启后仍然可以从硬盘读取消息;

(2)如果rabbitMQ单点故障怎么办,这种情况倒不会造成消息丢失,这里就要提到rabbitMQ的3种安装模式,单机模式、普通集群模式、镜像集群模式,这里要保证rabbitMQ的高可用就要配合HAPROXY做镜像集群模式

(3)如果硬盘坏掉怎么保证消息不丢失

(1)消息持久化

RabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失。

所以就要对消息进行持久化处理。如何持久化,下面具体说明下:

要想做到消息持久化,必须满足以下三个条件,缺一不可。

  • Exchange 设置持久化

  • Queue 设置持久化

  • Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息

(2)设置集群镜像模式

我们先来介绍下RabbitMQ三种部署模式:

  • 单节点模式:最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。

  • 普通模式:消息只会存在与当前节点中,并不会同步到其他节点,当前节点宕机,有影响的业务会瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。

  • 镜像模式:消息会同步到其他节点上,可以设置同步的节点个数,但吞吐量会下降。属于RabbitMQ的HA方案

为什么设置镜像模式集群,因为队列的内容仅仅存在某一个节点上面,不会存在所有节点上面,所有节点仅仅存放消息结构和元数据。下面自己画了一张图介绍普通集群丢失消息情况:

如果想解决上面途中问题,保证消息不丢失,需要采用HA 镜像模式队列。

下面介绍下三种HA策略模式:

  • 同步至所有的

  • 同步最多N个机器

  • 只同步至符合指定名称的nodes

命令处理HA策略模版:

rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

1)为每个以“rock.wechat”开头的队列设置所有节点的镜像,并且设置为自动同步模式

rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

2)为每个以“rock.wechat.”开头的队列设置两个节点的镜像,并且设置为自动同步模式

rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat" \
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

3)为每个以“node.”开头的队列分配指定的节点做镜像

rabbitmqctl set_policy ha-nodes "^nodes\." \
'{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

但是:HA 镜像队列有一个很大的缺点就是:系统的吞吐量会有所下降

(3)消息补偿机制

为什么还要消息补偿机制呢?难道消息还会丢失,没错,系统是在一个复杂的环境,不要想的太简单了,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题,

但是作为有追求的程序员来讲,要绝对保证我的系统的稳定性,有一种危机意识。

比如:持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,消息丢了,怎么办?

1)生产端首先将业务数据以及消息数据入库,需要在同一个事务中,消息数据入库失败,则整体回滚。

2)根据消息表中消息状态,失败则进行消息补偿措施,重新发送消息处理。

3.针对消费者

方案一:ACK确认机制

多个消费者同时收取消息,比如消息接收到一半的时候,一个消费者死掉了(逻辑复杂时间太长,超时了或者消费被停机或者网络断开链接),如何保证消息不丢?

使用rabbitmq提供的ack机制,服务端首先关闭rabbitmq的自动ack,然后每次在确保处理完这个消息之后,在代码里手动调用ack。这样就可以避免消息还没有处理完就ack。才把消息从内存删除。

这样就解决了,即使一个消费者出了问题,但不会同步消息给服务端,会有其他的消费端去消费,保证了消息不丢的case。

三.总结

如果需要保证消息在整条链路中不丢失,那就需要生产端、mq自身与消费端共同去保障。

  • 生产端:对生产的消息进行状态标记,开启confirm机制,依据mq的响应来更新消息状态,使用定时任务重新投递超时的消息,多次投递失败进行报警。

  • mq自身:开启持久化,并在落盘后再进行ack。如果是镜像部署模式,需要在同步到多个副本之后再进行ack。

  • 消费端:开启手动ack模式,在业务处理完成后再进行ack,并且需要保证幂等。

通过以上的处理,理论上不存在消息丢失的情况,但是系统的吞吐量以及性能有所下降。

在实际开发中,需要考虑消息丢失的影响程度,来做出对可靠性以及性能之间的权衡。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/675364.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

软考A计划-系统集成项目管理工程师--一般常识-中

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例点击跳转>软考全系列 👉关于作者 专注于Android/Unity和各种游戏开发技巧&#xff…

【深度学习推荐系统 理论篇】一、Wide Deep Learning for Recommender Systems

前言 在搜广推业务做了3年工程,最近终于有空整理下,完整的梳理下自己的知识架构(预计分为理论篇/工程篇) Wide & Deep论文链接:https://arxiv.org/abs/1606.07792 另外王喆老师《深度学习推荐系统》中&#xff…

安装 Nginx 服务

一.安装 Nginx 服务 1.关闭防火墙 开机自启起 安全机制 systemctl stop firewalld systemctl disable firewalld setenforce 0 2、安装依赖包 yum -y install pcre-devel zlib-devel gcc gcc-c make 3、创建运行用户 useradd -M -s /sbin/nologin nginx 4、编译安装 cd …

总结908

学习目标: 月目标:6月(线性代数强化9讲,背诵15篇短文,考研核心词过三遍) 周目标:线性代数强化3讲,英语背3篇文章并回诵,检测 每日必复习(5分钟)…

永磁同步直线电机学习笔记——直线电机的数学模型

永磁直线电机数学模型的建立,是进行后续控制仿真和实验的前提。为了实现永磁同步直线电机的矢量控制,需要把永磁同步直线电机假想成永磁同步旋转电机,借鉴旋转电机的电流分析方式,实现dq轴电流控制的解耦,并且把永磁同…

为什么常见电路板GND与外壳GND之间接一个电阻一个电容

集电极开路是指集电极电路中出现了断路的情况,导致电路无法正常工作。在集电极开路的情况下,电路中的电流无法通过集电极流过,导致电路无法正常放大信号或者控制其他器件的工作。 集电极开路的原因有很多,可能是器件本身的故障、…

C++进阶—继承(下)菱形(虚拟)继承分析虚拟继承存储对象模型

目录 0. 前言 1. 普通多继承下,基类和派生类复制转换底层细节(切片) 2. 多继承下的复杂菱形继承 3. 菱形虚拟继承(虚基类)重点 3.1 菱形非虚拟继承对象存储模型 3.2 菱形虚拟继承对象存储模型 3.3 虚拟继承对象存储模型 3.4 多对象继承关系分析其虚基类&…

Redis原理 - 内存策略

原文首更地址,阅读效果更佳! Redis 本身是一个典型的 key-value 内存存储数据库,因此所有的 key、value 都保存在之前学习过的 Dict 结构中。不过在其 database 结构体中,有两个 Dict :一个用来记录 key-value&#xf…

【计算机网络详解】——软件定义网络SDN(学习笔记)

目录 🕒 1. 概念🕒 2. OpenFlow 协议 🕒 1. 概念 软件定义网络(Software Defined Network,SDN)的概念最早由斯坦福大学的Nick McKeown教授于2009年提出。SDN最初只是学术界讨论的一种新型网络体系结构。SD…

基于JAVA实现的简易学生信息管理系统(附源码)

一、前言 最近在学习JAVA,这几天跟着网上的视频学完基础知识之后,做了一个学生信息管理系统,写的比较普通,没太大亮点,希望可以给初学者一些参考经验,另外,如有不恰当的地方还请各位指正&am…

论文解读:End-to-End Object Detection with Transformers

发表时间:2020 论文地址:https://arxiv.org/pdf/2005.12872.pdf 项目地址:https://github.com/facebookresearch/detr 提出了一种将对象检测视为集合预测问题的新方法。我们的方法简化了检测流程,有效地消除了许多手工设计的组件…

解决跨域问题的两种方案

说明:跨域是指,在A向B发送请求时,如果A和B的协议、端口号和域名有一个不相同。跨域问题是指,浏览器出于安全,会阻止跨域的异步请求(如Ajax),而在分布式的开发环境下,跨域…

ChatGPT在媒体与娱乐领域的沉浸式场景:虚拟主持人和创意助手的新应用探索

第一章:引言 在当今数字化时代,人工智能技术在媒体与娱乐领域的应用日益广泛。ChatGPT作为一种先进的自然语言处理模型,具备强大的对话生成能力和创造力,为媒体与娱乐产业带来了新的创意和可能性。本文将探讨ChatGPT在媒体与娱乐…

学了那么长时间的编程,C语言的各种操作符都搞不懂?点开这里有详细的介绍—>

目录 前言 一、原码、反码、补码的基础概念 1.原码 2.反码 3.补码 二、原码、反码、补码的计算方法 1.原码 2.反码 3.补码 三、算术操作符 四、移位操作符 1. 左移操作符 移位规则: 2. 右移操作符 移位规则: (1) …

MySQL 中Relay Log打满磁盘问题的排查方案

MySQL 中Relay Log打满磁盘问题的排查方案 引言: MySQL Relay Log(中继日志)是MySQL复制过程中的一个重要组件,它用于将主数据库的二进制日志事件传递给从数据库。然而,当中继日志不断增长并最终占满磁盘空间时&…

【openGauss数据库审计项配置审计日志维护】--略有小成

【openGauss数据库审计项配置&审计日志维护】--略有小成 🔻 一、openGauss数据库审计🔰 1.1 关于openGauss审计功能🔰 1.2 openGauss审计功能开启🔰 1.3 配置具体的审计项 🔻 二、查看审计结果🔻 三、维…

day13_类中成员之一:构造器

由来 我们发现我们new完对象时,所有成员变量都是默认值,如果我们需要赋别的值,需要挨个为它们再赋值,太麻烦了。我们能不能在new对象时,直接为当前对象的某个或所有成员变量直接赋值呢。可以,Java给我们提…

详解c++---map和set的封装

目录标题 前言红黑树的基本代码map和set的封装红黑树迭代器红黑树迭代器- -begin和end函数代码测试const迭代器方括号的实现 前言 通过之前的学习我们知道set容器中存储的数据是k,map容器中存储的数据是k和v,但是这两个容器底层都是通过红黑树来进行实现…

blfs:为lfs虚拟机增加桌面01

vmware中克隆一份lfs,扩大硬盘分区再操作。 1、硬盘拓展容量,分区挂载到/home(已有的大小在后面编译桌面系统会捉襟见肘) 使用fdisk进行分区 fdisk /dev/sda 执行p w分区并保存 mkfs -v -t ext4 /dev/sda4 转ext4格式 让/…

uniapp中使用mixins(混入)

mixins 选项接收一个混入对象的数组。这些混入对象可以像正常的实例对象一样包含实例选项,这些选项将会被合并到最终的选项中,使用的是和 Vue.extend() 一样的选项合并逻辑。也就是说,如果你的混入包含一个 created 钩子,而创建组…