WebSocket的那些事(5-Spring STOMP支持之连接外部消息代理)

news2025/1/21 15:40:39

目录

  • 一、序言
  • 二、开启RabbitMQ外部消息代理
  • 三、代码示例
    • 1、Maven依赖项
    • 2、相关实体
    • 3、自定义用户认证拦截器
    • 4、Websocket外部消息代理配置
    • 5、ChatController
    • 6、前端页面chat.html
  • 四、测试示例
    • 1、群聊、私聊、后台定时推送测试
    • 2、登录RabbitMQ控制台查看队列信息
  • 五、结语

一、序言

上节我们在 WebSocket的那些事(4-Spring中的STOMP支持详解) 中详细说明了通过Spring内置消息代理结合STOMP子协议进行Websocket通信,以及相关注解的使用及原理。

但是Spring内置消息代理会有一些限制,比如只支持STOMP协议的一部分命令,像acksreceipts命令都是不支持的,还有由于内置消息代理把消息存储在内存,当应用不可用时,客户端也就订阅不到到后台推送的消息。

这节我们将会使用支持STOMP协议的外部消息代理(RabbitMQ)进行Websocket通信。


二、开启RabbitMQ外部消息代理

服务端路由发送消息以及客户端订阅消息都要通过STOMP协议与RabbitMQ进行交互,由于RabbitMQ默认没有启动STOMP插件,因此我们需要先启用该插件。

rabbitmq-plugins enable rabbitmq_stomp

启动该插件后,RabbitMQ中STOMP适配器默认会监听61613端口,如果是云服务器,需要把该端口在安全组中放开。

关于该插件说明请参考:RabbitMQ中STOMP插件说明。


三、代码示例

我们在 WebSocket的那些事(4-Spring中的STOMP支持详解)中写了一个简单的聊天Demo示例,下面我们对该聊天Demo示例进行改造,将Spring内置消息代理替换成RabbitMQ外部消息代理。

1、Maven依赖项

服务端和客户端与外部消息代理都是通过TCP进行通信,Spring底层默认使用的是NettyReactor,因此需要引入相关依赖项。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-reactor-netty</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>

2、相关实体

(1) 请求消息参数

@Data
public class WebSocketMsgDTO {

	private String name;

	private String content;
}


(2) 响应消息内容

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class WebSocketMsgVO {

	private String content;
}

(3) 自定义认证用户信息

@Data
@AllArgsConstructor
@NoArgsConstructor
public class StompAuthenticatedUser implements Principal {

	/**
	 * 用户唯一ID
	 */
	private String userId;

	/**
	 * 用户昵称
	 */
	private String nickName;

	/**
	 * 用于指定用户消息推送的标识
	 * @return
	 */
	@Override
	public String getName() {
		return this.userId;
	}

}

3、自定义用户认证拦截器

@Slf4j
public class UserAuthenticationChannelInterceptor implements ChannelInterceptor {

	private static final String USER_ID = "User-ID";
	private static final String USER_NAME = "User-Name";

	@Override
	public Message<?> preSend(Message<?> message, MessageChannel channel) {
		StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
		// 如果是连接请求,记录userId
		if (StompCommand.CONNECT.equals(accessor.getCommand())) {
			String userID = accessor.getFirstNativeHeader(USER_ID);
			String username = accessor.getFirstNativeHeader(USER_NAME);

			log.info("Stomp User-Related headers found, userID: {}, username:{}", userID, username);
			accessor.setUser(new StompAuthenticatedUser(userID, username));
		}

		return message;
	}

}

4、Websocket外部消息代理配置

Spring中与外部消息代理通信的中间方被称之为Broker Relay,它会维护一个系统共享的单一TCP连接和外部消息代理进行通信,该TCP连接仅仅适用于服务端,用来发送消息,而不是接收消息,通过Broker RelaysystemLoginsystemPasscode属性可以设置该连接的认证信息。

