【stomp 实战】spring websocket用户消息发送源码分析

news2024/12/26 23:41:49

这一节,我们学习用户消息是如何发送的。

消息的分类

spring websocket将消息分为两种,一种是给指定的用户发送(用户消息),一种是广播消息,即给所有用户发送消息。那怎么区分这两种消息呢?那就是用前缀了。

用户消息的前缀

  • 不配置的情况下,默认用户消息的前缀是/user
  • 也可以通过下面的方式来配置用户消息
@Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {

        /**
         * stompClient.subscribe("/user/topic/subNewMsg",...)
         * 这个时候,后端推送消息应该这么写
         * msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg);
         * 即去掉了/user前缀
         */
        registry.setUserDestinationPrefix(WsConstants.USER_DESTINATION_PREFIX);
    }
  • 默认情况下,/user是用户消息前缀,那么前端订阅的代码可以这么写
 //订阅用户消息topic1
 stompClient.subscribe("/user/topic/answer", function (response) {
 //do something
 });
  • 后端的发送消息的代码可以这么写,注意,在这里发送的时候,调用的convertAndSendToUser没有带/user前缀
    private final SimpMessageSendingOperations msgOperations;
    public void echo(Principal principal, Msg msg) {
        msgOperations.convertAndSendToUser(username, "/topic/answer", msg);
    }

广播消息的前缀

  • 广播消息没有默认值,必须显示地指定
  • 配置广播消息的前缀是这么配置,通过/topic或者/queue前缀来订阅的,就是广播消息
@Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/topic", "/queue")
                //配置stomp协议里, server返回的心跳
                .setHeartbeatValue(new long[]{10000L, 10000L})
                //配置发送心跳的scheduler
                .setTaskScheduler(new DefaultManagedTaskScheduler());
    }
  • 前端代码可以这么写
//订阅广播消息topic
  stompClient.subscribe("/topic/boardCast/hello", function (response) {
      // do something
  });
  • 后端代码可以这么写
  private final SimpMessageSendingOperations msgOperations;
  public void echo2(Msg msg) {
        log.info("收到的消息为:{}", msg.getContent());
        msgOperations.convertAndSend("/topic/boardCast/hello", "hello boardCast Message");
    }

发送用户消息源码分析

用户订阅过程

发送消息,本质上就是从内存中找到注册的用户,通过用户名找到用户会话,在从用户会话中找到该用户的订阅,如果该用户有该订阅,那么就发送消息给前端。

总结一下用户和会话之间的关系,如下图
在这里插入图片描述
如果这块不太熟悉,建议回顾这篇文章,了解一下用户,用户会话,订阅之间的关系:【stomp 实战】Spring websocket 用户订阅和会话的管理源码分析

我们通过Debug来看一下,前端执行用户订阅,经历了哪些过程。
假设,当前登录用户是1001

  stompClient.subscribe("/user/topic/answer", function (response) {
  //do something
   });

该用户建立连接,并且绑定1001的用户会话后,执行后端的订阅注册
DefaultSimpUserRegistry响应订阅事件代码如下:
在这里插入图片描述
可以看到,当前的sessionId,destination

在这里插入图片描述
将订阅放到一个subscriptions的map里面。缓存在内存中。

用户消息的发送

后端代码是这么写的,我们来调试一下

    private final SimpMessageSendingOperations msgOperations;
    public void echo(Principal principal, Msg msg) {
        msgOperations.convertAndSendToUser(username, "/topic/answer", msg);
    }

经过层层调用,发现调到了下面的方法
在这里插入图片描述
发现我们的发送目的地变成了这个:this.destinationPrefix + user + destination
通过调试时,发现值如上图所示。
也就是说,我们的发送目的,变成了/user+用户名+我们传的入参/topic/answer
然后再进入下面的代码

//AbstractMessageSendingTemplate
	@Override
	public void convertAndSend(D destination, Object payload, @Nullable Map<String, Object> headers,
			@Nullable MessagePostProcessor postProcessor) throws MessagingException {
		//对消息进行转换,对象转字符串,或者字节数组之类的
		Message<?> message = doConvert(payload, headers, postProcessor);
		//调用Send发送
		send(destination, message);
	}

做了两个事:

  • 对消息进行转换,对象转字符串,或者字节数组之类的
  • 调用Send发送

再来看下send方法

	@Override
	public void send(D destination, Message<?> message) {
		doSend(destination, message);
	}

再调用doSend,由子类SimpMessagingTemplate实现。

