【stomp 实战】Spring websocket 用户订阅和会话的管理

news2025/1/11 5:45:24

通过Spring websocket 用户校验和业务会话绑定我们学会了如何将业务会话绑定到spring websocket会话上。通过这一节,我们来分析一下会话和订阅的实现

用户会话的数据结构

SessionInfo 用户会话

用户会话定义如下:

private static final class SessionInfo {

		// subscriptionId -> Subscription
		private final Map<String, Subscription> subscriptionMap = new ConcurrentHashMap<>();

		public Collection<Subscription> getSubscriptions() {
			return this.subscriptionMap.values();
		}

		@Nullable
		public Subscription getSubscription(String subscriptionId) {
			return this.subscriptionMap.get(subscriptionId);
		}

		public void addSubscription(Subscription subscription) {
			this.subscriptionMap.putIfAbsent(subscription.getId(), subscription);
		}

		@Nullable
		public Subscription removeSubscription(String subscriptionId) {
			return this.subscriptionMap.remove(subscriptionId);
		}
	}
  • 用户会话中有subscriptionMap。这个表示一个会话中,可以有多个订阅,可以根据subscriptionId找到订阅。

SessionRegistry 用户会话注册

private static final class SessionRegistry {

		private final ConcurrentMap<String, SessionInfo> sessions = new ConcurrentHashMap<>();

		@Nullable
		public SessionInfo getSession(String sessionId) {
			return this.sessions.get(sessionId);
		}

		public void forEachSubscription(BiConsumer<String, Subscription> consumer) {
			this.sessions.forEach((sessionId, info) ->
				info.getSubscriptions().forEach(subscription -> consumer.accept(sessionId, subscription)));
		}

		public void addSubscription(String sessionId, Subscription subscription) {
			SessionInfo info = this.sessions.computeIfAbsent(sessionId, _sessionId -> new SessionInfo());
			info.addSubscription(subscription);
		}

		@Nullable
		public SessionInfo removeSubscriptions(String sessionId) {
			return this.sessions.remove(sessionId);
		}
	}
  • SessionRegistry 中sessions 表示多个会话。根据sessionId可以找到唯一一个会话SessionInfo

Subscription 用户订阅

	private static final class Subscription {

		private final String id;

		private final String destination;

		private final boolean isPattern;

		@Nullable
		private final Expression selector;

		public Subscription(String id, String destination, boolean isPattern, @Nullable Expression selector) {
			Assert.notNull(id, "Subscription id must not be null");
			Assert.notNull(destination, "Subscription destination must not be null");
			this.id = id;
			this.selector = selector;
			this.destination = destination;
			this.isPattern = isPattern;
		}

		public String getId() {
			return this.id;
		}

		public String getDestination() {
			return this.destination;
		}

		public boolean isPattern() {
			return this.isPattern;
		}

		@Nullable
		public Expression getSelector() {
			return this.selector;
		}

		@Override
		public boolean equals(@Nullable Object other) {
			return (this == other ||
					(other instanceof Subscription && this.id.equals(((Subscription) other).id)));
		}

		@Override
		public int hashCode() {
			return this.id.hashCode();
		}

		@Override
		public String toString() {
			return "subscription(id=" + this.id + ")";
		}
	}

SimpUserRegistry 用户注册接口

用户注册的接口如下:

public interface SimpUserRegistry {

	/**
	根据用户名,获取到用户信息
	 * Get the user for the given name.
	 * @param userName the name of the user to look up
	 * @return the user, or {@code null} if not connected
	 */
	@Nullable
	SimpUser getUser(String userName);

	/**
	获取现在所有的注册的用户
	 * Return a snapshot of all connected users.
	 * <p>The returned set is a copy and will not reflect further changes.
	 * @return the connected users, or an empty set if none
	 */
	Set<SimpUser> getUsers();

	/**
	获取在线用户数量
	 * Return the count of all connected users.
	 * @return the number of connected users
	 * @since 4.3.5
	 */
	int getUserCount();

	/**
	 * Find subscriptions with the given matcher.
	 * @param matcher the matcher to use
	 * @return a set of matching subscriptions, or an empty set if none
	 */
	Set<SimpSubscription> findSubscriptions(SimpSubscriptionMatcher matcher);

}

SimpUser实际上就是代表着一个用户,我们来看其实现:LocalSimpUser的定义

	private static class LocalSimpUser implements SimpUser {
		private final String name;
		private final Principal user;
		private final Map<String, SimpSession> userSessions = new ConcurrentHashMap<>(1);
		public LocalSimpUser(String userName, Principal user) {
			Assert.notNull(userName, "User name must not be null");
			this.name = userName;
			this.user = user;
		}
	}