Broker Relay也会为每个连接的Websocket客户端创建一个TCP连接,该连接用来接收消息,通过clientLoginclientPasscode属性可以设置连接的认证信息。

/**
 * Websocket连接外部消息代理配置
 * @author Nick Liu
 * @date 2023/9/6
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketExternalMessageBrokerConfig implements WebSocketMessageBrokerConfigurer {

	@Override
	public void configureClientInboundChannel(ChannelRegistration registration) {
		// 拦截器配置
		registration.interceptors(new UserAuthenticationChannelInterceptor());
	}

	@Override
	public void registerStompEndpoints(StompEndpointRegistry registry) {
		registry.addEndpoint("/websocket") // WebSocket握手端口
			.addInterceptors(new HttpSessionHandshakeInterceptor())
			.setAllowedOriginPatterns("*") // 设置跨域
			.withSockJS(); // 开启SockJS回退机制
	}

	@Override
	public void configureMessageBroker(MessageBrokerRegistry registry) {
		registry.setApplicationDestinationPrefixes("/app") // 发送到服务端目的地前缀
			.enableStompBrokerRelay("/topic") // 开启外部消息代理,指定消息订阅前缀
			.setRelayHost("localhost") // 外部消息代理Host
			.setRelayPort(61613) // 外部消息代理STOMP端口
			.setSystemLogin("admin")  // 共享系统连接用户名,该连接主要用来发送消息
			.setSystemPasscode("admin") // 共享系统连接密码,该连接主要用来发送消息
			.setClientLogin("admin") // 客户端连接用户名,该连接主要用来接收消息
			.setClientPasscode("admin") // 客户端连接密码,该连接主要用来接收消息
			.setVirtualHost("/stomp"); // RabbitMQ虚拟主机
	}
}

备注:我们可以为服务端与客户端的连接设置不同的用户,针对客户端连接用户进行权限管控,保证系统的安全性,在这里为了方便测试我们统一用一个用户。

5、ChatController

STOMP协议并没有规定消息代理必须支持哪种类型的Destinations(目的地),但是RabbitMQ STOMP适配器只支持一些指定的目的地类型,如下图:
在这里插入图片描述

  • /exchange:指定交换机和路由key,发送和订阅来自队列的消息。
  • /queue:发送和订阅受STOMP网关管理的队列的消息,最多只有一个订阅者能到消息。
  • /amq/queue:发送和订阅不受STOMP网关管理的队列的消息。
  • /topic:发送和订阅来自临时或者持久Topic的消息,多个订阅者都能接收到消息。
  • /temp-queue/:发送和订阅来自临时队列的消息。

参考文档见:RabbitMQ中STOMP插件说明。

在下面的示例中,我们选用了/topic的开头的消息发送和订阅前缀,目的地格式只能为/topic/{routing-key}routing-key不能有斜杠,否则会报错。

@Slf4j
@Controller
@RequiredArgsConstructor
public class ChatController {

	private final SimpUserRegistry simpUserRegistry;
	private final SimpMessagingTemplate simpMessagingTemplate;

	/**
	 * 模板引擎为Thymeleaf,需要加上spring-boot-starter-thymeleaf依赖,
	 * @return
	 */
	@GetMapping("/page/chat")
	public ModelAndView turnToChatPage() {
		return new ModelAndView("chat");
	}

	/**
	 * 群聊消息处理
	 * 这里我们通过@SendTo注解指定消息目的地为"/topic/chat/group",如果不加该注解则会自动发送到"/topic" + "/chat/group"
	 * @param webSocketMsgDTO 请求参数,消息处理器会自动将JSON字符串转换为对象
	 * @return 消息内容,方法返回值将会广播给所有订阅"/topic/chat/group"的客户端
	 */
	@MessageMapping("/chat/group")
	@SendTo("/topic/chat-group")
	public WebSocketMsgVO groupChat(WebSocketMsgDTO webSocketMsgDTO) {
		log.info("Group chat message received: {}", FastJsonUtils.toJsonString(webSocketMsgDTO));
		String content = String.format("来自[%s]的群聊消息: %s", webSocketMsgDTO.getName(), webSocketMsgDTO.getContent());
		return WebSocketMsgVO.builder().content(content).build();
	}

	/**
	 * 私聊消息处理
	 * 这里我们通过@SendToUser注解指定消息目的地为"/topic/chat/private",发送目的地默认会拼接上"/user/"前缀
	 * 实际发送目的地为"/user/topic/chat/private"
	 * @param webSocketMsgDTO 请求参数,消息处理器会自动将JSON字符串转换为对象
	 * @return 消息内容,方法返回值将会基于SessionID单播给指定用户
	 */
	@MessageMapping("/chat/private")
	@SendToUser("/topic/chat-private")
	public WebSocketMsgVO privateChat(WebSocketMsgDTO webSocketMsgDTO) {
		log.info("Private chat message received: {}", FastJsonUtils.toJsonString(webSocketMsgDTO));
		String content = "私聊消息回复:" + webSocketMsgDTO.getContent();
		return WebSocketMsgVO.builder().content(content).build();
	}

	/**
	 * 定时消息推送,这里我们会列举所有在线的用户,然后单播给指定用户。
	 * 通过SimpMessagingTemplate实例可以在任何地方推送消息。
	 */
	@Scheduled(fixedRate = 10 * 1000)
	public void pushMessageAtFixedRate() {
		log.info("当前在线人数: {}", simpUserRegistry.getUserCount());
		if (simpUserRegistry.getUserCount() <= 0) {
			return;
		}

		// 这里的Principal为StompAuthenticatedUser实例
		Set<StompAuthenticatedUser> users = simpUserRegistry.getUsers().stream()
			.map(simpUser -> StompAuthenticatedUser.class.cast(simpUser.getPrincipal()))
			.collect(Collectors.toSet());

		users.forEach(authenticatedUser -> {
			String userId = authenticatedUser.getUserId();
			String nickName = authenticatedUser.getNickName();
			WebSocketMsgVO webSocketMsgVO = new WebSocketMsgVO();
			webSocketMsgVO.setContent(String.format("定时推送的私聊消息, 接收人: %s, 时间: %s", nickName, LocalDateTime.now()));

			log.info("开始推送消息给指定用户, userId: {}, 消息内容:{}", userId, FastJsonUtils.toJsonString(webSocketMsgVO));
			simpMessagingTemplate.convertAndSendToUser(userId, "/topic/chat-push", webSocketMsgVO);
		});
	}

}

