【stomp 实战】Spring websocket 用户订阅和会话的管理源码分析

news2024/12/28 23:28:15

通过Spring websocket 用户校验和业务会话绑定我们学会了如何将业务会话绑定到spring websocket会话上。通过这一节,我们来分析一下会话和订阅的实现

用户会话的数据结构

SessionInfo 用户会话

用户会话定义如下:

private static final class SessionInfo {

		// subscriptionId -> Subscription
		private final Map<String, Subscription> subscriptionMap = new ConcurrentHashMap<>();

		public Collection<Subscription> getSubscriptions() {
			return this.subscriptionMap.values();
		}

		@Nullable
		public Subscription getSubscription(String subscriptionId) {
			return this.subscriptionMap.get(subscriptionId);
		}

		public void addSubscription(Subscription subscription) {
			this.subscriptionMap.putIfAbsent(subscription.getId(), subscription);
		}

		@Nullable
		public Subscription removeSubscription(String subscriptionId) {
			return this.subscriptionMap.remove(subscriptionId);
		}
	}
  • 用户会话中有subscriptionMap。这个表示一个会话中,可以有多个订阅,可以根据subscriptionId找到订阅。

SessionRegistry 用户会话注册

private static final class SessionRegistry {

		private final ConcurrentMap<String, SessionInfo> sessions = new ConcurrentHashMap<>();

		@Nullable
		public SessionInfo getSession(String sessionId) {
			return this.sessions.get(sessionId);
		}

		public void forEachSubscription(BiConsumer<String, Subscription> consumer) {
			this.sessions.forEach((sessionId, info) ->
				info.getSubscriptions().forEach(subscription -> consumer.accept(sessionId, subscription)));
		}

		public void addSubscription(String sessionId, Subscription subscription) {
			SessionInfo info = this.sessions.computeIfAbsent(sessionId, _sessionId -> new SessionInfo());
			info.addSubscription(subscription);
		}

		@Nullable
		public SessionInfo removeSubscriptions(String sessionId) {
			return this.sessions.remove(sessionId);
		}
	}
  • SessionRegistry 中sessions 表示多个会话。根据sessionId可以找到唯一一个会话SessionInfo

Subscription 用户订阅

	private static final class Subscription {

		private final String id;

		private final String destination;

		private final boolean isPattern;

		@Nullable
		private final Expression selector;

		public Subscription(String id, String destination, boolean isPattern, @Nullable Expression selector) {
			Assert.notNull(id, "Subscription id must not be null");
			Assert.notNull(destination, "Subscription destination must not be null");
			this.id = id;
			this.selector = selector;
			this.destination = destination;
			this.isPattern = isPattern;
		}

		public String getId() {
			return this.id;
		}

		public String getDestination() {
			return this.destination;
		}

		public boolean isPattern() {
			return this.isPattern;
		}

		@Nullable
		public Expression getSelector() {
			return this.selector;
		}

		@Override
		public boolean equals(@Nullable Object other) {
			return (this == other ||
					(other instanceof Subscription && this.id.equals(((Subscription) other).id)));
		}

		@Override
		public int hashCode() {
			return this.id.hashCode();
		}

		@Override
		public String toString() {
			return "subscription(id=" + this.id + ")";
		}
	}

SimpUserRegistry 用户注册接口

用户注册的接口如下:

public interface SimpUserRegistry {

	/**
	根据用户名,获取到用户信息
	 * Get the user for the given name.
	 * @param userName the name of the user to look up
	 * @return the user, or {@code null} if not connected
	 */
	@Nullable
	SimpUser getUser(String userName);

	/**
	获取现在所有的注册的用户
	 * Return a snapshot of all connected users.
	 * <p>The returned set is a copy and will not reflect further changes.
	 * @return the connected users, or an empty set if none
	 */
	Set<SimpUser> getUsers();

	/**
	获取在线用户数量
	 * Return the count of all connected users.
	 * @return the number of connected users
	 * @since 4.3.5
	 */
	int getUserCount();

	/**
	 * Find subscriptions with the given matcher.
	 * @param matcher the matcher to use
	 * @return a set of matching subscriptions, or an empty set if none
	 */
	Set<SimpSubscription> findSubscriptions(SimpSubscriptionMatcher matcher);

}

SimpUser实际上就是代表着一个用户,我们来看其实现:LocalSimpUser的定义

	private static class LocalSimpUser implements SimpUser {
		private final String name;
		private final Principal user;
		private final Map<String, SimpSession> userSessions = new ConcurrentHashMap<>(1);
		public LocalSimpUser(String userName, Principal user) {
			Assert.notNull(userName, "User name must not be null");
			this.name = userName;
			this.user = user;
		}
	}