//SimpMessagingTemplate
	@Override
	protected void doSend(String destination, Message<?> message) {
		Assert.notNull(destination, "Destination must not be null");

		SimpMessageHeaderAccessor simpAccessor =
				MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);

		if (simpAccessor != null) {
			if (simpAccessor.isMutable()) {
				simpAccessor.setDestination(destination);
				simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);
				simpAccessor.setImmutable();
				sendInternal(message);
				return;
			}
			else {
				// Try and keep the original accessor type
				simpAccessor = (SimpMessageHeaderAccessor) MessageHeaderAccessor.getMutableAccessor(message);
				initHeaders(simpAccessor);
			}
		}
		else {
			simpAccessor = SimpMessageHeaderAccessor.wrap(message);
			initHeaders(simpAccessor);
		}

		simpAccessor.setDestination(destination);
		simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);
		message = MessageBuilder.createMessage(message.getPayload(), simpAccessor.getMessageHeaders());
		sendInternal(message);
	}

其中最关键的是sendInternal

private void sendInternal(Message<?> message) {
		String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
		Assert.notNull(destination, "Destination header required");

		long timeout = this.sendTimeout;
		boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));

		if (!sent) {
			throw new MessageDeliveryException(message,
					"Failed to send message to destination '" + destination + "' within timeout: " + timeout);
		}
	}

然后再通过messageChannel来发送此条消息。

//AbstractMessageChannel
	@Override
	public final boolean send(Message<?> message, long timeout) {
		Assert.notNull(message, "Message must not be null");
		Message<?> messageToUse = message;
		ChannelInterceptorChain chain = new ChannelInterceptorChain();
		boolean sent = false;
		try {
			messageToUse = chain.applyPreSend(messageToUse, this);
			if (messageToUse == null) {
				return false;
			}
			sent = sendInternal(messageToUse, timeout);
			chain.applyPostSend(messageToUse, this, sent);
			chain.triggerAfterSendCompletion(messageToUse, this, sent, null);
			return sent;
		}
		catch (Exception ex) {
			chain.triggerAfterSendCompletion(messageToUse, this, sent, ex);
			if (ex instanceof MessagingException) {
				throw (MessagingException) ex;
			}
			throw new MessageDeliveryException(messageToUse,"Failed to send message to " + this, ex);
		}
		catch (Throwable err) {
			MessageDeliveryException ex2 =
					new MessageDeliveryException(messageToUse, "Failed to send message to " + this, err);
			chain.triggerAfterSendCompletion(messageToUse, this, sent, ex2);
			throw ex2;
		}
	}
  • 构造了一个拦截链,在发送前,可以进行前置处理和后置处理。这个拦截链就是扩展的关键了。我们可以定义自己的拦截器,在发送消息前后进行拦截处理。这里spring给我们的扩展点。
  • 通过sendInternal将消息发送出去

再来看下sendInternal方法,进入子类ExecutorSubscribableChannel

//ExecutorSubscribableChannel
	@Override
	public boolean sendInternal(Message<?> message, long timeout) {
		for (MessageHandler handler : getSubscribers()) {
			SendTask sendTask = new SendTask(message, handler);
			if (this.executor == null) {
				sendTask.run();
			}
			else {
				this.executor.execute(sendTask);
			}
		}
		return true;
	}

可以看到,通过这个Channel,找到messageHandler,这个messageHandler有多个,依次将消息进行处理。
在这里插入图片描述
这里取到的有两个messageHandler

  • SimpleBrokerMessageHandler
  • UserDestinationMessageHandler

进入SendTask,看一下run方法

//
public void run() {
	Message<?> message = this.inputMessage;
	try {
		message = applyBeforeHandle(message);
		if (message == null) {
			return;
		}
		this.messageHandler.handleMessage(message);
		triggerAfterMessageHandled(message, null);
	}
	catch (Exception ex) {
		triggerAfterMessageHandled(message, ex);
		if (ex instanceof MessagingException) {
			throw (MessagingException) ex;
		}
		String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;
		throw new MessageDeliveryException(message, description, ex);
	}
	catch (Throwable err) {
		String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;
		MessageDeliveryException ex2 = new MessageDeliveryException(message, description, err);
		triggerAfterMessageHandled(message, ex2);
		throw ex2;
	}
}

这里的关键点是:this.messageHandler.handleMessage(message);
首先会进入SimpleBrokerMessageHandler的handleMessage
在这里插入图片描述
可以看到,这里直接跳出去了。
SimpleBrokerMessageHandler的作用就是,看是不是我们配置的广播消息的前缀,要满足这个条件,才能发送消息。我们配置的前缀是/topic,/queue,这里destination前缀是/user,所以提前返回,不处理。
然后,我们还有一个UserDestinationMessageHandler会继续处理。

在这里插入图片描述
这里对destination进行了处理,发现生成了一个result对象,这里解析出一个targetDestinations,可以看到我们的destination变成了下面的样子
/topic/answer-usero2zuy4zg

  • 这个的构成实际上就是把/user前缀去掉
  • 然后加上-user,后面加上sessionId,就是当前会话的id
  • 最后再以这个新生成的targetDestination,将消息发送出去!
    在这里插入图片描述