userSessions 表示当前一个用户可以对应多个会话。
这个Principal 是啥,还记得我们上一节通过Spring websocket 用户校验和业务会话绑定中,我们是怎么注册用户的吗

    private void connect(Message<?> message, StompHeaderAccessor accessor) {
        //1通过请求头获取到token
        String token = accessor.getFirstNativeHeader(WsConstants.TOKEN_HEADER);
        //2如果token为空或者用户id没有解析出来,抛出异常,spring会将此websocket连接关闭
        if (StringUtils.isEmpty(token)) {
            throw new MessageDeliveryException("token missing!");
        }
        String userId = TokenUtil.parseToken(token);
        if (StringUtils.isEmpty(userId)) {
            throw new MessageDeliveryException("userId missing!");
        }
        //这个是每个会话都会有的一个sessionId
        String simpleSessionId = (String) message.getHeaders().get(SimpMessageHeaderAccessor.SESSION_ID_HEADER);

        //3创建自己的业务会话session对象
        UserSession userSession = new UserSession();
        userSession.setSimpleSessionId(simpleSessionId);
        userSession.setUserId(userId);
        userSession.setCreateTime(LocalDateTime.now());
        //4关联用户的会话。通过msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg); 此方法,可以发送给用户消息
        accessor.setUser(new UserSessionPrincipal(userSession));
    }

从token中解析出用户的userId,并通过下面的代码,把当前用户和会话绑定起来。一个用户实际上是可以绑定多个会话的。

 accessor.setUser(new UserSessionPrincipal(userSession));

总结一下用户和会话之间的关系,如下图
在这里插入图片描述

订阅过程的源码分析

前端订阅的代码如下

  stompClient.subscribe("/user/topic/answer", function (response) {
      createElement("answer", response.body);
  });

当后端收到订阅消息后,会由SimpleBrokerMessageHandler来处理

	@Override
	protected void handleMessageInternal(Message<?> message) {
		MessageHeaders headers = message.getHeaders();
		String destination = SimpMessageHeaderAccessor.getDestination(headers);
		String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);

		updateSessionReadTime(sessionId);

		if (!checkDestinationPrefix(destination)) {
			return;
		}

		SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
		if (SimpMessageType.MESSAGE.equals(messageType)) {
			logMessage(message);
			sendMessageToSubscribers(destination, message);
		}
		else if (SimpMessageType.CONNECT.equals(messageType)) {
			logMessage(message);
			if (sessionId != null) {
				if (this.sessions.get(sessionId) != null) {
					if (logger.isWarnEnabled()) {
						logger.warn("Ignoring CONNECT in session " + sessionId + ". Already connected.");
					}
					return;
				}
				long[] heartbeatIn = SimpMessageHeaderAccessor.getHeartbeat(headers);
				long[] heartbeatOut = getHeartbeatValue();
				Principal user = SimpMessageHeaderAccessor.getUser(headers);
				MessageChannel outChannel = getClientOutboundChannelForSession(sessionId);
				this.sessions.put(sessionId, new SessionInfo(sessionId, user, outChannel, heartbeatIn, heartbeatOut));
				SimpMessageHeaderAccessor connectAck = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
				initHeaders(connectAck);
				connectAck.setSessionId(sessionId);
				if (user != null) {
					connectAck.setUser(user);
				}
				connectAck.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message);
				connectAck.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, heartbeatOut);
				Message<byte[]> messageOut = MessageBuilder.createMessage(EMPTY_PAYLOAD, connectAck.getMessageHeaders());
				getClientOutboundChannel().send(messageOut);
			}
		}
		else if (SimpMessageType.DISCONNECT.equals(messageType)) {
			logMessage(message);
			if (sessionId != null) {
				Principal user = SimpMessageHeaderAccessor.getUser(headers);
				handleDisconnect(sessionId, user, message);
			}
		}
		else if (SimpMessageType.SUBSCRIBE.equals(messageType)) {
			logMessage(message);
			this.subscriptionRegistry.registerSubscription(message);
		}
		else if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
			logMessage(message);
			this.subscriptionRegistry.unregisterSubscription(message);
		}
	}

当消息类型为SUBSCRIBE时,会调用subscriptionRegistry.registerSubscription(message)
接着来看下subscriptionRegistry.registerSubscription(message)

