WebSocket的那些事(4-Spring中的STOMP支持详解)

news2024/11/21 2:36:31

目录

  • 一、序言
  • 二、Spring对STOMP支持的相关注解
  • 三、聊天Demo代码示例
    • 1、前端页面chat.html
    • 2、相关实体
      • (1) 请求消息参数
      • (2) 响应消息内容
      • (3) 自定义认证用户信息
    • 3、自定义用户认证拦截器
    • 4、WebSocket消息代理配置
    • 5、ChatController控制器
  • 四、测试用例
    • 1、指定用户定时消息推送测试
    • 2、群聊和私聊消息测试
  • 五、@SendToUser和SimpMessagingTemplate#convertAndSendToUser原理
    • 1、关于UserDestinationMessageHandler
    • 2、UserDestinationMessageHandler源码分析
  • 六、结语

一、序言

上节我们在 WebSocket的那些事(3-STOMP实操篇)中介绍了STOMP协议以及和Spring集成的简单示例,这一节我们我们将用一个聊天Demo程序详细介绍相关注解使用和原理、拦截器、用户身份校验、还有事件。


二、Spring对STOMP支持的相关注解

  • @MessageMapping:消息路由注解,功能和MVC的@RequestMapping等注解类似,被注解的方法会基于目的地路由对消息进行处理。
  • @SubscribeMapping:和@MessageMapping功能类似,但不同点是被该注解修饰的方法的返回值不会经过brokerChannel发送给消息代理,而是直接通过clientOutboundChannel返回给客户端。
  • @MessageExceptionHandler:消息处理异常注解,主要用来处理来自@MessageMapping注解方法引发的异常。
  • @SendTo:指定消息发送目的地,如果消息处理方法上不带该注解,则会自动使用消息订阅前缀 + @MessageMapping上的值作为消息发送目的地。
  • @SendToUser:指定推送给某个用户的消息发送目的地,加上该注解后,消息将会基于SessionId推送给单个用户。

三、聊天Demo代码示例

Maven依赖如下:

<dependency>
      <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-websocket</artifactId>
   </dependency>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>

备注:为了方便调试,可以在application.yml中将spring.thymeleaf.cache设为false禁用模板缓存。

1、前端页面chat.html

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8">
    <title>greeting</title>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.6.1/sockjs.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.6.4/jquery.min.js"></script>
    <style>
        #mainWrapper {
            width: 600px;
            margin: auto;
        }
    </style>
</head>
<body>
<div id="mainWrapper">
    <div>
        <label for="username" style="margin-right: 5px">姓名:</label><input id="username" type="text"/>
    </div>
    <div id="msgWrapper">
        <p style="vertical-align: top">发送的消息:</p>
        <textarea id="msgSent" style="width: 600px;height: 100px"></textarea>
        <p style="vertical-align: top">收到的群聊消息:</p>
        <textarea id="groupMsgReceived" style="width: 600px;height: 100px"></textarea>
        <p style="vertical-align: top">收到的私聊消息:</p>
        <textarea id="privateMsgReceived" style="width: 600px;height: 200px"></textarea>
    </div>
    <div style="margin-top: 5px;">
        <button onclick="connect()">连接</button>
        <button onclick="sendGroupMessage()">发送群聊消息</button>
        <button onclick="sendPrivateMessage()">发送私聊消息</button>
        <button onclick="disconnect()">断开连接</button>
    </div>
