RabbitMQ消息可靠性(一)-- 生产者消息确认

news2025/1/11 12:38:49

前言

在项目中,引入了RabbitMQ这一中间件,必然也需要在业务中增加对数据安全性的一层考虑,来保证RabbitMQ消息的可靠性,否则一个个消息丢失可能导致整个业务的数据出现不一致等问题,对系统带来巨大的影响,消息的可靠性可以主要在三个方面去考虑:生产者消息确认,消费者消息确认,消息持久化,这篇文件说明生产者消息确认的。


一、消息确认流程图

由图可知,消息确认是分为生产者确认和消费者确认的,生产者和MQ之间的消息确认机制为生产者消息确认,MQ和消费者之间的消息确认机制为消费者消息确认

消息丢失的情景有三种情况:

  1. 发送消息过程中出现网络问题:生产者以为发送成功,但MQ没有收到;(需要生产者消息确认)
  2. 接收到消息后由于MQ服务器宕机或重启等原因(消息默认存在内存中)导致消息丢失;(需要消息持久化)
  3. 消费者接收到消息后处理消息出错,没有完成消息的处理,但是自动返回ack(这时候需要开启手动确认模式,消费者消息确认)

二、生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息投递到MQ过程中丢失。这种机制下每个message都必须要有一个独一无二的ID,来区分开不同的消息,避免ack(消息确认参数)冲突。每当消息发送到MQ成功后,MQ都会返回一个结果给生产者,以保证生产者消息确认。在生产者消息确认时,又有两种返回结果方式(通常两个都要实现)来确保消息投递可靠性,分别为publisher-confirm和publisher-return,以下作出说明。

1、publisher-confirm(发送者确认)

消息成功投递到交换机,返回ack

消息未投递到交换机,返回nack

2、publisher-return(发送者回执)

消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。


三、代码实现

1、配置文件

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    #确认消息已发送到交换机(Exchange)
    publisher-confirm-type: correlated
    #确认消息已发送到队列(Queue)
    publisher-returns: true

publish-confirm-type有三个值,

  1. none:禁用发布确认模式,是默认值
  2. simple:同步等待confirm结果,直到超时
  3. correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback

publisher-returns:开启消息失败回调,回调函数ReturnCallback

2、配置ConfirmCallback函数和ReturnCallback函数

/**
 * 生产者消息回调配置类
 */
@Configuration
@Slf4j
public class ProviderCallBackConfig {

    @Resource
    private CachingConnectionFactory cachingConnectionFactory;

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
        // 当mandatory设置为true时,若exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,
         //那么broker会调用basic.return方法将消息返还给生产者。
        // 当mandatory设置为false时,出现上述情况broker会直接将消息丢弃。
        rabbitTemplate.setMandatory(true);

        /**
         * TODO RabbitMQ生产者发送消息确认回调,解决消息可靠性问题
         * 消息确认回调,确认消息是否到达broker
         * data:消息唯一标识
         * ack:确认结果
         * cause:失败原因
         */
        rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
            if (ack) {
                //消息发送成功后,更新数据库消息状态等逻辑
                log.info("消息发送至exchange成功------>消息唯一标识: {}, 确认状态: {}, 造成原因: {}",data, ack, cause);
            } else {
                //信息发送失败,打印日志后,可以根据业务选择是否重发消息
                log.info("消息发送至exchange失败------>消息唯一标识: {}, 确认状态: {}, 造成原因: {}", data, ack, cause);
            }
        });

        /**
         * TODO RabbitMQ生产者发送消息失败回调,解决消息可靠性问题
         * message      消息
         * replyCode    回应码
         * replyText    回应信息
         * exchange     交换机
         * routingKey   路由键
         */
        rabbitTemplate.setReturnsCallback((res) -> {
            //若发送失败,打印错误信息,然后可以根据业务选择重发消息
            log.error("消息发送至queue失败-------->res: {}", JSON.toJSONString(res));
        });
        return rabbitTemplate;
    }

}

到这里,生产者推送消息的消息确认调用回调函数已经完毕。
可以看到上面写了两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback;
那么以上这两种回调函数都是在什么情况会触发呢?

先从总体的情况分析,推送消息存在3种情况:

①消息推送到server,但是在server里找不到交换机
②消息推送到server,找到交换机了,但是没找到队列
③消息推送成功

①消息推送到server,但是在server里找不到交换机
写个测试接口,把消息推送到名为‘non-existent-exchange’的交换机上(这个交换机是没有创建没有配置的):

@GetMapping("/testProviderMessageBack")
    @ApiOperation(value = "测试生产者消息回调")
    @ApiOperationSupport(order = 5)
    public String testProviderMessageBack() {
        CorrelationData data = new CorrelationData();
        data.setId("111");
        rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", "测试生产者消息回调",data);
        return "ok";
    }

调用接口,查看项目的控制台输出情况(原因里面有说,没有找到交换机'non-existent-exchange'):

结论: ①这种情况触发的是 ConfirmCallback 回调函数

消息发送至exchange失败------>

消息唯一标识: CorrelationData [id=111],

确认状态: false,

造成原因: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)

