Springboot整合RabbitMq,详细步骤

news2025/1/12 6:00:06

Springboot整合RabbitMq,详细步骤

    • 1 添加springboot-starter依赖
    • 2 添加连接配置
    • 3 在启动类上添加开启注解`@EnableRabbit`
    • 4 创建RabbitMq的配置类,用于创建交换机,队列,绑定关系等基础信息。
    • 5 生产者推送消息
    • 6 消费者接收消息
    • 7 生产者的消息回调机制
    • 8 消费者的确认机制

消息队列(Message Queue)是一种应用间的通信方式。顾名思义,将消息放到队列中,排队发出。消息发布者只管把消息发布到MQ中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

而且消息队列一般有完整的接收确认,发布消息回调等一系列机制,可以确保接收方一定能接受。

用到的场景如:异步处理,应用解耦,流量削锋和消息通讯等。

以下先详细介绍下springboot项目怎么使用RabbitMq

1 添加springboot-starter依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2 添加连接配置

以下几项是最基础的配置,其他配置下面用到时额外添加

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest #默认用户名和密码
    password: guest
    virtual-host: /  # 虚拟主机

3 在启动类上添加开启注解@EnableRabbit

4 创建RabbitMq的配置类,用于创建交换机,队列,绑定关系等基础信息。

可以直接在java代码中通过注入实体类的方式创建交换机及队列等设备。但此方式添加的’设备‘是懒加载的形式,只要当使用到识别到监听注解或调用发送消息的方法时,才会真在rabbitmq中创建。

可以定位到amqp依赖的源码看到在程序启动的时候并不创建连接,只有在添加了监听注解启动程序或要发送消息时,才会走创建连接的方法。

配置类的示例代码如下:

@Configuration
public class RabbitConfig {
    /**
     * 队列
     */
    @Bean
    Queue createDirectQueue(){
        /**
         * durable:是否持久化,默认是false。true为持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在;false为暂存队列:当前连接有效。
         * exclusive:默认也是false。true是只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable。
         * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
         * 一般设置一下队列的持久化就好,其余两个就是默认false
         */
        //两种创建方式
        //QueueBuilder.durable("queue.test1").build();
        return new Queue("queue.test1",true,false,false);
    }

    /**
     *  交换机
     */
    @Bean
    DirectExchange createDirectExchange(){
        /**
         * durable、autoDelete参数性质和上面队列的一致
         */
        return new DirectExchange("direct.test1",true,false);
    }

    /**
     * 将队列和交换机绑定, 并设置用于匹配键
     */
    @Bean
    Binding binding(){
        return BindingBuilder.bind(createDirectQueue()).to(createDirectExchange()).with("testRoute");
    }

}

以上是以直连交换机为例,创建其他交换机写法一样,具体对应哪个实体类可以在Exchange接口 —>AbstractExchange实现类下看到。

在这里插入图片描述

可以通过客户端看到队列、交换机、路由关系已经创建成功

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

5 生产者推送消息

@Autowired
RabbitTemplate rabbitTemplate;

@PostMapping("/sendMessage")
public AjaxResult sendMessage(@RequestBody Map params) {
    String id = UUID.randomUUID().toString();
    String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    params.put("messageId",id);
    params.put("createTime",createTime);
    /**
     * 发给交换机,在发给路由绑定的队列
     */
    rabbitTemplate.convertAndSend("direct.test1","testRoute",params);
    return AjaxResult.success("成功");
}

可以看到,rabbitmq成功接收到消息。

在这里插入图片描述
在这里插入图片描述

6 消费者接收消息

@Component
@RabbitListener(queues = "queue.test1")
public class Receiver {
    @RabbitHandler
    public void process(Map message){
        System.out.printf("消费者接收到消息:" + message.toString());
    }
}

可以看到消息成功被消费,监听处理方法也成功被执行

在这里插入图片描述

​ 如果多个监听器监听同一个队列,是轮询的方式进行消费,不会出现重复消费的情况;如果多个队列同时以相同的路由绑定同一个交换机,消息会以复制的形式发送至每个队列。

7 生产者的消息回调机制