</div>
<script type="text/javascript">
    $(() => {
        $('#msgSent').val('');
        $("#groupMsgReceived").val('');
        $("#privateMsgReceived").val('');
    });

    let stompClient = null;


    // 连接服务器
    const connect = () => {
        const header = {"User-ID": new Date().getTime().toString(), "User-Name": $('#username').val()};
        const ws = new SockJS('http://localhost:8080/websocket');
        stompClient = Stomp.over(ws);
        stompClient.connect(header, () => subscribeTopic());
    }

    // 订阅主题
    const subscribeTopic = () => {
        alert("连接成功!");
        
        // 订阅广播消息
        stompClient.subscribe('/topic/chat/group', function (message) {
                console.log(`Group message received : ${message.body}`);
                const resp = JSON.parse(message.body);
                const previousMsg = $("#groupMsgReceived").val();
                $("#groupMsgReceived").val(`${previousMsg}${resp.content}\n`);
            }
        );
        // 订阅单播消息
        stompClient.subscribe('/user/topic/chat/private', message => {
                console.log(`Private message received : ${message.body}`);
                const resp = JSON.parse(message.body);
                const previousMsg = $("#privateMsgReceived").val();
                $("#privateMsgReceived").val(`${previousMsg}${resp.content}\n`);
            }
        );
        // 订阅定时推送的单播消息
        stompClient.subscribe(`/user/topic/chat/push`, message => {
                console.log(`Private message received : ${message.body}`);
                const resp = JSON.parse(message.body);
                const previousMsg = $("#privateMsgReceived").val();
                $("#privateMsgReceived").val(`${previousMsg}${resp.content}\n`);
            }
        );
    };

    // 断连
    const disconnect = () => {
        stompClient.disconnect(() => {
            $("#msgReceived").val('Disconnected from WebSocket server');
        });
    }

    // 发送群聊消息
    const sendGroupMessage = () => {
        const msg = {name: $('#username').val(), content: $('#msgSent').val()};
        stompClient.send('/app/chat/group', {}, JSON.stringify(msg));
    }

    // 发送私聊消息
    const sendPrivateMessage = () => {
        const msg = {name: $('#username').val(), content: $('#msgSent').val()};
        stompClient.send('/app/chat/private', {}, JSON.stringify(msg));
    }
</script>
</body>
</html>

备注:在建立连接时,我们在STOMP请求头里指定了随机的User-IDUser-Name信息,服务端可以根据User-ID确定唯一用户。

2、相关实体

(1) 请求消息参数

@Data
public class WebSocketMsgDTO {

	private String name;

	private String content;
}

(2) 响应消息内容

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class WebSocketMsgVO {

	private String content;
}

(3) 自定义认证用户信息

@Data
@AllArgsConstructor
@NoArgsConstructor
public class StompAuthenticatedUser implements Principal {

	/**
	 * 用户唯一ID
	 */
	private String userId;

	/**
	 * 用户昵称
	 */
	private String nickName;

	/**
	 * 用于指定用户消息推送的标识
	 * @return
	 */
	@Override
	public String getName() {
		return this.userId;
	}

}

3、自定义用户认证拦截器

@Slf4j
public class UserAuthenticationChannelInterceptor implements ChannelInterceptor {

	private static final String USER_ID = "User-ID";
	private static final String USER_NAME = "User-Name";

	@Override
	public Message<?> preSend(Message<?> message, MessageChannel channel) {
		StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
		// 如果是连接请求,记录userId
		if (StompCommand.CONNECT.equals(accessor.getCommand())) {
			String userID = accessor.getFirstNativeHeader(USER_ID);
			String username = accessor.getFirstNativeHeader(USER_NAME);

			log.info("Stomp User-Related headers found, userID: {}, username:{}", userID, username);
			accessor.setUser(new StompAuthenticatedUser(userID, username));
		}

		return message;
	}

}

备注:该拦截器用来从STOMP消息头取出User-IDUser-Name相关用户信息,并且注入到STOMPsimpUser的消息头中,后续可以通过SimpUserRegistry实例读取用户信息。

4、WebSocket消息代理配置

@Configuration
@EnableWebSocketMessageBroker
public class WebsocketMessageBrokerConfig implements WebSocketMessageBrokerConfigurer {

	@Override
	public void registerStompEndpoints(StompEndpointRegistry registry) {
		registry.addEndpoint("/websocket") // WebSocket握手端口
			.addInterceptors(new HttpSessionHandshakeInterceptor())
			.setAllowedOriginPatterns("*") // 设置跨域
			.withSockJS(); // 开启SockJS回退机制
	}

	@Override
	public void configureClientInboundChannel(ChannelRegistration registration) {
		// 拦截器配置
		registration.interceptors(new UserAuthenticationChannelInterceptor());
	}

	@Override
	public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
		// 这里我们设置入站消息最大为8K
		registry.setMessageSizeLimit(8 * 1024);
	}

	@Override
	public void configureMessageBroker(MessageBrokerRegistry registry) {
		registry.setApplicationDestinationPrefixes("/app") // 发送到服务端目的地前缀
			.enableSimpleBroker("/topic");// 开启简单消息代理,指定消息订阅前缀
	}

}

5、ChatController控制器

@Slf4j
@Controller
@RequiredArgsConstructor
public class ChatController {

	private final SimpUserRegistry simpUserRegistry;
	private final SimpMessagingTemplate simpMessagingTemplate;

	@GetMapping("/page/chat")
	public ModelAndView turnToChatPage() {
		return new ModelAndView("chat");
	}