userSessions 表示当前一个用户可以对应多个会话。
这个Principal 是啥,还记得我们上一节通过Spring websocket 用户校验和业务会话绑定中,我们是怎么注册用户的吗

    private void connect(Message<?> message, StompHeaderAccessor accessor) {
        //1通过请求头获取到token
        String token = accessor.getFirstNativeHeader(WsConstants.TOKEN_HEADER);
        //2如果token为空或者用户id没有解析出来,抛出异常,spring会将此websocket连接关闭
        if (StringUtils.isEmpty(token)) {
            throw new MessageDeliveryException("token missing!");
        }
        String userId = TokenUtil.parseToken(token);
        if (StringUtils.isEmpty(userId)) {
            throw new MessageDeliveryException("userId missing!");
        }
        //这个是每个会话都会有的一个sessionId
        String simpleSessionId = (String) message.getHeaders().get(SimpMessageHeaderAccessor.SESSION_ID_HEADER);

        //3创建自己的业务会话session对象
        UserSession userSession = new UserSession();
        userSession.setSimpleSessionId(simpleSessionId);
        userSession.setUserId(userId);
        userSession.setCreateTime(LocalDateTime.now());
        //4关联用户的会话。通过msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg); 此方法,可以发送给用户消息
        accessor.setUser(new UserSessionPrincipal(userSession));
    }

从token中解析出用户的userId,并通过下面的代码,把当前用户和会话绑定起来。一个用户实际上是可以绑定多个会话的。

 accessor.setUser(new UserSessionPrincipal(userSession));

总结一下用户和会话之间的关系,如下图
在这里插入图片描述

订阅过程的源码分析

前端订阅的代码如下

  stompClient.subscribe("/user/topic/answer", function (response) {
      createElement("answer", response.body);
  });

当后端收到订阅消息后,会由SimpleBrokerMessageHandler来处理

	@Override
	protected void handleMessageInternal(Message<?> message) {
		MessageHeaders headers = message.getHeaders();
		String destination = SimpMessageHeaderAccessor.getDestination(headers);
		String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);

		updateSessionReadTime(sessionId);

		if (!checkDestinationPrefix(destination)) {
			return;
		}

		SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
		if (SimpMessageType.MESSAGE.equals(messageType)) {
			logMessage(message);
			sendMessageToSubscribers(destination, message);
		}
		else if (SimpMessageType.CONNECT.equals(messageType)) {
			logMessage(message);
			if (sessionId != null) {
				if (this.sessions.get(sessionId) != null) {
					if (logger.isWarnEnabled()) {
						logger.warn("Ignoring CONNECT in session " + sessionId + ". Already connected.");
					}
					return;
				}
				long[] heartbeatIn = SimpMessageHeaderAccessor.getHeartbeat(headers);
				long[] heartbeatOut = getHeartbeatValue();
				Principal user = SimpMessageHeaderAccessor.getUser(headers);
				MessageChannel outChannel = getClientOutboundChannelForSession(sessionId);
				this.sessions.put(sessionId, new SessionInfo(sessionId, user, outChannel, heartbeatIn, heartbeatOut));
				SimpMessageHeaderAccessor connectAck = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
				initHeaders(connectAck);
				connectAck.setSessionId(sessionId);
				if (user != null) {
					connectAck.setUser(user);
				}
				connectAck.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message);
				connectAck.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, heartbeatOut);
				Message<byte[]> messageOut = MessageBuilder.createMessage(EMPTY_PAYLOAD, connectAck.getMessageHeaders());
				getClientOutboundChannel().send(messageOut);
			}
		}
		else if (SimpMessageType.DISCONNECT.equals(messageType)) {
			logMessage(message);
			if (sessionId != null) {
				Principal user = SimpMessageHeaderAccessor.getUser(headers);
				handleDisconnect(sessionId, user, message);
			}
		}
		else if (SimpMessageType.SUBSCRIBE.equals(messageType)) {
			logMessage(message);
			this.subscriptionRegistry.registerSubscription(message);
		}
		else if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
			logMessage(message);
			this.subscriptionRegistry.unregisterSubscription(message);
		}
	}

当消息类型为SUBSCRIBE时,会调用subscriptionRegistry.registerSubscription(message)
接着来看下subscriptionRegistry.registerSubscription(message)

