消息发布确认

news2025/1/13 13:35:20

描述:在消息投递的过程中可能会存在消息丢失的行为产生,生产者到交换机,交换机到队列的过程都有可能出现这个现象。所以我们要有个发布确认的操作来防止消息丢西。
确认机制方案:
在这里插入图片描述

配置文件配置交换机发布确认模式:
publisher-confirm-type=correlated
NONE:禁用发布确认模式,是默认值
CORRELATED:发布消息成功到交换器后会触发回调方法
SIMPLE:经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,
其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法
等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是
waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
在这里插入图片描述
交换机发布确认和路由回退分别需要实现RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback接口

@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

    @Resource
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        //将类注入接口
        rabbitTemplate.setConfirmCallback(this); //交换机发布确认

        //路由回退配置
        rabbitTemplate.setMandatory(true);
        //设置回退消息交给谁处理
        rabbitTemplate.setReturnsCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {  //id ,ack ,结果描述
        if (b){
            log.info("收到回调id:{}",correlationData.getId());
        }else {
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}",correlationData.getId(),s);
        }

    }

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("路由:{}---回退的信息:{}",returnedMessage.getRoutingKey(),returnedMessage.getMessage());
    }
}

测试:
路由和交换机声明与绑定

public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

    public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
    public static final String BACKUP_QUEUE_NAME = "backup.queue";
    public static final String WARNING_QUEUE_NAME = "warning.queue";

@Bean
    public DirectExchange confirmExchange(){
        Map<String, Object> map = new HashMap<>();
        map.put("alternate-exchange", BACKUP_EXCHANGE_NAME);
        ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
                .durable(true)//持久化
                .withArguments(map);//设置备份交换机
        return (DirectExchange) exchangeBuilder.build();
    }

    @Bean
    public Binding bindingConfirmExchange(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") DirectExchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("key");
    }

    //声明备份 Exchange
    @Bean("backupExchange")
    public FanoutExchange backupExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    // 声明警告队列
    @Bean("warningQueue")
    public Queue warningQueue(){
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }
    // 声明报警队列绑定关系
    @Bean
    public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
                                  @Qualifier("backupExchange") FanoutExchange
                                          backupExchange){
        return BindingBuilder.bind(queue).to(backupExchange);
    }
    // 声明备份队列
    @Bean("backQueue")
    public Queue backQueue(){
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }
    // 声明备份队列绑定关系
    @Bean
    public Binding backupBinding(@Qualifier("backQueue") Queue queue,
                                 @Qualifier("backupExchange") FanoutExchange backupExchange){
        return BindingBuilder.bind(queue).to(backupExchange);
    }

生产者:

@GetMapping("sendConfirmMsg/{message}")
    public void sendConfirmMsg(@PathVariable String message) {
        CorrelationData correlationData1 = new CorrelationData("1");
        rabbitTemplate.convertAndSend(TtlQueueConfig.CONFIRM_EXCHANGE_NAME, "key", message,correlationData1);
        log.info(" 当 前 时 间 : {}, 发送一条发布确认信息给队列 confirm.queue:{}", new Date(), message);
        CorrelationData correlationData2 = new CorrelationData("2");
        rabbitTemplate.convertAndSend(TtlQueueConfig.CONFIRM_EXCHANGE_NAME, "key2", message,correlationData2);
        log.info(" 当 前 时 间 : {}, 发送一条发布确认信息给路由不存在的队列 confirm.queue:{}", new Date(), message);

    }

队列监听消费者:

@RabbitListener(queues = CONFIRM_QUEUE_NAME)
    public void receiveCONFIRMQueue(Message message){
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到发布确认队列的消息:{}", new Date().toString(), msg);
    }

    @RabbitListener(queues = "warning.queue")
    public void receiveWarningQueue(Message message){
        String msg = new String(message.getBody());
        //log.info("当前时间:{},收到备份交换机的警告队列的消息:{}", new Date().toString(), msg);
        log.error("报警发现不可路由消息:{}", msg);
    }

总结:监听者收到的ack为false则消息没有投递成功,交换机配置了备份交换机,优先级比路由回退的高,如果交换机到队列的消息没有投递成功,则可以通过备份交换机的监听队列再次去投递消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/72642.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

