Springboot整合RabbitMq,详细使用步骤

news2025/1/9 17:10:53

Springboot整合RabbitMq,详细使用步骤

    • 1 添加springboot-starter依赖
    • 2 添加连接配置
    • 3 在启动类上添加开启注解`@EnableRabbit`
    • 4 创建RabbitMq的配置类,用于创建交换机,队列,绑定关系等基础信息。
    • 5 生产者推送消息
    • 6 消费者接收消息
    • 7 生产者的消息回调机制
    • 8 消费者的确认机制

消息队列(Message Queue)是一种应用间的通信方式。顾名思义,将消息放到队列中,排队发出。消息发布者只管把消息发布到MQ中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

而且消息队列一般有完整的接收确认,发布消息回调等一系列机制,可以确保接收方一定能接受。

用到的场景如:异步处理,应用解耦,流量削锋和消息通讯等。

以下先详细介绍下springboot项目怎么使用RabbitMq

1 添加springboot-starter依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2 添加连接配置

以下几项是最基础的配置,其他配置下面用到时额外添加

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest #默认用户名和密码
    password: guest
    virtual-host: /  # 虚拟主机

3 在启动类上添加开启注解@EnableRabbit

4 创建RabbitMq的配置类,用于创建交换机,队列,绑定关系等基础信息。

可以直接在java代码中通过注入实体类的方式创建交换机及队列等设备。但此方式添加的’设备‘是懒加载的形式,只要当使用到识别到监听注解或调用发送消息的方法时,才会真在rabbitmq中创建。

可以定位到amqp依赖的源码看到在程序启动的时候并不创建连接,只有在添加了监听注解启动程序或要发送消息时,才会走创建连接的方法。

配置类的示例代码如下:

@Configuration
public class RabbitConfig {
    /**
     * 队列
     */
    @Bean
    Queue createDirectQueue(){
        /**
         * durable:是否持久化,默认是false。true为持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在;false为暂存队列:当前连接有效。
         * exclusive:默认也是false。true是只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable。
         * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
         * 一般设置一下队列的持久化就好,其余两个就是默认false
         */
        //两种创建方式
        //QueueBuilder.durable("queue.test1").build();
        return new Queue("queue.test1",true,false,false);
    }

    /**
     *  交换机
     */
    @Bean
    DirectExchange createDirectExchange(){
        /**
         * durable、autoDelete参数性质和上面队列的一致
         */
        return new DirectExchange("direct.test1",true,false);
    }

    /**
     * 将队列和交换机绑定, 并设置用于匹配键
     */
    @Bean
    Binding binding(){
        return BindingBuilder.bind(createDirectQueue()).to(createDirectExchange()).with("testRoute");
    }

}

以上是以直连交换机为例,创建其他交换机写法一样,具体对应哪个实体类可以在Exchange接口 —>AbstractExchange实现类下看到。

在这里插入图片描述

可以通过客户端看到队列、交换机、路由关系已经创建成功

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

5 生产者推送消息

@Autowired
RabbitTemplate rabbitTemplate;

@PostMapping("/sendMessage")
public AjaxResult sendMessage(@RequestBody Map params) {
    String id = UUID.randomUUID().toString();
    String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    params.put("messageId",id);
    params.put("createTime",createTime);
    /**
     * 发给交换机,在发给路由绑定的队列
     */
    rabbitTemplate.convertAndSend("direct.test1","testRoute",params);
    return AjaxResult.success("成功");
}

可以看到,rabbitmq成功接收到消息。

在这里插入图片描述
在这里插入图片描述

6 消费者接收消息

@Component
@RabbitListener(queues = "queue.test1")
public class Receiver {
    @RabbitHandler
    public void process(Map message){
        System.out.printf("消费者接收到消息:" + message.toString());
    }
}

可以看到消息成功被消费,监听处理方法也成功被执行

在这里插入图片描述