这里的messagingTemplate,就是SimpMessagingTemplate。又会回到上面分析的代码。

  • SimpMessagingTemplate调用messageChannel来发送消息
  • messageChannel中会取得两个messageHandler来处理。
    像不像递归调用?
    不过这一次由于我们的destination已经变成了/topic/answer-usero2zuy4zg。这时候,在进入SimpleBrokerMessageHandler时,情况就不一样了

在这里插入图片描述
由于destination变成了/topic开头的,此时我们不会跳出去,会找到用户(-user后面跟了SessionId)订阅,将消息发送出去

可以看到,我们找到了一个用户订阅。在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

其实是每个用户订阅时,会将/user前缀去掉,将用户的destination改写成了如下形式,
/user/topic/hello->/topic/hello-user{sessionId}
所以,经过UserDestinationMessageHandler处理,改写后的destination可以通过destination找到用户会话,将此消息发送出去。
到此,我们的用户消息的发送就分析完了

总结

发送用户消息的整个过程如下:

  • SimpMessageSendingOperations.convertAndSendToUser接口发送用户消息,这里不传/user前缀,注意一下
  • 接着SimpMessagingTemplate进行消息的发送
  • SimpMessagingTemplate会交由messageChannel

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1636533.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

maven聚合,继承等方式

需要install安装到本地仓库&#xff0c;或者私服&#xff0c;方可使用自己封装项目 编译&#xff0c;测试&#xff0c;打包&#xff0c;安装&#xff0c;发布 parent: <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://mav…

Python 机器学习 基础 之 学习 基础环境搭建

Python 机器学习 基础 之 学习 基础环境搭建 目录 Python 机器学习 基础 之 学习 基础环境搭建 一、简单介绍 二、什么是机器学习 三、python 环境的搭建 1、Python 安装包下载 2、这里以 下载 Python 3.10.9 为例 3、安装 Python 3.10.9 4、检验 python 是否安装成功&…

二分图--判定以及最大匹配

水了个圈钱杯省一&#xff0c;不过估计国赛也拿不了奖&#xff0c;但还是小小挣扎一下。 什么是二分图&#xff1a;G(V,E)是一个无向图&#xff0c;若顶点V可以分为两个互不相交的子集A,B&#xff0c;并图中的每一条边&#xff08;i,j)所关联的ij属于不同的顶点集&#xff0c;…

2024年Docker常用操作快速查询手册

目录 一、Linux系统上 Docker安装流程&#xff08;以ubuntu为例&#xff09; 一、卸载所有冲突的软件包 二、设置Docker的apt存储库&#xff08;这里使用的是阿里云软件源&#xff09; 三、直接安装最新版本的Docker 三、安装指定版本的Docker 四、验证Docker是否安装成功…

Nginx实现端口转发与负载均衡配置

前言&#xff1a;当我们的软件体系结构较为庞大的时候&#xff0c;访问量往往是巨大的&#xff0c;所以我们这里可以使用nginx的均衡负载 一、配置nginx实现端口转发 本地tomcat服务端口为8082 本地nginx端口为8080 目的&#xff1a;将nginx的8080转发到tomcat的8082端口上…

每日一题:插入区间

给你一个 无重叠的 &#xff0c;按照区间起始端点排序的区间列表 intervals&#xff0c;其中 intervals[i] [starti, endi] 表示第 i 个区间的开始和结束&#xff0c;并且 intervals 按照 starti 升序排列。同样给定一个区间 newInterval [start, end] 表示另一个区间的开始和…

【嵌入式笔试题】网络编程笔试题

非常经典的笔试题。 2.网络编程(29道) 2.1列举一下OSI协议的各种分层。说说你最熟悉的一层协议的功能。 ( 1 )七层划分为:应用层、表示层、会话层、传输层、网络层、数据链路层、物理 层。 ( 2 )五层划分为:应用层、传输层、网络层、数据链路层、物理层。 ( 3 )…

vue 脚手架 创建vue3项目

创建项目 命令&#xff1a;vue create vue-element-plus 选择配置模式&#xff1a;手动选择模式 (上下键回车) 选择配置&#xff08;上下键空格回车&#xff09; 选择代码规范、规则检查和格式化方式: 选择语法检查方式 lint on save (保存就检查) 代码文件中有代码不符合 l…

国家开放大学2024年春《Matlab语言及其应用》实验一熟悉Matlab 操作环境参考答案

