需求背景
在某个资产平台,在不了解需求的情况下,我突然接到了一个任务,让我做某个页面窗口的即时通讯,想到了用websocket技术,我从来没用过,被迫接受了这个任务,我带着浓烈的兴趣,就去研究了一下,网上资料那么多,我们必须找到适合自己的方案,我们开发的时候一定要基于现有框架的基础上去做扩展,不然会引发很多问题,比如:运行不稳定、项目无法启动等,废话不多说,直接上代码
WebScoekt介绍
WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范。WebSocket API也被W3C定为标准。
WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
特点
- 较少的控制开销。在连接创建后,服务器和客户端之间交换数据时,用于协议控制的数据包头部相对较小。在不包含扩展的情况下,对于服务器到客户端的内容,此头部大小只有2至10字节(和数据包长度有关);对于客户端到服务器的内容,此头部还需要加上额外的4字节的掩码。相对于HTTP请求每次都要携带完整的头部,此项开销显著减少了。
- 更强的实时性。由于协议是全双工的,所以服务器可以随时主动给客户端下发数据。相对于HTTP请求需要等待客户端发起请求服务端才能响应,延迟明显更少;即使是和Comet等类似的长轮询比较,其也能在短时间内更多次地传递数据。
保持连接状态。与HTTP不同的是,Websocket需要先创建连接,这就使得其成为一种有状态的协议,之后通信时可以省略部分状态信息。而HTTP请求可能需要在每个请求都携带状态信息(如身份认证等)。 - 更好的二进制支持。Websocket定义了二进制帧,相对HTTP,可以更轻松地处理二进制内容。
可以支持扩展。Websocket定义了扩展,用户可以扩展协议、实现部分自定义的子协议。如部分浏览器支持压缩等。 - 更好的压缩效果。相对于HTTP压缩,Websocket在适当的扩展支持下,可以沿用之前内容的上下文,在传递类似的数据时,可以显著地提高压缩率。
解决方案介绍
PS:基于websocket的特点,我们打算放弃Ajax轮询,因为当客户端过多的时候,会导致消息收发有延迟、服务器压力增大。
API介绍
思路解析
首先,我们既然要发送消息,客户端和客户端是无法建立连接的,我们可以这样做,我们搭建服务端,所有的客户端都在服务端注册会话,我们把消息发送给服务端,然后由服务端转发给其他客户端,这样就可以和其他用户通讯了。
示例代码
服务端配置
package unicom.assetMarket.websocket.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import unicom.assetMarket.websocket.handler.MyMessageHandler;
import unicom.assetMarket.websocket.interceptor.WebSocketInterceptor;
/**
* @Author 庞国庆
* @Date 2023/02/15/15:36
* @Description
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
webSocketHandlerRegistry.addHandler(new MyMessageHandler(), "/accept")
.addInterceptors(new WebSocketInterceptor())
//允许跨域
.setAllowedOrigins("*");
webSocketHandlerRegistry.addHandler(new MyMessageHandler(),"/http/accept")
.addInterceptors(new WebSocketInterceptor())
.setAllowedOrigins("*").withSockJS();
}
}
Interceptor
package unicom.assetMarket.websocket.interceptor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
import java.util.Map;
/**
* @Author 庞国庆
* @Date 2023/02/15/15:52
* @Description
*/
@Slf4j
public class WebSocketInterceptor extends HttpSessionHandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest request1 = (ServletServerHttpRequest) request;
String userId = request1.getServletRequest().getParameter("userId");
attributes.put("currentUser", userId);
log.info("用户{}正在尝试与服务端建立链接········", userId);
}
return super.beforeHandshake(request, response, wsHandler, attributes);
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {
super.afterHandshake(request, response, wsHandler, ex);
}
}
Handler
package unicom.assetMarket.websocket.handler;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import unicom.assetMarket.assetChat.service.CamsMarketChatMessageService;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @Author 庞国庆
* @Date 2023/02/15/15:52
* @Description
*/
@Slf4j
@Component
public class MyMessageHandler extends TextWebSocketHandler {
//存储所有客户端的会话信息(线程安全)
private final static Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
@Autowired(required = false)
private CamsMarketChatMessageService service;
@Override
public void afterConnectionEstablished(WebSocketSession webSocketSession) throws Exception {
String userId = this.getUserId(webSocketSession);
if (StringUtils.isNotBlank(userId)) {
sessions.put(userId, webSocketSession);
log.info("用户{}已经建立链接", userId);
}
}
@Override
public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage) throws Exception {
String message = webSocketMessage.toString();
String userId = this.getUserId(webSocketSession);
log.info("服务器收到用户{}发送的消息:{}", userId, message);
//webSocketSession.sendMessage(webSocketMessage);
if (StringUtils.isNotBlank(message)) {
//保存用户发送的消息数据
service.saveData(message);
//发送消息给指定用户
doMessage(message);
}
}
@Override
public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) throws Exception {
WebSocketMessage message = new TextMessage("发送异常:" + throwable.getMessage());
//webSocketSession.sendMessage(message);
}
@Override
public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) throws Exception {
String userId = this.getUserId(webSocketSession);
if (StringUtils.isNotBlank(userId)) {
sessions.remove(userId);
log.info("用户{}已经关闭会话", userId);
} else {
log.error("没有找到用户{}的会话", userId);
}
}
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 根据会话查找已经注册的用户id
*
* @param session
* @return
*/
private String getUserId(WebSocketSession session) {
String userId = (String) session.getAttributes().get("currentUser");
return userId;
}
/**
* 发送消息给指定用户
*
* @param userId
* @param contents
*/
public void sendMessageUser(String userId, String contents) throws Exception {
WebSocketSession session = sessions.get(userId);
if (session != null && session.isOpen()) {
WebSocketMessage message = new TextMessage(contents);
session.sendMessage(message);
}
}
/**
* 接收用户消息,转发给指定用户
* @param msg
* @throws Exception
*/
public void doMessage(String msg) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(msg);
String sendStaffId = jsonObject.getString("sendStaffId");
String reciveStaffId = jsonObject.getString("reciveStaffId");
String message = jsonObject.getString("message");
//替换敏感字
message = service.replaceSomething(message);
this.sendMessageUser(reciveStaffId,message);
}
}
JS代码
$(function() {
connectWebSocket();
});
/**
* 和服务器建立链接
*/
function connectWebSocket() {
var userId = $("#sendStaffId").val();
var host = window.location.host;
if ('WebSocket' in window) {
if (userId) {
client = new WebSocket( "ws://"+ host + "/frm/accept?userId=" + userId,null);
client.onopen = function (event) {
console.log("连接成功");
}
client.onmessage = function (event) {
console.log("接收到服务端的内容为:" + event.data);
}
client.onerror = function (event) {
console.log("连接失败");
}
client.onclose = function (event) {
console.log("与服务器断开连接");
}
}
}
}
PS:这里我遇到了1个坑,就是在连接服务端的时候老是连接不上,我们在配置的代码中指定的匹配URL为 /accept
,但是我发现就是连不上,后来找了很多资料,原来是忘了加个url,这个url就是我们在web.xml
中配置的DispatcherServlet
的拦截url,如下:
运行效果如下:
PS:页面还没写完,暂时先写到这里~