RabbitMQ如何做到不丢不重

news2025/1/12 6:07:36

目录

MQTT协议

如何保证消息100%不丢失

生产端可靠性投递

​编辑

RabbitMQ的Broker端投

(1)消息持久化

(2)设置集群镜像模式

(3)消息补偿机制

消费端

ACK机制改为手动

总结


MQTT协议

先来说下MQTT协议中的3种语义,这个非常重要。

在MQTT协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

At most once:至多一次。消息在传递时,最多会被送达一次。也就是说,没什么消息可靠性保证,允许丢消息。
At least once:至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级 这个服务质量标准不仅适用于MQTT,对所有的消息队列都是适用的。现在常用的绝大部分消息队列提供的服务质量都是 At least once,包括RocketMQ、RabbitMQ和Kafka都是这样。也就是说,消息队列很难保证消息不重复。

At least once+幂等消费=Exactly once

如何保证消息100%不丢失

消息从生产端到消费端消费要经过3个步骤:

  1. 生产端发送消息到RabbitMQ;
  2. RabbitMQ发送消息到消费端;
  3. 消费端消费这条消息;  

所以要保证消息不丢,就得从三个方面入手,分别是生产端、RabbitMQ的Broker端、消费端。三个都保证不丢失,才能保证100%不丢。

生产端可靠性投递

事务机制:

// 设置channel开启事务
rabbitTemplate.setChannelTransacted(true);


@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory)
 {
  return new RabbitTransactionManager(connectionFactory);
 }
    
@Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
public void publishMessage(String message) throws Exception {
   rabbitTemplate.setMandatory(true);
   rabbitTemplate.convertAndSend("java",message);
    }

confirm消息确认机制:

# 开启发送确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送失败回退
spring.rabbitmq.publisher-returns=true	     
@Configuration
@Slf4j
public class RabbitMQConfig {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void enableConfirmCallback() {
        //confirm 监听,当消息成功发到交换机 ack = true,没有发送到交换机 ack = false
        //correlationData 可在发送时指定消息唯一 id
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if(!ack){
                //记录日志、落库定时任务扫描重发,同时对开发人员进行通知
            }
        });
        
        //当消息成功发送到交换机没有路由到队列触发此监听
        rabbitTemplate.setReturnsCallback(returned -> {
            //记录日志、落库、同时对开发人员进行通知
        });
    }
}

一般不推荐事务的模式,因为是同步的会影响性能,所以都会采用异步回调的confirm模式。

RabbitMQ的Broker端投

说三点:

(1)要保证rabbitMQ不丢失消息,那么就需要开启rabbitMQ的持久化机制,即把消息持久化到硬盘上,这样即使rabbitMQ挂掉在重启后仍然可以从硬盘读取消息;

(2)如果rabbitMQ单点故障怎么办,这种情况倒不会造成消息丢失,这里就要提到rabbitMQ的3种安装模式,单机模式、普通集群模式、镜像集群模式,这里要保证rabbitMQ的高可用就要配合HAPROXY做镜像集群模式

(3)如果硬盘坏掉怎么保证消息不丢失

(1)消息持久化

RabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失。

所以就要对消息进行持久化处理。如何持久化,下面具体说明下:

要想做到消息持久化,必须满足以下三个条件,缺一不可。

1) Exchange 设置持久化

2)Queue 设置持久化

3)Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息

(2)设置集群镜像模式

我们先来介绍下RabbitMQ三种部署模式:

1)单节点模式:最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。

2)普通模式:消息只会存在与当前节点中,并不会同步到其他节点,当前节点宕机,有影响的业务会瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。

3)镜像模式:消息会同步到其他节点上,可以设置同步的节点个数,但吞吐量会下降。属于RabbitMQ的HA方案

为什么设置镜像模式集群,因为队列的内容仅仅存在某一个节点上面,不会存在所有节点上面,所有节点仅仅存放消息结构和元数据。

如果想解决上面途中问题,保证消息不丢失,需要采用HA 镜像模式队列。

下面介绍下三种HA策略模式

1)同步至所有的

2)同步最多N个机器

3)只同步至符合指定名称的nodes

命令处理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

1)为每个以“rock.wechat”开头的队列设置所有节点的镜像,并且设置为自动同步模式 rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}' rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

2)为每个以“rock.wechat.”开头的队列设置两个节点的镜像,并且设置为自动同步模式 rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

3)为每个以“node.”开头的队列分配指定的节点做镜像 rabbitmqctl set_policy ha-nodes "^nodes." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

但是:HA 镜像队列有一个很大的缺点就是:系统的吞吐量会有所下降。

(3)消息补偿机制

为什么还要消息补偿机制呢?难道消息还会丢失,没错,系统是在一个复杂的环境,不要想的太简单了,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题,

但是作为有追求的程序员来讲,要绝对保证我的系统的稳定性,有一种危机意识。