在实际运用中,作为消息的生产者,很多时候我们需要确认消息是否成功发送到了mq中。同时我们还需要知道,假如消息出现异常时的异常情况。为了满足这个业务场景,我们就需要配置消息回调。

  • 增加配置项

    spring:
     rabbitmq:
      publisher-confirm-type: correlated #消息发送成功交互
      publisher-returns: true
    

    可能之前老的版本是publisher-confirm:true,但现在写的话会发现变红了,说明过时了。因为在springboot的自动配置依赖里该配置级别已经为error

在这里插入图片描述

  • 目前回调包含发送成功回调ConfirmCallback和失败回调ReturnsCallback。一些老版本的可能有ReturnCallback。下面先自定义两个回调的回调方法

    ConfirmCallback的回调

    /**
     * 消息发送成功回调
     */
    public class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback {
    
        /**
         * 消息成功到达exchange,ack=true
         * @param correlationData
         * @param ack
         * @param s
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String s) {
            System.out.println("相关数据:" + correlationData);
            System.out.println("确认状态:" + ack);
            System.out.println("造成原因:" + s);
        }
    }
    

    ReturnsCallback的回调

    /**
     * 发生异常时的消息返回提醒
     */
    public class RabbitReturnsCallback implements RabbitTemplate.ReturnsCallback {
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            System.out.println("失败回调:" + returnedMessage);
        }
    }
    

    将自定义回调配置到模板中

    在Rabbit配置类中添加RabbitTemplate并配置两个回调

    @Configuration
    public class RabbitConfig {
    
        @Bean
        public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate();
            rabbitTemplate.setConnectionFactory(connectionFactory);
            //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
            rabbitTemplate.setMandatory(true);
    
            rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack());
            rabbitTemplate.setReturnsCallback(new RabbitReturnsCallback());
            return rabbitTemplate;
        }
    }
    

    那以上两种回调函数什么时候回执行呢?

    1. 消息发送到exchange,且传播到队列,则只有ConfirmCallback回调,ack=true
    2. 消息发送不到exchange,则只有ConfirmCallback回调,ack=false
    3. 消息发送到exchange,没传播到队列(或找不到路由),则ConfirmCallback回调,ack=true、ReturnsCallback回调

由此可见ConfirmCallback回调是exchange的一种反馈,是发生在生产者和交换机之间的,无论能不能发到都会回调。消息发送出去如果收到交换机的确认反馈则回调为成功,如果没有收到确认反馈,则回调为失败。

ReturnsCallback回调是队列的一种反馈,是发生在交换机和队列之间的。只有消息先到达交换机,且发送到队列失败才会执行此回调。

下面是对以上三种情况的测试

  • 消息完全成功发送到队列

    模拟:交换机和路由都存在

    @PostMapping("/sendMessage")
    public AjaxResult sendMessage(@RequestBody Map params) {
        String id = UUID.randomUUID().toString();
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        params.put("messageId",id);
        params.put("createTime",createTime);
        //direct.test1和testRoute都存在
        rabbitTemplate.convertAndSend("direct.test1","testRoute",params);
        return AjaxResult.success("成功");
    }
    

    消费者监听且ConfirmCallback回调为true
    在这里插入图片描述

  • 消息没有发送到exchange

    模拟:交换机不存在

    @PostMapping("/sendMessageFailByNoExchange")
        public AjaxResult sendMessageFailByNoExchange(@RequestBody Map params) {
            String id = UUID.randomUUID().toString();
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            params.put("messageId",id);
            params.put("createTime",createTime);
            //该交换机不存在
            rabbitTemplate.convertAndSend("direct.exchange不存在","testRoute",params);
            return AjaxResult.success("成功");
        }
    

    ConfirmCallback回调为false
    在这里插入图片描述

  • 消息发送到exchange,但没发送到队列

    模拟:该交换机存在但该路由不存在

    @PostMapping("/sendMessageFailByNoRoute")
    public AjaxResult sendMessageFailByNoRoute(@RequestBody Map params) {
        String id = UUID.randomUUID().toString();
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        params.put("messageId",id);
        params.put("createTime",createTime);
    	//交换机存在但该路由不存在
        rabbitTemplate.convertAndSend("direct.test1","failRoute",params);
        return AjaxResult.success("成功");
    }
    

    ConfirmCallback回调为true,ReturnsCallback失败回调执行
    在这里插入图片描述

