如何保障消息一定能发送到RabbitMQ?

news2025/1/13 20:00:12

我们知道,RabbitMQ的消息最终是存储在Queue上的,而在Queue之前还要经过Exchange,那么这个过程中就有两个地方可能导致消息丢失。第一个是Producer到Exchange的过程,第二个是Exchange到Queue的过程。
在这里插入图片描述
为了解决这个问题,有两种方案,一种是通过confirm机制,另外一种是事务机制,因为事务机制并不推荐,这里先介绍Confirm机制。

Publisher Confirm是一种机制,用于确保消息已经被Exchange成功接收和处理。一旦消息成功到达Exchange并被处理,RabbitMQ会向消息生产者发送确认信号(ACK)。如果由于某种原因(例如,Exchange不存在或路由键不匹配)消息无法被处理,RabbitMQ会向消息生产者发送否定信号(NACK)。

//启用Publisher Confirms
            channel.confirmSelect();

            //设置Publisher Confirms回调
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("Message confirmed with deliveryTag:"+deliveryTag);
                    //在这里处理消息确认
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("Message not confirmed with deliveryTag:"+deliveryTag);
                    //在这里处理消息未确认
                }
            });

Publisher Returns机制与Publisher Confirms类似,但用于处理在消息无法路由到任何队列时的情况。当RabbitMQ在无法路由消息时将消息返回给消息生产者,但是如果能正常路由,则不会返回消息。

//启用Publisher Returns
            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(int replyCode, String replyTest, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("Message returned with replayCode: "+replyCode);
                    //在这里处理消息发送到Queue失败的返回
                }
            });

通过以上方式,我们注册了两个回调监听,用于在消息发送到Exchange或者Queue失败时进行异常处理。通常我们可以在失败时精心报警或者重试来保障一定能发送成功。

完整代码:

package com.example.demo.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

public class PublisherCallbacksExample {
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("localhost");
        try(Connection connection=factory.newConnection();
            Channel channel=connection.createChannel()){

            //启用Publisher Confirms
            channel.confirmSelect();

            //设置Publisher Confirms回调
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("Message confirmed with deliveryTag:"+deliveryTag);
                    //在这里处理消息确认
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("Message not confirmed with deliveryTag:"+deliveryTag);
                    //在这里处理消息未确认
                }
            });

            //启用Publisher Returns
            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(int replyCode, String replyTest, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("Message returned with replayCode: "+replyCode);
                    //在这里处理消息发送到Queue失败的返回
                }
            });

            String exchangeName = "my_exchange";
            String routingKey = "my_routing_key";
            String message = "Hello,RabbitMQ!";

            //发布消息到Exchange
            channel.basicPublish(exchangeName,routingKey,true,null,message.getBytes());

            //等待Publisher Confirms
            if (!channel.waitForConfirms()) {
                System.out.println("Message was not confirmed!");
            }

            //关闭通道和连接
            channel.close();

        }

    }
}

另外,这里如果发送到Queue之后,是否一定能持久化下来,是否一定不丢,这就是另外一个话题了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1539755.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Transformer】transformer注解

every blog every motto: You can do more than you think. 0. 前言 transformer注解 在过去的一年里,《Attention is all you need》中的transformer一直萦绕在很多人的脑海里。除了在翻译质量上产生重大改进之外,它还为许多其他NLP任务提供了一种新的…

2024年【危险化学品经营单位安全管理人员】新版试题及危险化学品经营单位安全管理人员模拟考试题

题库来源:安全生产模拟考试一点通公众号小程序 危险化学品经营单位安全管理人员新版试题考前必练!安全生产模拟考试一点通每个月更新危险化学品经营单位安全管理人员模拟考试题题目及答案!多做几遍,其实通过危险化学品经营单位安…

matlab 电机仿真平台GUI

1、内容简介 略 74-可以交流、咨询、答疑 2、内容说明 略 电机仿真平台GUI 包含直流机要加调电压启动、回馈制动、串电阻调速 异步电动机要加串电阻启动、星三角启动、回馈制动模块 3、仿真分析 略 4、参考论文 略

【网安】DDoS / Web漏洞 / CC攻击 / 恶意爬虫

【网安】DDoS攻击:方法、影响与防御策略 写在最前面1.DDoS(分布式拒绝服务)攻击2. Web 漏洞利用3. CC(凭证破解)攻击4.恶意爬虫 🌈你好呀!我是 是Yu欸 🌌 2024每日百字篆刻时光&…

Vue介绍使用

文章目录 Vue概念一、Vue快速入门二、Vue常用指令三、Vue生命周期四、案例1、查询所有2、新增品牌 Vue概念 一、Vue快速入门 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title> <…

chap验证实验

一、添加接口 在每个路由器里添加2SA接口 二、配IP 进入serial接口配置IP R1&#xff1a; R2&#xff1a; ppp mp Mp-group 0/0/0 R3: 查看&#xff1a; 三、aaa认证&#xff0c;chap验证 创建一个新用户&#xff1a; R2进入3/0/0接口&#xff1a; R1进入3/0/0接口&a…

蓝桥杯2021年第十三届省赛真题-解密