//AbstractSubscriptionRegistry
	@Override
	public final void registerSubscription(Message<?> message) {
		MessageHeaders headers = message.getHeaders();

		SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
		if (!SimpMessageType.SUBSCRIBE.equals(messageType)) {
			throw new IllegalArgumentException("Expected SUBSCRIBE: " + message);
		}

		String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
		if (sessionId == null) {
			if (logger.isErrorEnabled()) {
				logger.error("No sessionId in  " + message);
			}
			return;
		}

		String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);
		if (subscriptionId == null) {
			if (logger.isErrorEnabled()) {
				logger.error("No subscriptionId in " + message);
			}
			return;
		}

		String destination = SimpMessageHeaderAccessor.getDestination(headers);
		if (destination == null) {
			if (logger.isErrorEnabled()) {
				logger.error("No destination in " + message);
			}
			return;
		}

		addSubscriptionInternal(sessionId, subscriptionId, destination, message);
	}

这个代码很简单,就是从消息中取出三个东西,sessionId, subscriptionId, destination,进行注册。

//DefaultSubscriptionRegistry
	@Override
	protected void addSubscriptionInternal(
			String sessionId, String subscriptionId, String destination, Message<?> message) {

		boolean isPattern = this.pathMatcher.isPattern(destination);
		Expression expression = getSelectorExpression(message.getHeaders());
		Subscription subscription = new Subscription(subscriptionId, destination, isPattern, expression);

		this.sessionRegistry.addSubscription(sessionId, subscription);
		this.destinationCache.updateAfterNewSubscription(sessionId, subscription);
	}
	//其实就是添加到sessions map中。会话里把订阅添加到订阅map中
		public void addSubscription(String sessionId, Subscription subscription) {
			SessionInfo info = this.sessions.computeIfAbsent(sessionId, _sessionId -> new SessionInfo());
			info.addSubscription(subscription);
		}

其实就是添加到sessions map中。会话里把订阅添加到订阅map中

那用户和会话是如何关联起来的?
在这里插入图片描述

  • 当订阅事件发生时,取出当前的Principal( accessor.setUser(xxx)设置的),然后生成LocalSimpleUser,即用户
  • 把当前会话,添加到当前用户会话中。这样就给用户绑定好了会话了。

用户会话事件

通过Spring事件机制,管理注册用户信息和会话,包括订阅、取消订阅,会话断连。代码如下

//DefaultSimpUserRegistry
	@Override
	public void onApplicationEvent(ApplicationEvent event) {
		AbstractSubProtocolEvent subProtocolEvent = (AbstractSubProtocolEvent) event;
		Message<?> message = subProtocolEvent.getMessage();
		MessageHeaders headers = message.getHeaders();

		String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
		Assert.state(sessionId != null, "No session id");

		if (event instanceof SessionSubscribeEvent) {
			LocalSimpSession session = this.sessions.get(sessionId);
			if (session != null) {
				String id = SimpMessageHeaderAccessor.getSubscriptionId(headers);
				String destination = SimpMessageHeaderAccessor.getDestination(headers);
				if (id != null && destination != null) {
					session.addSubscription(id, destination);
				}
			}
		}
		else if (event instanceof SessionConnectedEvent) {
			Principal user = subProtocolEvent.getUser();
			if (user == null) {
				return;
			}
			String name = user.getName();
			if (user instanceof DestinationUserNameProvider) {
				name = ((DestinationUserNameProvider) user).getDestinationUserName();
			}
			synchronized (this.sessionLock) {
				LocalSimpUser simpUser = this.users.get(name);
				if (simpUser == null) {
					simpUser = new LocalSimpUser(name, user);
					this.users.put(name, simpUser);
				}
				LocalSimpSession session = new LocalSimpSession(sessionId, simpUser);
				simpUser.addSession(session);
				this.sessions.put(sessionId, session);
			}
		}
		else if (event instanceof SessionDisconnectEvent) {
			synchronized (this.sessionLock) {
				LocalSimpSession session = this.sessions.remove(sessionId);
				if (session != null) {
					LocalSimpUser user = session.getUser();
					user.removeSession(sessionId);
					if (!user.hasSessions()) {
						this.users.remove(user.getName());
					}
				}
			}
		}
		else if (event instanceof SessionUnsubscribeEvent) {
			LocalSimpSession session = this.sessions.get(sessionId);
			if (session != null) {
				String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);
				if (subscriptionId != null) {
					session.removeSubscription(subscriptionId);
				}
			}
		}
	}

优雅停机

当服务器停机时,最好给客户端发送断连消息,而不是让客户端过了一段时间发现连接断开。
Spring websocket是如何来实现优雅停机的?

