springboot整合rabbitmq发布确认高级

news2025/1/23 17:26:29

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们如何才能进行 RabbitMQ 的消息可靠投递。

发布确认  

发布确认方案

 架构

 

配置文件 

在配置文件当中添加 spring.rabbitmq.publisher-confirm-type=correlated

  NONE:禁用发布确认模式,是默认值
  CORREL:ATED发布消息成功到交换器后会触发回调方法
spring.rabbitmq.host=43.139.59.23
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated

配置类

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfig {
   public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
   public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
   //声明业务 Exchange
   @Bean("confirmExchange")
   public DirectExchange confirmExchange(){
   return new DirectExchange(CONFIRM_EXCHANGE_NAME);
   }
   // 声明确认队列
   @Bean("confirmQueue")
   public Queue confirmQueue(){
   return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
   }
   // 声明确认队列绑定关系
   @Bean
   public Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){
    return BindingBuilder.bind(queue).to(exchange).with("key1");
   }
}

 生产者

import com.example.demo.component.MyCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {
     public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
     @Resource
     private RabbitTemplate rabbitTemplate;
     @Resource
     private MyCallBack myCallBack;
     //依赖注入 rabbitTemplate 之后再设置它的回调对象
     @PostConstruct
     public void init(){
     rabbitTemplate.setConfirmCallback(myCallBack);
     }
     @GetMapping("sendMessage/{message}")
     public void sendMessage(@PathVariable String message){
        //指定消息 id 为 1
        CorrelationData correlationData1=new CorrelationData("1");
        String routingKey="key1";
        
       rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData1);
        CorrelationData correlationData2=new CorrelationData("2");
        routingKey="key2";
        
       rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData2);
        log.info("发送消息内容:{}",message);
     }
}

 回调接口

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
   /**
   * 交换机不管是否收到消息的一个回调方法
   * CorrelationData
   * 消息相关数据
   * ack
   * 交换机是否收到消息
   */
   @Override
   public void confirm(CorrelationData correlationData, boolean ack, String cause) {
      String id=correlationData!=null?correlationData.getId():"";
      if(ack){
        log.info("交换机已经收到 id 为:{}的消息",id);
      }else{
       log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);
      }
   }
}

 消费者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ConfirmConsumer {
   public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
   @RabbitListener(queues =CONFIRM_QUEUE_NAME)
   public void receiveMsg(Message message){
     String msg=new String(message.getBody());
     log.info("接受到队列 confirm.queue 消息:{}",msg);
   }
}

 结果

可以看到,发送了两条消息,第一条消息的 RoutingKey "key1" ,第二条消息的 RoutingKey
"key2" ,两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所以第二条消息被直接丢弃了。

 回退消息

Mandatory 参数 

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息 果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的 。那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参 数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

 生产者

import com.example.demo.component.MyCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.UUID;