​ 如果多个监听器监听同一个队列,是轮询的方式进行消费,不会出现重复消费的情况;如果多个队列同时以相同的路由绑定同一个交换机,消息会以复制的形式发送至每个队列。

7 生产者的消息回调机制

在实际运用中,作为消息的生产者,很多时候我们需要确认消息是否成功发送到了mq中。同时我们还需要知道,假如消息出现异常时的异常情况。为了满足这个业务场景,我们就需要配置消息回调。

  • 增加配置项

    spring:
     rabbitmq:
      publisher-confirm-type: correlated #消息发送成功交互
      publisher-returns: true
    

    可能之前老的版本是publisher-confirm:true,但现在写的话会发现变红了,说明过时了。因为在springboot的自动配置依赖里该配置级别已经为error

在这里插入图片描述

  • 目前回调包含发送成功回调ConfirmCallback和失败回调ReturnsCallback。一些老版本的可能有ReturnCallback。下面先自定义两个回调的回调方法

    ConfirmCallback的回调

    /**
     * 消息发送成功回调
     */
    public class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback {
    
        /**
         * 消息成功到达exchange,ack=true
         * @param correlationData
         * @param ack
         * @param s
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String s) {
            System.out.println("相关数据:" + correlationData);
            System.out.println("确认状态:" + ack);
            System.out.println("造成原因:" + s);
        }
    }
    

    ReturnsCallback的回调

    /**
     * 发生异常时的消息返回提醒
     */
    public class RabbitReturnsCallback implements RabbitTemplate.ReturnsCallback {
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            System.out.println("失败回调:" + returnedMessage);
        }
    }
    

    将自定义回调配置到模板中

    在Rabbit配置类中添加RabbitTemplate并配置两个回调

    @Configuration
    public class RabbitConfig {
    
        @Bean
        public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate();
            rabbitTemplate.setConnectionFactory(connectionFactory);
            //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
            rabbitTemplate.setMandatory(true);
    
            rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack());
            rabbitTemplate.setReturnsCallback(new RabbitReturnsCallback());
            return rabbitTemplate;
        }
    }
    

    那以上两种回调函数什么时候回执行呢?

    1. 消息发送到exchange,且传播到队列,则只有ConfirmCallback回调,ack=true
    2. 消息发送不到exchange,则只有ConfirmCallback回调,ack=false
    3. 消息发送到exchange,没传播到队列(或找不到路由),则ConfirmCallback回调,ack=true、ReturnsCallback回调

由此可见ConfirmCallback回调是exchange的一种反馈,是发生在生产者和交换机之间的,无论能不能发到都会回调。消息发送出去如果收到交换机的确认反馈则回调为成功,如果没有收到确认反馈,则回调为失败。

ReturnsCallback回调是队列的一种反馈,是发生在交换机和队列之间的。只有消息先到达交换机,且发送到队列失败才会执行此回调。

下面是对以上三种情况的测试

  • 消息完全成功发送到队列

    模拟:交换机和路由都存在

    @PostMapping("/sendMessage")
    public AjaxResult sendMessage(@RequestBody Map params) {
        String id = UUID.randomUUID().toString();
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        params.put("messageId",id);
        params.put("createTime",createTime);
        //direct.test1和testRoute都存在
        rabbitTemplate.convertAndSend("direct.test1","testRoute",params);
        return AjaxResult.success("成功");
    }
    

    消费者监听且ConfirmCallback回调为true
    在这里插入图片描述

  • 消息没有发送到exchange

    模拟:交换机不存在

    @PostMapping("/sendMessageFailByNoExchange")
        public AjaxResult sendMessageFailByNoExchange(@RequestBody Map params) {
            String id = UUID.randomUUID().toString();
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            params.put("messageId",id);
            params.put("createTime",createTime);
            //该交换机不存在
            rabbitTemplate.convertAndSend("direct.exchange不存在","testRoute",params);
            return AjaxResult.success("成功");
        }
    

    ConfirmCallback回调为false
    在这里插入图片描述

  • 消息发送到exchange,但没发送到队列

    模拟:该交换机存在但该路由不存在

    @PostMapping("/sendMessageFailByNoRoute")
    public AjaxResult sendMessageFailByNoRoute(@RequestBody Map params) {
        String id = UUID.randomUUID().toString();
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        params.put("messageId",id);
        params.put("createTime",createTime);
    	//交换机存在但该路由不存在
        rabbitTemplate.convertAndSend("direct.test1","failRoute",params);
        return AjaxResult.success("成功");
    }
    

    ConfirmCallback回调为true,ReturnsCallback失败回调执行
    在这里插入图片描述