	/**
	 * 群聊消息处理
	 * 这里我们通过@SendTo注解指定消息目的地为"/topic/chat/group",如果不加该注解则会自动发送到"/topic" + "/chat/group"
	 * @param webSocketMsgDTO 请求参数,消息处理器会自动将JSON字符串转换为对象
	 * @return 消息内容,方法返回值将会广播给所有订阅"/topic/chat/group"的客户端
	 */
	@MessageMapping("/chat/group")
	@SendTo("/topic/chat/group")
	public WebSocketMsgVO groupChat(WebSocketMsgDTO webSocketMsgDTO) {
		log.info("Group chat message received: {}", FastJsonUtils.toJsonString(webSocketMsgDTO));
		String content = String.format("来自[%s]的群聊消息: %s", webSocketMsgDTO.getName(), webSocketMsgDTO.getContent());
		return WebSocketMsgVO.builder().content(content).build();
	}

	/**
	 * 私聊消息处理
	 * 这里我们通过@SendToUser注解指定消息目的地为"/topic/chat/private",发送目的地默认会拼接上"/user/"前缀
	 * 实际发送目的地为"/user/topic/chat/private"
	 * @param webSocketMsgDTO 请求参数,消息处理器会自动将JSON字符串转换为对象
	 * @return 消息内容,方法返回值将会基于SessionID单播给指定用户
	 */
	@MessageMapping("/chat/private")
	@SendToUser("/topic/chat/private")
	public WebSocketMsgVO privateChat(WebSocketMsgDTO webSocketMsgDTO) {
		log.info("Private chat message received: {}", FastJsonUtils.toJsonString(webSocketMsgDTO));
		String content = "私聊消息回复:" + webSocketMsgDTO.getContent();
		return WebSocketMsgVO.builder().content(content).build();
	}

	/**
	 * 定时消息推送,这里我们会列举所有在线的用户,然后单播给指定用户。
	 * 通过SimpMessagingTemplate实例可以在任何地方推送消息。
	 */
	@Scheduled(fixedRate = 10 * 1000)
	public void pushMessageAtFixedRate() {
		log.info("当前在线人数: {}", simpUserRegistry.getUserCount());
		if (simpUserRegistry.getUserCount() <= 0) {
			return;
		}

		// 这里的Principal为StompAuthenticatedUser实例
		Set<StompAuthenticatedUser> users = simpUserRegistry.getUsers().stream()
			.map(simpUser -> StompAuthenticatedUser.class.cast(simpUser.getPrincipal()))
			.collect(Collectors.toSet());

		users.forEach(authenticatedUser -> {
			String userId = authenticatedUser.getUserId();
			String nickName = authenticatedUser.getNickName();
			WebSocketMsgVO webSocketMsgVO = new WebSocketMsgVO();
			webSocketMsgVO.setContent(String.format("定时推送的私聊消息, 接收人: %s, 时间: %s", nickName, LocalDateTime.now()));

			log.info("开始推送消息给指定用户, userId: {}, 消息内容:{}", userId, FastJsonUtils.toJsonString(webSocketMsgVO));
			simpMessagingTemplate.convertAndSendToUser(userId, "/topic/chat/push", webSocketMsgVO);
		});
	}

}


备注:

  • 使用@SendTo或者@SendToUser注解时,一定要带上registry.enableSimpleBroker("/topic")指定的目的地前缀/topic,否则消息发送会失败。
  • @SendTo("/topic/chat/group") 相当于调用simpMessagingTemplate.convertAndSend(“/topic/chat/group”, payload) 方法进行消息发送。
  • @SendToUser("/topic/chat/private")相当于调用 simpMessagingTemplate.convertAndSendToUser(userId, “/topic/chat/push”, webSocketMsgVO) 方法发送消息,使用注解时会自动根据SessionID发送消息到指定用户。

四、测试用例

打开浏览器访问http://localhost:8080/page/chat可进入聊天页,同时打开两个窗口访问,点击连接按钮。

1、指定用户定时消息推送测试

在这里插入图片描述


在这里插入图片描述
可以看到,后台会每隔10秒推送消息给所有在线的指定用户。

2、群聊和私聊消息测试

分别点击发送群聊消息和发送私聊消息按钮,可以看到群聊消息和私聊消息分别展示在不同文本框里。

在这里插入图片描述


在这里插入图片描述


五、@SendToUser和SimpMessagingTemplate#convertAndSendToUser原理

1、关于UserDestinationMessageHandler