public class SubProtocolWebSocketHandler
		implements WebSocketHandler, SubProtocolCapable, MessageHandler, SmartLifecycle {
	@Override
	public final void stop() {
		synchronized (this.lifecycleMonitor) {
			this.running = false;
			this.clientOutboundChannel.unsubscribe(this);
		}

		// Proactively notify all active WebSocket sessions
		for (WebSocketSessionHolder holder : this.sessions.values()) {
			try {
				holder.getSession().close(CloseStatus.GOING_AWAY);
			}
			catch (Throwable ex) {
				if (logger.isWarnEnabled()) {
					logger.warn("Failed to close '" + holder.getSession() + "': " + ex);
				}
			}
		}
	}

	@Override
	public final void stop(Runnable callback) {
		synchronized (this.lifecycleMonitor) {
			stop();
			callback.run();
		}
	}
}

其奥秘就是其实现了SmartLifecycle。这个是Spring的生命周期接口。我们可以通过实现此接口,在相应的生命周期阶段注册回调事件!
上面的代码,通过调用stop接口,给客户端发送了一个断连的消息。即实现了关机时的主动通知断连。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1635992.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

03 - 步骤 Kafka producer

简介 Kafka producer 步骤&#xff0c;用于将 Kettle 中经过处理或转换的数据发送到 Kafka 的主题中 使用 场景 我需要把经过Kettle处理完的数据发送到一个Kafka中&#xff0c;让后端服务器进行下一步处理。 1、拖拽 Kafka producer 到面板 2、配置 Kafka producer 3、调试…

FSD自动驾驶泛谈

特斯拉的FSD&#xff08;Full-Self Driving&#xff0c;全自动驾驶&#xff09;系统是特斯拉公司研发的一套完全自动驾驶系统。旨在最终实现车辆在多种驾驶环境下无需人类干预的自动驾驶能力。以下是对FSD系统的详细探讨&#xff1a; 系统概述 FSD是特斯拉的自动驾驶技术&…

架设WebSocket的最后一环,如何设置好nginx反向代理

WebScoket都已经完工快一个月&#xff0c;经过一段时间的测试&#xff0c;公司还是准备把服务器换到鹅厂&#xff0c;用EO来解决CDN内容分发和DDOS防护问题&#xff0c;由于EO并不支持URL 路径转发&#xff0c;只支持转发到一个站点的80或则443端口&#xff0c;如果想做路径分发…

从Paint 3D入门glTF

Paint 3D Microsoft Paint 3D是微软的一款图像编辑软件&#xff0c;它是传统的Microsoft Paint程序的升级版。 这个新版本的Paint专注于三维设计和创作&#xff0c;使用户可以使用简单的工具创建和编辑三维模型。 Microsoft Paint 3D具有直观的界面和易于使用的工具&#xff0…

小程序地理位置权限如何申请?

这篇内容会教大家如何快速申请“获取当前的地理位置&#xff08;onLocationChange&#xff09;”接口&#xff0c;以便帮助大家顺利开通接口。以下内容是本人经历了多次的申请经历得出来的经验&#xff0c;来之不易&#xff0c;望大家给予鼓励&#xff01; 小程序地理位置接口有…

百川crm系统 汽车销售租赁CRM客户管理系统是不可或缺的利器?

在竞争激烈的汽车销售租赁市场中&#xff0c;如何提升客户满意度、优化业务流程、提高销售效率&#xff0c;成为了每一家汽车销售租赁公司必须面对的问题。而CRM&#xff08;客户关系管理&#xff09;客户管理系统&#xff0c;正是应对这些挑战的重要利器。本文将从汽车销售租赁…

18 如何设计微服务才能防止宕机?

在上一讲里&#xff0c;介绍了构建一个稳健的微服务的具体法则&#xff1a;防备上游、做好自己、怀疑下游&#xff0c; 并介绍了为什么要防备上游&#xff0c;以及一些防备上游的具体手段。 在本讲里&#xff0c;咱们一起来学习&#xff0c;做好微服务自身的设计和代码编写的常…

ollama-python-Python快速部署Llama 3等大型语言模型最简单方法

ollama介绍 在本地启动并运行大型语言模型。运行Llama 3、Phi 3、Mistral、Gemma和其他型号。 Llama 3 Meta Llama 3 是 Meta Inc. 开发的一系列最先进的模型&#xff0c;提供8B和70B参数大小&#xff08;预训练或指令调整&#xff09;。 Llama 3 指令调整模型针对对话/聊天用…

Centos7+Hadoop3.3.4+KDC1.15+Ranger2.4.0集成

