目录
1.启用Websocket功能 2.封装操作websocket session的工具 3.保存websocket session的接口 4.保存websocket session的类 5.定义websocket 端点 6.创建定时任务 ping websocket 客户端
1.启用Websocket功能
package com. xxx. robot. config ;
import org. springframework. context. annotation. Bean ;
import org. springframework. context. annotation. Configuration ;
import org. springframework. web. socket. config. annotation. EnableWebSocket ;
import org. springframework. web. socket. server. standard. ServerEndpointExporter ;
@Configuration
@EnableWebSocket
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpoint ( ) {
return new ServerEndpointExporter ( ) ;
}
}
2.封装操作websocket session的工具
package com. xxx. robot. websocket. util ;
import java. util. Map ;
import javax. websocket. Session ;
import org. apache. tomcat. websocket. Constants ;
import org. springframework. security. authentication. UsernamePasswordAuthenticationToken ;
import com. xxx. framework. security. config. MyUserDetails ;
import com. xxx. framework. security. entity. LoginUser ;
import com. xxx. user. entity. User ;
public final class WebSocketSessionUtils {
private WebSocketSessionUtils ( ) { }
public static final int WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE = 8 * 1024 * 1024 ;
public static final int WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE = 8 * 1024 * 1024 ;
public static final long WEBSOCKET_BLOCKING_SEND_TIMEOUT = 10 * 1000 ;
public static User findUser ( Session session) {
UsernamePasswordAuthenticationToken uToken = ( UsernamePasswordAuthenticationToken ) session. getUserPrincipal ( ) ;
MyUserDetails userDetails = ( MyUserDetails ) uToken. getPrincipal ( ) ;
LoginUser loginUser = ( LoginUser ) userDetails. getUserData ( ) ;
return ( User ) loginUser. getAdditionalInfo ( ) ;
}
public static void setProperties ( Session session) {
session. setMaxTextMessageBufferSize ( WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE) ;
session. setMaxBinaryMessageBufferSize ( WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE) ;
Map < String , Object > userProperties = session. getUserProperties ( ) ;
userProperties. put ( Constants . BLOCKING_SEND_TIMEOUT_PROPERTY, WEBSOCKET_BLOCKING_SEND_TIMEOUT) ;
}
}
3.保存websocket session的接口
package com. xxx. robot. websocket ;
import java. io. IOException ;
import java. nio. ByteBuffer ;
import java. util. List ;
import javax. websocket. Session ;
import org. slf4j. Logger ;
import org. slf4j. LoggerFactory ;
public interface WebSocketSessionManager {
Logger log = LoggerFactory . getLogger ( WebSocketSessionManager . class ) ;
String PING = "ping" ;
String PONG = "pong" ;
Session get ( String key) ;
List < String > keys ( ) ;
void add ( String key, Session session) ;
Session remove ( String key) ;
default void pingBatch ( ) {
List < String > keyList = keys ( ) ;
log. info ( "WebSocket: {} 数量为:{}" , this . getClass ( ) . getSimpleName ( ) , keyList. size ( ) ) ;
for ( String key : keyList) {
if ( key != null ) {
Session session = get ( key) ;
if ( session != null ) {
try {
session. getBasicRemote ( ) . sendPing ( ByteBuffer . wrap ( PING. getBytes ( ) ) ) ;
try {
Thread . sleep ( 10 ) ;
} catch ( InterruptedException e1) {
}
} catch ( Exception e) {
log. error ( "WebSocket-ping异常" , e) ;
}
}
}
}
}
default void clearAllSession ( ) {
List < String > keyList = keys ( ) ;
int i = 0 ;
for ( String key : keyList) {
if ( key != null ) {
Session session = get ( key) ;
if ( session != null ) {
try {
remove ( key) ;
i++ ;
session. close ( ) ;
} catch ( IOException e1) {
log. error ( "WebSocket-移除并关闭session异常" , e1) ;
}
if ( i % 10 == 0 ) {
try {
Thread . sleep ( 0 ) ;
} catch ( InterruptedException e1) {
}
}
}
}
}
log. info ( "WebSocket-移除并关闭session数量为:{}" , i) ;
}
}
4.保存websocket session的类
package com. xxx. robot. websocket. robot. manager ;
import java. io. IOException ;
import java. util. ArrayList ;
import java. util. List ;
import java. util. NavigableSet ;
import java. util. concurrent. ConcurrentNavigableMap ;
import java. util. concurrent. ConcurrentSkipListMap ;
import javax. websocket. Session ;
import org. apache. commons. lang3. StringUtils ;
import org. springframework. stereotype. Component ;
import com. xxx. robot. websocket. WebSocketSessionManager ;
@Component
public class RobotSessionManager implements WebSocketSessionManager {
private static final ConcurrentSkipListMap < String , Session > SESSION_POOL = new ConcurrentSkipListMap < > ( ) ;
public static final String joinKey ( String userId, String managerId) {
return userId + '-' + managerId;
}
public static final String joinKey ( Long userId, String managerId) {
return userId. toString ( ) + '-' + managerId;
}
public static final String [ ] splitKey ( String key) {
return StringUtils . split ( key, '-' ) ;
}
@Override
public Session get ( String key) {
return SESSION_POOL. get ( key) ;
}
public List < String > keysByUserId ( String userId, String excludeManagerId) {
ConcurrentNavigableMap < String , Session > subMap = SESSION_POOL. subMap ( userId + '-' , userId + '.' ) ;
NavigableSet < String > keySet = subMap. navigableKeySet ( ) ;
List < String > list = new ArrayList < > ( ) ;
if ( StringUtils . isBlank ( excludeManagerId) ) {
for ( String key : keySet) {
if ( key != null ) {
list. add ( key) ;
}
}
} else {
for ( String key : keySet) {
if ( key != null && ! key. equals ( excludeManagerId) ) {
list. add ( key) ;
}
}
}
return list;
}
@Override
public List < String > keys ( ) {
NavigableSet < String > keySet = SESSION_POOL. navigableKeySet ( ) ;
List < String > list = new ArrayList < > ( ) ;
for ( String key : keySet) {
if ( key != null ) {
list. add ( key) ;
}
}
return list;
}
@Override
public synchronized void add ( String key, Session session) {
removeAndClose ( key) ;
SESSION_POOL. put ( key, session) ;
}
@Override
public synchronized Session remove ( String key) {
return SESSION_POOL. remove ( key) ;
}
public synchronized void remove ( String key, Session session) {
SESSION_POOL. remove ( key, session) ;
}
private void removeAndClose ( String key) {
Session session = remove ( key) ;
if ( session != null ) {
try {
session. close ( ) ;
} catch ( IOException e) {
}
}
}
}
5.定义websocket 端点
package com. xxx. robot. websocket. robot. endpoint ;
import java. util. Map ;
import javax. websocket. OnClose ;
import javax. websocket. OnError ;
import javax. websocket. OnMessage ;
import javax. websocket. OnOpen ;
import javax. websocket. Session ;
import javax. websocket. server. PathParam ;
import javax. websocket. server. ServerEndpoint ;
import org. springframework. stereotype. Component ;
import com. fasterxml. jackson. databind. JsonNode ;
import com. xxx. framework. util. SpringBeanUtils ;
import com. xxx. user. entity. User ;
import com. xxx. robot. corefunc. service. RobotCoreService ;
import com. xxx. robot. util. serial. BaseJsonUtils ;
import com. xxx. robot. websocket. WebSocketSessionManager ;
import com. xxx. robot. websocket. robot. manager. RobotSessionManager ;
import com. xxx. robot. websocket. util. WebSocketSessionUtils ;
import lombok. extern. slf4j. Slf4j ;
@Slf4j
@Component
@ServerEndpoint ( value = "/robot/{id}" )
public class RobotWebSocketServer {
private volatile User user;
private volatile String id;
private volatile Session session;
private volatile Map < String , RobotCoreService > robotCoreServiceMap;
@OnOpen
public void onOpen ( @PathParam ( "id" ) String id, Session session) {
WebSocketSessionUtils . setProperties ( session) ;
this . user = WebSocketSessionUtils . findUser ( session) ;
this . id = id;
this . session = session;
log. info ( "连接成功:{}, {}" , id, this . user. getUserCode ( ) ) ;
robotCoreServiceMap = SpringBeanUtils . getApplicationContext ( ) . getBeansOfType ( RobotCoreService . class ) ;
RobotSessionManager robotSessionManager = SpringBeanUtils . getBean ( RobotSessionManager . class ) ;
robotSessionManager. add ( RobotSessionManager . joinKey ( this . user. getId ( ) , id) , session) ;
}
@OnClose
public void onClose ( ) {
log. info ( "连接关闭:{}, {}" , this . id, this . user. getUserCode ( ) ) ;
RobotSessionManager robotSessionManager = SpringBeanUtils . getBean ( RobotSessionManager . class ) ;
robotSessionManager. remove ( RobotSessionManager . joinKey ( this . user. getId ( ) , this . id) , this . session) ;
}
@OnError
public void onError ( Throwable error) {
log. error ( "onError:id = {}, {}, {}" , this . id, this . session. getId ( ) , this . user. getUserCode ( ) , error) ;
RobotSessionManager robotSessionManager = SpringBeanUtils . getBean ( RobotSessionManager . class ) ;
robotSessionManager. remove ( RobotSessionManager . joinKey ( this . user. getId ( ) , this . id) , this . session) ;
}
@OnMessage
public void onMessage ( String message) {
log. info ( "onMessage:id = {}, {}, {}" , this . id, this . user. getUserCode ( ) , message) ;
if ( WebSocketSessionManager . PING. equals ( message) ) {
this . session. getAsyncRemote ( ) . sendText ( WebSocketSessionManager . PONG) ;
return ;
}
try {
JsonNode root = BaseJsonUtils . readTree ( message) ;
String apiType = root. at ( "/apiType" ) . asText ( ) ;
robotCoreServiceMap. get ( apiType + "Service" ) . receiveFrontMessage ( this . user, RobotSessionManager . joinKey ( this . user. getId ( ) , this . id) , root) ;
} catch ( Exception e) {
log. error ( "处理消息错误" , e) ;
}
}
}
6.创建定时任务 ping websocket 客户端
package com. xxx. robot. config ;
import org. springframework. context. annotation. Bean ;
import org. springframework. context. annotation. Configuration ;
import org. springframework. scheduling. annotation. EnableScheduling ;
import org. springframework. scheduling. concurrent. ThreadPoolTaskExecutor ;
@Configuration
@EnableScheduling
public class TaskExecutorConfig {
@Bean
public ThreadPoolTaskExecutor taskExecutor ( ) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ( ) ;
executor. setCorePoolSize ( 5 ) ;
executor. setMaxPoolSize ( 5 ) ;
executor. setQueueCapacity ( 10 ) ;
executor. setKeepAliveSeconds ( 60 ) ;
executor. setThreadNamePrefix ( "scheduler-executor-" ) ;
return executor;
}
}
package com. xxx. robot. websocket ;
import java. util. List ;
import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. scheduling. annotation. Scheduled ;
import org. springframework. stereotype. Component ;
import lombok. extern. slf4j. Slf4j ;
@Slf4j
@Component
public class WebSocketSchedulerTask {
@Autowired
private List < WebSocketSessionManager > webSocketSessionManagers;
@Scheduled ( initialDelay = 60000 , fixedDelay = 30000 )
public void clearInvalidSession ( ) {
try {
log. info ( "pingBatch 开始。。。" ) ;
for ( WebSocketSessionManager webSocketSessionManager : webSocketSessionManagers) {
webSocketSessionManager. pingBatch ( ) ;
}
log. info ( "pingBatch 完成。。。" ) ;
} catch ( Exception e) {
log. error ( "pingBatch异常" , e) ;
}
}
}