年终颁奖 | 建模助手年度产品经理正在评比当中!

大家好&#xff0c;我是建模助手。 12月来了&#xff0c;又到了激情总结&#xff0c;却发现flag倒被打脸的时刻&#xff01;我就想问问在座各位&#xff1a;年初的定立的flag&#xff0c;完成得咋样了&#xff1f; 我们今年就有一张很优秀的成绩单&#xff1a;↓↓↓ 建模助手…

优优聚:学会删减菜单,帮你提升销量和转化

如今随着外卖市场的不断发展&#xff0c;越来越多的堂食店铺加入外卖&#xff0c;但是对于做外卖很多老板认为&#xff0c;自家堂食做得不错&#xff0c;那么直接把堂食的菜单上传到外卖&#xff0c;结果这样做的后果就是不仅累还不挣钱。下面优优聚小编就来讲一下。 1、菜单太…

IDEA 导入别人的javaweb项目进行部署

前言 我主要是进行java的springboot项目和vue项目的开发&#xff0c;但是架不住在这些框架兴起之前&#xff0c;公司内部已经是有其他的老的框架&#xff0c;我需要在这些老的框架进行修改和调整代码。原本我是使用的eclipse软件进行部署&#xff0c;也比较简单&#xff1b; …

了解常见的模拟器及交换机的基本配置

了解常见的模拟器及交换机的基本配置 1. 首先我们先了解常见的模拟器软件 1.Cisco Packet Tracer&#xff08;简单&#xff0c;纯软件实现&#xff09; Cisco Packet Tracer 是由Cisco公司发布的一个辅助学习工具&#xff0c;为学 习思科网络课程的初学者去设计、配置、排除…

chapter9——电磁兼容性能设计指南

目录1.定义2.电磁干扰理论3.电磁干扰的流程、标准和认证4.影响集成电路抗干扰性能的几个因素5.减少EMC/EMI的技术电子线路易于接收来自其他发射器的辐射信号&#xff0c;无论是有意或无意发射。这些电磁干扰&#xff08;EMI&#xff09;使得设备内毗邻的元件不能同时工作。这时…

58同城首页腰部动态化技术选型(布局动态化)

1. 行业情况 1.1 基本概念介绍 1.1.1 Web混合 Web 前端和客户端的混合开发。使用 WebView 进行页面渲染、逻辑执行&#xff1b;依赖客户端的能力需要通过 JSBridge(通信桥) 的方式进行调用&#xff0c;比如调用客户端的相册、定位、登陆、埋点能力。 1.1.2 小程序 小程序体…

【Redisson源码】可重入锁看门狗机制

【本篇文章基于redisson-3.17.6版本源码进行分析】 为什么需要自动续期&#xff1f; 设想一下&#xff0c;如果我们指定的锁的超时时间是30秒&#xff0c;但是业务执行在30秒内还没有执行完成&#xff0c;此时分布式锁超时过期自动释放&#xff0c;其它线程就能获取到这把锁&…

OpenGL基础图形编程(八)变换

八、OpenGL变换 OpenGL变换是本篇的重点内容&#xff0c;它包括计算机图形学中最基本的三维变换&#xff0c;即几何变换、投影变换、裁剪变换、视口变换&#xff0c;以及针对OpenGL的特殊变换概念理解和用法&#xff0c;如相机模拟、矩阵堆栈等。学好了这章&#xff0c;才开始真…

基于多时间尺度滚动优化的多能源微网双层调度研究附Matlab代码

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;修心和技术同步精进&#xff0c;matlab项目合作可私信。 &#x1f34e;个人主页&#xff1a;Matlab科研工作室 &#x1f34a;个人信条&#xff1a;格物致知。 更多Matlab仿真内容点击&#x1f447; 智能优化算法 …

网络套接字编程(TCP协议)

文章目录简单的TCP网络程序服务器绑定服务端监听服务端获取连接客户端连接服务器多线程版本的大小写字母转换服务简单的TCP网络程序 int socket(int domain, int type, int protocol);参数说明&#xff1a; domain&#xff1a;创建套接字的域或者叫做协议家族&#xff0c;也就…

