Springboot整合RabbitMQ并使用

news2024/11/15 13:25:45

1、Springboot整合RabbitMQ

1、引入场景启动器

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

引入AMQP场景启动器之后,RabbitAutoConfiguration就会自动生效。然后会给容器中自动配置了RabbitTemplateAmqpAdminCachingConnectionFactoryRabbitMessagingTemplate等来方便使用AMQP。

2、在yml中配置spring.rabbitmq相关信息

spring:
  rabbitmq:
    host: 192.168.56.10
    port: 5672
    username: admin
    password: admin
    virtual-host: my_vhost

2、简单使用

2.1 创建交换机(Exchange)、队列(Queue)和建立绑定关系(Binding)

@SpringBootTest
public class AmqpAdminTest {

    @Autowired
    private AmqpAdmin amqpAdmin;
    /**
     * 1、如何创建Exchange、Queue、Binding
     *      1)、使用AmqpAdmin进行创建
     */
    @Test
    public void creatExchange() {
        //创建 名为 itcxc.java.direct 的交换机
        DirectExchange directExchange = new DirectExchange("itcxc.java.direct");
        amqpAdmin.declareExchange(directExchange);
    }

    @Test
    public void creatQueue() {
        //创建名为 itcxc.java 的队列
        Queue queue = new Queue("itcxc.java");
        amqpAdmin.declareQueue(queue);
    }

    @Test
    public void creatBinding() {
        //创建绑定关系 将队列itcxc.java绑定到交换机itcxc.java.direct,routingKey为itcxc.java
        Binding binding = new Binding("itcxc.java", Binding.DestinationType.QUEUE,
                "itcxc.java.direct","itcxc.java",null);
        amqpAdmin.declareBinding(binding);
    }
}

2.1.1 交换机类型:

在这里插入图片描述
direct:会将消息发送给路由键必须完全匹配的队列中。
fanout:会将消息发送给所有绑定的队列中,不管路由键是否匹配。
topic:主体模式其实就是在路由模式的基础上,支持了对key的通配符匹配(星号以及井号),以满足更加复杂的消息分发场景。(#匹配零个或者多个单词,*匹配一个单词,每个单词用.分割)

2.2 发送消息

@SpringBootTest
public class RabbitTemplateTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     *  2、如何发消息
     *      1)、使用rabbitTemplate发送消息
     */
    @Test
    public void sendMessageTest(){
        OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
        orderReturnReasonEntity.setId(1L);
        orderReturnReasonEntity.setName("哈哈");
        orderReturnReasonEntity.setCreateTime(new Date());
        //1、发送消息
        // 默认情况下,如果发送的消息是一个对象,我们会使用序列化机制,将对象写出去,对象必须实现Serializable接口
        // 但是我们可以通过向容器中注入Jackson2JsonMessageConverter转换器将序列化机制改为转JSON
        rabbitTemplate.convertAndSend("itcxc.java.direct","itcxc.java", orderReturnReasonEntity);
    }
}

2.2.1 替换消息系列化方式

通过观看RabbitTemplate的源码发现,我们在默认情况下消息系列化方式是JDK序列化方式。那么我们发送的消息如果是一个对象时,这个对象就必须实现Serializable接口。
在这里插入图片描述
在这里插入图片描述

如何使用转JSON的方式序列化消息呢?

通过观察RabbitAutoConfiguration源码发现,在创建RabbitTemplate的时候,会从容器中拿消息序列化器(MessageConverter)。

在这里插入图片描述

所以我们想要将转JSON的方式序列化消息,只需要给容器中放一个Jackson2JsonMessageConverter就可以了

@Component
public class GulimallRabbitMqConfig {

    /**
     * 消息转换器 指定消息转换的方式为转为JSON
     * @return
     */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

}

2.3 获取消息

2.3.1 在启动类或者配置类上添加@EnableRabbit注解

使用@RabbitListener必须开启@EnableRabbit,如果没有使用@RabbitListener可以不添加@EnableRabbit注解。

2.3.2 添加@RabbitListener

@RabbitListener:可以标注在类和方法上 (监听哪些队列)
@RabbitHandler:只能标注在方法上 (重载区别不同的消息)

@Service
public class RabbitListeners {

    /**
     * queues:声明需要监听的所有队列
     *
     * 接收参数的类型:
     * 1、org.springframework.amqp.core.Message
     * 2、直接写原来发送的消息类型
     * 3、Channel 当前传送数据的通道
     *
     * @param message
     */
    @RabbitListener(queues = {"itcxc.java"})
    public void receiveMessage(Message message, OrderReturnReasonEntity orderReturnReasonEntity,
                               Channel channel){
        System.out.println("接收到的消息为:" + orderReturnReasonEntity);
    }
}