可以通过两个回调确定哪些消息没有成功发送到队列,记录下来再次发送,保证消息不丢失。

8 消费者的确认机制

消费者和生产者不同,消费者本身就是凭自己喜好,符合条件才会消费。

所有消费者的确认机制有三种模式:

  1. 自动确认

    是默认的消息确认模式,即mq成功将消息发出,消费者成功接收到,就反馈确认。不管消费者是不是已经成功处理。

    所以如果处理逻辑抛出异常,就相当于丢失了消息。

    一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。

  2. 手动确认

  • 自动确认

    自动确认没什么好说的,消费者确认机制的默认模式就是auto,自动反馈确认,所以可以看到只要消息被消费了队列中就不存在了。

  • 手动确认

    消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功。

    • basic.ack:确认正确
    • basic.nack:拒绝确认,可以选择是否重新发回队列、是否批处理
    • basic.reject:拒绝确认,可以选择是否重新发回队列

    后两者对应的方法为channel.basicNackchannel.basicReject两者都表示消息没有被正常处理。其中有个参数requeue,选择是否重新入队,开启此项可以避免消息丢失。

    但开启要慎重,如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,导致消息积压。

    两者有略微的区别channel.basicNack可以拒绝多个消息,channel.basicReject只能拒绝一个

    下面看下代码怎么实现

    如果使用的是RabbitListener注解,需要将ackMode设置为手动模式ackMode="MANUAL"

    三个种情况分别对应下面 【1、2、3】三个方法

    @RabbitHandler
        @RabbitListener(queues = "queue.test1",ackMode = "MANUAL")
        public void processQueueTest1(Map param, Message message, Channel channel) throws IOException {
            /**
             * 【1 确认】
             * deliveryTag:消息的标识符
             * multiple:
             *     false:仅确认当前消息
             *     true:确认所有消息
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
             /**
             *  【2 拒绝】
             *  第一个参数是消息的唯一ID
             *  第二个参数表示是否批量处理
             *  第三个参数表示是否将消息重发回队列
             */
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
            /**
             * 【3 拒绝】
             * 第一个参数deliveryTag表示消息ID
             * 第二个参数为true表示是否重新入列,如果是true则重新丢回队列里等待再次消费,否则数据只是被消费,不会丢回队列里
             */
    		//channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
            System.out.println("queue.test1消费者接收到消息:" + param.toString());
            System.out.println("message:" + message);
            System.out.println("channel:" + channel);
        }
    

    channel.basicAck确认

    以上代码中channel.basicAck是消费者向rabbitmq发送确认消息。向queue.test1队列发送消息,此时开启了手动确认,如果不写此行,队列中会一直存在一条Unacked(未确认)的消息。
    在这里插入图片描述
    执行了channel.basicAck消息才会被消费,如下图已经无滞留消息。
    在这里插入图片描述
    channel.basicNack、channel.basicReject否认
    可以看到拒绝消息之后,因为requeue参数为true,消息会被重新入队,入队后再次等待被消费者消费。如果requeue设为false的话则队列中该消息就是已经被消费。一般情况可以单独记录下,在轮询发送到队列。
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/874469.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

优化堆排序(Java 实例代码)