一、集群规划 本次测试采用3台虚拟机&#xff0c;操作系统版本为centos7.6。 kerberos采用默认YUM源安装&#xff0c;版本为&#xff1a;1.15.1-55 Ranger版本为2.4.0 系统用户为ranger:ranger IP地址主机名KDCRanger192.168.121.101node101.cc.localKDC masterRanger Admin…

如何找到台式电脑的ip地址

在数字时代&#xff0c;每台接入网络的设备都拥有一个独特的标识&#xff0c;这就是IP地址。无论是手机、笔记本电脑还是台式电脑&#xff0c;IP地址都扮演着至关重要的角色&#xff0c;它帮助设备在网络世界中定位并与其他设备进行通信。对于许多电脑用户来说&#xff0c;了解…

JavaScript原型链深度剖析

目录 前言 一、原型链 1.原型链的主要组成 原型&#xff08;Prototype&#xff09; 构造函数&#xff08;Constructor&#xff09; 实例&#xff08;Instance&#xff09; 2.原型链的工作原理 前言 在JavaScript的世界中&#xff0c;原型链&#xff08;Prototype Chain&…

“Postman 中文版使用教程:如何切换到中文界面?”

Postman 的很好用的接口测试软件。但是&#xff0c;Postman 默认是英文版的&#xff0c;也不支持在软件内切换为中文版。很多同学的英语并不是很好&#xff0c;看到一堆的英文很是头痛。 今天我们来介绍下&#xff1a;切换到 Postman 中文版的方法。想要学习更多的关于 Postma…

IDEA 中 git fetch 验证报错 The provided password or token is incorrect

参考链接&#xff1a; 【GitLab】-HTTP Basic: Access denied.remote:You must use a personal access token_http basic: access denied. the provided password o-CSDN博客 idea使用gitLab报错&#xff1a;remote: HTTP Basic: Access denied_idea remote: http basic: acc…

MoonBit 周报 Vol.39:新增 JS 后端、插件和构建系统同步支持多后端开发……

MoonBit 更新 新增JavaScript后端 目前MoonBit已新增对JavaScript的支持并带来前所未有的性能提升&#xff0c;在JS后端实现了超出Json5近8倍性能的优势。更详细的介绍可以看一下这篇文章&#xff1a;IDEA研究院编程语言MoonBit发布JavaScript后端&#xff0c;速度提升25倍 …

Copilot Workspace是GitHub对人工智能驱动的软件工程的诠释

软件开发的未来是人工智能驱动的集成开发环境吗&#xff1f;至少GitHub 是这样想的。 在今年初秋于旧金山举行的 GitHub Universe 年度大会之前&#xff0c;GitHub 发布了 Copilot Workspace&#xff0c;这是一种开发环境&#xff0c;利用 GitHub 所称的 “Copilot 驱动的代理…

[游戏陪玩系统] 陪玩软件APP小程序H5游戏陪玩成品软件源码-线上线下可爆改家政,整理师等功能

简介 随着电竞行业的快速发展&#xff0c;电竞陪玩APP正在逐渐成为用户在休闲娱乐时的首选。为了吸引用户和提高用户体验&#xff0c;电竞陪玩APP开发需要定制一些特色功能&#xff0c;并通过合适的盈利模式来获得收益。本文将为您介绍电竞陪玩APP开发需要定制的特色功能以及常…

超简单的Spring-mvc示例

超简单的Spring-mvc示例

IDEA2024版本控制台乱码怎么解决?

在使用最新版本的IDEA时&#xff0c;可能会遇到控制台输出乱码问题&#xff1f; 在网上找了很多办法&#xff0c;修改了IDEA的vmoptions文件也没有用,最后发现原来是要修改这里 Setting>>Build&#xff0c;Execution,Deployment>>Runnr中的VM Options配置&#xf…

保序加密技术:保护数据有序性的安全方案

在数据安全领域&#xff0c;除了常见的保密性、完整性和可用性需求外&#xff0c;某些特定场景还需要保护数据的有序性。保序加密技术&#xff08;Order Preserving Encryption, OPE&#xff09;就是为了满足这一需求而设计的。本文将介绍保序加密技术的基本原理、应用场景以及…

Leetcode——面试题02.04.分割链表

面试题 02.04. 分割链表 - 力扣&#xff08;LeetCode&#xff09; 对于该链表OJ&#xff0c;我们两种大的方向&#xff1a; 1.在原链表上修改&#xff1b;2.创建新链表&#xff0c;遍历原链表。 在原链上进行修改&#xff1a;如果该节点的val小于x则继续往后走&#xff0c;如…