6、前端页面chat.html

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8">
    <title>chat</title>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.6.1/sockjs.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.6.4/jquery.min.js"></script>
    <style>
        #mainWrapper {
            width: 600px;
            margin: auto;
        }
    </style>
</head>
<body>
<div id="mainWrapper">
    <div>
        <label for="username" style="margin-right: 5px">姓名:</label><input id="username" type="text"/>
    </div>
    <div id="msgWrapper">
        <p style="vertical-align: top">发送的消息:</p>
        <textarea id="msgSent" style="width: 600px;height: 100px"></textarea>
        <p style="vertical-align: top">收到的群聊消息:</p>
        <textarea id="groupMsgReceived" style="width: 600px;height: 100px"></textarea>
        <p style="vertical-align: top">收到的私聊消息:</p>
        <textarea id="privateMsgReceived" style="width: 600px;height: 200px"></textarea>
    </div>
    <div style="margin-top: 5px;">
        <button onclick="connect()">连接</button>
        <button onclick="sendGroupMessage()">发送群聊消息</button>
        <button onclick="sendPrivateMessage()">发送私聊消息</button>
        <button onclick="disconnect()">断开连接</button>
    </div>
</div>
<script type="text/javascript">
    $(() => {
        $('#msgSent').val('');
        $("#groupMsgReceived").val('');
        $("#privateMsgReceived").val('');
    });

    let stompClient = null;


    // 连接服务器
    const connect = () => {
        const header = {"User-ID": new Date().getTime().toString(), "User-Name": $('#username').val()};
        const ws = new SockJS('http://localhost:8080/websocket');
        stompClient = Stomp.over(ws);
        stompClient.connect(header, () => subscribeTopic());
    }

    // 订阅主题
    const subscribeTopic = () => {
        alert("连接成功!");

        // 订阅广播消息
        stompClient.subscribe('/topic/chat-group', function (message) {
                console.log(`Group message received : ${message.body}`);
                const resp = JSON.parse(message.body);
                const previousMsg = $("#groupMsgReceived").val();
                $("#groupMsgReceived").val(`${previousMsg}${resp.content}\n`);
            }
        );
        // 订阅单播消息
        stompClient.subscribe('/user/topic/chat-private', message => {
                console.log(`Private message received : ${message.body}`);
                const resp = JSON.parse(message.body);
                const previousMsg = $("#privateMsgReceived").val();
                $("#privateMsgReceived").val(`${previousMsg}${resp.content}\n`);
            }
        );
        // 订阅定时推送的单播消息
        stompClient.subscribe(`/user/topic/chat-push`, message => {
                console.log(`Private message received : ${message.body}`);
                const resp = JSON.parse(message.body);
                const previousMsg = $("#privateMsgReceived").val();
                $("#privateMsgReceived").val(`${previousMsg}${resp.content}\n`);
            }
        );
    };

    // 断连
    const disconnect = () => {
        stompClient.disconnect(() => {
            $("#msgReceived").val('Disconnected from WebSocket server');
        });
    }

    // 发送群聊消息
    const sendGroupMessage = () => {
        const msg = {name: $('#username').val(), content: $('#msgSent').val()};
        stompClient.send('/app/chat/group', {}, JSON.stringify(msg));
    }

    // 发送私聊消息
    const sendPrivateMessage = () => {
        const msg = {name: $('#username').val(), content: $('#msgSent').val()};
        stompClient.send('/app/chat/private', {}, JSON.stringify(msg));
    }