比如:持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,消息丢了,怎么办?

1)生产端首先将业务数据以及消息数据入库,需要在同一个事务中,消息数据入库失败,则整体回滚

2)根据消息表中消息状态,失败则进行消息补偿措施,重新发送消息处理。

消费端
ACK机制改为手动

RabbitMQ的自动ack机制默认在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完。
我们需要进行手动消费

#开启手动ACK,消费消息的时候,就必须发送ack确认,不然消息永远还在队列中
spring.rabbitmq.listener.simple.acknowledge-mode=manual

 basicNack 方法的第三个参数代表是否重回队列,通常代码的报错并不会因为重试就能解决,所以可能这种情况:继续被消费,继续报错,重回队列,继续被消费…死循环。
一定要有重发消息次数的限制,或者干脆不入队,发送到Redis进行下记录也行。一般就不会再次入队了,而是记录并通知开发人员,进行手动处理



 @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
    public void process(String msg, Message message, Channel channel) {
        long tag = message.getMessageProperties().getDeliveryTag();
        Action action = Action.SUCCESS;
        try {
            System.out.println("消费者RabbitDemoConsumer从RabbitMQ服务端消费消息:" + msg);
            if ("bad".equals(msg)) {
                throw new IllegalArgumentException("测试:抛出可重回队列的异常");
            }
            if ("error".equals(msg)) {
                throw new Exception("测试:抛出无需重回队列的异常");
            }
        } catch (IllegalArgumentException e1) {
            e1.printStackTrace();
            //根据异常的类型判断,设置action是可重试的,还是无需重试的
            action = Action.RETRY;
        } catch (Exception e2) {
            //打印异常
            e2.printStackTrace();
            //根据异常的类型判断,设置action是可重试的,还是无需重试的
            action = Action.REJECT;
        } finally {
            try {
                if (action == Action.SUCCESS) {
                    //multiple 表示是否批量处理。true表示批量ack处理小于tag的所有消息。false则处理当前消息
                    channel.basicAck(tag, false);
                } else if (action == Action.RETRY) {
                    //Nack,拒绝策略,消息重回队列
                    channel.basicNack(tag, false, true);
                } else {
                    //Nack,拒绝策略,并且从队列中删除
                    channel.basicNack(tag, false, false);
                }
                channel.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
总结

如果需要保证消息在整条链路中不丢失,那就需要生产端、mq自身与消费端共同去保障。

生产端:对生产的消息进行状态标记,开启confirm机制,依据mq的响应来更新消息状态,使用定时任务重新投递超时的消息,多次投递失败进行报警。

mq自身:开启持久化,并在落盘后再进行ack。如果是镜像部署模式,需要在同步到多个副本之后再进行ack。

消费端:开启手动ack模式,在业务处理完成后再进行ack,并且需要保证幂等。

通过以上的处理,理论上不存在消息丢失的情况,但是系统的吞吐量以及性能有所下降。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1332423.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springCould中的consul-从小白开始【4】

目录 1.consul介绍 ❤️❤️❤️ 2.安装 ❤️❤️❤️ 3.创建8006模块 ❤️❤️❤️ 4.创建80模块❤️❤️❤️ 1.consul介绍 ❤️❤️❤️ Consul 是一种用于服务发现、配置和分布式一致性的开源软件。它由HashiCorp开发和维护,可用于帮助构建和管理现代化的分布…

新建项目EasyUiAutotest,安装Appium-Python-Client

一、前置说明 Appium-Python-Client 是 Appium 的 Python 客户端库,它提供了一系列的类和方法,用于与 Appium 服务器进行通信,并执行各种移动应用测试操作,包括启动应用、模拟用户输入、点击等操作。 二、操作步骤 1. 启动Pych…

MySQL中MVCC的流程

参考文章一 参考文章二 当谈到数据库的并发控制时,多版本并发控制(MVCC)是一个重要的概念。MVCC 是一种用于实现数据库事务隔离性的技术,常见于像 PostgreSQL 和 Oracle 这样的数据库系统中。 MVCC 的核心思想是为每个数据行维护…

嵌入式开发网络配置——windows连热点,开发板和电脑网线直连

目录 电脑 WiFi 上网,开发板和电脑直连 使用场景 设置VMware虚拟机的网络配置 Ubuntu设置——版本18.04 ​编辑 windows设置 开发板设置 原因:虚拟机Linux移植可执行程序到开发板失败 最后发现虚拟机的Linuxping不通开发板 下面是我的解决方法 …

网络7层架构

网络 7 层架构 什么是OSI七层模型? OSI模型用于定义并理解数据从一台计算机转移到另一台计算机,在最基本的形式中,两台计算机通过网线和连接器相互连接,在网卡的帮助下共享数据,形成一个网络,但是一台计算…

如何使用固定二级子域名公网访问多个本地Windows Web网站

文章目录 1. 下载windows版Nginx2. 配置Nginx3. 测试局域网访问4. cpolar内网穿透5. 测试公网访问6. 配置固定二级子域名7. 测试访问公网固定二级子域名 1. 下载windows版Nginx 进入官方网站(http://nginx.org/en/download.html)下载windows版的nginx 下载好后解压进入nginx目…

共建共享,创新同行!飞桨星河社区助力大模型时代开发者砥砺前行

大模型引领AI新浪潮,助力人工智能实现从感知理解到生成创造的飞跃。飞桨星河社区,覆盖深度学习初学者、在职开发者、企业开发者、高校教师、创业者等,是国内最大的AI开发者社区,以飞桨和文心大模型为核心,集开放数据、…

NPOI 导出Excel

NPOI是一个用于处理Office文档的开源项目。它是用C#编写的,允许开发人员在.NET平台上读取、写入和操作Word、Excel和PowerPoint文件。NPOI提供了一组API,使开发人员能够以编程方式创建、修改和处理Office文档,这对于自动化生成报表、导出数据…

go写的海盗王数据库重置工具

很久没有用go去写代码了,很多语法都快忘记了。 为了测试一下界面库govcl的用法,拉了一个界面窗口之后,想想还是把代码也补上去吧。 于是,就写了这个海盗王数据库重置工具。 这个工具适合开服的人使用,可以将海盗王的账…

1.关于浏览器

一、认识主流浏览器 Chrome谷歌浏览器Safari苹果浏览器Firefox火狐浏览器Opera欧朋浏览器 二、浏览器内核是什么? 三、五大浏览器,四大内核 四、前端做网页开发用什么浏览器? Chrome谷歌浏览器。

VM固定虚拟机IP

命令 vim /etc/sysconfig/network-scripts/ifcfg-enosystemctl restart network子网掩码、网关、DNS获取如下:

什么是MVC?MVC框架的优势和特点

目录 一、什么是MVC 二、MVC模式的组成部分和工作原理 1、模型(Model) 2、视图(View) 3、控制器(Controller) 三、MVC模式的工作过程如下: 用户发送请求,请求由控制器处理。 …

windows下切换JDK8、JDK11、JDK17

问题背景: 需要同时使用不同版本的JDK进行开发和运行,可通过下述操作改变JDK版本。 一、java分类以及JDK、JRE、JVM的联系 1、JAVA分类 JAVA EE——Java Enterprise Edition, JAVA企业版,主要用于WEB开发。 JAVA SE——Java Stand…

LeetCode刷题--- 组合总和

个人主页:元清加油_【C】,【C语言】,【数据结构与算法】-CSDN博客 个人专栏 力扣递归算法题 http://t.csdnimg.cn/yUl2I 【C】 http://t.csdnimg.cn/6AbpV 数据结构与算法 http://t.csdnimg.cn/hKh2l 前言:这个专栏主要讲述递归递归、搜…

计算机是如何工作的(下)

4. 编程语言(Program Language) 本块内容主要是还原下我们已经熟悉的编程语言,即编程语言是如何和 CPU 指令对应起来的。 4.1 程序(Program) 所谓程序,就是一组指令以及这组指令要处理的数据。狭义上来说&…

python dash学习2

代码 内有说明: from dash import Dash, html, dcc, callback, Output, Input import plotly.express as px import pandas as pd# 从 Plotly 数据集中读取数据 df pd.read_csv(https://raw.githubusercontent.com/plotly/datasets/master/gapminder_unfiltered.c…

Html / CSS刷题笔记

WebKit是一个开源的浏览器引擎,它最初是由苹果公司开发的,并且被广泛用于Safari浏览器和其他基于WebKit的浏览器,比如Google Chrome的早期版本。它也是构建许多移动设备浏览器的基础。WebKit的主要功能是解析HTML和CSS,并将其渲染…

vue3(五)-基础入门之计算属性

一、计算属性 1.计算属性与普通方法的的区别: 计算属性在需要渲染数据时调用一次,而后将结果缓存起来。只有计算属性所依赖的数据发生改变时才会重新调用函数,否则每次渲染相同的数据都只会从缓存中读取。 普通方法在每次数据需要渲染时都会…

prometheus二进制安装

1、在需要安装prometheus的目录下执行wget命令下载软件到本地,如我的路径是/opt/module/prometheus wget https://github.com/prometheus/prometheus/releases/download/v2.34.0/prometheus-2.34.0.linux-amd64.tar.gz正在解析主机 objects.githubusercontent.com …

Head First Design Patterns - 策略模式

策略模式 策略模式:策略模式是一种行为型模式,它将对象和行为分开,将行为定义为 一个行为接口 和 具体行为的实现。策略模式最大的特点是行为的变化,行为之间可以相互替换。每个if判断都可以理解为就是一个策略。本模式使得算法可…