Spring Cloud 集成 RabbitMQ

news2025/1/2 3:20:00

目录

  • 前言
  • 步骤
    • 引入相关maven依赖
    • 添加相关配置
  • 使用方法
    • 配置消息序列化
    • 创建第一个消息队列和交换机
    • 使用方法
  • 总结

前言

在当今的微服务架构盛行的时代,消息队列作为一种重要的通信机制,在分布式系统中扮演着不可或缺的角色。RabbitMQ,作为一款开源的消息代理和队列服务器,以其高可用性、易扩展性、灵活的路由机制以及多协议支持等特点,深受开发者们的青睐。而Spring Cloud,作为Spring生态中针对微服务架构的一套集成解决方案,也提供了与RabbitMQ的集成支持,使得在Spring Cloud环境下使用RabbitMQ变得更加简单高效。
Spring Cloud集成RabbitMQ,不仅继承了RabbitMQ本身的诸多优点,还充分利用了Spring Cloud的自动配置和声明式编程特性,极大地简化了消息队列的配置和使用过程。开发者可以通过简单的配置和注解,轻松实现消息的发布、订阅、路由和持久化等功能,从而构建出稳定可靠、高性能的分布式系统。
此外,Spring Cloud集成RabbitMQ还提供了丰富的消息处理机制,如消息确认、死信队列、延迟队列等,这些机制可以帮助开发者更好地处理消息丢失、重复消费、消息堆积等问题,提升系统的健壮性和可靠性。

步骤

引入相关maven依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加相关配置

spring:  
  rabbitmq:  
    # RabbitMQ服务器的地址  
    host: 127.0.0.1  
    # RabbitMQ服务器的端口号  
    port: 5672  
    # RabbitMQ服务器的用户名  
    username: guest  
    # RabbitMQ服务器的密码  
    password: guest  
    # 消息监听器的配置  
    listener:  
      simple:  
        # 确认模式,这里设置为手动,意味着需要手动确认消息处理成功  
        acknowledge-mode: manual  
        # 消息重试的配置  
        retry:  
          # 是否启用重试机制  
          enabled: true  
          # 最大重试次数  
          max-attempts: 5  
          # 最大重试间隔,单位是毫秒  
          max-interval: 20000ms  
          # 初始重试间隔,单位是毫秒  
          initial-interval: 2000ms  
          # 重试间隔的倍增系数  
          multiplier: 2

使用方法

配置消息序列化

// 定义一个Bean,返回一个MessageConverter实例,用于消息的序列化和反序列化  
@Bean  
public MessageConverter messageConverter() {  
    // 使用Jackson2JsonMessageConverter作为消息转换器,它基于Jackson库进行JSON格式转换  
    return new Jackson2JsonMessageConverter();  
}  
  
// 定义一个Bean,返回一个RabbitTemplate实例,用于发送和接收消息  
@Bean  
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {  
    // 创建一个RabbitTemplate实例,传入连接工厂,用于建立与RabbitMQ的连接  
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);  
    // 设置RabbitTemplate的消息转换器,用于将Java对象转换为消息,以及将消息转换为Java对象  
    rabbitTemplate.setMessageConverter(messageConverter);  
    // 返回配置好的RabbitTemplate实例,供其他组件使用  
    return rabbitTemplate;  
}

创建第一个消息队列和交换机

// 定义actuator队列的名称常量  
public static final String ACTUATOR_QUEUE_NAME = "actuator_queue";  
// 定义actuator交换机的名称常量  
public static final String ACTUATOR_EXCHANGE_NAME = "actuator_exchange";  
// 定义actuator路由键的常量  
public static final String ACTUATOR_ROUTING_KEY = "actuator_routing_key";  
  
// 定义一个Bean,用于创建actuator队列  
@Bean  
public Queue actuatorQueue() {  
    // 创建一个持久化队列,队列名称为ACTUATOR_QUEUE_NAME常量定义的值  
    return new Queue(ACTUATOR_QUEUE_NAME, true);  
}  
  
// 定义一个Bean,用于创建actuator交换机  
@Bean  
public DirectExchange actuatorExchange() {  
    // 创建一个Direct类型的交换机,交换机名称为ACTUATOR_EXCHANGE_NAME常量定义的值  
    return new DirectExchange(ACTUATOR_EXCHANGE_NAME);  
}  
  
// 定义一个Bean,用于绑定actuator队列和交换机  
@Bean  
public Binding actuatorBinding(@Qualifier("actuatorQueue") Queue queue, @Qualifier("actuatorExchange") DirectExchange exchange) {  
    // 使用BindingBuilder来绑定actuator队列和actuator交换机  
    // 并指定ACTUATOR_ROUTING_KEY作为路由键  
    return BindingBuilder.bind(queue).to(exchange).with(ACTUATOR_ROUTING_KEY);  
}

使用方法