当发送或者订阅消息时,如果消息目的地前缀以/user/开头,那么该消息目的地将会由UserDestinationMessageHandler进行转义,并且和用户SessionID对应,那么该UserDestinationMessageHandler是怎么运作的呢?

举个栗子,上面的聊天示例中我们有个定时任务在后台推送消息给所有在线的用户,我们调用了simpMessagingTemplate.convertAndSendToUser(userId, “/topic/chat/push”, webSocketMsgVO) 方法进行消息推送,那么消息发送目的地逻辑上为/user/topic/chat/push,经过UserDestinationMessageHandler处理后实际发送目的地会转义成/user/topic/chat/push-user123这种,后面的后缀其实就是-user + sessionId拼接起来的值。

通过这种方式我们就可以指定推送消息给某个用户,同时客户端在订阅以/user开头的消息时,同样会进行转义,最后实际订阅的消息目的也是/user/topic/chat/push-user123这种形式,实现唯一订阅。

2、UserDestinationMessageHandler源码分析

接下来,我们从UserDestinationMessageHandler的源码看下里面的实现细节,还是以SimpMessagingTemplate#convertAndSendToUser调用为例,首先看到handleMessage方法,如下:

@Override
public void handleMessage(Message<?> message) throws MessagingException {
	Message<?> messageToUse = message;
	if (this.broadcastHandler != null) {
		messageToUse = this.broadcastHandler.preHandle(message);
		if (messageToUse == null) {
			return;
		}
	}

	// 解析实际消息发送目的地
	UserDestinationResult result = this.destinationResolver.resolveDestination(messageToUse);
	if (result == null) {
		return;
	}

	// 如果没有目标发送目的地,则直接广播消息
	if (result.getTargetDestinations().isEmpty()) {
		if (logger.isTraceEnabled()) {
			logger.trace("No active sessions for user destination: " + result.getSourceDestination());
		}
		if (this.broadcastHandler != null) {
			this.broadcastHandler.handleUnresolved(messageToUse);
		}
		return;
	}

	SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(messageToUse);
	initHeaders(accessor);
	accessor.setNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, result.getSubscribeDestination());
	accessor.setLeaveMutable(true);

	messageToUse = MessageBuilder.createMessage(messageToUse.getPayload(), accessor.getMessageHeaders());
	if (logger.isTraceEnabled()) {
		logger.trace("Translated " + result.getSourceDestination() + " -> " + result.getTargetDestinations());
	}
	// 有目标发送目的地,则一对一推送
	for (String target : result.getTargetDestinations()) {
		this.messagingTemplate.send(target, messageToUse);
	}
}

这段代码逻辑处理也比较简单,如果能解析出目标发送目的地,则一对一进行推送,如果没有解析出发送目的地则直接广播消息。

下面让我们看下目标发送目的地代表什么,是如何解析的?我们看this.destinationResolver.resolveDestination(messageToUse)这段代码,如下:

@Override
@Nullable
public UserDestinationResult resolveDestination(Message<?> message) {
	// 从消息中解析目的地
	ParseResult parseResult = parse(message);
	if (parseResult == null) {
		return null;
	}
	String user = parseResult.getUser();
	String sourceDestination = parseResult.getSourceDestination();
	Set<String> targetSet = new HashSet<>();
	for (String sessionId : parseResult.getSessionIds()) {
		String actualDestination = parseResult.getActualDestination();
		String targetDestination = getTargetDestination(
				sourceDestination, actualDestination, sessionId, user);
		if (targetDestination != null) {
			targetSet.add(targetDestination);
		}
	}
	String subscribeDestination = parseResult.getSubscribeDestination();
	return new UserDestinationResult(sourceDestination, targetSet, subscribeDestination, user);
}

目标发送目的地实际上就是上面的targetSettargetSet集合中的targetDestination结构为actualDestination + “-user” + sessionId,这里就是我们前面说到的会对消息目的地以/user/开头的消息进行目的地转义。

targetSet来源于parseResult#getSessionIds,我们再看看parse(message)的源码:

@Nullable
	private ParseResult parse(Message<?> message) {
		MessageHeaders headers = message.getHeaders();
		String sourceDestination = SimpMessageHeaderAccessor.getDestination(headers);
		if (sourceDestination == null || !checkDestination(sourceDestination, this.prefix)) {
			return null;
		}
		SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
		if (messageType != null) {
			switch (messageType) {
				case SUBSCRIBE:
				case UNSUBSCRIBE:
					return parseSubscriptionMessage(message, sourceDestination);
				case MESSAGE:
					return parseMessage(headers, sourceDestination);
			}
		}
		return null;
	}

