RabbitMq-发布确认高级(避坑指南版)

news2024/12/28 19:24:55

在初学rabbitMq的时候,伙伴们肯定已经接触到了“发布确认”的概念,但是到了后期学习中,会接触到“springboot”中使用“发布确认”高级的概念。后者主要是解决什么问题呢?或者是什么样的场景引出这样的概念呢?

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 rabbitmq 重启期间生产者投递失败,导致消息丢失,需要手动处理和恢复。因此为了确保rabbitmq 的消息可靠投递,特别是在这样比较极端的情况,rabbitmq 集群不可用的时候,对无法投递的消息进行处理。

废话不说直接开始撸代码!!!在代码中解决实际问题~

一、代码架构分析:

        接触到这里,对于一条完整的“rabbitmq消息”发布链的构成大家已经不陌生了。主要是由:“消息生产者”、“交换机”、“队列”、“消费者”四个方面构成,如图所示:

二、构造“配置类”代码: 

声明交换机“confirm_exchange”、声明队列“confirm_queue”、通过routing-key对交换机和队列进行绑定。

package com.example.rabbitmq_demo.fabuquerengaoji;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClaseName: ConfirmConfig$
 * @Description:配置类 发布确认(高级)
 * @Author: wuhs
 * @Date: 2023/8/16$ 14:32$
 * 快捷键ctrl+shift+u  字母大小写转化
 */
@Configuration
public class ConfirmConfig {
    // 交换机
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    // 队列
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    // ROUTING-KEY
    public static final String CONFIRM_ROUTING_KEY = "key1";

    @Bean
    public DirectExchange confirmExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    @Bean
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange directExchange,
                                        @Qualifier("confirmQueue") Queue queue) {
        //一般使用在项目中使用@Qualifier来限定注入的Bean。
        return BindingBuilder.bind(queue).to(directExchange).with(CONFIRM_ROUTING_KEY);

    }
}

三、构建消费者代码:

通过@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)来监听队列,以此“充当”消费者。这一块也没啥好说的,直接上代码!

package com.example.rabbitmq_demo.fabuquerengaoji;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @ClaseName: Consumer$
 * @Description:消费者
 * @Author: wuhs
 * @Date: 2023/8/16$ 15:18$】
 */
@Slf4j
@Component
public class Consumer {
    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void reciverConfirmMessage(Message message) {
        String msg = new String(message.getBody());
        log.info("接收到的队列confirm.queue消息:{}", msg);
    }
}

四、创建“回调”方法

        在最开始我们说到“确保rabbitmq 的消息可靠投递”的概念,那么具体如何确保呢?如果我们在消费者每次消费成功、未消费成功交换机都能进行“回调”确认,是不是就能知道哪些消息消费成功、哪些没有消费成功呢?

        在RabbitTemplate中有一个方法接口(ConfirmCallback),我们只需要实现这个接口并实现“confirm”方法,并将它注入进RabbitTemplate工具中即可创建“回调”。具体代码如下:

package com.example.rabbitmq_demo.fabuquerengaoji;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @ClaseName: MyCallBack$
 * @Description:
 * @Author: wuhs
 * @Date: 2023/8/16$ 16:17$
 */
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        // 注入
        rabbitTemplate.setConfirmCallback(this);

    }
   //交换机确认回调方法  
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String reason) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {//发消息 交换机接收到了消息 回调
            log.info("交换机已经收到了ID为:{}的消息", id);
        } else {//发消息 交换机没有接收到了消息 回调
            log.info("交换机没有收到了ID为:{}发的的消息,失败的原因是:{}", id, reason);

        }
    }
}

confirm方法参数介绍:

* 1. correlationData 保存回调消息的ID及相关信息
* 2. 交换机是否收到了消息 ack=true(收到)、ack=false(未收到)
* 3. reason 失败的原因

 六、配置类声明:(application.yml)

        在这里需要注意!!这也是最容易踩得坑,不知道有没有小伙伴遇没遇到,“publisher-confirm-type: correlated”也声明了,但是项目创建启动发布消息之后“没有成功回调”的情况,查看了很多的文章,很多博主只配置了publisher-confirm-type、但是并没有开启“confirm 确认机制”,所以会存在“误导”,导致一直找不到失败的原因~具体正确配置,看代码:

server:
  port: 8899

spring:
  rabbitmq:
    host: 124.221.94.214
    port: 5672
    username: xgsm
    password: xgsm123
    # 发送者开启 confirm 确认机制
    publisher-confirms: true
    publisher-confirm-type: correlated

 publisher-confirm-type参数介绍:

publisher-confirm-type这个参数一共有三种配置方法:

# NONE:禁用发布确认,是默认值。

# CORRELATED:发布消息后,交换机会触发回调方法。

# SIMPLE:有两种效果:

1:和CORRELATED一样会触发回调方法

2:发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,

# 要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。

 七、创建Controller层(消息生产者)

        这里演示三种情况。第一种为正常情况下,发送成功后的回调;第二种消息为发送失败、当交换机不存在则发送失败(模拟发送失败),所以将交换机名称修改即可

package com.example.rabbitmq_demo.fabuquerengaoji;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClaseName: ProducerController$
 * @Description:消息生产者
 * @Author: wuhs
 * @Date: 2023/8/16$ 14:58$
 */
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // @PathVariable主要作用:映射URL绑定的占位符
    @RequestMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        //正常发送
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData);

        //发送失败-交换机不存在的情况
        CorrelationData correlationData2 = new CorrelationData("2");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"2", ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData2);

        log.info("发送的消息为:{}", message);
    }
}

测试结果: 

 如果是routing-key错误,这种情况会触发回调嘛?让我们验证一下;修改routing-key为“错误值”

  CorrelationData correlationData3 = new CorrelationData("3");
  rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY+"key3", message,correlationData3);

测试结果:

         通过结果可以看出,消息发送成功了,而且也触发了“成功的回调”。但是我们知道的是,由于路由失败,这里消费者并没有对消息进行消费,这是为什么呢?那是因为,在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。解决方式为:通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。具体操作如下

1、application.yml文件中添加消息回退配置

# 发送者开启 return 确认机制
    publisher-returns: true

 2、实现RabbitTemplate中的方法接口ReturnCallback,并实现“returnedMessage”方法,最后将类注入到RabbitTemplate的RabbitTemplate中,详细代码如下:

package com.example.rabbitmq_demo.fabuquerengaoji;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @ClaseName: MyCallBack$
 * @Description:
 * @Author: wuhs
 * @Date: 2023/8/16$ 16:17$
 */
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        // 注入
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);

    }

    /**
     * 交换机确认回调方法
     * 1、发消息 交换机接收到了消息 回调
     * 1.1 correlationData 保存回调消息的ID及相关信息
     * 1.2 交换机收到消息 ack=true
     * 2、发消息 交换机接收失败了 回调
     * 2.1 correlationData 保存回调消息的ID及相关信息
     * 2.2 交换机接收到消息 ack=false
     * 2.3 reason 失败的原因
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String reason) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到了ID为:{}的消息", id);
        } else {
            log.info("交换机没有收到了ID为:{}发的的消息,失败的原因是:{}", id, reason);

        }
    }

    //可以在当消息传递的过长中不可达目的地时将消息返回给生产者
    // 只有不可待目的地的时候 才进行回退
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息{},被交换机{}退回,退回原因:{},路由key:{}", message, exchange, replyText, routingKey);
    }
}

测试结果:

2023-08-17 10:36:32.476  INFO 21108 --- [221.94.214:5672] c.e.r.fabuquerengaoji.MyCallBack : 消息(Body:'消息确认发布测试' MessageProperties [headers={spring_returned_message_correlation=3}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),被交换机confirm_exchange退回,退回原因:NO_ROUTE,路由key:key1key3

问题解决!~ 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/890509.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

前端原生写自定义旋转变换轮播图

html部分&#xff1a; <div class"banner_box"><div class"swiperWrapper" v-show"bannerList.length>0"><div class"swiper-item" :id"swiperSlide${index}" :class"{active:index0,next:index1,pr…

耗资170亿美元?三星电子在得克萨斯州建设新的半导体工厂

据报道&#xff0c;三星电子在得克萨斯州泰勒市建设的新的半导体工厂预计将于2024年下半年投入运营。这座工厂将成为三星电子在美国的第二座芯片代工厂&#xff0c;与位于得克萨斯州奥斯汀市的第一座工厂相距不远。 此次投资将耗资约170亿美元&#xff0c;显示了三星电子在半导…

Linux驱动之利用ioctl函数和字符设备驱动对象分布注册点亮小灯

实验结果 头文件代码 #ifndef __HEAD_H__ #define __HEAD_H__ typedef struct{unsigned int MODER;unsigned int OTYPER;unsigned int OSPEEDR;unsigned int PUPDR;unsigned int IDR;unsigned int ODR; }gpio_t; #define PHY_LED1_ADDR 0X50006000 #define PHY_LED2_ADDR 0X5…

激活函数总结(十一):激活函数补充(Absolute、Bipolar、Bipolar Sigmoid)

激活函数总结&#xff08;十一&#xff09;&#xff1a;激活函数补充 1 引言2 激活函数2.1 Absolute激活函数2.2 Bipolar激活函数2.3 Bipolar Sigmoid激活函数 3. 总结 1 引言 在前面的文章中已经介绍了介绍了一系列激活函数 (Sigmoid、Tanh、ReLU、Leaky ReLU、PReLU、Swish、…

多维时序 | MATLAB实现PSO-CNN-BiLSTM多变量时间序列预测

多维时序 | MATLAB实现PSO-CNN-BiLSTM多变量时间序列预测 目录 多维时序 | MATLAB实现PSO-CNN-BiLSTM多变量时间序列预测基本介绍模型特点程序设计参考资料 基本介绍 本次运行测试环境MATLAB2021b&#xff0c;MATLAB实现PSO-CNN-BiLSTM多变量时间序列预测。代码说明&#xff1a…

掌握渗透测试,从Web漏洞靶场搭建开始

渗透测试切忌纸上谈兵&#xff0c;学习渗透测试知识的过程中&#xff0c;我们通常需要一个包含漏洞的测试环境来进行训练。而在非授权情况下&#xff0c;对于网站进行渗透测试攻击&#xff0c;是触及法律法规的&#xff0c;所以我们常常需要自己搭建一个漏洞靶场&#xff0c;避…

【CUDA】学习记录(4)-线程束的执行

线程模型 block&#xff1a;同一个block内共享内存&#xff0c;同一block中的thread可以彼此进行通信。 block&#xff1a;block-local synchronization。同一个块内的线程可以同步。 线程&#xff0c;可以根据blockIdx和threadIdx唯一的指定。 作者&#xff1a;不会code的程序…

02-前端基础第二天-HTML5

01-HTML标签&#xff08;下&#xff09;导读 目标&#xff1a; 能够书写表格能够写出无序列表能够写出3~4个常用input表单类型能够写出下拉列表表单能够使用表单元素实现注册页面能够独立查阅W3C文档 目录&#xff1a; 表格标签列表标签表单标签综合案例查阅文档 02-表格标…

pygame第6课——贪吃蛇小游戏

今天我们开始Pygame的第六课&#xff0c;前几节课的内容在这里【点我】&#xff0c;欢迎大家前去考古&#xff1a; 今天我们一起来学习制作一个小游戏【贪吃蛇】&#xff0c;这是一个非常经典的小游戏&#xff0c;那么我们一起开始吧 1、游戏准备工作 import pygame, random,o…

常用激活函数及其优缺点的总结与比较

文章目录 1、sigmoid2、Tanh3、ReLU4、Leaky ReLU5、Softmax 1、sigmoid 优点&#xff1a;将很大范围内的输入特征值压缩到0~1之间&#xff0c;适用于将预测概率作为输出的模型&#xff1b; 缺点&#xff1a; 1&#xff09;当输入非常大或非常小的时候&#xff0c;容易导致梯度…

Hadoop的DataNode无法启动的解决方案

Hadoop重启一次&#xff0c;里面的数据需要重新导入&#xff0c;发现无法导入数据&#xff0c;查看jps发现是DataNode没有启动&#xff0c;重新启动发现也无法启动&#xff0c;原因是前面重新启动NameNode&#xff0c;里面的文件格式化一次&#xff0c;DataNode的文件不一致&am…

jmeter监听器大家都会用,但我这个妙招能让你提早一小时下班!

使用过 jmeter 的同学&#xff0c;应该都会使用监听器&#xff0c;在每个监听器中&#xff0c;都会有一个“所有数据写入一个文件”的功能&#xff0c;那这个功能应该怎么用呢&#xff1f;今天&#xff0c;我们就来讲讲这个功能的使用。 几乎所有的监听器都有这样一个功能。 那…

LTD239次升级 | 可答题获取优惠券 • 官微中心菜单可折叠• 分享页表单客户可显示分享来源

1、新增答题获取优惠券应用 2、官微中心支持左侧菜单折叠、批量上传可设置排序值 3、分享页表单客户可显示分享来源 4、iOS App支持设置名片卡片图片 5、新增一款产品列表展示模块 01 网站应用 新增答题获取优惠券应用 本次升级中&#xff0c;新增了一款答题获取优惠券的小程序…

MAC钓鱼并Root权限上线CS并权限维持,以及所有的坑如何解决

本文转载于&#xff1a;https://www.freebuf.com/articles/web/350592.html 作者&#xff1a;文鸯涂鸦智能安全实验室 制作MAC 一、下载工具 首先从github上下载CrossC2。链接&#xff1a;https://github.com/gloxec/CrossC2/releases/tag/v3.1.0。 根据你CS客户端的操作系统选…

基于 BEM 规范实现简单的 全局 scss

前言 BEM 是 css 常用的命名规范BEM &#xff1a;block(块)、 element(元素)、 modify(修饰符)以 namespace-block__element、namespace-block---modify 格式为例&#xff08;namespace 一般是 ui 库的前缀&#xff0c;如 element-ui 的 el 前缀&#xff09;scss 的使用请参考…

计算机竞赛 Yolov安全帽佩戴检测 危险区域进入检测 - 深度学习 opencv

1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; Yolov安全帽佩戴检测 危险区域进入检测 &#x1f947;学长这里给一个题目综合评分(每项满分5分) 难度系数&#xff1a;3分工作量&#xff1a;3分创新点&#xff1a;4分 该项目较为新颖&am…

驱动 day4

通过字符设备驱动分步注册方式编写LED灯的驱动&#xff0c;应用程序使用ioctl函数编写硬件控制 mycdev.c #include <linux/init.h> #include <linux/module.h> #include <linux/fs.h> #include <linux/cdev.h> #include <linux/device.h> #inc…

【中文竞技场】大模型深度体验与测评

简介&#xff1a;本次&#xff0c;我深入体验了中文竞技场中的大语言模型&#xff0c;尝试了写作创作、代码编写和中文游戏三个领域&#xff0c;以下是我详细的评测报告。 一、开篇 在科技日新月异的今天&#xff0c;中文竞技场提供了一系列大模型供我们体验。涉及的领域包括写…

HLS实现FIR低通滤波器+System Generator仿真

硬件&#xff1a;ZYNQ7010 软件&#xff1a;MATLAB 2019b、Vivado 2017.4、HLS 2017.4、System Generator 2017.4 1、MATLAB设计低通滤波器 FPGA系统时钟 50MHz&#xff0c;也是采样频率。用 MATLAB 生成 1MHz 和 10MHz 的正弦波叠加的信号&#xff0c;并量化为 14bit 整数。把…

也许你正处于《孤注一掷》中的“团队”,要留心了

看完这部电影&#xff0c;心情久久不能平静&#xff0c;想了很多&#xff0c;倒不是担心自己哪天也成为“消失的yaozi”&#xff0c;而是在想&#xff0c;我们每天所赖以生存的工作&#xff0c;跟电影里他们的工作比&#xff0c;差别在哪里呢&#xff1f; 目录 1. 产品的本质…