//AbstractSubscriptionRegistry
	@Override
	public final void registerSubscription(Message<?> message) {
		MessageHeaders headers = message.getHeaders();

		SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
		if (!SimpMessageType.SUBSCRIBE.equals(messageType)) {
			throw new IllegalArgumentException("Expected SUBSCRIBE: " + message);
		}

		String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
		if (sessionId == null) {
			if (logger.isErrorEnabled()) {
				logger.error("No sessionId in  " + message);
			}
			return;
		}

		String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);
		if (subscriptionId == null) {
			if (logger.isErrorEnabled()) {
				logger.error("No subscriptionId in " + message);
			}
			return;
		}

		String destination = SimpMessageHeaderAccessor.getDestination(headers);
		if (destination == null) {
			if (logger.isErrorEnabled()) {
				logger.error("No destination in " + message);
			}
			return;
		}

		addSubscriptionInternal(sessionId, subscriptionId, destination, message);
	}

这个代码很简单,就是从消息中取出三个东西,sessionId, subscriptionId, destination,进行注册。

//DefaultSubscriptionRegistry
	@Override
	protected void addSubscriptionInternal(
			String sessionId, String subscriptionId, String destination, Message<?> message) {

		boolean isPattern = this.pathMatcher.isPattern(destination);
		Expression expression = getSelectorExpression(message.getHeaders());
		Subscription subscription = new Subscription(subscriptionId, destination, isPattern, expression);

		this.sessionRegistry.addSubscription(sessionId, subscription);
		this.destinationCache.updateAfterNewSubscription(sessionId, subscription);
	}
	//其实就是添加到sessions map中。会话里把订阅添加到订阅map中
		public void addSubscription(String sessionId, Subscription subscription) {
			SessionInfo info = this.sessions.computeIfAbsent(sessionId, _sessionId -> new SessionInfo());
			info.addSubscription(subscription);
		}

其实就是添加到sessions map中。会话里把订阅添加到订阅map中

那用户和会话是如何关联起来的?
在这里插入图片描述

  • 当订阅事件发生时,取出当前的Principal( accessor.setUser(xxx)设置的),然后生成LocalSimpleUser,即用户
  • 把当前会话,添加到当前用户会话中。这样就给用户绑定好了会话了。

用户会话事件

通过Spring事件机制,管理注册用户信息和会话,包括订阅、取消订阅,会话断连。代码如下

//DefaultSimpUserRegistry
	@Override
	public void onApplicationEvent(ApplicationEvent event) {
		AbstractSubProtocolEvent subProtocolEvent = (AbstractSubProtocolEvent) event;
		Message<?> message = subProtocolEvent.getMessage();
		MessageHeaders headers = message.getHeaders();

		String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
		Assert.state(sessionId != null, "No session id");

		if (event instanceof SessionSubscribeEvent) {
			LocalSimpSession session = this.sessions.get(sessionId);
			if (session != null) {
				String id = SimpMessageHeaderAccessor.getSubscriptionId(headers);
				String destination = SimpMessageHeaderAccessor.getDestination(headers);
				if (id != null && destination != null) {
					session.addSubscription(id, destination);
				}
			}
		}
		else if (event instanceof SessionConnectedEvent) {
			Principal user = subProtocolEvent.getUser();
			if (user == null) {
				return;
			}
			String name = user.getName();
			if (user instanceof DestinationUserNameProvider) {
				name = ((DestinationUserNameProvider) user).getDestinationUserName();
			}
			synchronized (this.sessionLock) {
				LocalSimpUser simpUser = this.users.get(name);
				if (simpUser == null) {
					simpUser = new LocalSimpUser(name, user);
					this.users.put(name, simpUser);
				}
				LocalSimpSession session = new LocalSimpSession(sessionId, simpUser);
				simpUser.addSession(session);
				this.sessions.put(sessionId, session);
			}
		}
		else if (event instanceof SessionDisconnectEvent) {
			synchronized (this.sessionLock) {
				LocalSimpSession session = this.sessions.remove(sessionId);
				if (session != null) {
					LocalSimpUser user = session.getUser();
					user.removeSession(sessionId);
					if (!user.hasSessions()) {
						this.users.remove(user.getName());
					}
				}
			}
		}
		else if (event instanceof SessionUnsubscribeEvent) {
			LocalSimpSession session = this.sessions.get(sessionId);
			if (session != null) {
				String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);
				if (subscriptionId != null) {
					session.removeSubscription(subscriptionId);
				}
			}
		}
	}

优雅停机

当服务器停机时,最好给客户端发送断连消息,而不是让客户端过了一段时间发现连接断开。
Spring websocket是如何来实现优雅停机的?