发送消息时,消息类型为Message,我们直接看parseMessage(headers, sourceDestination)方法的源码:

private ParseResult parseMessage(MessageHeaders headers, String sourceDest) {
	int prefixEnd = this.prefix.length();
	int userEnd = sourceDest.indexOf('/', prefixEnd);
	Assert.isTrue(userEnd > 0, "Expected destination pattern \"/user/{userId}/**\"");
	String actualDest = sourceDest.substring(userEnd);
	String subscribeDest = this.prefix.substring(0, prefixEnd - 1) + actualDest;
	String userName = sourceDest.substring(prefixEnd, userEnd);
	userName = StringUtils.replace(userName, "%2F", "/");

	String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
	Set<String> sessionIds;
	if (userName.equals(sessionId)) {
		userName = null;
		sessionIds = Collections.singleton(sessionId);
	}
	else {
		sessionIds = getSessionIdsByUser(userName, sessionId);
	}

	if (isRemoveLeadingSlash()) {
		actualDest = actualDest.substring(1);
	}
	return new ParseResult(sourceDest, actualDest, subscribeDest, sessionIds, userName);
}

这里的核心逻辑其实就是获取SessionId,如果userNameSessionId相同,则直接返回SessionId。如果不相同,则调用getSessionIdsByUser(userName, sessionId) 获取,我们再看下该方法的源码:

private Set<String> getSessionIdsByUser(String userName, @Nullable String sessionId) {
	Set<String> sessionIds;
	SimpUser user = this.userRegistry.getUser(userName);
	if (user != null) {
		if (sessionId != null && user.getSession(sessionId) != null) {
			sessionIds = Collections.singleton(sessionId);
		}
		else {
			Set<SimpSession> sessions = user.getSessions();
			sessionIds = new HashSet<>(sessions.size());
			for (SimpSession session : sessions) {
				sessionIds.add(session.getId());
			}
		}
	}
	else {
		sessionIds = Collections.emptySet();
	}
	return sessionIds;
}

可以看到,SessionId是从SimpUser实例中获取的,而SimpUser是从userRegistry(SimpUserRegistry实例) 中获取。

这里有一个问题,SimpUserRegistry中的信息是从什么时候初始化的呢?经过代码调试,这里的SimpUserRegistry实例类型为DefaultSimpUserRegistry

DefaultSimpUserRegistry中,监听了应用事件,如下:

public void onApplicationEvent(ApplicationEvent event) {
	AbstractSubProtocolEvent subProtocolEvent = (AbstractSubProtocolEvent) event;
	Message<?> message = subProtocolEvent.getMessage();
	MessageHeaders headers = message.getHeaders();

	String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
	Assert.state(sessionId != null, "No session id");

	if (event instanceof SessionSubscribeEvent) {
		LocalSimpSession session = this.sessions.get(sessionId);
		if (session != null) {
			String id = SimpMessageHeaderAccessor.getSubscriptionId(headers);
			String destination = SimpMessageHeaderAccessor.getDestination(headers);
			if (id != null && destination != null) {
				session.addSubscription(id, destination);
			}
		}
	}
	else if (event instanceof SessionConnectedEvent) {
		// 这里的用户信息从SessionConnectedEvent事件获取
		Principal user = subProtocolEvent.getUser();
		if (user == null) {
			return;
		}
		String name = user.getName();
		if (user instanceof DestinationUserNameProvider) {
			name = ((DestinationUserNameProvider) user).getDestinationUserName();
		}
		synchronized (this.sessionLock) {
			LocalSimpUser simpUser = this.users.get(name);
			if (simpUser == null) {
				simpUser = new LocalSimpUser(name, user);
				// 这里会在会话建立成功时保存用户信息
				this.users.put(name, simpUser);
			}
			LocalSimpSession session = new LocalSimpSession(sessionId, simpUser);
			simpUser.addSession(session);
			this.sessions.put(sessionId, session);
		}
	}
	else if (event instanceof SessionDisconnectEvent) {
		synchronized (this.sessionLock) {
			LocalSimpSession session = this.sessions.remove(sessionId);
			if (session != null) {
				LocalSimpUser user = session.getUser();
				user.removeSession(sessionId);
				if (!user.hasSessions()) {
					this.users.remove(user.getName());
				}
			}
		}
	}
	else if (event instanceof SessionUnsubscribeEvent) {
		LocalSimpSession session = this.sessions.get(sessionId);
		if (session != null) {
			String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);
			if (subscriptionId != null) {
				session.removeSubscription(subscriptionId);
			}
		}
	}
}