百万基建狂魔们的赛博世界

钉钉完成的&#xff0c;是基于PaaS底座和底层基础产品&#xff0c;与生态伙伴一起提供低代码的普惠化定制开发模式&#xff0c;让大型企业自己可以具备诊断自己的能力和梳理流程的能力&#xff0c;并且将过往的经验和积累进行数字化应用层面的表达&#xff0c;进而寻找出一条最…

ffplay调试环境搭建

前言 ffplay是基于FFmpeg的最简单的官方播放器。麻雀虽小&#xff0c;五脏俱全&#xff0c;虽说ffplay简单&#xff0c;但是各种播放器应有的功能一一俱全&#xff0c;说它简单或许仅仅是因为它只有一个点c文件而已吧。 想要开发一个优秀的播放器&#xff0c;参考是必不可少的&…

Netron可视化Pytorch保存的网络模型

目录 一.理清网络的输入与输出 二. 将模型转换为onnx格式 三.Netron可视化工具 一.理清网络的输入与输出 我自定义的网络模型&#xff08;主要看看前向传播函数即可&#xff09;&#xff1a; import torch import torch.nn as nn#导入数据预处理之后的相关数据 from dataP…

Acrel-EMS企业微电网能效管理平台在某食品加工厂35kV变电站应用-Susie 周

1、概述 该食品加工厂变电站工程规模&#xff1a;电压等级&#xff1a;35/10.5kV&#xff0c;规划主变容量16.3MVA1台8MVA。有一个总配电室&#xff0c;包括35kV开关柜、10kV开关柜和0.4kV配电柜&#xff0c;两个独立变压器室&#xff0c;变压器为干式变压器。35kV供电系统采用…

(2)ITK中迭代器的时间效率

背景 ITK对图像处理中&#xff0c;为了提高代码运行效率&#xff0c;通过迭代器Iterator可以实现对时间的优化。 在ITK的官方文档中也有明确的说明&#xff1a; 针对此说明&#xff0c;本次使用对图像获取最大值最小值的方式&#xff0c;来实验和测试其效率。 代码实现 &am…

JDBC 数据库连接池之Driud

1 数据库连接池简介 数据库连接池是个容器&#xff0c;负责分配、管理数据库连接(Connection) 它允许应用程序重复使用一个现有的数据库连接&#xff0c;而不是再重新建立一个&#xff1b; 释放空闲时间超过最大空闲时间的数据库连接来避免因为没有释放数据库连接而引起的数据…

数据安全新战场,EasyMR为企业筑起“安全防线”

2020年1月&#xff0c;时间跨度长达14年的&#xff0c;微软2.5亿条客户服务和支持记录在网上泄露&#xff1b; 同年4月&#xff0c;微盟发生史上最贵“删库跑路”事件&#xff0c;造成微盟市值一夜之间缩水约24亿港币&#xff1b; 今年7月&#xff0c;网信办依据《数据安全法…

PCIEBPMCx4板卡

PCIEBPMCx4本板卡可以使标准的PMC板卡安装于带有PCIE插槽的PC机上使用&#xff0c;安装后占一个槽位&#xff0c;槽位可以为PCIE x4 PCIE x8、PCIE x16&#xff0c;安装后工作在PCIE x4模式。PCIE X1 后开口也可以使用&#xff0c;但只运行在PCIE X1模式。PCIE支持X4 V2.0,板载…

Python对json的操作总结

Json简介&#xff1a;Json&#xff0c;全名 JavaScript Object Notation&#xff0c;是一种轻量级的数据交换格式。Json最广泛的应用是作为AJAX中web服务器和客户端的通讯的数据格式。现在也常用于http请求中&#xff0c;所以对json的各种学习&#xff0c;是自然而然的事情。 J…

C++学习笔记(十四)——vector的模拟实现

vector各函数接口总览 vector当中的成员变量介绍 默认成员函数 构造函数1 构造函数2 构造函数3 拷贝构造函数 赋值运算符重载函数 析构函数 迭代器相关函数 begin和end 容量和大小相关函数 size和capacity reserve resize empty 修改容器内容相关函数 push_ba…