</script>
</body>
</html>

四、测试示例

1、群聊、私聊、后台定时推送测试

启动应用程序,日志打印显示系统连接建立成功,如下:

在这里插入图片描述

打开浏览器访问http://localhost:8080/page/chat可进入聊天页,同时打开两个窗口访问。
在这里插入图片描述


在这里插入图片描述

2、登录RabbitMQ控制台查看队列信息

在这里插入图片描述
可以看到所有消息都发送到了amq.topic交换机上(Topic类型), RabbitMQ会为每个连接的客户端创建3个队列。

因为我们在ChatController中定义了三个目的地,Routing Key分别是/topic/chat-group/topic/chat-private/topic/chat-push。群聊消息目的地/topic/chat-group绑定了两个队列,用于实现广播订阅,其它两个Routing Key分别绑定到了不同的队列上,实现唯一订阅。


五、结语

下一节我们将会详细说明RabbitMQ STOMP适配器支持的各种消息目的地类型的区别以及适用场景。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/996752.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

合宙Air724UG LuatOS-Air LVGL API控件-截屏(Screenshots)

截屏&#xff08;Screenshots&#xff09; 分 享导出pdf 截屏功能&#xff0c;core版本号要>3211 示例代码 -- 创建图片控件img lvgl.img_create(lvgl.scr_act(), nil)-- 设置图片显示的图像lvgl.img_set_src(img, "/lua/test.png")-- 图片居中lvgl.obj_align(…

Pygame中Sprite类的使用1

