RabbitMQ实现消息发送接收——实战篇(路由模式)

news2024/12/17 7:57:21

本篇博文将带领大家一起学习rabbitMQ如何进行消息发送接收,我也是在写项目的时候边学边写,有不足的地方希望在评论区留下你的建议,我们一起讨论学习呀~

需求背景

先说一下我的项目需求背景,社区之间可以进行物资借用,当有社区提交物资借用申请时,需要通过RabbitMQ将这条消息发送到被借用物资的社区,同时在界面进行提示。

先把依赖引入一下

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

application.yml做好配置:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

工具类实现

先选择以何种方式进行消息发送,这里根据需求我选择使用RabbitMQ的路由模式进行消息发送,先来配置一下相应工具类:

先配置RabbitMQ的配置类

/**
 * @Title: RabbitMQConfig
 * @Author yinan
 * @Package com.yinan.config.RabbitConfig
 * @Date 2024/12/13 13:58
 * @description: RabbitMQ配置类
 */
@Configuration
public class RabbitMQConfig {

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

//    声明一个交换机
    @Bean
    public DirectExchange borrowMaterialExchange(){
        return new DirectExchange("borrow_material_exchange");
    }

//    动态绑定队列时使用的方法(具体绑定逻辑在下面的监听器中实现)
//    @Bean
//    public Queue communityQueue(){
//        return new Queue("communityQueue");
//    }

    @Bean
    public Jackson2JsonMessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }
}

为了确保消息发送和接收时都以 JSON 格式处理,可以在 Spring 配置中添加 Jackson2JsonMessageConverter。这样,发送端会将 MaterialBorrowing 对象序列化为 JSON,接收端会自动将 JSON 反序列化回 MaterialBorrowing 对象。 

绑定交换机和对应队列

/**
 * @Title: RabbitMQBindRoutingConfig
 * @Author yinan
 * @Package com.yinan.config.RabbitConfig
 * @Date 2024/12/13 14:21
 * @description:  动态绑定路由配置
 */
@Component
@Slf4j
public class RabbitMQBindRoutingConfig {

    @Autowired
    private DirectExchange borrowMaterialExchange;

    @Autowired
    private RabbitAdmin rabbitAdmin;

    /**
     * 以社区ID为路由键,为指定社区动态创建队列并绑定到交换机
     * @param communityId 社区ID
     */

    public void bindRouting(String communityId){
//        创建队列
        Queue queue = new Queue("queue_" + communityId);
//        动态绑定交换机和指定队列
        Binding binding = BindingBuilder.bind(queue)
                .to(borrowMaterialExchange)
                .with(communityId);
        rabbitAdmin.declareExchange(borrowMaterialExchange);
        rabbitAdmin.declareBinding(binding);
        log.info("队列绑定成功,社区ID----》" + communityId + ",队列名称----》" + queue.getName() + ",交换机名称----》" + borrowMaterialExchange.getName());

    }


}

动态声明队列

@Configuration
@Slf4j
public class QueueDeclareConfig {

    @Autowired
    private RabbitAdmin rabbitAdmin;

    public void dynamicDeclareQueue(String communityId){
        String queueName = String.format("queue_%s",communityId);

        Queue queue = new Queue(queueName,true);
        rabbitAdmin.declareQueue(queue);
        log.info("队列声明成功");
    }
}

在创建声明队列的时候,我们希望的是根据我们的规则在调用接口的时候去创建指定名称的队列,所以可以使用动态声明对列而不是直接在平台上进行配置。

消息发送

@Component
@Slf4j
public class MessageSendConfig {
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMessage(Object message,String communityId){
        System.out.println("发送消息:" + message);
        amqpTemplate.convertAndSend("borrow_material_exchange",communityId, message);
        log.info("发送消息成功------->"+message);
    }

}

消息接收(动态声明与监听结合),这里你可以先思考一下为什么要用这种方式实现消息接收,而不是使用@RabbitListener去动态获取某个队列接收消息。

/**
 * @Title: MessageRecieveConfig
 * @Author yinan
 * @Package com.yinan.config.RabbitConfig
 * @Date 2024/12/13 12:53
 * @description: 动态监听接收消息
 */
@Service
@Slf4j
public class MessageRecieveConfig<T> {


    private final ConnectionFactory connectionFactory;


    public MessageRecieveConfig(ConnectionFactory connection) {
        this.connectionFactory = connection;
    }