现在有一个情况,就是我们给同一个消息对象发送的消息是有可能不是同一个类型的。例如:

@Test
public void sendMq(){
    for (int i = 0; i < 10; i++) {
        if (i % 2 == 0){
            OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
            orderReturnReasonEntity.setId(1L);
            orderReturnReasonEntity.setName("哈哈");
            orderReturnReasonEntity.setCreateTime(new Date());
            rabbitTemplate.convertAndSend("itcxc.java.direct","itcxc.java", orderReturnReasonEntity);
        } else {
            OrderEntity orderEntity = new OrderEntity();
            orderEntity.setOrderSn(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend("itcxc.java.direct","itcxc.java", orderEntity);
        }
    }
}

在这个情况下如果我们还是用原来的方式监听消息的话,就会使发送的消息类型为OrderEntity的消息丢失。

在这里插入图片描述
这个时候我们就可以,将@RabbitListener标注在类上,然后@RabbitHandler标注在方法上

@Service
@RabbitListener(queues = {"itcxc.java"})
public class RabbitListeners {

    /**
     * queues:声明需要监听的所有队列
     *
     * 接收参数的类型:
     * 1、org.springframework.amqp.core.Message
     * 2、直接写原来发送的消息类型
     * 3、Channel 当前传送数据的通道
     *
     * @param message
     */
    @RabbitHandler
    public void receiveMessage(Message message, OrderReturnReasonEntity orderReturnReasonEntity,
                               Channel channel){
        System.out.println("接收到的消息为:" + orderReturnReasonEntity);
    }

    @RabbitHandler
    public void receiveMessage(OrderEntity orderEntity){
        System.out.println("接收到的消息为:" + orderEntity);
    }
}

在这里插入图片描述

3、消息的可靠传递

3.1 发送端确认

为什么会丢失消息?

  • 在发送消息到服务端的时候,有可能因为网络等等问题,没有将消息发送到服务端。
  • 交换机(Exchange)通过路由键将消息发送给队列(Queue)的时候有可能没有找到相应的队列(Queue),而默认情况下是将消息直接丢弃的。

3.1.1 开启confirm和return机制

spring:
  rabbitmq:
    # 消息发送到broker后的回调 
    publisher-confirm-type: correlated
    # 没有设置mandatory时生效
    publisher-returns: true
    # mandatory的优先级高于publisher-returns,只要设置了mandatory,publisher-returns就失效了
    template:
      mandatory: true

我翻看源码可以发现mandatory的优先级高于publisher-returns,只要设置了mandatorypublisher-returns就失效了。

在这里插入图片描述
在这里插入图片描述
但是经过测试,我发现mandatory,只有在publisher-confirm-typepublisher-returns至少有一个设置才会生效。如果mandatorypublisher-returns同时存在的话,则mandatory优先级高于publisher-returns

3.1.2 添加回调方法

@Component
@RequiredArgsConstructor
public class RabbitMqCallback {

    private final RabbitTemplate rabbitTemplate;

    /**
     * RabbitMqCallback 创建完成之后,执行这个方法
     * @return
     */
    @PostConstruct
    public RabbitTemplate initCallback() {

        /**
         * 需要设置spring.rabbitmq.publisher-confirm-type=correlated
         * 消息发broker成功回调:发送到broker的exchange是否正确找到
         * correlationData:当前消息的唯一关联数据(这个是消息的唯一ID)
         * ack:消息是否发送成功
         * cause:失败的原因,成功则返回null
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            System.out.println("setConfirmCallback 消息数据:" + correlationData);
            if (Objects.nonNull(correlationData)) {
                System.out.println("setConfirmCallback 消息数据:" + correlationData.getReturnedMessage());
            }
            System.out.println("setConfirmCallback 消息确认:" + ack);
            System.out.println("setConfirmCallback 原因:" + cause);
            System.out.println("-----------------------------------");
        });

        /**
         * 需要设置spring.rabbitmq.template.mandatory=true或spring.rabbitmq.publisher-returns=true 才会有回调
         * 消息路由回调:从交换器路由到队列是否正确发送
         * message:投递失败消息的详细消息
         * replyCode:回应码
         * replyText:回应信息
         * exchange:当时这个消息发送给的交换器
         * routingKey:当时这个消息发送用的路由键
         */
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.out.println("setReturnCallback 消息:" + message);
            System.out.println("setReturnCallback 回应码:" + replyCode);
            System.out.println("setReturnCallback 回应信息:" + replyText);
            System.out.println("setReturnCallback 交换器:" + exchange);
            System.out.println("setReturnCallback 路由键:" + routingKey);
            System.out.println("-----------------------------------");
        });

        return rabbitTemplate;
    }
}