目录 优化堆排序 Java 实例代码 src/runoob/heap/HeapSort.java 文件代码&#xff1a; 优化堆排序 上一节的堆排序&#xff0c;我们开辟了额外的空间进行构造堆和对堆进行排序。这一小节&#xff0c;我们进行优化&#xff0c;使用原地堆排序。 对于一个最大堆&#xff0c;首…

Azure概念介绍

云计算定义 云计算是一种使用网络进行存储和处理数据的计算方式。它通过将数据和应用程序存储在云端服务器上&#xff0c;使用户能够通过互联网访问和使用这些资源&#xff0c;而无需依赖于本地硬件和软件。 发展历史 云计算的概念最早可以追溯到20世纪60年代的时候&#x…

阿里云Alibaba Cloud Linux镜像系统介绍_常见问题解答FAQ

阿里云服务器操作系统Alibaba Cloud Linux镜像怎么样&#xff1f;可以代替CentOS吗&#xff1f;Alibaba Cloud Linux兼容性如何&#xff1f;有人维护吗&#xff1f;漏洞可以修复吗&#xff1f;Alibaba Cloud Linux完全兼容CentOS&#xff0c;并由阿里云官方免费提供长期维护。 …

数据统计与可视化的Dash应用程序

在数据分析和可视化领域&#xff0c;Dash是一个强大的工具&#xff0c;它结合了Python中的数据处理库&#xff08;如pandas&#xff09;和交互式可视化库&#xff08;如Plotly&#xff09;以及Web应用程序开发框架。本文将介绍如何使用Dash创建一个简单的数据统计和可视化应用程…

SpringBoot复习:(44)MyBatisAutoConfiguration

可以看到MyBatisAutoConfiguration引入了MyBatisProperties这个属性&#xff1a; MyBatisAutoConfiguration中配置了一个SqlSessionFactoryBean,代码如下&#xff1a; 可以配置mybatis-config.xml,需要配置文件里指定&#xff1a; mybatis.config-locationclasspath:/mybat…

ImportError: cannot import name ‘MutableMapping‘ from ‘collections‘解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

【FreeRtos基础入门】任务状态

文章目录 前言一、任务状态1.阻塞状态(Blocked)2.暂停状态(Suspended)3.就绪状态(Ready) 总结 前言 本freertos使用stm32系列单片机&#xff0c;使用其他的也可以&#xff0c;如esp系列等… 任务管理是实时操作系统&#xff08;RTOS&#xff09;的核心功能之一&#xff0c;它…

VMware Workstation中安装了Windows7系统但是VMware Tools选项为灰色及无法安装的解决方法

一、问题描述 当我们在使用VMware Workstation安装好了Windows7系统后;该安装好的Windows7系统并不能自动适配WMware的界面,只能在中间显示很小的一部分内容;此时我们就需要给Windows7系统安装VMware Tools工具; 问题一:WMware中的【安装VMware Tools】选项则是灰色的无法…

最强自动化测试框架Playwright(21)-测试生成器inspector

测试生成器 运行该命令时&#xff0c;将打开两个窗口&#xff0c;一个浏览器窗口&#xff0c;可以在其中与要测试的网站进行交互&#xff0c;另一个是Playwright Inspector窗口&#xff0c;可以在其中记录测试&#xff0c;然后将其复制到编辑器中。 使用该命令运行测试生成器…

7.9 SpringBoot实战 拷贝工具类,扩展BeanUtils.copyProperties

文章目录 前言一、拷贝普通对象Bean1.1 基础的Bean拷贝1.2 支持忽略某些属性1.3 支持忽略字段值为null的属性1.4 通用的Bean拷贝1.4.1 拷贝时可指定忽略属性1.4.2 拷贝时外加忽略null属性 二、拷贝集合对象List2.1 拷贝时可指定忽略属性2.2 拷贝时外加忽略null属性 三、拷贝分页…

HOT92-最小路径和

