Spingboot 整合netty-socket.io
-
依赖
注意版本号,不然client版本不对的话也是连不上的
https://github.com/mrniko/netty-socketio
``
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
</dependency>
<zyy-socket.version>2.0.2</zyy-socket.version>
- 结构
3 配置
package com.zyy.wss.config;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.corundumstudio.socketio.SocketConfig;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.SpringAnnotationScanner;
import com.zyy.framework.model.constant.SecurityConstants;
import com.zyy.wss.handler.NettySocketEventHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.util.Strings;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.oauth2.common.OAuth2AccessToken;
import org.springframework.security.oauth2.provider.token.TokenStore;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
/**
* @author: liucw
* @createDate: 2023/4/26
* @description:
*/
@Configuration
@Slf4j
public class NettySocketConfig {
@Value("${socket.port}")
private Integer socketPort;
@Value("${socket.upgradeTimeout}")
private Integer upgradeTimeout;
@Value("${socket.pingInterval}")
private Integer pingInterval;
@Value("${socket.pingTimeout}")
private Integer pingTimeout;
@Value("${socket.bossThreadCount}")
private Integer bossThreadCount;
@Value("${socket.workerThreadCount}")
private Integer workerThreadCount;
@Resource
private NettySocketEventHandler nettySocketEventHandler;
@Resource
private TokenStore tokenStore;
@Bean
public SocketIOServer socketIOServer() {
/*
* 创建Socket,并设置监听端口
*/
com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
// 设置主机名,默认是0.0.0.0
config.setHostname("0.0.0.0");
// 设置监听端口
config.setBossThreads(bossThreadCount);
config.setWorkerThreads(workerThreadCount);
config.setAllowCustomRequests(true);
config.setPort(socketPort);
// 协议升级超时时间(毫秒), 默认10000, HTTP握手升级为ws协议超时时间
config.setUpgradeTimeout(upgradeTimeout);
// Ping消息间隔(毫秒), 默认25000, 客户端向服务器发送一条心跳消息间隔
config.setPingInterval(pingInterval);
// Ping消息超时时间(毫秒), 默认60000, 这个时间间隔内没有接收到心跳消息就会发送超时事件
config.setPingTimeout(pingTimeout);
// 开放跨域 null 不是*
config.setOrigin("*");
SocketConfig socketConfig = new SocketConfig();
socketConfig.setReuseAddress(true);
socketConfig.setTcpNoDelay(true);
socketConfig.setSoLinger(0);
config.setSocketConfig(socketConfig);
config.setAuthorizationListener(handshakeData -> {
Map<String, List<String>> urlParams = handshakeData.getUrlParams();
String accessToken = handshakeData.getSingleUrlParam("Authorization");
log.info(" handshakeData url req :{} ,authorization:{}", JSONUtil.toJsonStr(urlParams), accessToken);
if (StrUtil.isBlank(accessToken)) {
return false;
}
OAuth2AccessToken oAuth2AccessToken = tokenStore.readAccessToken(StrUtil.replaceIgnoreCase(accessToken, SecurityConstants.JWT_PREFIX, Strings.EMPTY));
return !oAuth2AccessToken.isExpired();
});
SocketIOServer socketIOServer = new SocketIOServer(config);
socketIOServer.addListeners(nettySocketEventHandler);
return socketIOServer;
}
@Bean
public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
return new SpringAnnotationScanner(socketServer);
}
}
4 .ClientCache
``
package com.zyy.wss.config;
import com.corundumstudio.socketio.SocketIOClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author litong
* @date 2019/11/6 16:01
*/
@Component
@Slf4j
public class ClientCache {
/**
* 本地缓存
*/
private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap = new ConcurrentHashMap<>();
/**
* 存入本地缓存
*
* @param userId 用户ID
* @param sessionId 页面sessionID
* @param socketIOClient 页面对应的通道连接信息
*/
public void saveClient(String userId, UUID sessionId, SocketIOClient socketIOClient) {
HashMap<UUID, SocketIOClient> sessionIdClientCache = concurrentHashMap.get(userId);
if (sessionIdClientCache == null) {
sessionIdClientCache = new HashMap<>();
}
sessionIdClientCache.put(sessionId, socketIOClient);
concurrentHashMap.put(userId, sessionIdClientCache);
log.info("ClientCache :{}",concurrentHashMap.get(userId).size());
}
/**
* 根据用户ID获取所有通道信息
*
* @param userId
* @return
*/
public HashMap<UUID, SocketIOClient> getUserClient(String userId) {
return concurrentHashMap.get(userId);
}
/**
* 根据用户ID及页面sessionID删除页面链接信息
*
* @param userId
* @param sessionId
*/
public void deleteSessionClient(String userId, UUID sessionId) {
concurrentHashMap.get(userId).remove(sessionId);
}
}
5 . 连接处理类
package com.zyy.wss.handler;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.corundumstudio.socketio.annotation.OnEvent;
import com.zyy.admin.api.dto.common.NumberNotDownloadedDto;
import com.zyy.admin.api.enums.common.ExportEnums;
import com.zyy.common.api.feign.DownloadCenterFeign;
import com.zyy.framework.model.dto.result.data.DataResult;
import com.zyy.framework.model.enums.TenantEnums;
import com.zyy.framework.model.enums.base.EnumUtils;
import com.zyy.wss.config.ClientCache;
import com.zyy.wss.entity.DownLoadData;
import com.zyy.wss.entity.MessageInfoStructure;
import com.zyy.wss.entity.TokenInfo;
import com.zyy.wss.enums.MsgTypeEnum;
import com.zyy.wss.helper.JWTHelper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.UUID;
/**
* @author: liucw
* @createDate: 2023/4/26
* @description:
*/
@Component
@Slf4j
public class NettySocketEventHandler {
@Resource
private ClientCache clientCache;
@Resource
private JWTHelper jwtHelper;
@Resource
private DownloadCenterFeign downloadCenterFeign;
/**
* socket事件消息接收入口
*/
@OnEvent(value = "message_event") //value值与前端自行商定
public void onEvent(SocketIOClient client, AckRequest ackRequest, MessageInfoStructure data) {
boolean ackRequested = ackRequest.isAckRequested();
// 获取前端推送数据
log.info("onEvent :{}", JSONUtil.toJsonStr(data));
//根据msgType类别进行数据类型判断,
if (Objects.nonNull(data.getMsgType()) && data.getMsgType().equals(MsgTypeEnum.EXPORT_POLLING.getValue())) {
TokenInfo clientKey = jwtHelper.getClientKey(client);
assert clientKey != null;
DataResult<Integer> integerDataResult = null;
if (data.getTenantId().equals(TenantEnums.ZMP_BACK.getId())) {
integerDataResult = downloadCenterFeign.downloadableCount(NumberNotDownloadedDto.builder().userType(ExportEnums.ExportUserType.DAY_PIVOT.getUserType()).userId(clientKey.getUserId()).build());
} else if (data.getTenantId().equals(TenantEnums.ZMP_BEND.getId())) {
integerDataResult = downloadCenterFeign.downloadableCount(NumberNotDownloadedDto.builder().userType(ExportEnums.ExportUserType.CLOUD_CHAIN.getUserType()).userId(clientKey.getCustomerId()).build());
}
log.info("integerDataResult :{}", JSONUtil.toJsonStr(integerDataResult));
if (StrUtil.isNotBlank(data.getMsgContent())) {
DownLoadData downLoadData = (DownLoadData) JSONUtil.toBean(data.getMsgContent(), EnumUtils.of(MsgTypeEnum.class, MsgTypeEnum.EXPORT_POLLING.getValue()).getContentType());
log.info("Json: " + downLoadData);
}
//数据类型标识
//向前端发送接收数据成功标识
//这里可填写接收数据后的相关业务逻辑代码
client.sendEvent("message_event", integerDataResult);
}
}
/**
* socket添加@OnDisconnect事件,客户端断开连接时调用,刷新客户端信息
*/
@OnDisconnect
public void onDisconnect(SocketIOClient client) {
log.info("--------------------客户端已断开连接--------------------");
//client.disconnect();
TokenInfo clientKey = jwtHelper.getClientKey(client);
if (Objects.isNull(clientKey)) {
log.error("Client key token error");
return;
}
clientCache.deleteSessionClient(String.valueOf(clientKey.getUserId()), client.getSessionId());
}
/**
* socket添加connect事件,当客户端发起连接时调用
*/
@OnConnect
public void onConnect(SocketIOClient client) {
if (client != null) {
TokenInfo clientKey = jwtHelper.getClientKey(client);
if (Objects.isNull(clientKey)) {
log.error("Client key token error");
return;
}
//存储SocketIOClient,用于向不同客户端发送消息
// socketIOClientMap.put(mac, client);
UUID sessionId = client.getSessionId();
clientCache.saveClient(String.valueOf(clientKey.getUserId()), sessionId, client);
log.info("--------------------客户端连接成功---------------------");
client.sendEvent("connected", "connected", sessionId);
} else {
log.error("客户端为空");
}
}
/**
* 广播消息 函数可在其他类中调用
*/
/* public static void sendBroadcast(byte[] data) {
for (SocketIOClient client : socketIOClientMap.values()) {
//向已连接的所有客户端发送数据,map实现客户端的存储
if (client.isChannelOpen()) {
client.sendEvent("message_event", data);
}
}
}*/
}
6.工具处理类
package com.zyy.wss.helper;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.corundumstudio.socketio.SocketIOClient;
import com.google.common.collect.Maps;
import com.nimbusds.jose.JWSObject;
import com.zyy.framework.model.constant.SecurityConstants;
import com.zyy.wss.entity.TokenInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.util.Strings;
import org.springframework.security.oauth2.common.OAuth2AccessToken;
import org.springframework.security.oauth2.provider.token.TokenStore;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.text.ParseException;
import java.util.Map;
/**
* @author: liucw
* @createDate: 2023/4/28
* @description:
*/
@Component
@Slf4j
public class JWTHelper {
@Resource
private TokenStore tokenStore;
public static Boolean verifyZyToken(String token) {
token = StrUtil.replaceIgnoreCase(token, SecurityConstants.JWT_PREFIX, Strings.EMPTY);
String payload;
try {
payload = StrUtil.toString(JWSObject.parse(token).getPayload());
} catch (ParseException e) {
log.error("验证token失败", e);
return false;
}
TokenInfo tokenInfo = JSONUtil.toBean(payload, TokenInfo.class);
log.info("token: {}", tokenInfo);
return tokenInfo.isAuth();
}
public static TokenInfo parseZyToken(String token) {
token = StrUtil.replaceIgnoreCase(token, SecurityConstants.JWT_PREFIX, Strings.EMPTY);
String payload;
try {
payload = StrUtil.toString(JWSObject.parse(token).getPayload());
} catch (ParseException e) {
log.error("验证token失败", e);
return null;
}
return JSONUtil.toBean(payload, TokenInfo.class);
}
public TokenInfo parseZyTokenTwo(OAuth2AccessToken token) {
Map<String, Object> source = token.getAdditionalInformation();
Map<String, String> mapping = Maps.newHashMap();
mapping.put("username", "username");
mapping.put("customerId", "customerId");
mapping.put("userId", "userId");
return BeanUtil.mapToBean(source, TokenInfo.class, true, CopyOptions.create().setFieldMapping(mapping));
}
public TokenInfo getClientKey(SocketIOClient client) {
String authorization = client.getHandshakeData().getSingleUrlParam("Authorization");
if (StrUtil.isBlank(authorization)) {
return null;
}
authorization = StrUtil.replaceIgnoreCase(authorization, SecurityConstants.JWT_PREFIX, Strings.EMPTY);
// TokenStore tokenStore = SpringContextHelper.getBean("tokenStore", TokenStore.class);
return parseZyTokenTwo(tokenStore.readAccessToken(authorization));
}
/* public static TokenInfo getClientKey(SocketIOClient client) {
String authorization = client.getHandshakeData().getSingleUrlParam("Authorization");
if (StrUtil.isBlank(authorization)) {
return null;
}
return parseZyToken(authorization);
}*/
public static void main(String[] args) throws ParseException {
String token = "bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX25hbWUiOiJMaWFvMSIsImlwIjoiMTEzLjExMC4yMjEuNSIsImF1dGhlbnRpY2F0aW9uSWRlbnRpdHkiOiJ1c2VybmFtZSIsImF2YXRhciI6bnVsbCwidXNlcklkIjoxNjUxNDEwMDc3MTQ5Njc5NjE4LCJhdXRob3JpdGllcyI6WyJCRU5EOklvdC9NaWZp55So5oi36L-Q6JClIl0sImNsaWVudF9pZCI6InptcC1iZW5kIiwic2NvcGUiOlsiYWxsIl0sImZpcnN0TG9naW5TdGF0dXMiOjAsInRlbmFudElkIjoxMSwiY3VzdG9tZXJJZCI6MTY1MTQxMDA3Njg1MTg1NTM2MSwiZXhwIjoxNjgyNjg4NTU5LCJqdGkiOiI2MTY5ODRiOC1jZDU3LTRmYmItOTMxMS05YzNhNTdlZjVmOTciLCJ1c2VybmFtZSI6IkxpYW8xIn0.jHwvZc3vCwDzTfPJ26-7_wfKOehmfRCxEiKe3l79up5gBaYDSIWb3lTSsdORfhmCiRBRJPvhts19DJIwa3xbieBTDUoN8eEENUBGVblHzTMCgdprms88WCtLue6-BpZ6YnEiBXK1vNLStrdi7zf7JohKwdeXb7OojgKN-5PX2yLuaPZSQMiJmCDQ597q7Lx7DkmR5MdcAqEU2bUuTrFTtqgQDcyfARAqYxra2JFVreVOjnVJoZqtIkYE16lSt5_7IEQYqgzdcccq6hnuZmD7eqcg0vh3KDO94DBZncWEu08CJZ427cpFZBL0n5rTplFFnxNHy8LtX-MiZSZkJHFKjA";
token = StrUtil.replaceIgnoreCase(token, SecurityConstants.JWT_PREFIX, Strings.EMPTY);
String payload = StrUtil.toString(JWSObject.parse(token).getPayload());
TokenInfo tokenInfo1 = JSONUtil.toBean(payload, TokenInfo.class);
boolean auth = tokenInfo1.isAuth();
// TokenInfo tokenInfo = verifyToken(token);
System.out.println(JSONUtil.toJsonStr(auth));
}
}
7.NettySocketRunnable
package com.zyy.wss.runner;
import com.corundumstudio.socketio.SocketIOServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
/**
* @author: liucw
* @createDate: 2023/4/26
* @description:
*/
@Component
@Order(value = 1)
@Slf4j
public class NettySocketRunnable implements CommandLineRunner {
private final SocketIOServer server;
@Autowired
public NettySocketRunnable(SocketIOServer server) {
this.server = server;
}
@Override
public void run(String... args) {
log.info("--------------------SocketIOServer socket.io通信启动成功!---------------------");
server.start();
}
}
yaml 配置
socket:
port: 9090
upgradeTimeout: 10000
pingInterval: 60000
pingTimeout: 180000
bossThreadCount : 1
workerThreadCount : 100
8:nginx 配置 协议升级 wss(443) 或ws(80)
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
upstream socket {
server 127.0.0.1:9090 ;
keepalive 256;
}
location /socket.io/ {
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $http_host;
proxy_pass http://socket;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
tcp_nodelay on;
}
1: 注意server的版本和client版本是对应上的
2: 上线后注意线上是否有cdn 注意 如果有需要开启cdn支持websocket
1: 注意server的版本和client版本是对应上的
2: 上线后注意线上是否有cdn 注意 如果有需要开启cdn支持websocket
支持多个节点 清集群
public interface SocketIOService {
void pushMessageToUser(String eventName, String userId, Object msgContent);
void sendToAll(String eventName, Object msgContent);
void sendMessage(String eventName, String userId, Object msgContent);
}
package com.zyy.wss.service.impl;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.corundumstudio.socketio.SocketIOClient;
import com.zyy.wss.config.ClientCache;
import com.zyy.wss.config.properties.SocketIoServerProperties;
import com.zyy.wss.entity.MessageInfoStructure;
import com.zyy.wss.service.SocketIOService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
/**
* @author: liucw
* @createDate: 2023/5/23
* @description:
*/
@Service
@Slf4j
public class SocketIOServiceImpl implements SocketIOService {
@Resource
private ClientCache clientCache;
@Resource
private RedissonClient redissonClient;
@Resource
private SocketIoServerProperties serverProperties;
/**
* 广播(群发)前缀
*/
private static final String MASS_PREFIX = "/mass";
/**
* socketio
*/
private static final String TOPIC_SOCKETIO_SINGLE = "socketio:single";
private static final String TOPIC_SOCKETIO_TOALL = "socketio:toAll";
@Override
public void pushMessageToUser(String eventName, String userId, Object msgContent) {
HashMap<UUID, SocketIOClient> userClient = clientCache.getUserClient(userId);
if (userClient == null) {
log.debug("没有在线的用户");
return;
}
userClient.forEach((uuid, socketIOClient) -> {
//向客户端推送消息
socketIOClient.sendEvent(eventName, msgContent);
});
}
@Override
public void sendToAll(String topic, Object msgContent) {
if (StrUtil.isBlank(topic)) {
topic = MASS_PREFIX + "/toAll";
}
clientCache.sendBroadcast(topic, msgContent);
//socketIOServer.getBroadcastOperations().sendEvent(topic, msgContent);
}
@Override
public void sendMessage(String eventName, String userId, Object msgContent) {
MessageInfoStructure socketIOMessageDTO = MessageInfoStructure.builder().topic(eventName).userId(userId).msgContent(JSONUtil.toJsonStr(msgContent)).build();
if (StrUtil.isNotBlank(userId)) {
if (Objects.nonNull(serverProperties) && serverProperties.isCluster()) {
RTopic rTopic = redissonClient.getTopic(TOPIC_SOCKETIO_SINGLE);
rTopic.publish(socketIOMessageDTO);
} else {
pushMessageToUser(eventName, socketIOMessageDTO.getUserId(), msgContent);
}
} else {
RTopic rTopic = redissonClient.getTopic(TOPIC_SOCKETIO_TOALL);
rTopic.publish(socketIOMessageDTO);
//sendToAll(eventName, msgContent);
}
}
@PostConstruct
public void init() {
if (redissonClient == null) {
return;
}
RTopic topic = redissonClient.getTopic(TOPIC_SOCKETIO_SINGLE);
topic.addListener(MessageInfoStructure.class, (channel, msg) -> {
if (StrUtil.isNotBlank(msg.getUserId())) {
pushMessageToUser(msg.getTopic(), msg.getUserId(), JSONUtil.toBean(msg.getMsgContent(), Map.class));
log.info(" {} {} {} {} {}", serverProperties.getPort(), channel.toString(), msg.getTopic(), msg.getUserId(), msg.getMsgContent());
}
});
RTopic broadcast = redissonClient.getTopic(TOPIC_SOCKETIO_TOALL);
broadcast.addListener(MessageInfoStructure.class, (channel, msg) -> {
sendToAll(msg.getTopic(), JSONUtil.toBean(msg.getMsgContent(), Map.class));
log.info(" sendToAll {} {} {} {} {}", serverProperties.getPort(), channel.toString(), msg.getTopic(), msg.getUserId(), msg.getMsgContent());
});
}
}