组件概述
面对C端产品,往往会携带有客户端和服务端的双端通信以实现实时交互的效果,但是目前HTTP1.1并不支持双端通信,因此,对于聊天室、多人实时游戏等场景,就需要用到一个新的通信协议:WebSocket。
更多WebSocket相关的信息请参考本文章https://blog.csdn.net/weixin_73077810/article/details/136840600?spm=1001.2014.3001.5501 而对于一个系统而言,往往WebSocket的建立使用不仅仅只有一个,那如果对于每一个业务都搭建一套适配的WebSocket组件,项目中便会加持上许多冗余代码,无法达到一个复用的效果,也不利于后续的相关业务拓展。
本文的目标是实现一个可以复用的WebSocket组件,通过设计模式解耦、业务标识分组实现项目中对于WebSocket通讯的复用实现,并基于业务标识扩展消息的自定义消费逻辑。
组件实现流程图
伪代码讲解
连接建立阶段
在WebSocket连接建立阶段,PlusWebSocketInterceptor拦截器首先在握手前将用户信息和业务标识存储到Session中,然后在握手后将session基于业务标识和用户ID存储到目标容器中。
/**
* WebSocket握手请求的拦截器
*/
@Slf4j
public class PlusWebSocketInterceptor implements HandshakeInterceptor {
/**
* 握手前:将用户信息 + 业务标识在后续存储到Session中
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) {
LoginUser loginUser = LoginHelper.getLoginUser();
attributes.put(LOGIN_USER_KEY, loginUser);
String businessType = request.getHeaders().get(BUSINESS_TYPE_KEY).get(0);
if (StrUtil.isBlank(businessType)){
throw new ServiceException("WebSocket握手中Header需要携带业务标识businessType");
}
attributes.put(BUSINESS_TYPE_KEY, businessType);
return true;
}
/**
* 握手后
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
}
}
/**
* 连接成功后:将session基于业务标识和用户ID存储到目标容器
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) {
LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY);
String businessType = (String) session.getAttributes().get(BUSINESS_TYPE_KEY);
WebSocketSessionHolder.addSession(loginUser.getUserId(), businessType, session);
log.info("[connect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType());
}
// WebSocketSession存储容器
Map<String, Map<Long, WebSocketSession>> BUSINESS_TYPE_SESSION_MAP = new ConcurrentHashMap<>();
消息处理阶段
在消息处理阶段,首先从WebSocket会话中获取登录用户信息,然后创建WebSocket消息DTO对象并解析文本数据为两部分:具体信息和业务类型。接着检查数据是否有效,如果无效则记录错误并抛出异常。最后,将消息转发消息发布器对象。
/**
* 处理发送来的文本消息
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
// 从WebSocket会话中获取登录用户信息
LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY);
// 创建WebSocket消息DTO对象
// 解析文本数据为两部分:具体信息 + 业务类型
String payload = message.getPayload();
// payload = {"data":{"businessType":"xxx","data":{"type":xx,"contentUrl":"xxx","createTime":"xxx","friendId":"xxx","chatSessionId":"xxx","ossId":"xxx"}}}
WebSocketMessageDto data = new JSONObject(payload).get("data", WebSocketMessageDto.class);
if (data == null || data.getData() == null){
log.error("WebSocket接收消息失败");
throw new ServiceException("WebSocket接收消息失败");
}
// 记录要转发的用户列表 这里是转发给自己 + 通讯对象
data.setSessionKeys(List.of(loginUser.getUserId(), data.getData().getFriendId()));
WebSocketUtils.publishMessage(data);
}
@Data
public class WebSocketMessageDto implements Serializable {
/**
* 需要推送到的session key 列表
*/
private List<Long> sessionKeys;
/**
* 传输的真实信息
*/
private WebSocketMutualDto data;
/**
* 业务类型
*/
private String businessType;
}
消息解析消费阶段
在消息解析消费阶段,首先根据业务类型获取对应的自定义消费器,然后执行该消费器的自定义消费逻辑。接着遍历需要发送消息的sessionKey列表,如果session存在则直接发送消息给目标对象。
public static void publishMessage(WebSocketMessageDto webSocketMessage) {
// 执行业务的自定义消费逻辑
String businessType = webSocketMessage.getBusinessType();
// 获取该业务的指定消费器来执行自定义的消费逻辑
MessageConsumer messageHandler = WebSocketConstants.CONSUMER_MAP.get(businessType);
messageHandler.consumerMessage(webSocketMessage.getData());
// 当前服务内session,直接发送消息
for (Long sessionKey : webSocketMessage.getSessionKeys()) {
if (WebSocketSessionHolder.existSession(sessionKey, businessType)) {
WebSocketUtils.sendMessage(businessType, sessionKey, new JSONObject(webSocketMessage.getData()).toString());
continue;
}
}
}
public interface WebSocketConstants {
/**
* 基于业务标识调用业务消费器进行自定义逻辑
*/
Map<String, MessageConsumer> CONSUMER_MAP = new HashMap<>();
}
/**
* 业务自定义消费器
*/
public interface MessageConsumer {
// 默认不进行自定义消费
default void consumerMessage(WebSocketMutualDto message){};
}
/**
* A项目WebSocket订阅消费者
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class MemDriftWebSocketConsumer implements MessageConsumer {
private final String Drift_WebSocket_Type = "driftBottle";
@PostConstruct
void init(){
// 注册到WebSocket消费者组件池
WebSocketConstants.CONSUMER_MAP.put(Drift_WebSocket_Type, this);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void consumerMessage(WebSocketMutualDto message) {
// 自定义消费逻辑
}
}
后期业务扩展实现
如果有新的业务需要复用该组件,只需要继承MessageConsumer接口实现自己的业务消费逻辑,再者在修改消息中的业务标识便可无侵入复用WebSocket组件消费信息。
总结
文章首先指出了在多个业务中使用独立WebSocket组件会导致代码冗余和不利于维护拓展的问题。为此,提出了一个通用的WebSocket组件设计方案,允许不同业务共享相同的WebSocket基础架构,同时能够处理各自独特的业务逻辑。
组件的设计分为三个主要阶段:连接建立、消息处理、和消息解析消费。在连接建立阶段,PlusWebSocketInterceptor
拦截器会在握手前将用户信息和业务标识存储到会话(Session)中,并在握手后将会话按业务类型和用户ID组织存储。这一阶段确保了会话管理的有序和业务之间的隔离。
消息处理阶段涉及到接收和解析客户端发来的消息。服务器首先从会话中获取用户信息,然后解析文本消息内容,将其转换为WebSocketMessageDto
对象,这个对象包含了消息内容、接收者信息和业务类型等。之后,有效的消息将被转发到相应的处理器进行处理。