我们主要看SessionConnectedEvent会话连接建立事件,在会话连接建立完成时,这段代码this.users.put(name, simpUser);会保存用户信息,而用户信息来自Principal user = subProtocolEvent.getUser();,因此我们这里需要看下SessionConnectedEvent事件是什么时候发布的。

经过一番查找,在StompSubProtocolHandler类中发现了SessionConnectedEvent事件发布的源码,如下:

public void handleMessageToClient(WebSocketSession session, Message<?> message) {
	if (!(message.getPayload() instanceof byte[])) {
		if (logger.isErrorEnabled()) {
			logger.error("Expected byte[] payload. Ignoring " + message + ".");
		}
		return;
	}

	StompHeaderAccessor accessor = getStompHeaderAccessor(message);
	StompCommand command = accessor.getCommand();

	if (StompCommand.MESSAGE.equals(command)) {
		if (accessor.getSubscriptionId() == null && logger.isWarnEnabled()) {
			logger.warn("No STOMP \"subscription\" header in " + message);
		}
		String origDestination = accessor.getFirstNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION);
		if (origDestination != null) {
			accessor = toMutableAccessor(accessor, message);
			accessor.removeNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION);
			accessor.setDestination(origDestination);
		}
	}
	else if (StompCommand.CONNECTED.equals(command)) {
		this.stats.incrementConnectedCount();
		accessor = afterStompSessionConnected(message, accessor, session);
		if (this.eventPublisher != null) {
			try {
				SimpAttributes simpAttributes = new SimpAttributes(session.getId(), session.getAttributes());
				SimpAttributesContextHolder.setAttributes(simpAttributes);
				// 通过session找到用户信息
				Principal user = getUser(session);
				// 这里会发布会话连接建立事件,同时会附带用户信息
				publishEvent(this.eventPublisher, new SessionConnectedEvent(this, (Message<byte[]>) message, user));
			}
			finally {
				SimpAttributesContextHolder.resetAttributes();
			}
		}
	}

	byte[] payload = (byte[]) message.getPayload();
	if (StompCommand.ERROR.equals(command) && getErrorHandler() != null) {
		Message<byte[]> errorMessage = getErrorHandler().handleErrorMessageToClient((Message<byte[]>) message);
		if (errorMessage != null) {
			accessor = MessageHeaderAccessor.getAccessor(errorMessage, StompHeaderAccessor.class);
			Assert.state(accessor != null, "No StompHeaderAccessor");
			payload = errorMessage.getPayload();
		}
	}

	Runnable task = OrderedMessageChannelDecorator.getNextMessageTask(message);
	if (task != null) {
		Assert.isInstanceOf(ConcurrentWebSocketSessionDecorator.class, session);
		((ConcurrentWebSocketSessionDecorator) session).setMessageCallback(m -> task.run());
	}

	sendToClient(session, accessor, payload);
}

接下来我们主要看Principal user = getUser(session);这段代码,根据Session查找会话信息,源码如下:

private Principal getUser(WebSocketSession session) {
	Principal user = this.stompAuthentications.get(session.getId());
	return (user != null ? user : session.getPrincipal());
}

这里可以看到用户信息是直接从stompAuthentications中读取,stompAuthentications是一个Map,那么该Map还会在什么时候进行初始化呢?

点击调用链,发现在StompSubProtocolHandler类中的handleMessageFromClient方法中发现一段赋值的代码,代码有点长,这里直接贴图:
在这里插入图片描述

可以看到设置了一个回调,当接下来我们看看该回调函数在哪里会调用,从命名上看,是用户信息变化时会触发,点击调用链,在SimpMessageHeaderAccessor类中发现了下面的方法:

public void setUser(@Nullable Principal principal) {
	setHeader(USER_HEADER, principal);
	if (this.userCallback != null) {
		this.userCallback.accept(principal);
	}
}

还记得我们在前面的聊天Demo示例中,自定义的拦截器UserAuthenticationChannelInterceptor吗?我们调用了StompHeaderAccessor#setUser方法,实际上就是调用了父类的SimpMessageHeaderAccessor#setUser方法,这下真相大白了。

