这一节,我们学习用户消息是如何发送的。
消息的分类
spring websocket将消息分为两种,一种是给指定的用户发送(用户消息),一种是广播消息,即给所有用户发送消息。那怎么区分这两种消息呢?那就是用前缀了。
用户消息的前缀
- 不配置的情况下,默认用户消息的前缀是/user
- 也可以通过下面的方式来配置用户消息
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
/**
* stompClient.subscribe("/user/topic/subNewMsg",...)
* 这个时候,后端推送消息应该这么写
* msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg);
* 即去掉了/user前缀
*/
registry.setUserDestinationPrefix(WsConstants.USER_DESTINATION_PREFIX);
}
- 默认情况下,/user是用户消息前缀,那么前端订阅的代码可以这么写
//订阅用户消息topic1
stompClient.subscribe("/user/topic/answer", function (response) {
//do something
});
- 后端的发送消息的代码可以这么写,注意,在这里发送的时候,调用的convertAndSendToUser没有带/user前缀
private final SimpMessageSendingOperations msgOperations;
public void echo(Principal principal, Msg msg) {
msgOperations.convertAndSendToUser(username, "/topic/answer", msg);
}
广播消息的前缀
- 广播消息没有默认值,必须显示地指定
- 配置广播消息的前缀是这么配置,通过/topic或者/queue前缀来订阅的,就是广播消息
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic", "/queue")
//配置stomp协议里, server返回的心跳
.setHeartbeatValue(new long[]{10000L, 10000L})
//配置发送心跳的scheduler
.setTaskScheduler(new DefaultManagedTaskScheduler());
}
- 前端代码可以这么写
//订阅广播消息topic
stompClient.subscribe("/topic/boardCast/hello", function (response) {
// do something
});
- 后端代码可以这么写
private final SimpMessageSendingOperations msgOperations;
public void echo2(Msg msg) {
log.info("收到的消息为:{}", msg.getContent());
msgOperations.convertAndSend("/topic/boardCast/hello", "hello boardCast Message");
}
发送用户消息源码分析
用户订阅过程
发送消息,本质上就是从内存中找到注册的用户,通过用户名找到用户会话,在从用户会话中找到该用户的订阅,如果该用户有该订阅,那么就发送消息给前端。
总结一下用户和会话之间的关系,如下图
如果这块不太熟悉,建议回顾这篇文章,了解一下用户,用户会话,订阅之间的关系:【stomp 实战】Spring websocket 用户订阅和会话的管理源码分析
我们通过Debug来看一下,前端执行用户订阅,经历了哪些过程。
假设,当前登录用户是1001
stompClient.subscribe("/user/topic/answer", function (response) {
//do something
});
该用户建立连接,并且绑定1001的用户会话后,执行后端的订阅注册
DefaultSimpUserRegistry响应订阅事件代码如下:
可以看到,当前的sessionId,destination
将订阅放到一个subscriptions的map里面。缓存在内存中。
用户消息的发送
后端代码是这么写的,我们来调试一下
private final SimpMessageSendingOperations msgOperations;
public void echo(Principal principal, Msg msg) {
msgOperations.convertAndSendToUser(username, "/topic/answer", msg);
}
经过层层调用,发现调到了下面的方法
发现我们的发送目的地变成了这个:this.destinationPrefix + user + destination
通过调试时,发现值如上图所示。
也就是说,我们的发送目的,变成了/user+用户名+我们传的入参/topic/answer
然后再进入下面的代码
//AbstractMessageSendingTemplate
@Override
public void convertAndSend(D destination, Object payload, @Nullable Map<String, Object> headers,
@Nullable MessagePostProcessor postProcessor) throws MessagingException {
//对消息进行转换,对象转字符串,或者字节数组之类的
Message<?> message = doConvert(payload, headers, postProcessor);
//调用Send发送
send(destination, message);
}
做了两个事:
- 对消息进行转换,对象转字符串,或者字节数组之类的
- 调用Send发送
再来看下send方法
@Override
public void send(D destination, Message<?> message) {
doSend(destination, message);
}
再调用doSend,由子类SimpMessagingTemplate实现。
//SimpMessagingTemplate
@Override
protected void doSend(String destination, Message<?> message) {
Assert.notNull(destination, "Destination must not be null");
SimpMessageHeaderAccessor simpAccessor =
MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);
if (simpAccessor != null) {
if (simpAccessor.isMutable()) {
simpAccessor.setDestination(destination);
simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);
simpAccessor.setImmutable();
sendInternal(message);
return;
}
else {
// Try and keep the original accessor type
simpAccessor = (SimpMessageHeaderAccessor) MessageHeaderAccessor.getMutableAccessor(message);
initHeaders(simpAccessor);
}
}
else {
simpAccessor = SimpMessageHeaderAccessor.wrap(message);
initHeaders(simpAccessor);
}
simpAccessor.setDestination(destination);
simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);
message = MessageBuilder.createMessage(message.getPayload(), simpAccessor.getMessageHeaders());
sendInternal(message);
}
其中最关键的是sendInternal
private void sendInternal(Message<?> message) {
String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
Assert.notNull(destination, "Destination header required");
long timeout = this.sendTimeout;
boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));
if (!sent) {
throw new MessageDeliveryException(message,
"Failed to send message to destination '" + destination + "' within timeout: " + timeout);
}
}
然后再通过messageChannel来发送此条消息。
//AbstractMessageChannel
@Override
public final boolean send(Message<?> message, long timeout) {
Assert.notNull(message, "Message must not be null");
Message<?> messageToUse = message;
ChannelInterceptorChain chain = new ChannelInterceptorChain();
boolean sent = false;
try {
messageToUse = chain.applyPreSend(messageToUse, this);
if (messageToUse == null) {
return false;
}
sent = sendInternal(messageToUse, timeout);
chain.applyPostSend(messageToUse, this, sent);
chain.triggerAfterSendCompletion(messageToUse, this, sent, null);
return sent;
}
catch (Exception ex) {
chain.triggerAfterSendCompletion(messageToUse, this, sent, ex);
if (ex instanceof MessagingException) {
throw (MessagingException) ex;
}
throw new MessageDeliveryException(messageToUse,"Failed to send message to " + this, ex);
}
catch (Throwable err) {
MessageDeliveryException ex2 =
new MessageDeliveryException(messageToUse, "Failed to send message to " + this, err);
chain.triggerAfterSendCompletion(messageToUse, this, sent, ex2);
throw ex2;
}
}
- 构造了一个拦截链,在发送前,可以进行前置处理和后置处理。这个拦截链就是扩展的关键了。我们可以定义自己的拦截器,在发送消息前后进行拦截处理。这里spring给我们的扩展点。
- 通过sendInternal将消息发送出去
再来看下sendInternal方法,进入子类ExecutorSubscribableChannel
//ExecutorSubscribableChannel
@Override
public boolean sendInternal(Message<?> message, long timeout) {
for (MessageHandler handler : getSubscribers()) {
SendTask sendTask = new SendTask(message, handler);
if (this.executor == null) {
sendTask.run();
}
else {
this.executor.execute(sendTask);
}
}
return true;
}
可以看到,通过这个Channel,找到messageHandler,这个messageHandler有多个,依次将消息进行处理。
这里取到的有两个messageHandler
- SimpleBrokerMessageHandler
- UserDestinationMessageHandler
进入SendTask,看一下run方法
//
public void run() {
Message<?> message = this.inputMessage;
try {
message = applyBeforeHandle(message);
if (message == null) {
return;
}
this.messageHandler.handleMessage(message);
triggerAfterMessageHandled(message, null);
}
catch (Exception ex) {
triggerAfterMessageHandled(message, ex);
if (ex instanceof MessagingException) {
throw (MessagingException) ex;
}
String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;
throw new MessageDeliveryException(message, description, ex);
}
catch (Throwable err) {
String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;
MessageDeliveryException ex2 = new MessageDeliveryException(message, description, err);
triggerAfterMessageHandled(message, ex2);
throw ex2;
}
}
这里的关键点是:this.messageHandler.handleMessage(message);
首先会进入SimpleBrokerMessageHandler的handleMessage
可以看到,这里直接跳出去了。
SimpleBrokerMessageHandler的作用就是,看是不是我们配置的广播消息的前缀,要满足这个条件,才能发送消息。我们配置的前缀是/topic,/queue,这里destination前缀是/user,所以提前返回,不处理。
然后,我们还有一个UserDestinationMessageHandler会继续处理。
这里对destination进行了处理,发现生成了一个result对象,这里解析出一个targetDestinations,可以看到我们的destination变成了下面的样子
/topic/answer-usero2zuy4zg
- 这个的构成实际上就是把/user前缀去掉
- 然后加上-user,后面加上sessionId,就是当前会话的id
- 最后再以这个新生成的targetDestination,将消息发送出去!
这里的messagingTemplate,就是SimpMessagingTemplate。又会回到上面分析的代码。
- SimpMessagingTemplate调用messageChannel来发送消息
- messageChannel中会取得两个messageHandler来处理。
像不像递归调用?
不过这一次由于我们的destination已经变成了/topic/answer-usero2zuy4zg。这时候,在进入SimpleBrokerMessageHandler时,情况就不一样了
由于destination变成了/topic开头的,此时我们不会跳出去,会找到用户(-user后面跟了SessionId)订阅,将消息发送出去
可以看到,我们找到了一个用户订阅。
其实是每个用户订阅时,会将/user前缀去掉,将用户的destination改写成了如下形式,
/user/topic/hello->/topic/hello-user{sessionId}
所以,经过UserDestinationMessageHandler处理,改写后的destination可以通过destination找到用户会话,将此消息发送出去。
到此,我们的用户消息的发送就分析完了
总结
发送用户消息的整个过程如下:
- SimpMessageSendingOperations.convertAndSendToUser接口发送用户消息,这里不传/user前缀,注意一下
- 接着SimpMessagingTemplate进行消息的发送
- SimpMessagingTemplate会交由messageChannel