RabbitMQ实践——使用WebFlux响应式方式实时返回队列中消息

news2024/11/19 12:29:29

大纲

  • Pom.xml
  • 监听队列
  • 实时返回消息
  • 测试
  • 完整代码
  • 工程代码

在之前的案例中,我们在管理后台收发消息都是通过短连接的形式。本文我们将探索对队列中消息的实时读取,并通过流式数据返回给客户端。
webflux是反应式Web框架,客户端可以通过一个长连接和服务端相连,后续服务端可以通过该连接持续给客户端发送消息。可以达到:发送一次,多次接收的效果。

Pom.xml

由于我们要使用Rabbitmq,所以要新增如下依赖

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-rabbit-stream</artifactId>
		</dependency>

webflux的依赖如下:

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-webflux</artifactId>
		</dependency>
		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-core</artifactId>
			<version>3.6.7</version>
		</dependency>

监听队列

下面代码会返回一个监听队列的Container

    private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {
        lock.lock();
        try {
            SimpleMessageListenerContainer listener = listeners.get(queueName);
            if (listener == null && messageListener != null) {
                listener = new SimpleMessageListenerContainer();
                listener.setConnectionFactory(connectionFactory);
                listener.setQueueNames(queueName);
                listener.setMessageListener(messageListener);
                listeners.put(queueName, listener);
            }
            return listener;
        } finally {
            lock.unlock();
        }
    }

实时返回消息

一旦消费者读取到消息,onMessage方法会被调用。然后Flux的消费者会将消息投递到流上。

    public Flux<String> listen(String queueName) {
       return Flux.create(emitter -> {
           SimpleMessageListenerContainer container = getListener(queueName, new MessageListener() {
               @Override
               public void onMessage(Message message) {
                   String msg = new String(message.getBody());
                   System.out.println("listen function Received message: " + msg);
                   emitter.next(msg);
               }
           });
           container.start();
       });
    }

测试

由于OpenApi不能支持实时展现流式数据,所以我们采用Postman来测试。
发送请求后,该页面一直处于滚动状态。
在这里插入图片描述
在管理后台发送一条消息
在这里插入图片描述
可以看到Postman收到了该消息
在这里插入图片描述
然后在发一条,Postman又会收到一条
在这里插入图片描述
这样我们就完成了“请求一次,多次返回”的效果。

完整代码

需要注意的是,返回的格式需要标记为produces = “text/event-stream”。

// controller
package com.rabbitmq.consumer.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import com.rabbitmq.consumer.service.ConsumerService;

import reactor.core.publisher.Flux;

@RestController
@RequestMapping("/consumer")
public class ConsumerController {
    
    @Autowired
    private ConsumerService comsumerService;

   
    @GetMapping(value = "/listen", produces = "text/event-stream")
    public Flux<String> listen(@RequestParam String queueName) {
        return comsumerService.listen(queueName);
    }
}
// service
package com.rabbitmq.consumer.service;

import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import jakarta.annotation.PostConstruct;
import reactor.core.publisher.Flux;

@Service
public class ConsumerService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    private ConnectionFactory connectionFactory;

    private final ReentrantLock lock = new ReentrantLock();
    private Map<String, SimpleMessageListenerContainer> listeners = new java.util.HashMap<>();

    @PostConstruct
    public void init() {
        connectionFactory = rabbitTemplate.getConnectionFactory();
    }

    public Flux<String> listen(String queueName) {
       return Flux.create(emitter -> {
           SimpleMessageListenerContainer container = getListener(queueName, new MessageListener() {
               @Override
               public void onMessage(Message message) {
                   String msg = new String(message.getBody());
                   System.out.println("listen function Received message: " + msg);
                   emitter.next(msg);
               }
           });
           container.start();
       });
    }

    private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {
        lock.lock();
        try {
            SimpleMessageListenerContainer listener = listeners.get(queueName);
            if (listener == null && messageListener != null) {
                listener = new SimpleMessageListenerContainer();
                listener.setConnectionFactory(connectionFactory);
                listener.setQueueNames(queueName);
                listener.setMessageListener(messageListener);
                listeners.put(queueName, listener);
            }
            return listener;
        } finally {
            lock.unlock();
        }
    }
}