// 使用RabbitListener注解,指定监听ACTUATOR_QUEUE_NAME队列中的消息  
@RabbitListener(queues = ActuatorBind.ACTUATOR_QUEUE_NAME)  
public void onMessage(TaskInfoBo taskInfoBo, Message message, Channel channel) throws IOException {  
    // 获取消息的属性,并从中提取消息的deliveryTag  
    long deliveryTag = message.getMessageProperties().getDeliveryTag();  
    try {  
        // 执行一些业务逻辑代码......  
        // ...(业务代码省略)  
  
        // 如果业务代码执行成功,则手动确认消息处理成功  
        // deliveryTag用于标识需要确认的消息,false表示是否进行批量确认(这里不进行批量确认)  
        channel.basicAck(deliveryTag, false);  
    } catch (Exception e) {  
        // 如果在业务代码执行过程中出现异常,则手动拒绝该消息  
        // deliveryTag用于标识需要拒绝的消息,false表示是否将消息重新放回队列(这里不重新放回队列)  
        channel.basicReject(deliveryTag, false);  
    }  
}

在这个方法中,我们使用了@RabbitListener注解来声明这个方法是一个消息监听器,它监听ActuatorBind.ACTUATOR_QUEUE_NAME队列中的消息。当消息到达这个队列时,Spring会调用这个方法来处理消息。
方法接受三个参数:TaskInfoBo taskInfoBo(用于接收消息体中的信息,并自动转换为TaskInfoBo对象)、Message message(代表原始的RabbitMQ消息)和Channel channel(用于与RabbitMQ进行交互的通道)。
在方法体中,我们首先通过message对象获取消息的deliveryTag,这个deliveryTag用于后续的消息确认或拒绝操作。
然后,我们尝试执行一些业务逻辑代码。如果业务代码执行成功,我们使用channel.basicAck方法手动确认消息已被成功处理,从而从队列中移除该消息。如果业务代码执行过程中出现异常,我们使用channel.basicReject方法手动拒绝该消息,这里拒绝时不将消息重新放回队列(第二个参数为false)。
这种手动确认消息的方式确保了消息处理的可靠性和健壮性,只有当消息被成功处理后才从队列中移除,否则会被拒绝或重新尝试处理。

总结

完成上诉步骤我们完成了Spring Cloud 集成RabbitMQ。还有很多其他的模式,等到业务用到后再进行补充。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1595297.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

(七)C++自制植物大战僵尸游戏关卡数据加载代码讲解

植物大战僵尸游戏开发教程专栏地址http://t.csdnimg.cn/xjvbb 打开LevelData.h和LevelData.cpp文件。文件位置如下图所示。 LevelData.h 此头文件中定义了两个类&#xff0c;分别是OpenLevelData、LevelData&#xff0c;其中OpenLevelData用于加载文件数据。LevelData解析数据…

ansible创建用户账户和更新ansible库的密钥

1.创建⽤户帐户 从 http://materials/user_list.yml 下载要创建的⽤户的列表&#xff0c;并将它保存到 /home/greg/ansible 在本次考试中使⽤在其他位置创建的密码库 /home/greg/ansible/locker.yml 。创建名为 /home/greg/ansible/users.yml 的 playbook &#xff0c;从⽽…

攻防世界13-simple_php

13-simple_php <?php show_source(*__FILE__*);//高亮文件 include("config.php");//文件包含在内 $a$_GET[a];//获得a $b$_GET[b];//获得b if($a0 and $a){ //判断a是否满足条件echo $flag1; //满足就输出flag1 } if(is_numeric($b)){ //判断b的条件&#x…

解决方案ImportError: cannot import name ‘BertTokenizerFast‘ from ‘transformers‘

文章目录 一、现象二、解决方案 一、现象 从transformers 库调用该包的时候 from transformers import BertTokenizer, AdamW, BertTokenizerFast报错显示 ImportError: cannot import name ‘BertTokenizerFast’ from ‘transformers’ 二、解决方案 追溯查看transforme…

【OpenGL开发】PyQt在关闭应用程序时没有运行析构函数的问题

PyQt在关闭应用程序时没有运行析构函数的问题 目录 一、说明二、python的析构函数三、QT5 存在一些问题四、PyQt5 存在一些问题五、OpenGL的析构问题 一、说明 应用QT做程序界面&#xff0c;在程序退出的时候&#xff0c;需要调用析构函数释放资源&#xff0c;这个操作在Pytho…

跟TED演讲学英文:Why AI is incredibly smart and shockingly stupid by Yejin Choi

Why AI is incredibly smart and shockingly stupid Link: https://www.ted.com/talks/yejin_choi_why_ai_is_incredibly_smart_and_shockingly_stupid Speaker: Yejin Choi Date: April 2023 文章目录 Why AI is incredibly smart and shockingly stupidIntroductionVocabul…

通过调用Vcenter-Api获取Vcenter中服务器信息

通过调用Vcenter-Api获取Vcenter中服务器信息 文章目录 通过调用Vcenter-Api获取Vcenter中服务器信息1. 获取Vmware API帮助文档2. 获取访问凭证3. 获取服务器清单4. 获取服务器更多信息5. 获取虚机更多信息6. 获取磁盘信息7. 获取操作系统相关 1. 获取Vmware API帮助文档 htt…

面试八股——Spring——AOP与事务