这样我们就可以知道哪些消息发送成功,哪些消息发送失败了,然后就可以做出相应的处理。

3.2 消费端确认

为什么会丢失消息?

默认情况下只要收到消息,客户端会自动确认,然后服务端就会移除这个消息。由于客户端会一次性接收很多的消息。
在这个情况下,就有可能我们接收了10个消息,只处理了前面2个消息,然后服务宕机了,这样就会使得我们有8个消息丢失。

3.2.1 设置ACK应答机制为手动

spring:
  rabbitmq:
    # 设置ACK应答机制为手动
    listener:
      simple:
        acknowledge-mode: manual

手动模式,只要我们没有明确的告诉MQ,消息被签收(没有ACK),消息就是一直处于unacked状态,即使客户端宕机了,消息也不会丢失,会重新变为ready,下次有新的客户端连接进来就发给新的客户端。

3.2.2 处理完消息之后手动应答

 @RabbitListener(queues = {"itcxc.java"})
 public void receiveMessage(Message message, OrderReturnReasonEntity orderReturnReasonEntity,
                            Channel channel) throws IOException {
     //deliveryTag通道(channel)内自增的
     long deliveryTag = message.getMessageProperties().getDeliveryTag();
     System.out.println("方法一deliveryTag为"+deliveryTag+"接收到的消息为:" + orderReturnReasonEntity);
     //确认签收参数说明(deliveryTag,是否批量签收)
     channel.basicAck(deliveryTag,false);
     //拒绝签收参数说明(deliveryTag,是否批量签收,是否放回队列中)
     //channel.basicNack(deliveryTag,false,true);
 }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/391280.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

变电站应用监控系统6大优势,你知道几个?

变电站是改变电压、控制和分配电能的场所&#xff0c;发展方向是自动化、数字化、网络化、智能化。 泛地缘科技针对电力行业用户的使用特点&#xff0c;利用电网现有的网络资源&#xff0c;推出集动力监控、环境监理、门禁系统、消防系统、视频监控于一身的物联网云盒监控系统。…

05-思维导图Xmind快速入门

文章目录5.1 认识思维导图5.2 Xmind的主要结构及主题元素5.2.1 Xmind的多种结构5.2.2 主题分类5.2.3 Xmind的主题元素章节总结5.1 认识思维导图 什么是思维导图&#xff1f; 思维导图是一种将思维进行可视化的实用工具。 具体实现方法是用一个关键词去引发相关想法&#xff0…

Hive实战 --- 电子商务消费行为分析

目录 数据结构 Customer表 Transaction表 Store表 Review表 上传数据 创建目录用于存放数据 把本地文件上传到HDFS上 创建外部表 创建数据库 创建表 数据清洗 对transaction_details中的重复数据生成新ID 过滤掉store_review中没有评分的数据 找出PII (personal …

位图/布隆过滤器/海量数据处理方式

位图 位图的概念 所谓位图&#xff0c;就是用每一位来存放某种状态&#xff0c;适用于海量数据&#xff0c;数据无重复的场景。通常是用来判断某个数据存不存在的。 直接来看问题&#xff1a; 给40亿个不重复的无符号整数&#xff0c;没排过序。给一个无符号整数&#xff0…

Gradle+SpringBoot多模块开发

关于使用Gradle结合SpringBoot进行多模块开发。 本来是打算使用buildSrc之类的&#xff0c;但是感觉好像好麻烦&#xff0c;使用这种方法就可以实现&#xff0c;没必要采用其他的。 我不怎么会表述&#xff0c;可能写的跟粑粑一样&#xff0c;哈哈哈哈 这是我的项目地址。 存在…

Java速成篇-Day01笔记

提示&#xff1a;这里只记录我个人不熟悉的知识&#xff0c;并非所有内容 笔记目录课程&#xff1a;04-第一行代码① jshell② 对象.方法课程&#xff1a;05-第一份源码① Java开发程序的流程② 入口方法课程&#xff1a;06-常见问题-中文乱码① 乱码原因② 解决方法课程&#…

【基础算法】单链表的OJ练习(4) # 分割链表 # 回文链表 #

文章目录前言分割链表回文链表写在最后前言 本章的OJ练习相对前面的难度加大了&#xff0c;但是换汤不换药&#xff0c;还是围绕单链表的性质来出题的。我相信&#xff0c;能够过了前面的OJ练习&#xff0c;本章的OJ也是轻轻松松。 对于OJ练习(3)&#xff1a;-> 传送门 <…

华为OD机试题,用 Java 解【单词反转】问题

华为Od必看系列 华为OD机试 全流程解析+经验分享,题型分享,防作弊指南)华为od机试,独家整理 已参加机试人员的实战技巧华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典使用说明 参加华为od机试,一定要注意不…