leetcode原题链接&#xff1a;最小路径和 题目描述 给定一个包含非负整数的 m x n 网格 grid &#xff0c;请找出一条从左上角到右下角的路径&#xff0c;使得路径上的数字总和为最小。 说明&#xff1a;每次只能向下或者向右移动一步。 示例 1&#xff1a; 输入&#xff1a;…

基础堆排序(Java 实例代码)

目录 基础堆排序 一、概念及其介绍 二、适用说明 三、过程图示 四、Java 实例代码 src/runoob/heap/Heapify.java 文件代码&#xff1a; 基础堆排序 一、概念及其介绍 堆排序&#xff08;Heapsort&#xff09;是指利用堆这种数据结构所设计的一种排序算法。 堆是一个近…

① vue复习。从安装到使用

vue官网&#xff1a;cn.vuejs.org vue安装 cnpm install -g vue/cli 查看是否安装成功 vue --version 创建一个项目 vue create vue-demo(项目名称) 这个取消掉。空格可选中或者取消。 运行项目&#xff1a; cd 进入到项目下 npm run serve 运行成功后&#xff0c;访问这…

面对算力瓶颈,如何利用CPU解决全链路智能编码?

编者按&#xff1a;英特尔是半导体行业和计算创新领域的全球领先厂商。与合作伙伴一起&#xff0c;英特尔推动了人工智能、5G、智能边缘等转折性技术的创新和应用突破&#xff0c;驱动智能互联世界。不久前&#xff0c;英特尔正式发布了第四代英特尔至强可扩展处理器&#xff0…

计算机网络-物理层(二)- 传输方式

计算机网络-物理层&#xff08;二&#xff09;- 传输方式 串型传输与并行传输 串行传输:是指数据是一个比特一个比特依次发送的&#xff0c;因此在发送端和接收端之间&#xff0c;只需要一条数据传输线路即可 并行传输:是指一次发送n个比特而不是一个比特&#xff0c;因此发送…

前端架构师的能力要求:打造可靠、灵活和可扩展的Web应用

随着互联网技术迅猛发展&#xff0c;现代Web应用程序变得越来越复杂且功能强大。作为一名前端架构师&#xff0c;在这个快节奏且竞争激烈的环境中&#xff0c;你需要具备广泛而深入地技术知识&#xff0c;并且有能力设计、开发和维护高度可靠、灵活和可扩展性强的Web应用。 深入…

popen/pclose 函数

函数作用 如果说system在一定程度上是execl的优化版&#xff0c;那么popen就一定程度上是system的优化版&#xff0c;使用popen不仅可以运行代码&#xff0c;还可以获取运行的输出结果&#xff08;但是system和exec族函数还是非常重要的&#xff0c;也有自己的特定应用场景&am…

从0开始搭建ns3环境以及NetAnim简单使用

一、环境准备 ns3是基于GNU/Linux平台使用C开发的工具软件&#xff0c;在windows系统中安装使用ns3环境&#xff0c;可以使用虚拟机VMware并安装ubuntu系统来实现&#xff0c;现将本教程所用到的虚拟机和系统镜像放到网盘提供下载 名称链接提取码VMware Workstation 17 Proht…

Docker镜像查看下载删除镜像文件的相关命令

1.镜像相关命令 本地查看有哪些镜像文件&#xff1a; docker images镜像的名称就是我们常见的一些软件&#xff0c;镜像相当于把软件和软件所需要的运行环境打包到一个镜像文件里面&#xff0c;将来在通过这个镜像文件创建出对应的容器&#xff0c;容器有了以后这些软件自动的…

system函数

函数作用 执行一个shell的命令 其实通过查看system函数的源码就会发现&#xff0c;system函数调用后会进行一次fork&#xff0c;然后就会在子进程中运行“execl("/bin/sh", "sh", "-c", command, (char *) 0);” 而“sh -c XXX” 这个命令&…