可以通过两个回调确定哪些消息没有成功发送到队列,记录下来再次发送,保证消息不丢失。

8 消费者的确认机制

消费者和生产者不同,消费者本身就是凭自己喜好,符合条件才会消费。

所有消费者的确认机制有三种模式:

  1. 自动确认

    是默认的消息确认模式,即mq成功将消息发出,消费者成功接收到,就反馈确认。不管消费者是不是已经成功处理。

    所以如果处理逻辑抛出异常,就相当于丢失了消息。

    一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。

  2. 手动确认
    后面还会详细介绍两者确认机制的用法及代码实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/870430.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

闭环控制方法及其应用:优缺点、场景和未来发展

闭环控制是一种基本的控制方法&#xff0c;它通过对系统输出与期望值之间的误差进行反馈&#xff0c;从而调整系统输入&#xff0c;使系统输出更加接近期望值。闭环控制的主要目标是提高系统的稳定性、精确性和鲁棒性。在实际应用中&#xff0c;闭环控制有多种方法&#xff0c;…

开源代码分享(13)—整合本地电力市场与级联批发市场的投标策略(附matlab代码)

1.引言 1.1摘要 本地电力市场是在分配层面促进可再生能源的效率和使用的一种有前景的理念。然而&#xff0c;作为一个新概念&#xff0c;如何设计和将这些本地市场整合到现有市场结构中&#xff0c;并从中获得最大利润仍然不清楚。在本文中&#xff0c;我们提出了一个本地市场…

linux添加磁盘

一、linux虚拟机添加一块新的硬盘 四步&#xff1a; &#xff08;1&#xff09; &#xff08;2&#xff09;为硬盘进行分区 &#xff08;3&#xff09;初始化硬盘分区 &#xff08;4&#xff09;挂载 在虚拟机上添加一块硬盘 (1)、 虚拟机添加一块新的硬盘作为数据盘 (2) ls…

Idea Live Template 功能总结

文章目录 Java自带的template属性模板psf——public static finalpsfi——public static final intpsfi——public static final StringSt——String 方法模板psvm——main方法sout——打印语句iter——for迭代循环fori——for循环 代码块模板if-e —— if elseelse-if 自定义自…

中国首款量子计算机操作系统本源司南 PilotOS正式上线

中国安徽省量子计算工程研究中心近日宣布&#xff0c;中国国产量子计算机操作系统本源司南 PilotOS 客户端正式上线。 如果把量子芯片比喻成人的“心脏”&#xff0c;那么量子计算机操作系统就相当于人的“大脑”&#xff0c;量子计算应用软件则是人的“四肢”。 据安徽省量子…

Linux 终端命令之文件浏览(1) cat

Linux 文件浏览命令 cat, more, less, head, tail&#xff0c;此五个文件浏览类的命令皆为外部命令。 hannHannYang:~$ which cat /usr/bin/cat hannHannYang:~$ which more /usr/bin/more hannHannYang:~$ which less /usr/bin/less hannHannYang:~$ which head /usr/bin/he…

论文总结《Towards Evaluating the Robustness of Neural Networks(CW)》

原文链接 C&W 这篇论文更像是在讲一个优化问题&#xff0c;后面讲述如何针对生成对抗样本的不可解问题近似为一个可解的问题&#xff0c;很有启发。本文后面将总结论文各个部分的内容。 Motivation 文章提出了一个通用的设计生成对抗样本的方法&#xff0c;根据该论文提…

YAPi在线接口文档简单案例(结合Vue前端Demo)

在前后端分离开发中&#xff0c;我们都是基于文档进行开发&#xff0c;那前端人员有时候无法马上拿到后端的数据&#xff0c;该怎么办&#xff1f;我们一般采用mock模拟伪造数据直接进行测试&#xff0c;本篇文章主要介绍YApi在线接口文档的简单使用&#xff0c;并结合Vue的小d…

【C++学习】STL容器——stack和queue

目录 一、stack的介绍和使用 1.1 stack的介绍 1.2 stack的使用 1.3 stack的模拟实现 二、queue的介绍和使用 2.1 queue的介绍 2.2 queue的使用 2.3 queue的模拟实现 三、priority_queue的介绍和使用 3.1 priority_queue的介绍和使用 3.2 priority_queue的使用 3.4 p…