下面再让我们梳理一下STOMP会话建立各调用链的先后顺序,

  1. 先调用StompSubProtocolHandler#handleMessageFromClient处理原生WebSocket消息,然后通过MessageChannel#send方法进行发送,发送完后发布事件,比如会话建立,订阅和取消订阅事件。
  2. 通过MessageChannel#send 方法发送消息时会调用ChannelInterceptor拦截器链,在拦截器链中我们可以设置一些自定义信息,比如用户信息。
  3. 经过拦截器链处理后,消息会交给MessageHandler进行处理。处理完后再会调用StompSubProtocolHandler#handleMessageToClient 将响应内容返回给客户端。

六、结语

研究了大半天源代码,加上debug才摸清了这些个调用链,还是得仔细研读官方文档,一个单词都不能落下,先得把概念给理清楚,研读源码才能事半功倍。

下一节将会带来集成支持STOMP协议的外部中间件使用示例,比如:RabbitMQ

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/590851.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【实用篇】SpringCloud02

文章目录 SpringCloud020.学习目标1.Nacos配置管理1.1.统一配置管理1.1.1.在nacos中添加配置文件1.1.2.从微服务拉取配置 1.2.配置热更新1.2.1.方式一1.2.2.方式二 1.3.配置共享1&#xff09;添加一个环境共享配置2&#xff09;在user-service中读取共享配置3&#xff09;运行两…

TCP通信(复习)

目录 TCP通信实现过程 1、socket函数与通信域 socket函数 参 数 bind函数 与 通信结构体 bind函数 参数 通信地址族与同届结构体 通用地址族结构体 IPV4地址族结构体 listen函数与accept函数 listen函数 accept函数 参 数 作 用 要实现进程间的通信必备&#xff1…

Map、Set和哈希表(数据结构系列14)

目录 前言&#xff1a; 1.搜索树 1.1概念 1.2插入 1.3查找 1.4删除 1.5二叉搜索树整体代码展示 2. Map和Set的讲解 2.1 Map的说明 2.1.1Map的方法 2.2 Set 的说明 2.2.1Set的方法 3.哈希表 3.1哈希表的概念 3.2哈希冲突 3.3冲突的避免 3.4哈希冲突的解决 3.4…

企业物资管理系统的设计与实现(ASP.NET,SQL)

论文阐述了企业物资管理系统的设计与实现&#xff0c;并对该系统的需求分析及系统需要实现的设计方法作了介绍。该系统的基本功能包括用户登录&#xff0c;修改密码&#xff0c;物资的基本信息管理&#xff0c;出入库和损坏的管理已经综合查询等功能。 4.1 用户登录模块的实现 …

【滤波】非线性滤波

本文主要翻译自rlabbe/Kalman-and-Bayesian-Filters-in-Python的第9章节09-Nonlinear-Filtering&#xff08;非线性滤波&#xff09;。 %matplotlib inline#format the book import book_format book_format.set_style()介绍 我们开发的卡尔曼滤波器使用线性方程组&#xff0…

【C++】类和对象——拷贝构造函数的概念、拷贝构造函数的特征

文章目录 1.拷贝构造函数1.1拷贝构造函数的概念1.2拷贝构造函数的特征 1.拷贝构造函数 在前面我们已经介绍了构造函数和析构函数的作用和使用方法&#xff0c;而拷贝构造函数则是在对象初始化时调用的一种特殊构造函数。拷贝构造函数可以帮助我们创建一个新的对象&#xff0c;该…

互联网中的web3.0和gpt有何联系?

文章目录 ⭐前言⭐web 3.0&#x1f496; web1.0-web3.0的概念 ⭐chatgpt&#x1f496; gpt的概念 ⭐总结⭐结尾 ⭐前言 大家好&#xff0c;我是yma16&#xff0c;本文分享互联网中的web3.0和gpt的关系。 互联网的发展 第一台计算机的出现 世界上第一台通用计算机“ENIAC”于…

C语言(扫雷)

扫雷 开发过程开发思路菜单界面游戏界面的打印雷的随机产生扫雷以及判断胜利条件代码整合 开发过程 准备工作效果展示 准备工作&#xff1a; game.h 一个头文件–>声明函数 test.c 为主文件 game.c 为功能函数实现文件 效果展示 开发思路 菜单界面 游戏界面打印&…

二、机器人的结构设计

1 、螺丝连接的坚固性 坚固性是机器人能顺利完成指定任务的一个重要条件&#xff0c;无论我们程序设计的如何完美&#xff0c; 如果不能保证机器人具有坚固性和稳定性&#xff0c;就无法保证任务的顺利完成&#xff0c;机器人在运行时如 果发生散架和分裂都会影响其功能的实现…

阿里云的白名单规则如何实现IP限制和访问控制?