在 Docker 安装 Oracle12

说明 单独在Linux上安装Oracle很繁琐&#xff0c;首先需要安装各种依赖&#xff0c;其次安装如果失败了&#xff0c;重新配置安装也挺麻烦&#xff0c;所以如果是开发或者测试的时候使用Docker来进行安装会非常的方便。 搜索了很多的oracle相关镜像&#xff0c;选择一个适合自…

【Linux】-- 基本指令

目录 用户管理 adduser passwd userdel pwd ls指令 -l -a -d -F -r -t -R -1 which alias ll ls -n cd cd - cd ~ touch -d stat mkdir -p rmdir rm -r -f man cp ​编辑 -r -f mv cat -n tac more less -N head tail | 管道 dat…

Cookies与Session

&#x1f482;作者简介&#xff1a; THUNDER王&#xff0c;一名热爱财税和SAP ABAP编程以及热爱分享的博主。目前于江西师范大学会计学专业大二本科在读&#xff0c;同时任汉硕云&#xff08;广东&#xff09;科技有限公司ABAP开发顾问。在学习工作中&#xff0c;我通常使用偏后…

JavaScript基础六、函数

零、文章目录 文章地址 个人博客-CSDN地址&#xff1a;https://blog.csdn.net/liyou123456789个人博客-GiteePages&#xff1a;https://bluecusliyou.gitee.io/techlearn 代码仓库地址 Gitee&#xff1a;https://gitee.com/bluecusliyou/TechLearnGithub&#xff1a;https:…

Deploy Workshop|DIY部署环境,让OceanBase跑起来

2023 年 3 月 25 日&#xff0c;我们将在北京开启首次 OceanBase 开发者大会&#xff0c;与开发者共同探讨单机分布式、云原生、HTAP 等数据库前沿趋势&#xff0c;分享全新的产品 Roadmap&#xff0c;交流场景探索和最佳实践&#xff0c;此外&#xff0c;OceanBase 开源技术全…

Qt多线程文件查找器

⭐️我叫恒心&#xff0c;一名喜欢书写博客的研究生在读生。 原创不易~转载麻烦注明出处&#xff0c;并告知作者&#xff0c;谢谢&#xff01;&#xff01;&#xff01; 这是一篇近期会不断更新的博客欧~~~ 有什么问题的小伙伴 欢迎留言提问欧。 Qt多线程文件查找器 前言 最近…

JUC并发编程——wait-notify

目录一、wait / notify1.1 wait / notify 原理1.2 wait / notify API介绍二、wait VS sleep三、wait / notify —代码改进一、wait / notify 1.1 wait / notify 原理 ● Owner线程发现条件不满足&#xff0c;调用wait( )方法即可进入WaitSet变为 WAITING状态 ● BLOCKED 和 W…

AI的简单介绍

什么是AI&#xff1f; AI 是 Artificial Intelligent 的缩写&#xff0c;是我们通常意义上说的人工智能。 简单来说就是让机器能够模拟人类的思维能力&#xff0c;让它能够像人一样感知、思考甚至决策。 为什么要开发AI&#xff1f; 因为在过去&#xff0c;都是我们学习机器…

408 计算机基础复试笔记 —— 更新中

计算机组成原理 计算机系统概述 问题一、冯诺依曼机基本思想 存储程序&#xff1a;程序和数据都存储在同一个内存中&#xff0c;计算机可以根据指令集执行存储在内存中的程序。这使得程序具有高度灵活性和可重用性。指令流水线&#xff1a;将指令分成若干阶段&#xff0c;每…

opencv学习(一)图像的基本操作

数据的读取cv2.IMREAD_COLOR:彩色图像cv2.IMREAD_GRAYSCALE:灰度图像import cv2 img cv2.imread(E:/opencv/open-cv/2-7/cat.jpg,1)cv2.imshow("img", img) cv2.waitKey(0) cv2.destroyAllWindows() cv2.imread()读取图片&#xff0c;当括号里面是1时&#xff…

华为OD机试题,用 Java 解【水仙花数】问题

华为Od必看系列 华为OD机试 全流程解析+经验分享,题型分享,防作弊指南)华为od机试,独家整理 已参加机试人员的实战技巧华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典使用说明 参加华为od机试,一定要注意不…

抓包技术(浏览器APP小程序PC应用)

P1 抓包工具 01. Fidder 首先第一个Fiddler它的优势&#xff0c;独立运行&#xff0c;第二个支持移动设备&#xff08;是否能抓移动APP的包&#xff0c;&#xff09;在这一块的话wireshark、httpwatch就不支持&#xff0c;因此在这一块就可以排除掉前连个&#xff0c;因为我们…