②消息推送到server,找到交换机了,但是没找到队列  
这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在DirectRabitConfig里面新增一个直连交换机,名叫‘lonelyDirectExchange’,但没给它做任何绑定配置操作:

    @Bean
    DirectExchange lonelyDirectExchange() {
        return new DirectExchange("lonelyDirectExchange");
    }

然后写个测试接口,把消息推送到名为‘lonelyDirectExchange’的交换机上(这个交换机是没有任何队列配置的):

@GetMapping("/testProviderMessageBack2")
    @ApiOperation(value = "测试生产者消息回调2")
    @ApiOperationSupport(order = 6)
    public String testProviderMessageBack2() {
        CorrelationData data = new CorrelationData();
        data.setId("222");
        rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestLonelyDirectRouting", "测试生产者消息回调2",data);
        return "ok";
    }

消息发送至exchange成功------>

消息唯一标识: CorrelationData [id=222],

确认状态: true,

造成原因: null

消息发送至queue失败-------->

res: {"exchange":"lonelyDirectExchange","message":{"body":"5rWL6K+V55Sf5Lqn6ICF5raI5oGv5Zue6LCDMg==","messageProperties":{"contentEncoding":"UTF-8","contentLength":0,"contentType":"text/plain","deliveryTag":0,"finalRetryForMessageWithNoId":false,"headers":{"spring_returned_message_correlation":"222"},"lastInBatch":false,"priority":0,"projectionUsed":false,"publishSequenceNumber":0,"receivedDeliveryMode":"PERSISTENT"}},"replyCode":312,"replyText":"NO_ROUTE","routingKey":"TestLonelyDirectRouting"}

这种情况下,两个函数都被调用了,

消息是推送成功到交换机了的,所以ConfirmCallback对消息确认情况是true;
而在RetrunCallback回调函数的打印参数里面可以看到,在路由分发给队列的时候,找不到队列,所以报了错误 NO_ROUTE 。
结论:②这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。

③消息推送成功
那么测试下,按照正常调用之前消息推送的接口就行,就调用下 /sendDirectMessage接口,可以看到控制台输出:

结论:这种情况触发的是 ConfirmCallback 回调函数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1014500.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

tomcat架构概览

https://blog.csdn.net/ldw201510803006/article/details/119880100 前言 Tomcat 要实现 2 个核心功能&#xff1a; 处理 Socket 连接&#xff0c;负责网络字节流与 Request 和 Response 对象的转化。加载和管理 Servlet&#xff0c;以及具体处理 Request 请求。 因此 Tomc…

weblogic __ 10.3.6 __ 反序列化漏洞 _ CVE-2017-10271

weblogic __ 10.3.6 __ 反序列化漏洞 _ CVE-2017-10271 说明内容漏洞编号CVE-2017-10271漏洞名称反序列化漏洞影响范围10.3.6.0.0&#xff0c;12.1.3.0.0&#xff0c;12.2.1.1.0&#xff0c;12.2.1.2.0漏洞描述Weblogic的WLS Security组件对外提供webservice服务&#xff0c;其…

MySQL里的查看操作

查看数据库或者表 列出所有数据库&#xff1a; show databases;查看正在使用的数据库&#xff08;必须大写&#xff09;&#xff1a; SELECT DATABASE();列出数据库中的表&#xff1a; use 数据库;//选中数据库 show 表;//列出当前数据库下所有表获取数据库表结构&#xff…

OpenCV(四十七):RANSAC优化特征点匹配

1.RANSAC算法介绍 RANSAC是一种常用的参数估计方法&#xff0c;全称为Random Sample Consensus&#xff08;随机抽样一致性&#xff09;。它的主要思想是通过随机选择一部分数据样本&#xff0c;构建模型并评估其拟合程度&#xff0c;迭代过程中逐步优化模型&#xff0c;最终得…

Linux知识点 -- 网络基础(二)

Linux知识点 – 网络基础&#xff08;二&#xff09;&#xff08;1&#xff09; 文章目录 Linux知识点 -- 网络基础&#xff08;二&#xff09;&#xff08;1&#xff09;一、使用协议来实现一个网络版的计算器1.自定义协议2.守护进程3.使用json来完成序列化 二、HTTP协议1.概念…

Redis 事务实现原理

1. 什么是Redis事务 提到事务,我们可能马上会想到传统的关系型数据库中的事务,客户端首先向服务器发送BEGIN开启事务,然后执行读写操作,最后用户发送 COMMIT 或者 ROLLBACK 来提交或者回滚之前的操作。 但是Redis中的事务与关系型数据库是不一样的,Redis 通过 MULTI 命令开始…

GB28181学习(四)——网络设备信息查询

要求 源设备向目标设备发送信息查询命令&#xff0c;目标设备将结果通过查询应答命令返回给源设备&#xff1b;设备信息查询命令包括&#xff1a; 设备目录设备信息设备状态信息设备配置预置位、看守位巡航轨迹列表巡航轨迹PTZ精准状态存储卡状态等 信息查询的范围&#xff1a…

内网穿透对开发人员有什么作用?要怎么实现?快解析