public class SubProtocolWebSocketHandler
		implements WebSocketHandler, SubProtocolCapable, MessageHandler, SmartLifecycle {
	@Override
	public final void stop() {
		synchronized (this.lifecycleMonitor) {
			this.running = false;
			this.clientOutboundChannel.unsubscribe(this);
		}

		// Proactively notify all active WebSocket sessions
		for (WebSocketSessionHolder holder : this.sessions.values()) {
			try {
				holder.getSession().close(CloseStatus.GOING_AWAY);
			}
			catch (Throwable ex) {
				if (logger.isWarnEnabled()) {
					logger.warn("Failed to close '" + holder.getSession() + "': " + ex);
				}
			}
		}
	}

	@Override
	public final void stop(Runnable callback) {
		synchronized (this.lifecycleMonitor) {
			stop();
			callback.run();
		}
	}
}

其奥秘就是其实现了SmartLifecycle。这个是Spring的生命周期接口。我们可以通过实现此接口,在相应的生命周期阶段注册回调事件!
上面的代码,通过调用stop接口,给客户端发送了一个断连的消息。即实现了关机时的主动通知断连。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1636420.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第三节课,功能2:开发后端用户的管理接口-- postman--debug测试

一、如何使用postman 网址&#xff1a; https://www.postman.com/downloads/ 【Postman小白教程】五分钟学会如何使用Postman~_哔哩哔哩_bilibili postman安装使用_bowser agent在postman哪里-CSDN博客 二、下载后 登录&#xff0c;开始测试 2.1 关于postman 报错&#…

IOT-9608I-L 的GPIO应用

目录 概述 1 GPIO接口介绍 2 板卡上操作IO 2.1 查看IO驱动 2.2 使用ECHO操作IO 2.2.1 端口选择 2.2.2 查看IO 2.2.3 echo操作IO 3 C语言实现一个操作IO的案例 3.1 功能介绍 3.2 代码实现 3.3 详细代码 4 测试 测试视频地址&#xff1a; IOT-9608I-L的一个简单测试&a…

探索APP分发的新趋势 - 内容包括小猪APP分发平

APP分发探索APP分发的新趋势 - 内容包括小猪APP分发平台的重要性 APP分发是移动互联网的核心环节探索APP分发的新趋势 - 内容包括小猪APP分发平台&#xff0c;它链接探索APP分发的新趋势 - 内容包括小猪APP分发平台了开发者和用户探索APP分发的新趋势 - 内容包括小猪APP分发平…

区块链 | 由外部实体导致的 NFT 安全问题

&#x1f98a;原文&#xff1a; Understanding Security Issues in the NFT Ecosystem &#x1f98a;警告&#xff1a; 本文只记录了原文的第 6 节。 1 问题描述 NFT 所指向的数字资产&#xff08;图片、视频等&#xff09;必须是可以访问的&#xff0c;这样 NFT 才具有意义…

牛客网刷题 | BC70 计算单位阶跃函数

目前主要分为三个专栏&#xff0c;后续还会添加&#xff1a; 专栏如下&#xff1a; C语言刷题解析 C语言系列文章 我的成长经历 感谢阅读&#xff01; 初来乍到&#xff0c;如有错误请指出&#xff0c;感谢&#xff01; 描述 KiKi最近学习了信号…

慧言AIVoceChat实现在线客服及社区频道

原文链接&#xff1a;小回博客 慧言AI&VoceChat实现在线客服及社区频道 一、VoceChat 简介 官网&#xff1a;https://voce.chat VoceChat 是一款支持独立部署的个人云社交媒体聊天服务。15MB 的大小可部署在任何的服务器上&#xff0c;部署简单&#xff0c;很少需要维护…

VS2022 .Net6.0 无法打开窗体设计器

拿Vs2022 建了个Demo&#xff0c;运行环境是net6.0-windows&#xff0c;无论双击或是右键都打不开窗体设计器 打开项目目录下的*.csproj.user <?xml version"1.0" encoding"utf-8"?> <Project ToolsVersion"Current" xmlns"htt…

变电站综合自动化系统:Modbus-PLC-645转IEC104网关方案

前言 电力行业作为关系国计民生的重要基础产业&#xff0c;是关系千家万户的公用事业。但是要做好电力行业安全保障工作的前提&#xff0c;是需要对应的技术人员详细了解电力工业使用的系统、设备以及各类协议的安全特性&#xff0c;本文将主要介绍IEC 104协议的定义和钡铼技术…

动手学深度学习——softmax分类

