RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理

news2024/9/24 9:22:03

前言:在生产环境中由于一些不明原因,导致RabbitMQ重启的情况下,在RabbitMQ重启期间生产者投递消息失败,生产者发送的消息会丢失,那这时候就需要去想在极端的情况下,RabbitMQ集群不可用的时候,如果去处理投递失败的消息。

1、在config包里新建一个名为ConfirmConfig的类用于编写配置交换机、队列、routingkey的代码

代码如下:

package com.ken.springbootrqbbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfig {

    //交换机
    public static final String EXCHANGE_NAME = "confirm_exchange";

    //队列
    public static final String QUEUE_NAME = "confirm_queue";

    //routingkey
    public static final String ROUTING_KEY = "confirm";

    //声明交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    //声明队列
    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    //绑定交换机和队列
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
                                        @Qualifier("confirmExchange") DirectExchange confirmExchange) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);
    }

}

2、在controller包里新建一个名为ProducerController的类用于编写充当生产者发送消息的代码

代码如下:

package com.ken.springbootrqbbitmq.controller;

import com.ken.springbootrqbbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    //发消息
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME,
                ConfirmConfig.ROUTING_KEY,message);
        log.info("发送消息内容:{}",message);
    }

}

3、在consumer包里新建一个名为Consumer的类用于编写充当消费者消费消息的代码

代码如下:

package com.ken.springbootrqbbitmq.consumer;

import com.ken.springbootrqbbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class Consumer {

    @RabbitListener(queues = ConfirmConfig.QUEUE_NAME)
    public void receiveConfirmMessage(Message message) {
        String msg = new String(message.getBody());
        log.info("接收到队列的消息为:{}",msg);
    }

}

4、启动项目,在浏览器地址栏调用发送消息的接口,查看生产者是否运行成功并能发送消息http://localhost:8080/confirm/sendMessage/我是消息

例:

效果图: 

5、前言里我们说过,怎么在RabbitMQ宕机的情况下,保证生产者发送的消息不丢失呢,这时候就需要用到回调函数了,交换机本身收到消息后会确认消息,如果交换机没有确认或者确认消息失败,都视为发送消息失败,然后触发回调接口,告诉生产者消息发送失败,这样,消息接收成功与否我们都能通过回调方法返回的消息知道了

(1)在config包里新建一个名为MyCallBack的类用于编写交换机的确认回调方法

代码如下:

package com.ken.springbootrqbbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    /**
     * @PostConstruct注解,在对象加载完依赖注入后执行,它通常都是一些初始化的操作,但初始化可能依赖于注入的其他组件,所以要等依赖全部加载完再执行
     */
    @PostConstruct
    public void init() {
        //把当前实现类MyCallBack注入到RabbitTemplate类的ConfirmCallback接口里面
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 交换机确认回调方法
     * 1、第一个参数:correlationData保存回调消息的ID以及相关信息
     * 2、第二个参数:交换机收到消息就返回true,否则返回false
     * 3、第三参数:原因(返回失败的原因,如果成功返回的是null)
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id =  correlationData != null ? correlationData.getId() : "";
        if(ack) {
            log.info("交换机已经收到id为{}的消息",id);
        }else {
            log.info("交换机还未收到id为{}的消息,原因为{}",id,cause);
        }

    }
}

6、在上述步骤可得知confirm方法有一个类型为CorrelationData的参数correlationData,这个参数实际上是空的,并没有值,需要生产者发送,correlationData参数才会有值(connfirm方法的其余两个参数ack和cause默认有值)所以我们需要修改生产者的代码

 代码如下:

package com.ken.springbootrqbbitmq.controller;

import com.ken.springbootrqbbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    //发消息
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME,
                ConfirmConfig.ROUTING_KEY,message,correlationData);
        log.info("发送消息内容:{}",message);
    }

}

7、在配置文件加上以下配置开启交换机确认发布模式

spring.rabbitmq.publisher-confirm-type=correlated

配置文件完整内容如下:

spring.rabbitmq.host=192.168.194.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
#none(禁用发布确认模式,默认值)
#correlated(发布消息成功到交换机后会触发回调方法)
#simple(和correlated一样会触发回调方法,消息发布成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法,等待broker节点返回发送结果)
spring.rabbitmq.publisher-confirm-type=correlated

效果图:

8、启动项目,在浏览器地址栏调用发送消息的接口,可以看到生产者发送消息成功,交换机调用了回调接口,消费者成功消费消息

http://localhost:8080/confirm/sendMessage/我是消息

例:

效果图: 

9、把生产者要发送到的交换机改成不存在的,用以模拟交换机出问题的情景

package com.ken.springbootrqbbitmq.controller;

import com.ken.springbootrqbbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    //发消息
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME + "1",
                ConfirmConfig.ROUTING_KEY,message,correlationData);
        log.info("发送消息内容:{}",message);
    }

}

效果图:

10、重新启动项目,在浏览器地址栏调用发送消息的接口,可以看到生产者发送消息成功,交换机调用了回调接口并打印出了交换机接收消息失败的原因