【Powershell 】(Windows下)常用命令 | 命令别名 | 运行Windows命令行工具 | 运行用户程序(vim、gcc、gdb)

微软官方Powershell文档&#xff1a;https://learn.microsoft.com/zh-cn/powershell/ 命令详细说明&#xff0c;在PDF的最后面&#xff1a; 一、Powershell及命令简介1.1 命令格式1.2 命令的别名 二、cmdlet别名三、cmdlet分类介绍3.1 基础命令1. Get-Command2. Get-Help3. S…

[HDLBIts] Exams/m2014 q4j

Implement the following circuit: ("FA" is a full adder) module top_module (input [3:0] x,input [3:0] y, output [4:0] sum);assign sumxy; endmodule

C数据结构与算法——无向图(邻接矩阵) 应用

实验任务 (1) 掌握图的邻接矩阵存储及基本算法&#xff1b; (2) 掌握该存储方式下的DFS和BFS算法。 实验内容 实现图的邻接矩阵存储结构实现基于邻接矩阵的相关算法及遍历算法 实验源码 #include <malloc.h> #include <stdio.h>#define MAXSIZE 1000 #define …

SpringBoot07——VueX

共享组件之间的数据&#xff0c;集中管理 这一部分某人要打ow我就跳过没看了&#xff0c;哼&#xff0c;都怪某人

【机器学习4】构建良好的训练数据集——数据预处理(一)处理缺失值及异常值

数据预处理 &#x1f4ab;数据预处理的重要性&#x1f4ab;处理缺失值⭐️识别表格中的数据⭐️计算每列缺失值的数量⭐️删除含有缺失值的样本或特征⭐️填充缺失值 &#x1f4ab;处理异常值⭐️异常值的鉴别⭐️异常值的处理 &#x1f4ab;将数据集划分为训练数据集和测试数据…

华为网络篇 RIP的Slient-Interface-26

难度1复杂度 1 目录 一、实验原理 二、实验拓扑 三、实验步骤 四、实验过程 总结 一、实验原理 在默认情况下&#xff0c;RIP会在所有的接口泛洪路由更新信息&#xff08;整个路由表&#xff09;&#xff0c;这里有一个问题&#xff0c;当RIP路由器连接的是一个末端网络时…

基层社会治理平台建设方案[113页PPT]

导读&#xff1a;原文《基层社会治理平台建设方案[113页PPT]》&#xff08;获取来源见文尾&#xff09;&#xff0c;本文精选其中精华及架构部分&#xff0c;逻辑清晰、内容完整&#xff0c;为快速形成售前方案提供参考。 完整版领取方式 完整版领取方式&#xff1a; 如需获取完…

Python(八十二)字符串的常用操作——替换与合并

❤️ 专栏简介&#xff1a;本专栏记录了我个人从零开始学习Python编程的过程。在这个专栏中&#xff0c;我将分享我在学习Python的过程中的学习笔记、学习路线以及各个知识点。 ☀️ 专栏适用人群 &#xff1a;本专栏适用于希望学习Python编程的初学者和有一定编程基础的人。无…

FreeRTOS(二值信号量)

资料来源于硬件家园&#xff1a;资料汇总 - FreeRTOS实时操作系统课程(多任务管理) 目录 一、信号量的概念 1、信号量的基本概念 2、信号量的分类 二、二值信号量的定义与应用 1、二值信号量的定义 2、二值信号量的应用 三、二值信号量的运作机制 1、FreeRTOS任务间二值…

应用冷启bindservice耗时

背景&#xff1a;sdk初始化的时候耗时过长&#xff0c;而sdk,init方法中只有一个bindservice及一些变量的初始化&#xff0c;却好事100ms 查看trace发现binderservice耗时只占init耗时的一小部分&#xff0c;但是init逻辑并没有其他代码。 这里servicebind返回快的另一原因是se…

【Java】线程数据共享和安全 -ThreadLocal

&#x1f384;欢迎来到边境矢梦的csdn博文&#x1f384; &#x1f384;本文主要梳理线程数据共享和安全 -ThreadLocal&#x1f384; &#x1f308;我是边境矢梦&#xff0c;一个正在为秋招和算法竞赛做准备的学生&#x1f308; &#x1f386;喜欢的朋友可以关注一下&#x1faf…