1. 分类问题 回归与分类的区别&#xff1a; 回归可以用于预测多少的问题&#xff0c; 比如"预测房屋被售出价格"&#xff0c;它是个单值输出。softmax可以用来预测分类问题&#xff0c;例如"某个图片中是猫、鸡还是狗&#xff1f;"&#xff0c;这是一个多…

Springboot+Vue项目-基于Java+MySQL的入校申报审批系统(附源码+演示视频+LW)

大家好&#xff01;我是程序猿老A&#xff0c;感谢您阅读本文&#xff0c;欢迎一键三连哦。 &#x1f49e;当前专栏&#xff1a;Java毕业设计 精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; &#x1f380; Python毕业设计 &…

Tomact安装配置及使用(超详细)

文章目录 web相关知识概述web简介(了解)软件架构模式(掌握)BS&#xff1a;browser server 浏览器服务器CS&#xff1a;client server 客户端服务器 B/S和C/S通信模式特点(重要)web资源(理解)资源分类 URL请求路径(理解)作用介绍格式浏览器通过url访问服务器的过程 服务器(掌握)…

Python | Leetcode Python题解之第57题插入区间

题目&#xff1a; 题解&#xff1a; class Solution:def insert(self, intervals: List[List[int]], newInterval: List[int]) -> List[List[int]]:left, right newIntervalplaced Falseans list()for li, ri in intervals:if li > right:# 在插入区间的右侧且无交集…

ASP.NET数据存储与交换系统设计

摘 要 该系统以Microsoft Visual Studio 2003作为开发工具&#xff0c;选用SQL Server 2000数据库来实现数据存储&#xff0c;并设计开发了一种基于B/S模式的数据存储与交换系统。该系统完成了用户注册管理、后台管理和用户空间管理功能&#xff1b;为每个用户提供了个人的存…

企业家必须提升演讲口才的原因(3篇)

企业家必须提升演讲口才的原因&#xff08;3篇&#xff09; **篇&#xff1a;企业家必须提升演讲口才的原因——建立品牌影响力 一、引言 在当今竞争激烈的市场环境中&#xff0c;企业家作为企业的灵魂和代表&#xff0c;其个人形象和品牌影响力对于企业的成功至关重要。而演…

【大语言模型LLM】-基于ChatGPT搭建客服助手(1)

&#x1f525;博客主页&#xff1a;西瓜WiFi &#x1f3a5;系列专栏&#xff1a;《大语言模型》 很多非常有趣的模型&#xff0c;值得收藏&#xff0c;满足大家的收集癖&#xff01; 如果觉得有用&#xff0c;请三连&#x1f44d;⭐❤️&#xff0c;谢谢&#xff01; 长期不…

Java File类

1. File类概述 1.1 什么是File类 File是java.io包下作为文件和目录的类。File类定义了一些与平台无关的方法来操作文件&#xff0c;通过调用File类中的方法可以得到文件和目录的描述信息&#xff0c;包括名称、所在路径、读写性和长度等&#xff0c;还可以对文件和目录进行新建…

C语言 | Leetcode C语言题解之第61题旋转链表

题目&#xff1a; 题解&#xff1a; struct ListNode* rotateRight(struct ListNode* head, int k) {if (k 0 || head NULL || head->next NULL) {return head;}int n 1;struct ListNode* iter head;while (iter->next ! NULL) {iter iter->next;n;}int add n…

c#数据库: 9.删除和添加新字段/数据更新

先把原来数据表的sexy字段删除,然后重新在添加字段sexy,如果添加成功,sexy列的随机内容会更新.原数据表如下: using System; using System.Collections.Generic; using System.Data; using System.Data.Common; using System.Data.SqlClient; using System.Linq; using System.…

3D看车有哪些强大的功能?适合哪些企业使用?

3D看车是一种创新的汽车展示方式&#xff0c;它提供了许多强大的功能&#xff0c;特别适合汽车行业的企业使用。 3D看车可实现哪些功能&#xff1f; 1、细节展示&#xff1a; 51建模网提供全套汽车行业3D数字化解决方案&#xff0c;3D看车能够将汽车展示得更加栩栩如生&…

ClickHouse高原理与实践

ClickHouse高原理与实践 1 ClickHouse的特性1.1. OLAP1.2. 列式存储1.3. 表引擎1.4. 向量化执行1.5. 分区1.6. 副本与分片1.7 其他特性 2. ClickHouse模块设计2.1 Parser分析器与Interpreter解释器2.2 Storage2.3 Column与Field2.4 DataType2.4 Block2.5 Cluster与Replication …