实验报告 姓名&#xff1a; 学号&#xff1a; 实验一名称&#xff1a;熟悉 Matlab 操作环境 实验目标&#xff1a;通过简单变量和矩阵的录入、计算和查看相关信息&#xff0c;了解 Matlab 操作界面 及各子窗口使用方法。熟悉一系列便于使用的 Matlab 函数和文件的工具。 实…

java-springmvc 01 补充 javaweb 三大组件Servlet,Filter、Listener(源码都是tomcat8.5项目中的)

01.JavaWeb三大组件指的是&#xff1a;Servlet、Filter、Listener,三者提供不同的功能 这三个在springmvc 运用很多 Servlet 01.Servlet接口&#xff1a; public interface Servlet {/*** 初始化方法* 实例化servlet之后&#xff0c;该方法仅调用一次 * init方法必须执行完…

【GitHub】github学生认证,在vscode中使用copilot的教程

github学生认证并使用copilot教程 写在最前面一.注册github账号1.1、注册1.2、完善你的profile 二、Github 学生认证注意事项&#xff1a;不完善的说明 三、Copilot四、在 Visual Studio Code 中安装 GitHub Copilot 扩展4.1 安装 Copilot 插件4.2 配置 Copilot 插件&#xff0…

【机器学习-19】集成学习---投票法(Voting)

一、引言 集成学习&#xff08;Ensemble Learning&#xff09;是机器学习领域中的一种重要策略&#xff0c;它通过结合多个模型的预测结果来提高整体性能。在单个模型容易过拟合或欠拟合的情况下&#xff0c;集成学习能够通过综合多个模型的优点来减少这种风险&#xff0c;从而…

ESP32 和 Arduino 之间建立蓝牙连接

ESP32 和 Arduino Uno 可以通过蓝牙进行通信。为此&#xff0c;您必须使用与 Arduino 兼容的蓝牙模块。我们将使用 HC-05&#xff08;06&#xff09; 蓝牙模块。 连接Arduino Uno和HC-05蓝牙模块 将 HC-05 蓝牙模块连接到 Arduino 板。将模块的VCC、GND、RX、TX引脚连接到Ard…

Python 与 TensorFlow2 生成式 AI(二)

原文&#xff1a;zh.annas-archive.org/md5/d06d282ea0d9c23c57f0ce31225acf76 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 第四章&#xff1a;教授网络生成数字 在前一章中&#xff0c;我们涵盖了神经网络模型的构建基块。在这一章中&#xff0c;我们的第一个项目…

04 Docker练习赛从0开始到 docker 镜像提交

1.1 本地安装 docker 工具 这里以ubutun下安装docker为例,其他操作系统安装命令略有不同,可自行百度。(建议使用阿里源安装速度快) sudo apt install docker.io如果你本地有gpu,请继续执行如下命令以支持gpu调用: 注意: 英伟达对 docker 支持的 linux 发行版:https:/…

【计算智能】基本遗传算法在优化问题中的应用与实验【理论到程序】

文章目录 1. 引言&#xff1a;遗传算法简介2. 基本遗传算法&#xff08;SGA&#xff09;2.1 基本遗传算法的构成要素1. 染色体编码2. 适应度函数3. 遗传算子 2.2 实验设计与方法1. 算法流程2. 伪代码3. python实现1. 导入模块2. 目标函数 f(x)3 初始化种群4. 计算适应度5. 选择…

20232906 2023-2024-2 《网络与系统攻防技术》第七次作业

20232906 2023-2024-2 《网络与系统攻防技术》第七次作业 1.实验内容 一、使用Metasploit进行Linux远程渗透攻击 任务&#xff1a;使用Metasploit渗透测试软件&#xff0c;攻击Linux靶机上的Samba服务Usermap_script安全漏洞&#xff0c;获取目标Linux靶机的主机访问权限。实…

如何在Spring Boot中配置数据库密码加密

如何在Spring Boot中配置数据库密码加密&#xff1f; alibaba/druid Wiki GitHub 使用ConfigFilter alibaba/druid Wiki GitHub 巧用Druid数据源实现数据库连接密码的加密解密功能 import com.alibaba.druid.filter.config.ConfigTools;public class Testttt {public stat…

汽车CAN总线技术详解

1. 历史 2. 应用 3. 优点 4. 基础概念 5. 组成 6. 应用 7. 网关 8. 波形分析 参考文献 汽车CAN总线技术详解&#xff08;100多页支持下载&#xff09;

帮助 Python 用户构建 CLI 界面:直观易写、简单高效 | 开源日报 No.240

tiangolo/typer Stars: 13.7k License: MIT typer 是一个构建出色命令行界面&#xff08;CLI&#xff09;的库&#xff0c;基于 Python 类型提示。它旨在让开发者轻松创建用户喜欢使用的 CLI 应用程序。其主要功能和核心优势包括&#xff1a; 直观易写&#xff1a;强大编辑器…