在当今快节奏的互联网时代&#xff0c;软件开发人员需要时刻与内外部服务器进行通信和调试&#xff0c;只有这样才能带来良好的工作速度&#xff0c;顺利推动项目的进展。然而&#xff0c;由于受到网络环境的限制&#xff0c;有时候我们可能无法直接访问公司内网的服务器&#…

差分方程模型:蛛网模型

在完全竞争的市场经济中&#xff0c;一个时期某种消费品如猪肉的上市量远远大于需求量&#xff0c;由于销售不畅导致价格下降&#xff0c;生产者发现养猪赔钱&#xff0c;于是转而经营其它农副产品。过一段时间猪肉上市量就会下降&#xff0c;此时供不应求导致价格上涨&#xf…

【MySQL】MySQL索引的定义、分类、Explain、索引失效和优化

索引的介绍 索引是帮助MySQL高效获取数据的数据结构 MySQL在存储数据之外&#xff0c;数据库系统中还维护着满足特定查找算法的数据结构&#xff0c;这些数据结构以某种引用(指向)表中的数据&#xff0c;这样我们就可以通过数据结构上实现的高级查找算法来快速找到我们想要的数…

设计原则SOLID看这一篇就够了

文章目录 1.引言1.1. 背景1.2. 简要介绍 SOLID 原则1.1. 面向对象编程和设计的重要性 2. 单一职责原则&#xff08;SRP&#xff09;2.1. 定义和原理2.2. SRP 的好处与目标2.3. 例子和代码展示2.4. 如何识别和解决 SRP 原则的违反2.5. 注意事项和局限性 3. 开闭原则&#xff08;…

Centos7.9 一键脚本部署 LibreNMS 网络监控系统

前言&#xff1a; LibreNMS 是个以 PHP/MySQL 为基底的自动探索网络监控系统 LibreNMS 官网 版本23.8.2-52-g7bbe0a2 - Thu Sep 14 2023 22:33:23 GMT0700数据库纲要2023_09_01_084057_application_new_defaults (259)Web 服务器nginx/1.20.1PHP8.1.23Python3.6.8DatabaseMa…

Killer!永久禁用WindowsDefender

工具介绍 WinDefenderKiller&#xff0c;使用C写的通过注册表项永久禁用Windows Defende的一个工具。 关注【Hack分享吧】公众号&#xff0c;回复关键字【230726】获取下载链接 编译使用 执行以下命令编译&#xff1a; # x86_64-w64-mingw32-g -O2 disableWinDef.cpp -o win…

JS 原型和原型链

原型和原型链 1. 了解原型和原型链1.1 原型1.2 原型链 2. 原型2.1 prototype2.2 __proto__ 隐式原型 3. 原型链 1. 了解原型和原型链 1.1 原型 原型&#xff1a; prototype 又称显示原型 1、原型是一个普通对象 2、只有构造函数才具备该属性 3、公有属性可操作 1.2 原型链 原…

数据治理-元数据管理-元数据类型

定义 元数据&#xff0c;定义和描述其它数据的数据。 类型 业务元数据、技术元数据和操作元数据。在图书馆或信息科学中&#xff0c;可分为描述元数据、结构元数据、管理元数据。 业务元数据 主要关注数据的内容和条件&#xff0c;另包括与数据治理相关的详细信息。业务元数据…

logback异步appender日志源码详解

背景&#xff1a; 日常打印日志时&#xff0c;使用logback的异步写日志几乎是标准的配置方式&#xff0c;本文从源码上看看异步写日志的整个流程 异步Appender日志 一般日志的配置如下所示 appender(“ASYNC-LOG”, AsyncAppender) { neverBlock true queueSize 10000 } 这…

前端需要知道的计算机网络知识----网络安全,自学网络安全,学习路线图必不可少,【282G】初级网络安全学习资源分享!

网络安全&#xff08;英语&#xff1a;network security&#xff09;包含网络设备安全、网络信息安全、网络软件安全。 黑客通过基于网络的入侵来达到窃取敏感信息的目的&#xff0c;也有人以基于网络的攻击见长&#xff0c;被人收买通过网络来攻击商业竞争对手企业&#xff0c…

CH07_封装

封装记录&#xff08;Encapsulate Record | 162&#xff09; 曾用名&#xff1a;以数据类代替记录&#xff08;Replace Record with Data Class&#xff09; organization {name: "Acme Gooseberries", country: "GB"};class Organization {constructor(…

HTML整站规划与规范

文章目录 命名规则命名命名书写 包含样式规范样式重置样式引入页面结构页面宽度页面高度与背景页面设计 网址图标 命名规则 命名 根据每块元素的主题、功能、页面上的位置命名&#xff0c;便于后期更改与维护。 另外&#xff1a;如果所有样式放在同一文件下&#xff0c;可以给…

计算如何剥出艺术品

背景&#xff1a; 今天给大家介绍一篇中国科学技术大学发表的论文《Computational Peeling Art Design》。论文要解决是&#xff1a;如何把球状三维物体的表面连续展开成一些艺术画面&#xff0c;要求是展开表面要占三维物体表面整个面积&#xff0c;展开表面要和艺术体形状尽可…