    public void recieveMessage(String communityId,Class<T> objectType){
        String queueName = String.format("queue_%s",communityId);
//        创建监听容器
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames(queueName);
//        处理消息消费逻辑
        container.setMessageListener(message -> {
            try {
                // 将字节数组转换为字符串
                String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
                System.out.println("接收到的消息:" + messageBody);

                // 如果需要将消息解析为对象(例如 MaterialBorrowing)
                ObjectMapper objectMapper = new ObjectMapper();
                T result = objectMapper.readValue(messageBody, objectType);
                System.out.println("反序列化后的消息:" + result);
            } catch (Exception e) {
                log.error("处理消息时发生错误:", e);
            }
        });
        // 确保自动确认
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.start();
        log.info("动态监听已启动,监听队列------->"+queueName);
    }

}

在你的项目中分别调用就行了,需要注意的是你必须确保在消息发送的时候你的队列已经创建完成且和对应交换机进行了绑定,不然可能会导致消息发送失败。

ok,我们启动项目

你会发现你的项目根本启动不起来,原因是因为对于 Spring AMQP 的监听器来说,必须确保监听的队列已经存在于 RabbitMQ 中,否则会抛出类似 DeclarationException 的错误。

所以我们考虑可以通过动态声明队列,在程序运行时确保 RabbitMQ 上创建好所需的队列。

动态声明队列的含义

动态声明队列是指程序在运行时,通过代码检查或创建 RabbitMQ 中尚不存在的队列,而不是手动预先配置好所有队列。这种方式可以自动帮你在 RabbitMQ 中创建所需的队列,而无需手动操作。

这里说一下为什么需要动态绑定队列而不直接使用@RabbitListener?

为什么使用 SimpleMessageListenerContainer 动态绑定队列

  • SimpleMessageListenerContainer 不需要在项目启动时绑定队列。你可以在用户调用接口时动态创建队列,并动态监听它。
  • 特点
    • 队列在用户调用接口时才会被动态创建(通过 RabbitAdmin 或其他机制)。
    • 动态创建队列和监听时,项目启动时不会尝试绑定不存在的队列,因此不会报错。
  • 适用场景:非常适合动态队列需求,比如队列名依赖用户输入或业务逻辑,且不想在项目启动时绑定固定的队列。

使用 @RabbitListener 的情况

@RabbitListener 会在项目启动时绑定到指定的队列。

  • 要求:如果绑定的队列在 RabbitMQ 中不存在,项目启动时就会抛出异常,类似 DeclarationException,这也就是上面为什么会报错的原因。
  • 解决办法
    • 提前创建队列:在 RabbitMQ 中手动创建队列,或通过 RabbitAdmin 在项目启动时自动创建队列。
    • 动态队列名:如果队列名是动态的,可以结合 SpEL 表达式,但队列仍然需要在项目启动时确保存在。
SpEL 表达式

如果你的需求中已经确定队列已经创建好的,但是需要动态去获取队列,可以使用如下形式:

@RabbitListener(queues = "#{T(java.lang.String).format('queue_%s', 'borrowedCommunityId')}")

这个表达式 是 Spring AMQP 中用于动态指定队列名称的 SpEL 表达式(Spring Expression Language),它的作用就是会动态生成一个队列名称,基于你传入的参数构造队列名。