阿里云的白名单规则如何实现IP限制和访问控制&#xff1f;   [本文由阿里云代理商[聚搜云]撰写]   随着企业在云计算领域的深入应用&#xff0c;网络安全问题日益凸显。阿里云提供了一种名为“白名单”的规则&#xff0c;帮助用户实现IP限制和访问控制。本文将详细阐述阿里…

“ 最近 ” ,准备跳槽的可以看看

前两天跟朋友感慨&#xff0c;今年的铜三铁四、裁员、疫情导致好多人都没拿到offer!现在已经12月了&#xff0c;具体明年的金三银四只剩下两个月。 对于想跳槽的职场人来说&#xff0c;绝对要从现在开始做准备了。这时候&#xff0c;很多高薪技术岗、管理岗的缺口和市场需求也…

【ROS】ROS2中的概念和名词解释

1、工作空间 workspace ROS以固定的目录结构创建项目工程&#xff0c;项目根目录称为工作空间 1.1 典型工作空间结构 src&#xff1a; 代码空间&#xff1b; build&#xff1a; 编译空间&#xff0c;保存编译过程中产生的中间文件&#xff1b; install&#xff1a;安装空间…

一种在不改变源码的情况下测试看门狗复位的方法

什么是“看门狗”&#xff1f; 看门狗定时器&#xff08;WDT&#xff0c;Watch Dog Timer&#xff09;是单片机的一个组成部分&#xff0c;它实际上是一个计数器&#xff0c;一般给看门狗一个数字&#xff0c;程序开始运行后看门狗开始倒计数。如果程序运行正常&#xff0c;过…

git使用X篇_2_Git全套教程IDEA版(git、GitHub、Gitee码云、搭建公司内部GitLab、与IDEA集成等内容)

本文是根据以下视频及网上总结进行更新后的介绍git使用的博文。包含了git、GitHub、Gitee码云、搭建公司内部GitLab、与IDEA集成等内容。 笔记来源&#xff1a;【尚硅谷】5h打通Git全套教程IDEA版&#xff08;涵盖GitHub\Gitee码云\GitLab&#xff09; 文章目录 初识 Git0、内容…

vue-echarts图表的应用(总结)

vue项目中echarts图表的应用(总结) 一 . 安装echarts包 npm i echarts 二 . 放置两个图表的div&#xff0c;并给定高宽 <div class"chart"><!-- 图表 --><div ref"social" style" width: 100%; height:100% " /> </div&g…

Python入门(十五)函数(三)

函数&#xff08;三&#xff09; 1.返回值1.1 返回简单值1.2 让实参变成可选的1.3 返回字典1.4 结合使用函数和while循环 作者&#xff1a;Xiou 1.返回值 函数并非总是直接显示输出&#xff0c;它还可以处理一些数据&#xff0c;并返回一个或一组值。函数返回的值称为返回值。…

【2023】Redis主从复制模式集群

资源有限&#xff0c;本文使用Docker部署目录 &#x1f3b6;主从模式介绍&#x1f3b6; 搭建主从模式集群&#x1f3b6; 使用命令搭建主从集群&#x1f3b6; 通过配置文件搭建主从模式集群 &#x1f3b6;配置读写分离&#x1f3b6; 用心跳机制提高主从复制的可靠性&#x1f3b6…

[golang 微服务] 3. ProtoBuf认识与使用

一.protobuf简介 前言 在移动互联网时代&#xff0c; 手机流量、 电量是最为有限的资源&#xff0c;而移动端的即时通讯应用无疑必须得直面这两点。解决流量过大的基本方法就是 使用高度压缩的通信协议&#xff0c;而数据压缩后流量减小带来的自然结果也就是省电&#xff1a;因…

#Verilog HDL# Verilog设计中的竞争问题和解决办法

经过前面文章的学习&#xff0c;我们知道&#xff1a;不管是Verilog设计语言&#xff0c;还是Sytemverilog验证语言&#xff0c;标准都定义了语言调度机制&#xff0c;来规范各家编译器和仿真器的开发。今天&#xff0c;我们着重看一下Verilog 硬件设计语言中竞争问题&#xff…

算法拾遗三十一马拉车算法

算法拾遗三十一马拉车算法 回文是什么回文暴力求法 Manacher算法回文直径和回文半径最右回文边界最右回文右边界的中心C位置Manacher求解过程Manacher 题 回文是什么 一个字符串正过来念和反过来念一样&#xff0c;总的来说就是有一个对称轴可能在字符上也可能在范围上面 回文…