http://localhost:8080/confirm/sendMessage/我是消息

例:

效果图: 

11、把RoutingKey改成不存在的,用以模拟队列出问题的情景

package com.ken.springbootrqbbitmq.controller;

import com.ken.springbootrqbbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    //发消息
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        CorrelationData correlationData = new CorrelationData("2");
        rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME,
                ConfirmConfig.ROUTING_KEY + "2",message,correlationData);
        log.info("发送消息内容:{}",message);
    }

}

效果图:

12、重新启动项目,在浏览器地址栏调用发送消息的接口,可以看到生产者发送消息成功,交换机调用了回调接口并打印出交换机接收消息成功,但消费者没有消费成功的日志输出,因为RoutingKey错了,交换机没有把消息发送到队列里,队列里没消息,自然消费者也就没有消费到消息了,但这个结果不符合我们的预期,因为这次丢失了消息,丢失消息却没有回馈消息丢失,实际上应该调用回调接口反馈消息丢失,所以我们需要继续往下改进代码。

http://localhost:8080/confirm/sendMessage/我是消息

例:

效果图: 

13、给配置文件加上以下配置,用以回退消息

spring.rabbitmq.publisher-returns=true

配置文件完整内容如下:

spring.rabbitmq.host=192.168.194.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
#none(禁用发布确认模式,默认值)
#correlated(发布消息成功到交换机后会触发回调方法)
#simple(和correlated一样会触发回调方法,消息发布成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法,等待broker节点返回发送结果)
spring.rabbitmq.publisher-confirm-type=correlated
#一旦投递消息失败或者路由失败,是否回退消息给生产者
spring.rabbitmq.publisher-returns=true

14、使用RabbitTemplate的内置接口回退消息

代码如下:

package com.ken.springbootrqbbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    /**
     * @PostConstruct注解,在对象加载完依赖注入后执行,它通常都是一些初始化的操作,但初始化可能依赖于注入的其他组件,所以要等依赖全部加载完再执行
     */
    @PostConstruct
    public void init() {
        //把当前实现类MyCallBack注入到RabbitTemplate类的ConfirmCallback接口里面
        rabbitTemplate.setConfirmCallback(this);
        //把当前实现类MyCallBack注入到RabbitTemplate类的ReturnCallback接口里面
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * 交换机确认回调方法
     * 1、第一个参数:correlationData保存回调消息的ID以及相关信息
     * 2、第二个参数:交换机收到消息就返回true,否则返回false
     * 3、第三参数:原因(返回失败的原因,如果成功返回的是null)
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id =  correlationData != null ? correlationData.getId() : "";
        if(ack) {
            log.info("交换机已经收到id为{}的消息",id);
        }else {
            log.info("交换机还未收到id为{}的消息,原因为{}",id,cause);
        }

    }

    /**
     * 可以在当消息传递过程中不可达目的地时将消息返回给生产者
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("消息{},被交换机{}退回,退回原因:{},路由routingkey:{}",
                new String(message.getBody()),exchange,replyText,routingKey);
    }

}

15、重新启动项目,在浏览器地址栏调用发送消息的接口,可以看到生产者发送消息成功,交换机收到消息发不过去队列后把消息回退了,保证了消息不丢失。

http://localhost:8080/confirm/sendMessage/我是消息

例:

效果图:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/739182.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

创造一款安卓自定义控件_裁剪原理介绍

1、新增功能,旋转: 效果如图,点击旋转,可以将控件画面本身进行90度倍数的旋转,并进行宽高比例适配,旋转之后裁剪依然正常。 功能实现原理: 1、通过调用view的setRotation功能进行以View为中心…

Stable Diffusion - 超分辨率插件 StableSR v2 (768x768) 配置与使用

欢迎关注我的CSDN:https://spike.blog.csdn.net/ 本文地址:https://spike.blog.csdn.net/article/details/131582734 论文:Exploiting Diffusion Prior for Real-World Image Super-Resolution StableSR 算法提出了一种新颖的方法&#xff0…

【JAVA】JAVA与C++的区别与联系

个人主页:【😊个人主页】 系列专栏:【❤️初识JAVA】 文章目录 前言两方介绍CJAVA 不同|Java不支持指针、模板、指针重载、联合等||支持析构函数||条件编译和包含||螺纹支架||默认参数||转到语句||多重继承||异常处理||方法重载和操作符重载|…

对表中的数据操作

目录标题 创建一个工作者的表 ,对表中数据进行相关操作worker表要求表中的数据内容 对数据的操作1.显示所有职工的基本信息2.查询所有职工所属部门的部门号,不显示重复的部门号3.求出所有职工的人数4.列出最高工和最低工资5.列出职工的平均工资和总工资6…

卷积神经网络CNN进阶与搭建

目录 Pooling(池化)ReluResNetGradient VanishingFeature scalingImage NormalizationBatch Normalization Pooling(池化) 在降采样(Subsampling)中起作用,在不改变feature map的基础上,在卷积出来的基础上…

【LeetCode】217. 存在重复元素

217. 存在重复元素(简单) 方法一:哈希表长度比较 思路 针对重复元素,很容易就想到 set。我们可以先将 nums 中的所有元素存入set,然后比较两个数据结构的长度,如果相等则说明不存在重复元素,反…

Android View滑动处理大法

原文链接 Android View滑动处理大法 对于触控式操作来说,滑动是一个特别重要的手势操作,如何做到让应用程序的页面滑动起来如丝般顺滑,让用户感觉到手起刀落的流畅感,是开发人猿需要重点解决的问题,这对提升用户体验是…

Python:使用 np.lib.stride_tricks.sliding_window_view 将立方体切割成N个三维小块

函数说明:np.lib.stride_tricks.sliding_window_view(arr, window_shape) 参数说明: arr:要创建滑动窗口视图的数组。 window_shape:滑动窗口的形状,表示切割出的小块的大小。 作用:用于创建滑动窗口视图。…

软件设计模式与体系结构-软件体系-基于事件的软件体系结构

目录 三、基于事件的软件体系结构代码显式调用隐式调用事件系统软件体系结构的概念事件系统的连接机制 例子:图形用户界面事件系统调度策略1.带有分离的派遣模块的事件管理器 观察者模式类图观察者模式应用实例 课程作业 三、基于事件的软件体系结构 计算机中&…

Prototype Completion for Few-Shot Learning

小样本学习的目的是用很少的例子来识别新类。基于预训练的方法有效地解决了通过预训练一个特征提取器,然后通过最近的基于质心的元学习对其进行微调 (pretraining fine-tuning)。然而,结果表明微调步骤使边际改进。在本文中,1)我们找出原因,…

Kafka入门,漏消费和重复消费, 消费者事务,数据积压(二十四)

漏消费和重复消费 重复消费:已经消费了数据,但是offset没提交。 漏消费:先提交offset后消费,有可能会造成数据得漏消费 消费者事务 如果向完成consumer端得进准一次性消费,那么需要Kafka消费端将消费过程和提交offs…

Gradio库中的State模块:保存用户会话状态的神秘组件

❤️觉得内容不错的话,欢迎点赞收藏加关注😊😊😊,后续会继续输入更多优质内容❤️ 👉有问题欢迎大家加关注私戳或者评论(包括但不限于NLP算法相关,linux学习相关,读研读博…

Python Flask构建微信小程序订餐系统 (六)

🔥 账号管理 🔥 展示账户列表 默认情况下的账户列表布局 查询用户信息 查询 所有用户信息 按照 倒序 的方式查询出来 User.query.order_by( User.uid.desc() ).all() ......@route_account.route("/index") def index():#模版文件夹取名叫 "account/login…

【netty】Netty模型

工作原理 1)Netty抽象出两组线程池 BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写; 2)BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup 3)NioEventLoopGroup 相当于一个事件循环组, 这个组中含有多…

RabbitMQ系列(17)--延迟队列的简介与实现

1、延迟队列的概念 延迟队列内部是有序的,重要的特性体现在它的延迟属性上,延迟队列中的元素希望在指定时间到了之后或之前取出处理,简单的说延迟队列就是用来存放需要在指定时间被处理的元素的队列。 2、延迟队列的应用场景 (1)订单指定时…

Python实现本地电脑启动HTTP服务

在Python中,可以使用Python内置的http.server模块来启动一个简单的HTTP服务器。以下是一个简单的Python代码示例,实现本地电脑启动HTTP服务: import http.server import socketserverport 8081# 在当前目录下启动http服务器 Handler http.…

JavaWeb 速通JavaScript

目录 一、JavaScript快速入门 1.基本介绍 : 2.JavaScript特点 : 3.JavaScript使用方式 : 1 方式一 : 写在 2 方式二 : 以外部文件形式引入 PS : 注意事项 4.JavaScript查错方式 : 二、JavaScript数据类型 1.变量 : 2.数据类型 : 3.特殊值 : 三、JavaScript运算符 1.算…

解决forest低版本请求不安全的网站出现SSL认证不通过问题

文章目录 前言解决问题的步骤1、当然是百度2、官网3、看源码4、GPT5、直接去gitee上看源代码的问题 解决一解决二 前言 先说结论:无法解决 那既然无法解决,为啥还要写这样一篇文章呢,是因为这个问题我弄了一天多,我觉得有必要记…

SpringBoot开启子线程执行任务

目录 一、EnableAsync 二、Async 三、测试 一、EnableAsync 二、Async Service public class IotLocationServiceImpl {Asyncpublic void testA() {try {// 模拟阻塞Thread.sleep(5000);System.out.println("子线程执行完毕");} catch (InterruptedException e) {…

WPS Office AI实战:智能表格化身智能助理

前面我们已经拿 WPS AI 对Word文字、PPT幻灯片、PDF 做了开箱体验,还没有看过的小伙伴,请翻看以前的文章,本文开始对【智能表格】进行AI开箱测验。 表格在日常的数据处理中占绝对地位,但表格处理并不是每一个人都擅长,…