详解
1. 关键部分解析
  • T(java.lang.String)

    • T 是 SpEL 用于引用 Java 类 的方式。
    • java.lang.String 是目标 Java 类,表明你可以调用 String 类的静态方法。
  • .format()

    • String.format() 是 Java 中的静态方法,用于格式化字符串。
    • 格式化字符串的格式是 'queue_%s'%s 是占位符,用于拼接动态内容。
  • 'queue_%s'

    • 这是格式化字符串的模板。%s 表示字符串占位符。
  • 动态参数(例如 borrowedCommunityId

    • 它会替换 %s,生成队列名。例如,当 borrowedCommunityId 的值是 123 时,结果是:queue_123
2. 具体实例

假设 borrowedCommunityId = "123"

String result = String.format("queue_%s", "123");
System.out.println(result); // 输出:queue_123

在 SpEL 中,这等同于:

queues = "#{T(java.lang.String).format('queue_%s', '123')}"

这会动态生成队列名称为 queue_123


为什么用 SpEL?

Spring AMQP 的 @RabbitListener 注解中,queues 参数支持 SpEL 表达式。这使得我们可以动态决定要监听的队列,而不是写死某个固定的队列名称。


实际应用场景

就比如在我的代码中,可能有多个社区队列,例如:

  • queue_123(社区 ID 为 123 的队列)
  • queue_456(社区 ID 为 456 的队列)

使用 queues = "#{T(java.lang.String).format('queue_%s', borrowedCommunityId)}",可以动态生成不同社区的队列名称,从而实现按社区路由的功能。

启动项目之后,调用接口就可以发送消息了

但是你会发现消息消费的逻辑并没有在控制台中打印出来,这个时候你就要考虑是不是以下几个问题了:

交换机和队列是否已经绑定成功(可以在平台上进行查看)

是否绑定到了对应的交换机:amqpTemplate.convertAndSend("borrow_material_exchange",communityId, message);红色部分指定交换机名称,如果不指定,那么就会使用默认的交换机,所以肯定也是接收不到值的。

当然,还有其他可能,如果你的项目中遇到了,可以在评论区留言,我们一起学习~

最后,重新修改代码调用接口,就可以接收到消息了

对于在界面进行消息提示的功能,这里先不写出来了,我会在后面的博客中进行更新~

【都看到这了,点赞加关注,收藏不迷路呀~】😚😚

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2260942.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ubuntu上更改ext4格式的硬盘为 windows的 NTFS 格式参考

1. ubuntu上安装 sudo apt-get install gparted 2. 参考如下&#xff0c;下面是转换后的样例。 3.windows上添加识别新硬盘参考 先在设备管理器中 找到下面 磁盘管理 如下&#xff1a;找到类似下面的磁盘2 查看相关信息 右键可以新建卷和格式化&#xff0c;下面是已经新建…

Java 垃圾回收机制详解

1 垃圾回收的概念 垃圾回收&#xff08;Garbage Collection&#xff0c;GC&#xff09;是自动管理内存的一种机制&#xff0c;用于释放不再使用的对象所占用的内存空间&#xff0c;防止内存溢出。垃圾回收器通过识别和回收那些已经死亡或长时间未使用的对象&#xff0c;来优化…

拿到小米 Offer,却迷茫了。。

大家好&#xff0c;我是程序员鱼皮&#xff0c;12 月了&#xff0c;很多小伙伴也拿到了秋招的 Offer&#xff08;没拿到也不要灰心&#xff09;&#xff0c;但即使拿到 Offer&#xff0c;可能还会有一些其他的顾虑。今天分享我们编程导航一位鱼友的提问&#xff0c;给大家作为学…

医疗领域的网络安全预防:保障患者隐私与医疗数据安全

医疗领域的网络安全预防&#xff1a;保障患者隐私与医疗数据安全 随着信息技术的不断发展和医疗行业的数字化转型&#xff0c;网络安全在医疗领域变得愈加重要。医疗行业处理着大量的敏感数据&#xff0c;包括患者的个人信息、医疗记录、诊疗方案等&#xff0c;这些数据一旦被…

实现线性回归笔记 # 自用

线性模型可以看作是一个单层的神经网络。 对于n个输入[x1, x2, ...., xn]&#xff0c;由n个权重[w1, w2, ......, wn]以及一个偏置常数b得到的输出y&#xff0c;则称y x1w1x2w2......xnwnb称为线性模型。 即 线性模型是对n维输入的加权外加偏差。 要利用线性模型进行预测&a…

实景视频与模型叠加融合?

[视频GIS系列]无人机视频与与实景模型进行实时融合_无人机视频融合-CSDN博客文章浏览阅读1.5k次&#xff0c;点赞28次&#xff0c;收藏14次。将无人机视频与实景模型进行实时融合是一个涉及多个技术领域的复杂过程&#xff0c;主要包括无人机视频采集、实景模型构建、视频与模型…

c语言——数据结构【链表:单向链表】

上篇→快速掌握C语言——数据结构【创建顺序表】多文件编译-CSDN博客 一、链表 二、单向链表 2.1 概念 2.2 单向链表的组成 2.3 单向链表节点的结构体原型 //类型重定义,表示存放的数据类型 typedef int DataType;//定义节点的结构体类型 typedef struct node {union{int l…

【LC】876. 链表的中间结点

题目描述&#xff1a; 给你单链表的头结点 head &#xff0c;请你找出并返回链表的中间结点。 如果有两个中间结点&#xff0c;则返回第二个中间结点。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5] 输出&#xff1a;[3,4,5] 解释&#xff1a;链表只有一个中间结点…

Bugku---misc---隐写2

题目出处&#xff1a;首页 - Bugku CTF平台 ✨打开发现是一张图片&#xff0c;于是查看属性&#xff0c;放在010查看&#xff0c;这都是基本步骤了&#xff0c;发现里面有一个flag.rar&#xff01;&#xff01;&#xff01;拿binwalk分析也确实存在 ✨于是按照压缩包的起始位置…

无需公网IP,本地可访问TightVNC 服务端

TightVNC 是一款免费而且开源的远程桌面软件&#xff0c;它允许用户在不同的操作系统之间实现无缝连接&#xff0c;TightVNC支持 Windows、macOS 和 Linux 等多个操作系统&#xff0c;为用户提供高效便捷的远程控制体验。在 Windows 系统电脑端安装使用 TightVNC 服务端和客户端…

【Unity基础】Unity中如何实现图形倒计时

为了在Unity中实现一个图形倒计时&#xff0c;除了代码部分&#xff0c;还需要一些UI元素的创建和设置。本文以环形倒计时为例&#xff0c;以下是完整的步骤&#xff0c;涵盖了如何创建UI元素、设置它们&#xff0c;以及如何编写控制环形倒计时进度的脚本。 1. 创建UI元素 创建…

Excel/VBA 正则表达式归纳汇总

1.with结构。以下语句用来提取A列中的“成品”两个字前面的部分的中文&#xff0c;不含成品两个字&#xff0c;结果存放在第2列。使用了On Error Resume Next&#xff0c;表示错误时继续下一条。 Sub 提取口味() Set regx CreateObject("vbscript.regexp") On Err…

xshell连接虚拟机,更换网络模式:NAT->桥接模式

NAT模式&#xff1a;虚拟机通过宿主机的网络访问外网。优点在于不需要手动配置IP地址和子网掩码&#xff0c;只要宿主机能够访问网络&#xff0c;虚拟机也能够访问。对外部网络而言&#xff0c;它看到的是宿主机的IP地址&#xff0c;而不是虚拟机的IP。但是&#xff0c;宿主机可…

优选算法《双指针》

在学习了C/C的基础知识之后接下来我们就可以来系统的学习相关的算法了&#xff0c;这在之后的笔试、面试或竞赛都是必须要掌握的&#xff1b;在这些算法中我们先来了解的是一些非常经典且较为常用的算法&#xff0c;在此也就是优选出来的算法&#xff0c;接下来在每一篇章中我们…

SQL server学习06-查询数据表中的数据(中)

目录 一&#xff0c;聚合函数 1&#xff0c;常用聚合函数 2&#xff0c;具体使用 二&#xff0c;GROP BY子句分组 1&#xff0c;基础语法 2&#xff0c;具体使用 3&#xff0c;加上HAVING对组进行筛选 4&#xff0c;使WHERE记录查询条件 汇总查询&#xff1a;在对数…

上传文件时获取音视频文件时长和文本文件字数

获取音视频文件时长和文本文件字数 一、获取音视频文件时长二、计算文本文件字数 最近有个需求&#xff0c;要求上传文件时获取音视频文件时长和文本文件字数&#x1f436;。 发现这样的冷门资料不多&#xff0c;特做个记录。本文忽略文件上传功能&#xff0c;只封装核心的工具…

C语言学习day22:进程ID获取工具/GetWindowThreadProcessId函数

简言&#xff1a; 每个人都有身份证号&#xff0c;这个身份证号就是个人的唯一标识符 进程也是如此&#xff0c;每个进程也有唯一的标识符&#xff0c;来标记自身是独一无二的 如下图:其中PID &#xff1a;Process ID&#xff0c;即进程ID 但是我们怎么去在编程中去获取某个…

使用Localstorage(Mapty)

使用Localstorage(Mapty) 首先&#xff0c;我们创建一个函数名&#xff0c;先在app中去调用它 // 为所有的锻炼创建本地存储this._setLocalStorage();之后我们就开始编写这个函数的功能 _setLocalStorage() {localStorage.setItem(workouts, JSON.stringify(this.#workouts));…

如何用细节提升用户体验?

前端给用户反馈是提升用户体验的重要部分&#xff0c;根据场景选择不同的方式可以有效地提升产品的易用性和用户满意度。以下是常见的方法&#xff1a; 1. 视觉反馈 用户执行了某些操作后&#xff0c;需要即时确认操作结果。例如&#xff1a;按钮点击、数据提交、页面加载等。…

OpenHarmony-3.HDF input子系统(5)

HDF input 子系统OpenHarmony-4.0-Release 1.Input 概述 输入设备是用户与计算机系统进行人机交互的主要装置之一&#xff0c;是用户与计算机或者其他设备通信的桥梁。常见的输入设备有键盘、鼠标、游戏杆、触摸屏等。本文档将介绍基于 HDF_Input 模型的触摸屏器件 IC 为 GT91…