一、题目 解密 【问题描述】 小明设计了一种文章加密的方法&#xff1a;对于每个字母 c&#xff0c;将它变成某个另外的字符 Tc。下表给出了字符变换的规则&#xff1a; 例如&#xff0c;将字符串 YeRi 加密可得字符串 EaFn。 小明有一个随机的字符串&#xff0c;加密后为EaFnj…

ERNIE SDK 本地使用与markdown自动生成

ERNIE SDK 仓库包含两个项目&#xff1a;ERNIE Bot Agent 和 ERNIE Bot。ERNIE Bot Agent 是百度飞桨推出的基于文心大模型编排能力的大模型智能体开发框架&#xff0c;结合了飞桨星河社区的丰富预置平台功能。ERNIE Bot 则为开发者提供便捷接口&#xff0c;轻松调用文心大模型…

从零开始学习在VUE3中使用canvas(六):lineCap(线条端点样式)

一、简介 lineCap能够让我们设置线条的端点样式&#xff0c;例如 1. butt const ctx canvas.getContext("2d");ctx.lineCap "butt"; // 默认样式&#xff0c;也可以显式指定 2.round const ctx canvas.getContext("2d");//圆头ctx.lineCap …

内存条@电脑支持的最大内存@升级内存硬件

文章目录 电脑支持的最大内存规格cpu官网查看支持的规格命令行查看脚本化 DDR内存LPDDR内存内存升级扩展&#x1f47a;插槽检查板载内存SPD内存厂商其他 内存参数&#x1f47a;性能指标使用软件查看更多内存相关的软件工具 电脑支持的最大内存规格 确认电脑最大支持内存大小和频…

MT2191 整数大小比较(高精度)

给出两个正整数&#xff0c;判断他们的大小。 输入格式&#xff1a; 两个正整数。 输出格式&#xff1a; 若前者大&#xff0c;输出>&#xff1b; 若后者大&#xff0c;输出<&#xff1b; 若一样大&#xff0c;输出。 输入&#xff1a; 1412894619244619891 23762842…

js 质数的因子

功能:输入一个正整数&#xff0c;按照从小到大的顺序输出它的所有质因子&#xff08;重复的也要列举&#xff09;&#xff08;如180的质因子为2 2 3 3 5 &#xff09; 按照从小到大的顺序输出它的所有质数的因子&#xff0c;以空格隔开 输入 180 输出 2 2 3 3 5 let line 180;…

宁波零碳工厂,“零碳工厂”指引未来

&#x1f600;随着环保意识的提高⬆️和全球气候变化问题&#x1f30f;的日益严重&#xff0c;⬇️减少碳排放、实现可持续发展已成为&#x1f30e;全球的共识。而在这个&#x1f504;过程中&#xff0c;宁波的“零碳工厂”建设&#x1f3e9;正成为引领未来的重要趋势。 “零碳…

jeect-boot queryFieldBySql接口RCE漏洞(CVE-2023-4450)复现

jeect-boot积木报表由于未授权的 API /jmreport/queryFieldBySql 使用了 freemarker 解析 SQL 语句从而导致了 RCE 漏洞的产生。 1.漏洞级别 高危 2.漏洞搜索 fofa app"Jeecg-Boot 企业级快速开发平台"3.影响范围 JimuReport < 1.6.14.漏洞复现 这个漏洞的…

基于springboot+vue的农产品直卖平台

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

【Android开发】【创建Activity,Activity之间的切换/消息传递】【java】

一、第一个Activity 1.1 创建一个空Activity 1.2 创建一个布局 知识点 在XML中引用一个id&#xff1a;id/id_name 在XML中定义一个id&#xff1a;id/id_name 右键错误&#xff0c;点击Show Quick-Fixes&#xff0c;再点击弹出的Suppress:Add........&#xff0c;错误会被自动修…

烟火AI识别检测算法在新能源汽车充电桩站点的应用方案

新能源汽车作为现代科技与环保理念的完美结合&#xff0c;其普及和应用本应带给人们更加便捷和绿色的出行体验。然而&#xff0c;近年来新能源汽车充电火灾事故的频发&#xff0c;无疑给这一领域投下了巨大的阴影。这不禁让人深思&#xff0c;为何这一先进的交通工具在充电过程…

思科网络中DHCP中继的配置

一、什么是DHCP中继&#xff1f;DHCP中继有什么用? &#xff08;1&#xff09;DHCP中继是指一种网络设备或服务&#xff0c;用于在不同的子网之间传递DHCP&#xff08;动态主机配置协议&#xff09;消息。DHCP中继的作用是帮助客户端设备获取IP地址和其他网络配置信息&#x…

运算放大器-放大倍数的表示方法:增益(Gain) 和 分贝(dB)

运算放大器的增益&#xff08;Gain&#xff09;可以用分贝&#xff08;dB&#xff09;表示&#xff0c;也可以用放大倍数表示。这两种表示方法之间的转换关系是基于对数的定义。 目录 1. 电压&#xff08;电流&#xff09;放大倍数分贝数定义&#xff1a; 2. 运放的增益&…

C语言文件操作相关题目

c语言中的小小白-CSDN博客c语言中的小小白关注算法,c,c语言,贪心算法,链表,mysql,动态规划,后端,线性回归,数据结构,排序算法领域.https://blog.csdn.net/bhbcdxb123?spm1001.2014.3001.5343 给大家分享一句我很喜欢我话&#xff1a; 知不足而奋进&#xff0c;望远山而前行&am…