@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {
     public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
     @Resource
     private RabbitTemplate rabbitTemplate;
     @Resource
     private MyCallBack myCallBack;
     //依赖注入 rabbitTemplate 之后再设置它的回调对象
     @PostConstruct
     public void init(){
         rabbitTemplate.setConfirmCallback(myCallBack);
         /**
          * true:
          * 交换机无法将消息进行路由时,会将该消息返回给生产者
          * false:
          * 如果发现消息无法进行路由,则直接丢弃
          */
         rabbitTemplate.setMandatory(true);
         //设置回退消息交给谁处理
         rabbitTemplate.setReturnCallback(myCallBack);

     }
    

    @GetMapping("sendMessage")
    public void sendMessage(String message){
        //让消息绑定一个 id 值
        CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key1",message+"key1",correlationData1)
        ;
        log.info("发送消息 id 为:{}内容为{}",correlationData1.getId(),message+"key1");
        CorrelationData correlationData2 = new CorrelationData(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key2",message+"key2",correlationData2)
        ;
        log.info("发送消息 id 为:{}内容为{}",correlationData2.getId(),message+"key2");
    }
}

 回调接口

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    /**
     * 交换机不管是否收到消息的一个回调方法
     * CorrelationData
     * 消息相关数据
     * ack
     * 交换机是否收到消息
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id=correlationData!=null?correlationData.getId():"";
        if(ack){
            log.info("交换机已经收到 id 为:{}的消息",id);
        }else{
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);
        }
    }
    //当消息无法路由的时候的回调方法
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String
    exchange, String routingKey) {
        log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}",new
        String(message.getBody()),exchange,replyText,routingKey);
    }
}

 结果

 接收到被退回的消息

备份交换机 

有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。

架构

 配置类

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfig {
     public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
     public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
     public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
     public static final String BACKUP_QUEUE_NAME = "backup.queue";
     public static final String WARNING_QUEUE_NAME = "warning.queue";
     // 声明确认队列
     @Bean("confirmQueue")
     public Queue confirmQueue(){
       return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
     }
     //声明确认队列绑定关系
     @Bean
     public Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){
       return BindingBuilder.bind(queue).to(exchange).with("key1");
     }
     //声明备份 Exchange
     @Bean("backupExchange")
     public FanoutExchange backupExchange(){
       return new FanoutExchange(BACKUP_EXCHANGE_NAME);
     }
     //声明确认 Exchange 交换机的备份交换机
     @Bean("confirmExchange")
     public DirectExchange confirmExchange(){
       ExchangeBuilder exchangeBuilder =
       ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
       .durable(true)
       //设置该交换机的备份交换机
       .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
       return (DirectExchange)exchangeBuilder.build();
     }
     // 声明警告队列
     @Bean("warningQueue")
     public Queue warningQueue(){
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
     }
     // 声明报警队列绑定关系
     @Bean
     public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
     @Qualifier("backupExchange") FanoutExchange backupExchange){
       return BindingBuilder.bind(queue).to(backupExchange);
     }
     // 声明备份队列
     @Bean("backQueue")
     public Queue backQueue(){
       return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
     }
     // 声明备份队列绑定关系
     @Bean
     public Binding backupBinding(@Qualifier("backQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange){
       return BindingBuilder.bind(queue).to(backupExchange);
     }
}

 报警消费者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ConfirmConsumer {
   public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
   @RabbitListener(queues =CONFIRM_QUEUE_NAME)
   public void receiveMsg(Message message){
     String msg=new String(message.getBody());
     log.info("接受到队列 confirm.queue 消息:{}",msg);
   }
}

 

结果 

mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是备份交换机优先级高

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/924641.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

poi带表头多sheet导出

导出工具类 package com.hieasy.comm.core.excel;import com.hieasy.comm.core.excel.fragment.ExcelFragment; import com.hieasy.comm.core.utils.mine.MineDateUtil; import org.apache.poi.hssf.usermodel.*; import org.apache.poi.ss.usermodel.*; import org.apache.po…

android studio安装教程

1、android studio 下载 下载网址:Download Android Studio & App Tools - Android Developers 2、开始安装 因为不需要每次连接手机进行调试,android studio给我们提供了模拟器调试环境。 一般选择自定义安装,这样可选sdk以及下载路径…

IT运维:使用数据分析平台监控 Kafka 服务

Apache Kafka 是由 LinkedIn 开发,并于2011年开源的分布式消息队列服务。但是通过快速持续的演进,目前它发展成为成熟的事件流处理平台,可用于大规模流处理、实时数据管道和数据集成等场景。 Kafka 的服务端组件包括一个或者多个 broker。Bro…

视频云存储/安防监控AI视频智能分析平台——智慧煤矿解决方案

一、方案背景 煤矿业是一个高风险行业,存在着许多潜在的安全隐患和风险。互联网、物联网、人工智能等新兴技术高速发展,为传统行业带来颠覆性变革,将高新技术与传统技术装备、管理相融合,实现产业转型升级已经成为煤矿行业发展趋…

【0824作业】C++ 拷贝赋值函数、匿名对象、友元、常成员函数和常对象、运算符重载

一、思维导图 二、作业&#xff1a;实现关系运算符的重载 关系运算符重载 概念&#xff1a; 种类&#xff1a;>、>、< 、< 、 、!表达式&#xff1a;L#R (L表示左操作数&#xff0c;R表示有操作数&#xff0c;#表示运算符)左操作数&#xff1a;既可以是左值也可以…

BSN与中国食品药品企业质量安全促进会达成战略合作协议

2023年8月18日至20日&#xff0c;“首届中国食品药品医疗器械化妆品高质量发展大会”在北京召开&#xff0c;本届大会以“树立新发展理念&#xff0c;服务构建新发展格局&#xff0c;助力食药行业高质量发展”为主题&#xff0c;聚焦食药监管和行业发展的热点、难点问题&#x…

R语言主成分分析

R语言主成分分析 之前介绍过怎么用SPSS进行主成分分析(PCA)&#xff0c;已经忘了的朋友们可以到主页看看 今天主要介绍下R语言主成分分析的几种方法。都是入门级别&#xff0c;跟着我一步步走&#xff0c;一点都不难哈~ 首先调用R语言自带的数据集&#xff0c;USArrests。这…

嵌入式linux之QT交叉编译环境搭建(最简单实测通用版)

这里总结下用于嵌入式linux下的QT交叉编译环境搭建&#xff0c;留作备忘&#xff0c;分享给有需要的小伙伴。不管你的是什么嵌入式linux环境&#xff0c;实测过的通用方法总结。 环境准备 需要准备的环境要求如下&#xff1a; 1.虚拟机(vmvare15.5) 2.ubuntu18.04-x64的linu…

4.网络设计与redis、memcached、nginx组件(一)

网络组件系列文章目录 第四章 网络设计与redis、memcached、nginx组件 文章目录 网络组件系列文章目录文章的思维导图前言一、网络相关的问题&#xff0c;网络开发中要处理那些问题&#xff1f;网络操作IO连接建立连接断开消息到达消息发送网络操作IO特性 二、网络中IO检测IO函…

【Java】基础练习(十一)

1.Poker 定义两个数组&#xff0c;一个数组存储扑克牌花色&#xff0c;另一个数组存储扑克牌&#xff08;A~K&#xff09;&#xff0c;输出52张扑克牌&#xff08;除大小王&#xff09; ♥A、♥2...&#xff08;1&#xff09;Poker类&#xff1a; package swp.kaifamiao.cod…

03-Numpy基础-通用函数:快速的元素级数组函数

通用函数&#xff08;即ufunc&#xff09;是一种对ndarray中的数据执行元素级运算的函数。你可以将 其看做简单函数&#xff08;接受一个或多个标量值&#xff0c;并产生一个或多个标量值&#xff09;的矢量化包 装器。 通用函数&#xff08;ufunc&#xff09;有三种类型&…

【BUG】解决安装oracle11g或12C中无法访问临时位置的问题

项目场景&#xff1a; 安装oracle时&#xff0c;到第二步出现oracle11g或12C中无法访问临时位置的问题。 解决方案&#xff1a; 针对客户端安装&#xff0c;在cmd中执行命令&#xff1a;前面加实际路径setup.exe -ignorePrereq -J"-Doracle.install.client.validate.cli…

countDown+react+hook

道阻且长&#xff0c;行而不辍&#xff0c;未来可期 知识点一&#xff1a; new Date().getTime()可以得到得到1970年01月1日0点零分以来的毫秒数。单位是毫秒 new Date().getTime()/1000获取秒数1分钟60秒&#xff0c;1小时60分钟1hour:60*60>单位是秒 60*60*1000>单位…

远程办公中安全远程访问解决方案

什么是安全远程访问 安全的远程访问是一个至关重要的过程&#xff0c;可让您使用互联网从远处完全控制某人的设备。为了确保安全&#xff0c;为受保护的远程访问采取了额外的身份验证和加密措施。 为什么安全远程访问解决方案很重要 当 IT 技术人员从远处帮助人们解决计算机…

GWO-LSTM交通流量预测(python代码)

使用 GWO 优化 LSTM 模型的参数&#xff0c;从而实现交通流量的预测方法 代码运行版本要求 1.项目文件夹 data是数据文件夹&#xff0c;data.py是数据归一化等数据预处理脚本 images文件夹装的是不同模型结构打印图 model文件夹 GWO-LSTM测试集效果 效果视频&#xff1a;GWO…

NLNet论文总结和代码实现

Non-local Neural Networks&#xff08;非局部神经网络&#xff09;&#xff1a;使用自注意力机制捕获远程依赖。 论文&#xff1a; https://arxiv.org/pdf/1711.07971.pdf 源码&#xff1a; 长距离依赖关系&#xff0c;顾名思义&#xff0c;是要和远程建立关系&#xff0c;在l…

牛客:数对

题目&#xff1a; 解题思路&#xff1a;看到题目的时候&#xff0c;一般第1反应是用两个循环暴力解题&#xff0c;时间复杂度是O(n^2)&#xff0c;不能通过&#xff0c;所以要优化&#xff0c;通过找规律。 一、当 y < k 时&#xff0c; 不可能符合题意&#xff0c;所以 y 从…

数据集收集列表(opencv,机器学习,深度学习)持续更新

opencv 车牌识别数据集 opencv 手写数字识别数据集 机器学习 Pima Indians数据集&#xff0c;下载地址

Docker的革命:容器技术如何重塑软件部署之路

引言 在过去的几年中&#xff0c;容器技术已经从一个小众的概念发展成为软件开发和部署的主流方法。Docker&#xff0c;作为这一变革的先驱&#xff0c;已经深深地影响了我们如何构建、部署和运行应用程序。本文将探讨容器技术的起源&#xff0c;Docker如何崛起并改变了软件部…

VScode运行C语言出现的调试问题 lauch:program does not exist 解决方法

"lauch:program does not exist"错误通常表示编译器或调试器无法找到指定的可执行文件。这可能是由于几个原因引起的。首先&#xff0c;确保你的源代码文件夹路径不包含中文字符&#xff0c;因为这可能导致编译器无法识别文件。其次&#xff0c;检查你的launch.json文…