文章目录
- 前言
- 一、客户端-服务端双向通信交互图
- 二、项目说明
- 1.引入包
- 2.项目各模块说明
- 问题
- 参考
前言
本文章主要记录项目客户端-服务端双向通信解决方案,基于Spring WebSocket架构实现双向数据通信;
以及项目实际应用中的一些问题与解决手段。
一、客户端-服务端双向通信交互图
交互说明:
建立connect并初始化客户端设备、WebSecket等内存信息:
1)服务端WS服务启动
2)客户端向服务端发送接入认证[/ems/robot/access] 主要参数为客户端deviceId;
接入成功后客户端1…N 向WebSocket服务模块【服务端】发送连接请求[ws.connect] ( 客户端 通常sockjs.min.js 来连接);
双向通道connect连接完成;
WebSocket服务模块内存集合增加WebSecket SessionId信息;
客户端向服务端发送注册[/ems/robot/register],参数为WebSecket SessionId 和 客户端设备deviceId;
3)服务端通信服务模块建立 【SessionId,设备deviceId】关系
断开disconnect并清除客户端WebSecket、【SessionId,设备deviceId】等内存信息:
1)当客户端主动断开,向服务端-WS发送断开请求[/ems/robot/disconnect] ,通信服务将移除客户端WebSecket、【SessionId,设备deviceId】等内存信息。
2)当非正常情况下断开(如网络问题、假死等),WebSocket服务模块移除客户端WebSecket、【SessionId,设备deviceId】等内存信息。
心跳:
服务端->客户端:
通信服务模块 基于 @Scheduled 组件(@Component),遍历【SessionId,设备deviceId】内存信息,向所有在线的客户端(设备deivce)经过WebSecket(SessionId)通道推送消息;心跳监测-10秒一次,连接4次未收到消息则认为设备断开,集合为:ConcurrentHashMap<String, Integer> heartbeatMonitorMap = new ConcurrentHashMap<>();//key为sessionId,value为未收到消息累计次数
二、项目说明
1.引入包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jetty</artifactId>
</dependency>
2.项目各模块说明
配置WebSocket,注册handler,将客户端请求转发到端点/merakWs
package com.ems.mgr.websocket.config;
import com.ems.mgr.websocket.handler.SocketHandler;
import com.ems.mgr.websocket.interceptor.WebSocketInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
/**
* @description: 配置WebSocket,注册handler,将客户端请求转发到端点/merakWs
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new SocketHandler(), "/merakWs")
.addInterceptors(new WebSocketInterceptor())
.setAllowedOrigins("*");
}
}
自定义WebSocketInterceptor拦截器:
package com.ems.mgr.websocket.interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import javax.servlet.http.HttpServletRequest;
import java.util.Map;
/**
* @description: WebSocketInterceptor拦截器,输出通信握手前、握手后标记
*/
public class WebSocketInterceptor implements HandshakeInterceptor {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse arg1,
WebSocketHandler arg2, Map<String, Object> arg3) throws Exception {
// 将ServerHttpRequest转换成request请求相关的类,用来获取request域中的用户信息
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
HttpServletRequest httpRequest = servletRequest.getServletRequest();
}
logger.info("beforeHandshake完成");
return true;
}
@Override
public void afterHandshake(ServerHttpRequest arg0, ServerHttpResponse arg1, WebSocketHandler arg2, Exception arg3) {
logger.info("afterHandshake完成");
}
}
自定义SocketHandler,继承TextWebSocketHandler,处理客户端通道的 消息接收处理handleTextMessage、connect连接成功afterConnectionEstablished、关闭连接afterConnectionClosed、
通道通信异常处理handleTransportError等方法。
package com.ems.mgr.websocket.handler;
import com.ems.mgr.common.utils.AutoDeviceInfoHelperUtil;
import com.ems.mgr.common.utils.DateUtils;
import com.ems.mgr.common.utils.spring.SpringUtils;
import com.ems.mgr.system.domain.AutoDeviceInfo;
import com.ems.mgr.system.service.IAutoDeviceInfoService;
import com.ems.mgr.websocket.commons.JacksonUtil;
import com.ems.mgr.websocket.model.MessageBodyBean;
import com.ems.mgr.websocket.model.WsResponse;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.util.Date;
/**
* @description: 自定义SocketHandler,继承TextWebSocketHandler,重载类和实现接口系列方法
*/
@Component
public class SocketHandler extends TextWebSocketHandler {
private Logger logger = LoggerFactory.getLogger(this.getClass());
protected WebSocketSessionHandler webSocketSessionHandler = SpringUtils.getBean(WebSocketSessionHandler.class);
protected IAutoDeviceInfoService deviceInfoService = SpringUtils.getBean(IAutoDeviceInfoService.class);
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
logger.info("handleTextMessage from robot...");
String msg = message.getPayload();
logger.info("msg Payload= " + msg);
MessageBodyBean wsParam = JacksonUtil.json2Bean(msg, new TypeReference<MessageBodyBean>() {
});
if ("signaling".equals(wsParam.getMessageType()) && "heartbeat".equals(wsParam.getMessageContext())) {
logger.info("handleTextMessage-Robot心跳监测响应");
String sessionId = wsParam.getSessionId();
webSocketSessionHandler.subHeartbeatMonitorMap(sessionId);
} else {
logger.info("handleTextMessage-普通响应");
WsResponse<MessageBodyBean> response = new WsResponse<>();
MessageBodyBean messageBodyBean = new MessageBodyBean();
messageBodyBean.setSessionId(session.getId());
messageBodyBean.setMessageType("signaling");
messageBodyBean.setMessageContext("response");
response.setResult(messageBodyBean);
ConcurrentWebSocketSessionDecorator sessionDecorator = (ConcurrentWebSocketSessionDecorator) webSocketSessionHandler.getSession(session.getId());
sendMessageToUser(sessionDecorator, new TextMessage(JacksonUtil.bean2Json(response)));
}
logger.info("handleTextMessage end...");
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
logger.info("Connected ... " + session.getId());
//并发方式
ConcurrentWebSocketSessionDecorator sessionDecorator = new ConcurrentWebSocketSessionDecorator(session, 10000, 524288);
//设备已经存在时,需要更新ConcurrentWebSocketSessionDecorator
webSocketSessionHandler.register(sessionDecorator);
WsResponse<MessageBodyBean> response = new WsResponse<MessageBodyBean>();
MessageBodyBean messageBodyBean = new MessageBodyBean();
messageBodyBean.setSessionId(session.getId());
messageBodyBean.setMessageType("signaling");
messageBodyBean.setMessageContext("connect");
response.setResult(messageBodyBean);
sendMessageToUser(sessionDecorator, new TextMessage(JacksonUtil.bean2Json(response)));
//同步方式
// webSocketSessionHandler.register(session);
// WsResponse<MessageBodyBean> response = new WsResponse<MessageBodyBean>();
// MessageBodyBean messageBodyBean = new MessageBodyBean();
// messageBodyBean.setSessionId(session.getId());
// messageBodyBean.setMessageType("signaling");
// messageBodyBean.setMessageContext("connect");
// response.setResult(messageBodyBean);
// sendMessageToUser(session, new TextMessage(JacksonUtil.bean2Json(response)));
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
logger.warn("afterConnectionClosed->WebSocketSession准备移除:" + session.getId());
// if (session.isOpen()) {
// session.close();
// }
ConcurrentWebSocketSessionDecorator sessionDecorator = (ConcurrentWebSocketSessionDecorator) webSocketSessionHandler.getSession(session.getId());
if (sessionDecorator.isOpen()) {
sessionDecorator.close(status);
}
connectionClosedAndUpdate(session.getId());
logger.info(String.format("afterConnectionClosed->Session %s closed because of %s", session.getId(), status.getReason()));
}
@Override
public void handleTransportError(WebSocketSession session, Throwable throwable) throws Exception {
logger.warn("handleTransportError->WebSocketSession准备移除:" + session.getId());
ConcurrentWebSocketSessionDecorator sessionDecorator = (ConcurrentWebSocketSessionDecorator) webSocketSessionHandler.getSession(session.getId());
if (sessionDecorator.isOpen()) {
sessionDecorator.close();
}
connectionClosedAndUpdate(session.getId());
logger.error("handleTransportError->error occured at sender " + session.getId(), throwable);
}
/**
* 发送消息给指定的用户:并发方式
*/
private void sendMessageToUser(ConcurrentWebSocketSessionDecorator user, TextMessage message) {
try {
if (user.isOpen()) {
user.sendMessage(message);
} else {
logger.warn("发送消息给指定的用户已经离线");
}
} catch (IOException e) {
logger.error("发送消息给指定的用户出错,err=" + e.getMessage());
}
}
/**
* 发送消息给指定的用户:同步方式
*/
private void sendMessageToUserSync(WebSocketSession user, TextMessage message) {
try {
if (user.isOpen()) {
user.sendMessage(message);
} else {
logger.warn("发送消息给指定的用户已经离线");
}
} catch (IOException e) {
logger.error("发送消息给指定的用户出错,err=" + e.getMessage());
}
}
private void connectionClosedAndUpdate(String sessionId) {
try {
//如果已经注册的设备异常关闭,同时更新设备状态
String deviceId = webSocketSessionHandler.getDeviceIdBySessionId(sessionId);
if (!StringUtils.isBlank(deviceId)) {
AutoDeviceInfo autoDeviceInfo1 = deviceInfoService.selectByPrimaryKey(deviceId);
if (null != autoDeviceInfo1) {
String sysTime = DateUtils.getTime();
Date systemDate = DateUtils.parseDate(sysTime);
autoDeviceInfo1.setStatus(AutoDeviceInfoHelperUtil.RUNNINGSTATE_OUTLINE);
autoDeviceInfo1.setStatusWeight(AutoDeviceInfoHelperUtil.LoadRunningStateWight(AutoDeviceInfoHelperUtil.RUNNINGSTATE_OUTLINE));
autoDeviceInfo1.setOnlineTime(systemDate);
deviceInfoService.updateByPrimaryKey(autoDeviceInfo1);
logger.warn("connectionClosed-[" + autoDeviceInfo1.getStatus() + "->outline]");
}
logger.warn("connectionClosedAndUpdate->WebSocketSession关联各Robot设备准备移除[" + sessionId + "," + deviceId + "]");
webSocketSessionHandler.removeSessionIdAndDevice(sessionId);
}
logger.warn("connectionClosedAndUpdate->WebSocketSession准备移除:" + sessionId);
webSocketSessionHandler.remove(sessionId);
} catch (Exception e) {
logger.error("connectionClosedAndUpdate error:" + e.getMessage());
}
}
}
WebSocketSessionHandler存储、管理WebSocketSession相关内存集合信息:
package com.ems.mgr.websocket.handler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.WebSocketSession;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @description: WebSocketSession相关内存集合管理
*/
@Service
public class WebSocketSessionHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketSessionHandler.class);
//存储各Robot设备的WebSocketSession,如[sessionId,WebSocketSession]
public final ConcurrentHashMap<String, WebSocketSession> sessionMap = new ConcurrentHashMap<>();
//sessionId关联各Robot设备id,如[sessionId,id]
public final ConcurrentHashMap<String, String> sessionIdAndDeviceIdMap = new ConcurrentHashMap<>();
//心跳监测-1分钟一次,连接4次未收到消息则认为设备断开
public final ConcurrentHashMap<String, Integer> heartbeatMonitorMap = new ConcurrentHashMap<>();
public final static Integer maxHeartbeatTime = 4;
/**
* @description: WebSocketSession相关操作
* @param: session WebSocketSession
*/
public void register(WebSocketSession session) {
LOGGER.info("WebSocketSession-register:" + session.getId());
sessionMap.put(session.getId(), session);
}
public void remove(String sessionId) {
sessionMap.remove(sessionId);
LOGGER.warn("WebSocketSession移除完成:" + sessionId);
}
public WebSocketSession getSession(String sessionId) {
LOGGER.info("WebSocketSession-get:" + sessionId);
return sessionMap.get(sessionId);
}
/**
* @description: 心跳监测-1分钟一次,连接5次未收到消息则认为设备断开
* @param: sessionId, id
*/
public void putHeartbeatMonitorMap(String sessionId) {
LOGGER.info("心跳监测-put:[" + sessionId + "," + 0 + "]");
heartbeatMonitorMap.put(sessionId, 0);
}
public void addHeartbeatMonitorMap(String sessionId) {
Integer history = heartbeatMonitorMap.get(sessionId);
heartbeatMonitorMap.put(sessionId, history + 1);
LOGGER.info("心跳监测:增加心跳次数[" + sessionId + "," + heartbeatMonitorMap.get(sessionId) + "]");
}
public void cleanHeartbeatMonitorMap(String sessionId) {
heartbeatMonitorMap.remove(sessionId);
LOGGER.info("心跳监测:清理[" + sessionId + "]");
}
public void subHeartbeatMonitorMap(String sessionId) {
Integer history = heartbeatMonitorMap.get(sessionId);
if( null != history && 0 < history ){
heartbeatMonitorMap.put(sessionId, history - 1);
}
LOGGER.info("心跳监测:减少心跳次数[" + sessionId + "," + heartbeatMonitorMap.get(sessionId) + "]");
}
//判断设备是否达到离线条件:1分钟一次,连接4次未收到消息
public boolean heartbeatMonitorOutLine(String sessionId) {
if( maxHeartbeatTime <= heartbeatMonitorMap.get(sessionId) ){
LOGGER.info("心跳监测-设备达到离线条件:[" + sessionId + "," + maxHeartbeatTime+ "]");
return true;
}
return false;
}
public void showSessionIdAndDeviceIdMapTest(ConcurrentHashMap<String, String> sessionIdAndDeviceIdMap){
LOGGER.info("测试环境展示SessionIdAndDeviceIdMap内存信息");
Iterator<Map.Entry<String, String>> sessionIdAndDeviceEntriesTest = sessionIdAndDeviceIdMap.entrySet().iterator();
while (sessionIdAndDeviceEntriesTest.hasNext()) {
Map.Entry<String, String> entry = sessionIdAndDeviceEntriesTest.next();
LOGGER.info("SessionIdAndDeviceIdMap内存信息["+entry.getKey()+","+entry.getValue()+"]");
}
}
public WebSocketSession getSessionIdByDeviceId(String deviceId) {
Iterator<Map.Entry<String, String>> sessionIdAndDeviceEntries = sessionIdAndDeviceIdMap.entrySet().iterator();
String sessionId = "";
showSessionIdAndDeviceIdMapTest(sessionIdAndDeviceIdMap);
while (sessionIdAndDeviceEntries.hasNext()) {
Map.Entry<String, String> entry = sessionIdAndDeviceEntries.next();
String deviceId_map = entry.getValue();
LOGGER.info("getSessionIdByDeviceId list deviceIds:[" +deviceId_map + "]");
if (deviceId_map.equals(deviceId)) {
sessionId = entry.getKey();
LOGGER.info("getSessionIdByDeviceId find:[" + sessionId + "," + deviceId_map + "]");
break;
} else {
continue;
}
}
LOGGER.info("get sessionId-deviceId:[" + sessionId + "," + deviceId + "]");
if( "".equals(sessionId) ){
return null;
}
else{
return sessionMap.get(sessionId);
}
}
/**
* @description: Robot设备相关操作
* @param: sessionId, deviceId
*/
public void putSessionIdAndDevice(String sessionId, String deviceId) {
//判断同一设备多次连接,更新设备WebSocket信息
WebSocketSession historyWsSession = getSessionIdByDeviceId(deviceId);
if( null != historyWsSession ){
LOGGER.warn("删除历史SessionIdAndDevice信息:[" + historyWsSession.getId() + "," + deviceId + "]");
sessionIdAndDeviceIdMap.remove(historyWsSession.getId());
}
LOGGER.info("SessionIdAndDevice put:[" + sessionId + "," + deviceId + "]");
sessionIdAndDeviceIdMap.put(sessionId, deviceId);
}
public void removeSessionIdAndDevice(String sessionId) {
sessionIdAndDeviceIdMap.remove(sessionId);
LOGGER.warn("WebSocketSession关联各Robot设备移除完成:" + sessionId);
}
public String getDeviceIdBySessionId(String sessionId) {
String value = sessionIdAndDeviceIdMap.get(sessionId);
LOGGER.info("SessionIdAndDevice sessionId-deviceId:[" + sessionId + "," + value + "]");
return value;
}
public Map<String, String> getWebSocketSessionIds() {
Map<String, String> webSocketSessionIdsMap = new HashMap<>();
Iterator<Map.Entry<String, WebSocketSession>> entries = sessionMap.entrySet().iterator();
while (entries.hasNext()) {
Map.Entry<String, WebSocketSession> entry = entries.next();
webSocketSessionIdsMap.put(entry.getKey(),entry.getValue().getId());
}
return webSocketSessionIdsMap;
}
}
服务端->客户端心跳监测业务类WsSocketController:
package com.ems.mgr.websocket.controller;
import com.ems.mgr.common.utils.AutoDeviceInfoHelperUtil;
import com.ems.mgr.common.utils.DateUtils;
import com.ems.mgr.common.utils.spring.SpringUtils;
import com.ems.mgr.system.domain.AutoDeviceInfo;
import com.ems.mgr.system.service.IAutoDeviceInfoService;
import com.ems.mgr.websocket.commons.JacksonUtil;
import com.ems.mgr.websocket.handler.WebSocketSessionHandler;
import com.ems.mgr.websocket.model.MessageBodyBean;
import com.ems.mgr.websocket.model.WsResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @description: 服务端->客户端心跳监测
* 遍历【SessionId,设备deviceId】内存信息,向所有在线的客户端(设备deivce)经过WebSecket(SessionId)通道推送消息;
* 心跳监测-10秒一次,连接4次未收到消息则认为设备断开,
*/
@Service
public class WsSocketController {
private Logger logger = LoggerFactory.getLogger(WsSocketController.class);
private WebSocketSessionHandler webSocketSessionHandler = SpringUtils.getBean(WebSocketSessionHandler.class);
private IAutoDeviceInfoService deviceInfoService = SpringUtils.getBean(IAutoDeviceInfoService.class);
/**
* @description: Robot心跳监测:定时推送消息(如10秒),监测所有客户端(设备)运行状态,并更新客户端(设备)运行状态值[running运行中,idle空闲]
*/
@Scheduled(fixedRate = 10000)
public void heartbeatMonitor() {
ConcurrentHashMap<String, WebSocketSession> allSessionMap = webSocketSessionHandler.sessionMap;
ConcurrentHashMap<String, Integer> heartbeatMonitorMapTime = webSocketSessionHandler.heartbeatMonitorMap;
try{
if( 0 < allSessionMap.size() ){
Iterator<Map.Entry<String, WebSocketSession>> entries = allSessionMap.entrySet().iterator();
while (entries.hasNext()) {
Map.Entry<String, WebSocketSession> entry = entries.next();
WebSocketSession user = entry.getValue();
String sessionId = entry.getKey();
WsResponse<MessageBodyBean> response = new WsResponse<MessageBodyBean>();
MessageBodyBean messageBodyBean = new MessageBodyBean();
messageBodyBean.setSessionId(sessionId);
messageBodyBean.setMessageType("signaling");
messageBodyBean.setMessageContext("heartbeat");
response.setResult(messageBodyBean);
String msgJson = JacksonUtil.bean2Json(response);
logger.info("WebSocket Service -> Robot心跳监测[定时推送消息时间:"+msgJson+"]");
sendMessageToUser(user, new TextMessage(msgJson));
//SessionId不存在时,初始化【SessionId,0】
Integer heartbeatMap_Value = heartbeatMonitorMapTime.get(sessionId);
if( null == heartbeatMap_Value ){
logger.info("heartbeatMap init 0");
heartbeatMonitorMapTime.put(sessionId,0);
}
else{
//SessionId存在时,每次发送次数+1, 到5次未响应则认为离线
if( !webSocketSessionHandler.heartbeatMonitorOutLine(sessionId) ){
webSocketSessionHandler.addHeartbeatMonitorMap(sessionId);
}
else{
//设备【sessionId】离线
String deviceId = webSocketSessionHandler.getDeviceIdBySessionId(sessionId);
AutoDeviceInfo autoDeviceInfo1 = deviceInfoService.selectByPrimaryKey(deviceId);
//存在且历史运行状态不为离线OUTLINE时,更新为OUTLINE
if( null != autoDeviceInfo1 ){
String sysTime = DateUtils.getTime();
Date systemDate = DateUtils.parseDate(sysTime);
autoDeviceInfo1.setStatus(AutoDeviceInfoHelperUtil.RUNNINGSTATE_OUTLINE);
autoDeviceInfo1.setStatusWeight(AutoDeviceInfoHelperUtil.LoadRunningStateWight(AutoDeviceInfoHelperUtil.RUNNINGSTATE_OUTLINE));
autoDeviceInfo1.setOnlineTime(systemDate);
deviceInfoService.updateByPrimaryKey(autoDeviceInfo1);
logger.warn("设备断开-["+autoDeviceInfo1.getStatus()+"->outline]");
}
logger.warn("Robot心跳监测-设备离线,移除:" + sessionId);
webSocketSessionHandler.remove(sessionId);
logger.warn("Robot心跳监测-设备离线,关联设备准备移除[" + sessionId+","+deviceId +"]");
webSocketSessionHandler.removeSessionIdAndDevice(sessionId);
webSocketSessionHandler.cleanHeartbeatMonitorMap(sessionId);
}
}
Thread.sleep(200);
}
}
else{
logger.warn("WebSocket Service -> Robot心跳监测[无连接设备,请先接入并注册设备]");
}
}catch (Exception e){
logger.warn("WebSocket Service -> Robot心跳监测异常,msg="+e.getMessage());
}
}
/**
* 发送消息给指定的用户
*/
public void sendMessageToUser(WebSocketSession user, TextMessage message) {
try {
// 在线就发送
if (user.isOpen()) {
user.sendMessage(message);
logger.info("Message {"+user.getId()+"} send success");
}
else{
logger.warn("发送消息给指定的用户已经离线");
}
} catch (IOException e) {
logger.error("发送消息给指定的用户出错", e);
}
}
//1:保证websocket同设备在线状态不一致的问题处理
public void syncDeviceWebsocket(String sessionId){
String deviceId = webSocketSessionHandler.getDeviceIdBySessionId(sessionId);
if( null != deviceId ){
AutoDeviceInfo autoDeviceInfo1 = deviceInfoService.selectByPrimaryKey(deviceId);
if( null != autoDeviceInfo1 && AutoDeviceInfoHelperUtil.RUNNINGSTATE_OUTLINE.equals(autoDeviceInfo1.getStatus()) ){
String sysTime = DateUtils.getTime();
Date systemDate = DateUtils.parseDate(sysTime);
autoDeviceInfo1.setStatus(AutoDeviceInfoHelperUtil.RUNNINGSTATE_IDLE);
autoDeviceInfo1.setStatusWeight(AutoDeviceInfoHelperUtil.LoadRunningStateWight(AutoDeviceInfoHelperUtil.RUNNINGSTATE_IDLE));
autoDeviceInfo1.setOnlineTime(systemDate);
deviceInfoService.updateByPrimaryKey(autoDeviceInfo1);
logger.warn("保证websocket同设备在线状态[outline->idle]一致");
}
}
}
}
设备信息通信管理类DeviceInfoController,设备接入、注册、断开(客户端主动)、接收消息:
package com.ems.mgr.web.controller.websocket;
import com.ems.mgr.common.core.controller.BaseController;
import com.ems.mgr.common.core.domain.AjaxResult;
import com.ems.mgr.common.utils.AutoDeviceInfoHelperUtil;
import com.ems.mgr.common.utils.DateUtils;
import com.ems.mgr.common.utils.sign.Md5SaltUtilsHelper;
import com.ems.mgr.system.domain.AutoAppConfig;
import com.ems.mgr.system.domain.AutoDeviceInfo;
import com.ems.mgr.system.service.IAutoAppConfigService;
import com.ems.mgr.system.service.IAutoDeviceInfoService;
import com.ems.mgr.websocket.commons.JacksonUtil;
import com.ems.mgr.websocket.handler.WebSocketSessionHandler;
import com.ems.mgr.websocket.model.MessageBodyBean;
import com.ems.mgr.websocket.model.WsResponse;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* 设备信息通信管理类:设备接入、注册、断开(客户端主动)、接收消息
*/
@RestController
@RequestMapping("/ems/robot")
public class DeviceInfoController extends BaseController {
public static final Logger log = LoggerFactory.getLogger(DeviceInfoController.class);
@Autowired
private IAutoDeviceInfoService deviceInfoService;
@Autowired
private IAutoAppConfigService autoAppConfigService;
@Autowired
private WebSocketSessionHandler webSocketSessionHandler;
/**
* 接入接口
*/
@PostMapping(value = "/access")
public AjaxResult access(@RequestBody AutoAppConfig autoAppConfig) {
log.info("设备开始接入");
String serverHost = autoAppConfig.getServerHost();
String appId = autoAppConfig.getAppId();
List<AutoAppConfig> autoAppConfigsRecs = autoAppConfigService.selectAll();
if( 0 == autoAppConfigsRecs.size() ){
return AjaxResult.error("应用设备未登记接入信息!");
}
if( StringUtils.isBlank(serverHost)){
return AjaxResult.error("应用设备接入服务器地址不正确!");
}
if( StringUtils.isBlank(appId)){
return AjaxResult.error("应用设备接入设备码不正确!");
}
List<AutoAppConfig> findAutoAppConfigs = autoAppConfigsRecs.stream().filter(u ->(u.getAppId().equals(appId) && u.getServerHost().equals(serverHost)) ).collect(Collectors.toList());
if( 0 == findAutoAppConfigs.size() ){
log.error("接入应用配置信息未配置设备码appId["+autoAppConfig.getAppId()+"]不正确,请检查auto_app_config表!");
return AjaxResult.error("应用设备接入信息不正确!");
}
AutoAppConfig findAutoAppConfig = findAutoAppConfigs.get(0);
boolean matchResp = Md5SaltUtilsHelper.matchMd5(appId,findAutoAppConfig.getAppTimestamp().getTime(),findAutoAppConfig.getAuthCode(),findAutoAppConfig.getAppSecret(),autoAppConfig.getAppSign());
return matchResp ? AjaxResult.success() : AjaxResult.error("应用设备接入签名不匹配!");
}
/**
* 连接成功后,注册/api/robot/register
*/
@PostMapping(value = "/register")
public AjaxResult register(@RequestBody AutoDeviceInfo autoDeviceInfo) {
log.info("设备连接成功后开始注册");
String msg = "";
String deviceId = autoDeviceInfo.getId();
String sessionId = autoDeviceInfo.getSessionId();
try{
if( null != webSocketSessionHandler.getSession(sessionId) ){
//在线
AutoDeviceInfo autoDeviceInfo1 = deviceInfoService.selectByPrimaryKey(autoDeviceInfo.getId());
if( null == autoDeviceInfo1 ){
msg = "设备注册失败,设备["+autoDeviceInfo.getId()+"]不存在!";
log.error(msg);
return AjaxResult.error(msg);
}
else{
String sysTime = DateUtils.getTime();
Date systemDate = DateUtils.parseDate(sysTime);
autoDeviceInfo1.setMachineIp(autoDeviceInfo.getMachineIp());
autoDeviceInfo1.setStatus(AutoDeviceInfoHelperUtil.RUNNINGSTATE_IDLE);
autoDeviceInfo1.setStatusWeight(AutoDeviceInfoHelperUtil.LoadRunningStateWight(AutoDeviceInfoHelperUtil.RUNNINGSTATE_IDLE));
autoDeviceInfo1.setOnlineTime(systemDate);
autoDeviceInfo1.setVers(autoDeviceInfo.getVers());
autoDeviceInfo1.setPythonVers(autoDeviceInfo.getPythonVers());
deviceInfoService.updateByPrimaryKey(autoDeviceInfo1);
msg = "设备注册成功,设备["+autoDeviceInfo.getId()+"],sessionId["+sessionId+"]";
log.info(msg);
webSocketSessionHandler.putSessionIdAndDevice(sessionId,deviceId);
return AjaxResult.success(msg);
}
}
String msg2 = "设备注册失败,设备已经断开,设备["+autoDeviceInfo.getId()+"],sessionId["+sessionId+"]";
log.info(msg2);
return AjaxResult.error(msg2);
}catch (Exception e){
msg = "设备注册异常,msg["+e.getMessage()+"],设备["+autoDeviceInfo.getId()+"],sessionId["+sessionId+"]";
log.error(msg);
return AjaxResult.error(msg);
}
}
/**
* 断开/api/robot/disconnect:先断开,然后再接收断开服务接口/disconnect
*/
@PostMapping(value = "/disconnect")
public AjaxResult disconnect(@RequestBody AutoDeviceInfo autoDeviceInfo) {
String msg = "";
String sessionId = autoDeviceInfo.getSessionId();
String deviceId = autoDeviceInfo.getId();
try{
msg = "接收执行器断开服务请求,Socket关联信息["+sessionId+","+deviceId+"]断开!";
log.warn(msg);
AutoDeviceInfo autoDeviceInfo1 = deviceInfoService.selectByPrimaryKey(deviceId);
//存在且历史运行状态不为离线OUTLINE时,更新为OUTLINE
if( null != autoDeviceInfo1 ){
String sysTime = DateUtils.getTime();
Date systemDate = DateUtils.parseDate(sysTime);
autoDeviceInfo1.setStatus(AutoDeviceInfoHelperUtil.RUNNINGSTATE_OUTLINE);
autoDeviceInfo1.setStatusWeight(AutoDeviceInfoHelperUtil.LoadRunningStateWight(AutoDeviceInfoHelperUtil.RUNNINGSTATE_OUTLINE));
autoDeviceInfo1.setOnlineTime(systemDate);
log.warn("设备断开-["+autoDeviceInfo1.getStatus()+"->outline]");
deviceInfoService.updateByPrimaryKey(autoDeviceInfo1);
}
logger.warn("Robot主动断开服务接口disconnect->WebSocketSession准备移除:" + sessionId);
webSocketSessionHandler.remove(sessionId);
logger.warn("Robot主动断开服务接口disconnect->WebSocketSession关联各Robot设备准备移除[" + sessionId+","+deviceId +"]");
webSocketSessionHandler.removeSessionIdAndDevice(sessionId);
return AjaxResult.success();
}catch (Exception e){
msg = "设备断开异常,msg["+e.getMessage()+"],设备["+autoDeviceInfo.getId()+"],sessionId["+sessionId+"]";
log.error(msg);
return AjaxResult.error(msg);
}
}
/**
* 接收调度中心调用机器人消息
*/
@PostMapping(value = "/handleRobotMessage/{deviceId}")
public AjaxResult handleRobotMessage(@PathVariable String deviceId,
@RequestBody MessageBodyBean messageBodyBean) {
try{
log.info("设备接收调度中心调用机器人消息");
if( !StringUtils.isBlank(deviceId) ){
AutoDeviceInfo autoDeviceInfo1 = deviceInfoService.selectByPrimaryKey(deviceId);
if( "stop".equals(messageBodyBean.getBussinessType()) ){
//stop:停止
WebSocketSession session = webSocketSessionHandler.getSessionIdByDeviceId(deviceId);
if( null != session ){
WsResponse<MessageBodyBean> response = new WsResponse<MessageBodyBean>();
messageBodyBean.setSessionId(session.getId());
response.setResult(messageBodyBean);
sendMessageToUser(session, new TextMessage(JacksonUtil.bean2Json(response)));
log.info("接收调度中心终止业务消息已完成,设备["+deviceId+"]");
return AjaxResult.success();
}
else{
String msg = "接收调度中心终止业务失败,机器人已接入但未注册使用,设备["+deviceId+"]";
log.info(msg);
return AjaxResult.error(msg);
}
}
else{
//存在且运行状态不为离线OUTLINE时,更新为OUTLINE
if( null != autoDeviceInfo1 && AutoDeviceInfoHelperUtil.RUNNINGSTATE_IDLE.equals(autoDeviceInfo1.getStatus())){
log.info("接收调度中心调用机器人消息,设备["+deviceId+"]");
WebSocketSession session = webSocketSessionHandler.getSessionIdByDeviceId(deviceId);
if( null != session ){
WsResponse<MessageBodyBean> response = new WsResponse<MessageBodyBean>();
messageBodyBean.setSessionId(session.getId());
response.setResult(messageBodyBean);
sendMessageToUser(session, new TextMessage(JacksonUtil.bean2Json(response)));
String sysTime = DateUtils.getTime();
Date systemDate = DateUtils.parseDate(sysTime);
autoDeviceInfo1.setStatus(AutoDeviceInfoHelperUtil.RUNNINGSTATE_RUNNING);
autoDeviceInfo1.setStatusWeight(AutoDeviceInfoHelperUtil.LoadRunningStateWight(AutoDeviceInfoHelperUtil.RUNNINGSTATE_RUNNING));
autoDeviceInfo1.setOnlineTime(systemDate);
deviceInfoService.updateByPrimaryKey(autoDeviceInfo1);
log.info("接收调度中心调用机器人消息已完成,设备["+deviceId+"]");
return AjaxResult.success();
}
else{
String msg = "接收调度中心调用空闲机器人已接入但未注册使用,设备["+deviceId+"]";
log.info(msg);
return AjaxResult.error(msg);
}
}
else{
String msg = "接收调度中心调用无空闲机器人,设备["+deviceId+"]";
log.info(msg);
return AjaxResult.error(msg);
}
}
}
else{
String msg = "接收调度中心调用机器人消息异常,设备["+deviceId+"]";
log.info(msg);
return AjaxResult.error(msg);
}
}catch (Exception e){
String msg = "接收调度中心调用机器人消息异常,msg["+e.getMessage()+"],设备["+deviceId+"]";
log.info(msg);
return AjaxResult.error(msg);
}
}
/**
* Robot执行完py流程后,更新Robot运行状态和运行状态排序权重
*/
@PutMapping(value = "/{deviceId}")
public AjaxResult updateDeviceStatus(@PathVariable String deviceId,
@RequestBody String status) {
try{
log.info("Robot执行完py流程更新Robot运行状态,设备["+deviceId+"]");
AutoDeviceInfo autoDeviceInfo1 = deviceInfoService.selectByPrimaryKey(deviceId);
//存在且历史运行状态不为离线OUTLINE时,更新为OUTLINE
if( null != autoDeviceInfo1 ){
String sysTime = DateUtils.getTime();
Date systemDate = DateUtils.parseDate(sysTime);
autoDeviceInfo1.setStatus(AutoDeviceInfoHelperUtil.RUNNINGSTATE_IDLE);
autoDeviceInfo1.setStatusWeight(AutoDeviceInfoHelperUtil.LoadRunningStateWight(AutoDeviceInfoHelperUtil.RUNNINGSTATE_IDLE));
autoDeviceInfo1.setOnlineTime(systemDate);
deviceInfoService.updateByPrimaryKey(autoDeviceInfo1);
log.info("更新Robot运行状态完成,设备["+deviceId+"]");
}
return AjaxResult.success();
}catch (Exception e){
String msg = "设备状态更新接口异常,msg["+e.getMessage()+"],设备["+deviceId+"]";
log.error(msg);
return AjaxResult.error(msg);
}
}
/**
* 设备信息接口说明
* 参数 appId:设备主键id
* 返回结果 AjaxResult
*/
@GetMapping(value = "/devicevideopolicy")
public AjaxResult deviceDetail(@RequestParam(name = "appId") String appId) {
return AjaxResult.success(deviceInfoService.selectByPrimaryKey(appId));
}
/**
* 发送消息给指定的用户
*/
public void sendMessageToUser(WebSocketSession user, TextMessage message) {
try {
String msg;
if (user.isOpen()) {
msg = "发送消息给指定的在线用户,WebSocketSessionId["+user.getId()+"]";
logger.info(msg);
user.sendMessage(message);
}
else{
msg = "用户已经离线,WebSocketSessionId["+user.getId()+"]";
logger.error(msg);
}
} catch (IOException e) {
logger.error("发送消息给指定的用户出错"+e.getMessage());
}
}
@GetMapping("/loadWebSocketSessions")
public AjaxResult loadWebSocketSessions(){
Map<String, String> msg = webSocketSessionHandler.getWebSocketSessionIds();
return AjaxResult.success(msg);
}
@GetMapping("/loadWebSessionIdAndDeviceId")
public AjaxResult loadWebSessionIdAndDeviceId(){
ConcurrentHashMap<String,String> msg = webSocketSessionHandler.sessionIdAndDeviceIdMap;
return AjaxResult.success(msg);
}
}
spring工具类SpringUtils,非spring管理环境中获取bean:
package com.ems.mgr.common.utils.spring;
import com.ems.mgr.common.utils.StringUtils;
import org.springframework.aop.framework.AopContext;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public final class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware
{
/** Spring应用上下文环境 */
private static ConfigurableListableBeanFactory beanFactory;
private static ApplicationContext applicationContext;
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException
{
SpringUtils.beanFactory = beanFactory;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
{
SpringUtils.applicationContext = applicationContext;
}
/**
* 获取对象
*
* @param name
* @return Object 一个以所给名字注册的bean的实例
* @throws org.springframework.beans.BeansException
*
*/
@SuppressWarnings("unchecked")
public static <T> T getBean(String name) throws BeansException
{
return (T) beanFactory.getBean(name);
}
/**
* 获取类型为requiredType的对象
*
* @param clz
* @return
* @throws org.springframework.beans.BeansException
*
*/
public static <T> T getBean(Class<T> clz) throws BeansException
{
T result = (T) beanFactory.getBean(clz);
return result;
}
/**
* 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
*
* @param name
* @return boolean
*/
public static boolean containsBean(String name)
{
return beanFactory.containsBean(name);
}
/**
* 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
*
* @param name
* @return boolean
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*
*/
public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException
{
return beanFactory.isSingleton(name);
}
/**
* @param name
* @return Class 注册对象的类型
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*
*/
public static Class<?> getType(String name) throws NoSuchBeanDefinitionException
{
return beanFactory.getType(name);
}
/**
* 如果给定的bean名字在bean定义中有别名,则返回这些别名
*
* @param name
* @return
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*
*/
public static String[] getAliases(String name) throws NoSuchBeanDefinitionException
{
return beanFactory.getAliases(name);
}
/**
* 获取aop代理对象
*
* @param invoker
* @return
*/
@SuppressWarnings("unchecked")
public static <T> T getAopProxy(T invoker)
{
return (T) AopContext.currentProxy();
}
/**
* 获取当前的环境配置,无配置返回null
*
* @return 当前的环境配置
*/
public static String[] getActiveProfiles()
{
return applicationContext.getEnvironment().getActiveProfiles();
}
/**
* 获取当前的环境配置,当有多个环境配置时,只获取第一个
*
* @return 当前的环境配置
*/
public static String getActiveProfile()
{
final String[] activeProfiles = getActiveProfiles();
return StringUtils.isNotEmpty(activeProfiles) ? activeProfiles[0] : null;
}
}
客户端(设备)常量类AutoDeviceInfoHelperUtil:
package com.ems.mgr.common.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
public class AutoDeviceInfoHelperUtil {
public static final Logger log = LoggerFactory.getLogger(AutoDeviceInfoHelperUtil.class);
/*
运行状态:
running运行中
idle空闲
outline断开连接[未连接](默认),
remove_cooling移除冷却
*/
public final static String RUNNINGSTATE_RUNNING = "running";
public final static String RUNNINGSTATE_IDLE = "idle";
public final static String RUNNINGSTATE_OUTLINE = "outline";
public final static String RUNNINGSTATE_REMOVE_COOLING = "remove_cooling";
// 某类中定义的获取运行状态-运行状态排序权重的方法以及EnumMap的声明
private static Map<String, Integer> runningStateWight = new HashMap<>();
/**
* @description:运行状态-运行状态排序权重映射
running | idle :70;
outline:50
remove_cooling:30
*/
static {
runningStateWight.put(RUNNINGSTATE_RUNNING, 70);
runningStateWight.put(RUNNINGSTATE_IDLE, 70);
runningStateWight.put(RUNNINGSTATE_OUTLINE, 50);
runningStateWight.put(RUNNINGSTATE_REMOVE_COOLING, 30);
}
//根据不同的运行状态类型,返回对应的运行状态排序权重
public static Integer LoadRunningStateWight(String runningState) {
Integer stateWight = -1;
if( null != runningState ) {
stateWight = runningStateWight.get(runningState);
}
return stateWight;
}
}
问题
1.当接入、连接的客户端(设备)量过大(如50台时),服务端心跳监测向客户端推送消息时,WebSocketSession.sendMessage(message)方法出现同步排队,导致客户端接收消息出现堵塞与延迟。
排查:跟踪Spring WebSocket源代码AbstractWebSocketSession类sendMessage(WebSocketMessage<?> message) -> StandardWebSocketSession 类sendMessage
-> WsRemoteEndpointBasic类 sendText(String fragment, boolean isLast) -> WsRemoteEndpointImplBase类 sendPartialString(String fragment, boolean isLast) 方法在消息发送(sendMessageBlock)前,同步synchronized判断通道状态State :
public synchronized void textPartialStart() {
checkState(State.OPEN, State.TEXT_PARTIAL_READY);
state = State.TEXT_PARTIAL_WRITING;
}
解决方案:SocketHandler类在对端建立连接后afterConnectionEstablished(WebSocketSession session),将单线程WebSocketSession session 修改为并发方式ConcurrentWebSocketSessionDecorator,即:
ConcurrentWebSocketSessionDecorator sessionDecorator = new ConcurrentWebSocketSessionDecorator(session, 10000, 524288)
参考
demo资源下载地址
demo参考 boot-websocket