工程代码

https://github.com/f304646673/RabbitMQDemo/tree/main/consumer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1855423.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

外部网络如何访问内网?

在现代信息化时代&#xff0c;随着企业规模的扩大和业务范围的扩展&#xff0c;越来越多的企业需要实现外部网络访问内网的需求。外部网络访问内网指的是在外部网络环境下&#xff0c;通过互联网等公共网络途径&#xff0c;实现对企业内部网络的访问和操作。这种需求的出现&…

骑马与砍杀战团mod制作-基础-军队笔记(一)

骑马与砍杀战团mod制作-基础-军队装备笔记&#xff08;一&#xff09; 资料来源 学习的资料来源&#xff1a; b站【三啸解说】手把手教你做【骑砍】MOD&#xff0c;基础篇&#xff0c;链接为&#xff1a; https://www.bilibili.com/video/BV19x411Q7No?p4&vd_sourcea507…

代码随想录——摆动序列(Leetcode376)

题目链接 贪心 class Solution {public int wiggleMaxLength(int[] nums) {if(nums.length < 1){return nums.length;}// 当前一对差值int cur 0;// 前一对差值int pre 0;// 峰值个数int res 1;for(int i 0; i < nums.length - 1; i){cur nums[i 1] - nums[i];i…

STM32单片机系统

1.STM32最小系统 微型计算机&#xff08;面&#xff09; 单片机最小系统是指能够将单片机芯片运行所必需的最少的硬件电路集成在一起的系统。 它是一种基本的单片机应用系统&#xff0c;通常由主芯片&#xff0c;时钟电路&#xff0c;复位电路&#xff0c;电源电路&#xff0c…

免费内网穿透工具 ,快解析内网穿透解决方案

在IPv4公网IP严重不足的环境下&#xff0c;内网穿透技术越来越多的被人们所使用&#xff0c;使用内网穿透技术的好处有很多。 1&#xff1a;无需公网ip 物以稀为贵&#xff0c;由于可用的公网IP地址越来越少&#xff0c;价格也是水涨船高&#xff0c;一个固定公网IP一年的成本…

大数据集群数据传输

简单的服务器间的通信示例 netcat&#xff0c;简写为 nc&#xff0c;是 unix 系统下一个强大的命令行网络通信工具&#xff0c;用于在两台主机之间建立 TCP 或者 UDP 连接&#xff0c;并提供丰富的命令进行数据通信。nc 在网络参考模型属于应用层。使用 nc 可以做很多事情&…

《Windows API每日一练》5.5 插入符号

当你向程序中输入文本时&#xff0c;通常会有下划线、竖线或方框指示你输入的下一个字符将出现在屏幕上的位置。你也许认为这是“光标”&#xff0c;但在编写Windows程序时&#xff0c;你必须避免这种习惯。在Windows中&#xff0c;它被称为“插入符号”&#xff08;caret&…

NUS、清华提出STAR:一句话生成高质量4D Avatar,代码已开源

©PaperWeekly 原创 作者 | Chai Zenghao 单位 | 新加坡国立大学博士生 研究方向 | 3D生成 背景 在计算机图形学和数字虚拟人领域&#xff0c;从简单的文本提示生成更真实、可交互的虚拟人物是是目前广受关注的研究课题。然而&#xff0c;先前的 3D Avatar 生成方法存在一…

陀螺仪LSM6DSV16X与AI集成(7)----FIFO数据读取与配置

陀螺仪LSM6DSV16X与AI集成.7--检测自由落体 概述视频教学样品申请源码下载主要内容生成STM32CUBEMX串口配置IIC配置CS和SA0设置串口重定向参考程序初始换管脚获取ID复位操作BDU设置设置量程设置FIFO水印设置速率使用流模式设置FIFO时间戳批处理速率使能时间戳FIFO状态寄存器演示…

【ajax核心05】宏任务与微任务

ES6之后引入Promise对象(用来管理异步任务)&#xff0c;让JS引擎也可以发起异步任务 一&#xff1a;异步任务分类 异步任务分为&#xff1a;宏任务与微任务 宏任务 由浏览器环境执行的异步代码 具体宏任务分类 微任务 由JS引擎执行的代码 创建Promise对象时&#xff0c;…

【鸿蒙】ERROR_GET_BUNDLE_INSTALLER_FAILED

错误信息 [ERROR_GET_BUNDLE_INSTALLER_FAILED] Troubleshooting guide $ hdc file send D:\Huawei\devEcoProjects\entry\build\default\outputs\default\entry-default-unsigned.hap /sdcard/e8a215ea7be1444197e6a58ebda7721f/entry-default-unsigned.hap Error while Depl…

Vue74-路由传参2

一、$route中的params参数 二、在配置路由的index.js文件中&#xff0c;声明传参 占位符用的什么名字&#xff0c;params里面的key就是什么。 三、<router-link>标签中传参 3-1、to字符串写法 3-2、to的对象写法 注意&#xff1a;若是用params携带参数&#xff0c;不…

联盟学习:技术原理、特点及适用场景

一、引言 随着大数据和人工智能技术的快速发展&#xff0c;数据成为了推动科技进步的重要资源。然而&#xff0c;在实际应用中&#xff0c;数据往往呈现出碎片化、分散化的特点&#xff0c;如何有效地利用这些数据成为了业界关注的焦点。联盟学习&#xff08;Federated Learni…

异地组网如何OEM?

在现代信息社会中&#xff0c;企业越来越需要跨地域进行数据传输与共享。面临的挑战却是如何在不暴露在公网的情况下&#xff0c;实现异地组网并保障数据的安全性。本文将介绍一种名为“异地组网OEM”的解决方案&#xff0c;该方案能够通过私有通道传输数据并对数据进行安全加密…

Docker Compose--安装Nginx--方法/实例

原文网址&#xff1a;Docker Compose--安装Nginx--方法/实例_IT利刃出鞘的博客-CSDN博客 简介 说明 本文介绍Docker Compose如何安装Nginx。 目录结构 ├── config │ ├── cert │ │ ├── xxx_bundle.pem │ │ └── xxx.key │ ├── conf.d │ …

解决Windows下移动硬盘无法弹出的问题:\$Extend\$RmMetadata\$TxfLog\$TxfLog.blf

想弹出移动硬盘时&#xff0c;Windows告诉我设备正在使用 然后我使用LockHunter查看到底是哪个应用在使用我的移动硬盘&#xff0c;发现是 System(PID 4) E x t e n d Extend ExtendRmMetadata T x f L o g TxfLog TxfLogTxfLog.blf这个文件正在使用 这是一个索引文件 解决 …

黑马HarmonyOS-NEXT星河版实战

"黑马HarmonyOS-NEXT星河版实战"课程旨在帮助学员深入了解HarmonyOS-NEXT星河版操作系统的开发和实际应用。学员将学习操作系统原理、应用开发技巧和界面设计&#xff0c;通过实战项目提升技能。课程注重实践与理论相结合&#xff0c;为学员提供全面的HarmonyOS开发经…

[分布式网络通讯框架]----ZooKeeper下载以及Linux环境下安装与单机模式部署(附带每一步截图)

首先进入apache官网 点击中间的see all Projects->Project List菜单项进入页面 找到zookeeper&#xff0c;进入 在Zookeeper主页的顶部点击菜单Project->Releases&#xff0c;进入Zookeeper发布版本信息页面&#xff0c;如下图&#xff1a; 找到需要下载的版本 …

段,页,段页,三种内存(RAM)管理机制分析

段&#xff0c;页&#xff0c;段页 是为实现虚拟内存而产生的技术。直接使用物理内存弊端&#xff1a;地址空间不隔离&#xff0c;内存使用效率低。 段 段&#xff1a;就是按照二进制文件的格式&#xff0c;在内存给进程分段&#xff08;包括堆栈、数据段、代码段&#xff09;。…

仿迪恩城市门户分类信息网discuz模板

Discuz x3.3模板 仿迪恩城市门户分类信息网 (GBK) Discuz模板 仿迪恩城市门户分类信息网(GBK)