AOP的定义 事务的实现 事务的失效场景 异常捕获处理 下图中由于②导致异常&#xff1a; 原因&#xff1a; 解决办法&#xff1a;自己抛出一个非检查异常&#xff08;具体原因看“抛出检查异常”&#xff09;。 抛出检查异常 由于①出错&#xff0c;导致抛出了检查异常 原因&…

[linux api] of_irq_init

总结: 以如下级联的中断控制器为例: of_irq_init会确保先初始化父控制器再初始化子控制器,也即整体按照层序遍历的顺序进行初始化,以上图为例,其初始化顺序为: intc0intc1-2intc3-6具体实现则分为两个阶段: 第一阶段 遍历所有设备节点,并与参数matches进行匹配,找…

Servlet实现常用功能及其他方法

getParameter 获取body或url中指定的key/value值 String classIdreq.getParameter("classId"); getQueryString 获取请求的所有查询参数key,values1 String queryStringreq.getQueryString(); from表单提交 前端通过from表单提交用户名和密码 <!DOCTYPE htm…

# 达梦sql查询 Sql 优化

达梦sql查询 Sql 优化 文章目录 达梦sql查询 Sql 优化注意点测试数据单表查询 Sort 语句优化优化过程 多表关联SORT 优化函数索引的使用 注意点 关于优化过程中工具的选用&#xff0c;推荐使用自带的DM Manage&#xff0c;其它工具在查看执行计划等时候不明确在执行计划中命中…

计算机网络常问面试题

一.HTTPS是如何保证安全传输的 https通过使⽤对称加密、⾮对称加密、数字证书等⽅式来保证数据的安全传输。 客户端向服务端发送数据之前&#xff0c;需要先建⽴TCP连接&#xff0c;所以需要先建⽴TCP连接&#xff0c;建⽴完TCP连接后&#xff0c;服务端会先给客户端发送公钥…

网络网络层之(2)ARP协议

网络网络层之(2)ARP协议 Author&#xff1a;Once Day Date: 2024年4月1日 漫漫长路&#xff0c;有人对你笑过嘛… 全系列文档可参考专栏&#xff1a;通信网络技术_Once-Day的博客-CSDN博客。 参考文档: 《TCP/IP详解卷一》arp(8) - Linux manual page (man7.org)彻底搞懂系…

配置香橙派摄像头服务每次开机自动启动

目录 1.创建一个mjpg.sh脚本 2.在脚本中添加以下内容 3.增加可执行权限 4.在/etc/xdg/autostart/下创建mjpg.desktop 文件输入以下内容 1.创建一个mjpg.sh脚本 touch mjpg.sh 2.在脚本中添加以下内容 #!/bin/bash cd /home/orangepi/Myorangepi/zhinenglajitong/mjpg-stre…

分布式幂等性

1. 什么是幂等性&#xff1f; 幂等性是指在分布式系统中&#xff0c;一个操作多次执行的结果与其执行一次的结果相同。设计具有幂等性的分布式系统可以有效避免数据不一致和重复处理的问题。 幂等系统的应用场景 在微服务架构下&#xff0c;由于分布式天然特性的时序问题, 以…

解决动态规划问题

文章目录 动态规划的定义动态规划的核心思想青蛙跳阶问题解法一&#xff1a;暴力递归解法二&#xff1a;带备忘录的递归解法&#xff08;自顶向下&#xff09;解法三&#xff1a;动态规划&#xff08;自底向上&#xff09; 动态规划的解题套路什么样的问题考虑使用动态规划&…

英语新概念2-回译法-lesson6

我刚刚搬家去柏林大街的房子里。昨天一个流浪汉敲我的门,他想我寻求一顿饭和一杯啤酒。未拒绝了这个请求之后,这个流浪汉倒立着唱歌,我给他了一顿饭,他吃了食物并且喝了啤酒,然后他把一片奶酪放到他的口袋里然后走开了。过了一会儿,一个领居告诉我关于这个流浪汉的事情。…

GAN:对抗生成网络【通俗易懂】

一、概述 对抗生成网络&#xff08;GAN&#xff09;是一种深度学习模型&#xff0c;由两个神经网络组成&#xff1a;生成器G和判别器D。这两个网络被训练来协同工作&#xff0c;以生成接近真实数据的新样本。 生成器的任务是接收一个随机噪声向量&#xff0c;并将其转换为与真…

java数据结构与算法刷题-----LeetCode371. 两整数之和

java数据结构与算法刷题目录&#xff08;剑指Offer、LeetCode、ACM&#xff09;-----主目录-----持续更新(进不去说明我没写完)&#xff1a;https://blog.csdn.net/grd_java/article/details/123063846 文章目录 位运算 位运算 解题思路&#xff1a;时间复杂度O( l o g 2 m a …

网络篇09 | 运输层 udp

网络篇09 | 运输层 udp 01 简介UDP 是面向报文的 02 报文协议 01 简介 UDP 只在 IP 的数据报服务之上增加了一些功能&#xff1a;复用和分用、差错检测 UDP 的主要特点&#xff1a;无连接。发送数据之前不需要建立连接。 使用尽最大努力交付。即不保证可靠交付。 面向报文。…