在Pygame中有一个名为sprite的模块&#xff0c;又叫做“精灵”模块。通过该模块中的Sprite类可以实现图形的绘制、移动以及碰撞检测等功能。 1 Sprite类的编写 例如&#xff0c;要绘制“植物大战僵尸”中的僵尸&#xff0c;就可以创建一个新类&#xff0c;让该类继承自sprite…

零基础Linux_2(基本指令_上)目录/文件的显示跳转创建删除

目录 1. 目录内容的显示 ls(显示非隐藏文件) pwd(显示用户当前所在的目录) ls -l(列出文件的详细信息) ls -a(显示隐藏文件) 2. 目录的跳转操作 cd ..(回到上级路径) cd 绝对路径名(进入这个路径) cd 绝对相对名(进入这个路径) cd ~(跳转到当前用户的家目录) cd -(跳…

SQL语法知识回顾

一、SQL语言的分类 由于数据库管理系统&#xff08;数据库软件&#xff09;功能非常多&#xff0c;不仅仅是存储数据&#xff0c;还要包含&#xff1a;数据的管理、表的管理、库的管理、账户管理、权限管理等等。所以&#xff0c;操作数据库的SQL语言&#xff0c;也基于功能&am…

Microsoft Edge网页视频播放绿屏解决方法(B站)

一&#xff1a;问题&#xff0c;在B站观看视频时有绿色条纹 二&#xff1a;查找原因&#xff0c;未知 三&#xff1a;解决方法 三.1网页设置关闭硬件加速 三.2 点击视频播放下的 “小齿轮”&#xff0c;然后点击“更多播放设置” 把播放策略 “默认” 改为“AVC” 四&…

[SUCTF2019]SignIn 题解

是一个64位的文件 使用了RSA加密算法 N是103461035900816914121390101299049044413950405173712170434161686539878160984549 使用在线网站分离得到p&#xff0c;q 然后编写脚本进行解密 import gmpy2 import binasciip 282164587459512124844245113950593348271 q 366669…

回归预测 | MATLAB实现MPA-BiGRU海洋捕食者算法优化双向门控循环单元多输入单输出回归预测(多指标,多图)

回归预测 | MATLAB实现MPA-BiGRU海洋捕食者算法优化双向门控循环单元多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09; 目录 回归预测 | MATLAB实现MPA-BiGRU海洋捕食者算法优化双向门控循环单元多输入单输出回归预测&#xff08;多指标&#xff0c;多图&a…

03-系统篇-内存碎片

一.常见的malloc内存分配原理 1内存分配原理 linux中应用层动态分配内存一般是用的malloc函数&#xff0c;而malloc在glibc中实现时&#xff0c;是用sbrk()来分内存. 在前面的章节中&#xff0c;我们了解到了堆的概念&#xff0c;堆在内存中&#xff0c;是一断连续的内存&…

适应度函数

23个基准测试函数 优化算法常用的适应度函数23个基准测试函数的图像python版code将以上代码打包为exe 优化算法常用的适应度函数 23个基准测试函数的图像 python版code 将23个基准测试函数封装成python库&#xff0c;并进行实例化、调用绘图函数。下面代码只需根据提示打印&am…

算法通关村18关 | 透析回溯的

回溯有清晰的解题模板&#xff0c; void backtracking(参数){if (终止条件){存放结果;return;}for (选择本层中的集合元素&#xff08;画成树&#xff0c;就是树节点孩子的大小) {处理节点;backtracking();回溯&#xff0c;撤销处理结果;}} 1. 从N叉树说起 在回溯之前&#x…

时序分解 | MATLAB实现ICEEMDAN+SE改进的自适应经验模态分解+样本熵重构分量

时序分解 | MATLAB实现ICEEMDANSE改进的自适应经验模态分解样本熵重构分量 目录 时序分解 | MATLAB实现ICEEMDANSE改进的自适应经验模态分解样本熵重构分量效果一览基本介绍程序设计参考资料 效果一览 基本介绍 ICEEMDANSE改进的自适应经验模态分解样本熵重构分量 包括频谱图 避…

配置Xftp绕过跳板机直连内网环境

一、需求说明 因现场环境限制&#xff0c;现在只有一台管理主机可连入内网环境&#xff0c;但因现在需要传入大量数据到内网环境&#xff0c;二管理主机的存储又无法满足需求&#xff0c;rz和sz命令又有传输大小限制&#xff0c;因此&#xff0c;我们来看下如何配置【隧道】实现…

Win10如何找回图片查看器

近期有小伙伴反映在将Win10升级之后发现电脑自带的图片查看器没有了&#xff0c;这是怎么回事&#xff0c;该怎么找回呢&#xff0c;下面小编就给大家详细介绍一下Win10找回图片查看器的方法&#xff0c;有需要的小伙伴快来和小编一起阅读看看吧。 win10找回windows照片查看器…

日志平台搭建第三章:Linux安装logstash

相关链接 项⽬主⻚&#xff1a; https://www.elastic.co/cn/downloads/logstash 下载地址&#xff1a; wget https://artifacts.elastic.co/downloads/logstash/logstash-7.5.1.tar.gz 官网下载可能比较慢&#xff0c;下面提供下载地址 百度云链接&#xff1a;https://pan.…

【Linux】对进程概念的理解

一丶进程概念 进程定义 进程是一个具有一定独立功能的程序在一个数据集合上依次动态执行的过程。进程是一个正在执行的程序的实例&#xff0c;包括程序计数器、寄存器和程序变量的当前值。 进程特征 1.进程依赖于程序运行而存在&#xff0c;进程是动态的&#xff0c;程序是…

生成树协议 STP(spanning-tree protocol)

一、STP作用 1、消除环路&#xff1a;通过阻断冗余链路来消除网络中可能存在的环路。 2、链路备份&#xff1a;当活动路径发生故障时&#xff0c;激活备份链路&#xff0c;及时恢复网络连通性。 二、STP选举机制 1、目的&#xff1a;找到阻塞的端口 2、STP交换机的角色&am…

UG\NX CAM二次开发 设置工序毛坯 UF_CAMGEOM_append_items

文章作者:代工 来源网站:NX CAM二次开发专栏 简介: UG\NX CAM二次开发 设置工序毛坯 UF_CAMGEOM_append_items 效果: 代码: static int init_proc(UF_UI_selection_p_t select, void* user_data) { int errorCode = 0; int num_triples = 1; UF_UI_mask_t mas…

JavaScript设计模式(五)——发布订阅模式、桥接模式、组合模式

个人简介 &#x1f440;个人主页&#xff1a; 前端杂货铺 &#x1f64b;‍♂️学习方向&#xff1a; 主攻前端方向&#xff0c;正逐渐往全干发展 &#x1f4c3;个人状态&#xff1a; 研发工程师&#xff0c;现效力于中国工业软件事业 &#x1f680;人生格言&#xff1a; 积跬步…

Rokid Jungle--Max pro

介绍和功能开发 YodaOS-Master操作系统&#xff1a;以交换计算为核心&#xff0c;实现单目SLAM空间交互&#xff0c;具有高精度、实时性和稳定性。发布UXR2.0SDK&#xff0c;为构建空间内容提供丰富的开发套件 多模态交互 算法原子化 多种开发工具协同 多生态支持 骁龙XR2…

无swing,高级javaSE毕业之贪吃蛇游戏(含模块构建,多线程监听服务),已录制视频

JavaSE&#xff0c;无框架实现贪吃蛇 B站已发视频&#xff1a;无swing&#xff0c;纯JavaSE贪吃蛇游戏设计构建 文章目录 JavaSE&#xff0c;无框架实现贪吃蛇1.整体思考2.可能的难点思考2.1 如何表示游戏界面2.2 如何渲染游戏界面2.3 如何让游戏动起来2.4 蛇如何移动 3.流程图…