1 前言
1.1 什么是 SocketIO ?
Socket.IO 是一个可以在浏览器与服务器之间实现实时、双向、基于事件的通信的工具库。 Socket.IO 能够在任何平台、浏览器或设备上运行,可靠性和速度同样出色。
1.2 websocket和socket.io区别?
- websocket
a:一种让客户端和服务器之间能进行双向实时通信的技术
b:使用时,虽然主流浏览器都已经支持,但仍然可能有不兼容的情况
c:适合用于client和基于node搭建的服务端使用 - socket.io
a:将WebSocket、AJAX和其它的通信方式全部封装成了统一的通信接口
b:使用时,不用担心兼容问题,底层会自动选用最佳的通信方式
c:适合进行服务端和客户端双向数据通信
d:Socket.IO中文网地址:https://socket.nodejs.cn/docs/v4/
1.3 应用及版本
- 本文是基于若依前后端分离版本的基础上进行代码编写和演示的
- spring-boot:2.5.14
- socketio:2.0.3
- jdk:java8
2 物料准备(均为后端代码)
2.1 添加 Socket 依赖包
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>2.0.3</version>
</dependency>
2.2 创建频道常量类:SocketEventContants
我这个常量类是为了统一频道所建,你们不一定需要这个类
package com.mss.common.constant;
/**
* @Description: Socket 自定义事件名称
* @Author: zhanleai
*/
public class SocketEventContants {
/**
* 用户频道
**/
public static final String CHANNEL_USER = "channel_user";
/**
* 系统频道
**/
public static final String CHANNEL_SYSTEM = "channel_system";
}
2.3 创建 Socket 连接类:SocketHandler
用来监听 socket 客户端上下线,以及服务端自动关闭;
有些博主把这个类的内容跟工具类里监听事件方法放在一起,个人认为需要解耦;
package com.mss.framework.handle;
import com.corundumstudio.socketio.SocketIOServer;
import com.mss.common.utils.socket.SocketUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.springframework.stereotype.Component;
/**
* @Author: zhanleai
* @Description: 客户端自动连接和断开、服务端关闭
*/
@Component
@Slf4j
public class SocketHandler {
@Autowired
private SocketIOServer socketIoServer;
/**
* 容器销毁前,自动调用此方法,关闭 socketIo 服务端
*
* @Param []
* @return
**/
@PreDestroy
private void destroy(){
try {
log.debug("关闭 socket 服务端");
socketIoServer.stop();
}catch (Exception e){
e.printStackTrace();
}
}
@PostConstruct
public void init() {
log.debug("SocketEventListener initialized");
//添加监听,客户端自动连接到 socket 服务端
socketIoServer.addConnectListener(client -> {
String userId = client.getHandshakeData().getSingleUrlParam("userId");
SocketUtil.connectMap.put(userId, client);
log.debug("客户端userId: "+ userId+ "已连接,客户端ID为:" + client.getSessionId());
});
//添加监听,客户端跟 socket 服务端自动断开
socketIoServer.addDisconnectListener(client -> {
String userId = client.getHandshakeData().getSingleUrlParam("userId");
SocketUtil.connectMap.remove(userId, client);
log.debug("客户端userId:" + userId + "断开连接,客户端ID为:" + client.getSessionId());
});
}
// // 注释说明:以下 onConnect和 onDisconnect 方法在某些场景下会失效,不建议使用,所以注释掉
// /**
// * 客户端自动连接到 socket 服务端
// *
// * @Param [client]
// * @return
// **/
// @OnConnect
// public void onConnect(SocketIOClient client) {
// String userId = client.getHandshakeData().getSingleUrlParam("userId");
// SocketUtil.connectMap.put(userId, client);
// log.debug("客户端userId: "+ userId+ "已连接,客户端ID为:" + client.getSessionId());
// }
//
// /**
// * 客户端跟 socket 服务端自动断开
// *
// * @Param [client]
// * @return
// **/
// @OnDisconnect
// public void onDisconnect(SocketIOClient client) {
// String userId = client.getHandshakeData().getSingleUrlParam("userId");
// log.debug("客户端userId:" + userId + "断开连接,客户端ID为:" + client.getSessionId());
// SocketUtil.connectMap.remove(userId, client);
// }
}
2.4 Socket 配置文件和配置类
(用来定义 socket 的一些配置)
2.4.1 yml 配置
socketio:
host: 127.0.0.1
port: 33000
maxFramePayloadLength: 1048576
maxHttpContentLength: 1048576
bossCount: 1
workCount: 100
allowCustomRequests: true
upgradeTimeout: 1000000
pingTimeout: 6000000
pingInterval: 25000
2.4.2 配置类:SocketConfig
package com.mss.framework.config;
import com.corundumstudio.socketio.SocketIOServer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class SocketConfig {
@Value("${socketio.host}")
private String host;
@Value("${socketio.port}")
private Integer port;
@Value("${socketio.bossCount}")
private int bossCount;
@Value("${socketio.workCount}")
private int workCount;
@Value("${socketio.allowCustomRequests}")
private boolean allowCustomRequests;
@Value("${socketio.upgradeTimeout}")
private int upgradeTimeout;
@Value("${socketio.pingTimeout}")
private int pingTimeout;
@Value("${socketio.pingInterval}")
private int pingInterval;
@Bean
public SocketIOServer socketIOServer() {
com.corundumstudio.socketio.Configuration configuration = new com.corundumstudio.socketio.Configuration();
configuration.setPort(port);
com.corundumstudio.socketio.SocketConfig socketConfig=new com.corundumstudio.socketio.SocketConfig();
socketConfig.setReuseAddress(true);
configuration.setSocketConfig(socketConfig);
configuration.setOrigin(null);
configuration.setBossThreads(bossCount);
configuration.setWorkerThreads(workCount);
configuration.setAllowCustomRequests(allowCustomRequests);
configuration.setUpgradeTimeout(upgradeTimeout);
configuration.setPingTimeout(pingTimeout);
configuration.setPingInterval(pingInterval);
configuration.setRandomSession(true);
// configuration.setKeyStorePassword("pi0yo93pqgrs");
// configuration.setKeyStore(this.getClass().getResourceAsStream("www.ibms.club.jks"));
// configuration.setAuthorizationListener(data -> {
// String token = data.getSingleUrlParam("token");
// return StrUtil.isNotBlank(token);
// });
//初始化 Socket 服务端配置
return new SocketIOServer(configuration);
}
}
2.5 Socket 服务启动类:ServerRunner
实现 CommandLineRunner 接口,项目启动时自动执行 socketIOServer.start() 方法
package com.mss.framework.run;
import com.corundumstudio.socketio.SocketIOServer;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@AllArgsConstructor
public class ServerRunner implements CommandLineRunner {
private final SocketIOServer socketIOServer;
/**
* 项目启动时,自动启动 socket 服务,服务端开始工作
*
* @Param [args]
* @return
**/
@Override
public void run(String... args) {
socketIOServer.start();
log.info("socket.io server started !");
}
}
2.6 Socket 工具类:SocketUtil
下列实例代码中,是使用 userId 来当做客户端唯一标识,这个每个人可以根据自己项目里自行设置;
下列实例代码的应用场景,只有服务端向客户端发送消息的需求,所以实际这个工具类只有 sendToOne() 方法是实际起作用的,其余的代码都是为了本文额外写的方法;
package com.mss.common.utils.socket;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.annotation.OnEvent;
import com.mss.common.constant.SocketEventContants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* @Author: zhanleai
* @Description:
*/
@Component
@Slf4j
public class SocketUtil {
//暂且把用户&客户端信息存在缓存
public static ConcurrentMap<String, SocketIOClient> connectMap = new ConcurrentHashMap<>();
/**
* 单发消息(以 userId 为标识符,给用户发送消息)
*
* @Param [userId, message]
* @return
**/
public static void sendToOne(String userId, Object message) {
//拿出某个客户端信息
SocketIOClient socketClient = getSocketClient(userId);
if (Objects.nonNull(socketClient) ){
//单独给他发消息
socketClient.sendEvent(SocketEventContants.CHANNEL_USER,message);
}else{
log.info(userId + "已下线,暂不发送消息。");
}
}
/**
* 群发消息
*
* @Param
* @return
**/
public static void sendToAll(Object message) {
if (connectMap.isEmpty()){
return;
}
//给在这个频道的每个客户端发消息
for (Map.Entry<String, SocketIOClient> entry : connectMap.entrySet()) {
entry.getValue().sendEvent(SocketEventContants.CHANNEL_SYSTEM, message);
}
}
/**
* 根据 userId 识别出 socket 客户端
* @param userId
* @return
*/
public static SocketIOClient getSocketClient(String userId){
SocketIOClient client = null;
if (StringUtils.hasLength(userId) && !connectMap.isEmpty()){
for (String key : connectMap.keySet()) {
if (userId.equals(key)){
client = connectMap.get(key);
}
}
}
return client;
}
/**
* 1)使用事件注解,服务端监听获取客户端消息;
* 2)拿到客户端发过来的消息之后,可以再根据业务逻辑发送给想要得到这个消息的人;
* 3)channel_system 之所以会向全体客户端发消息,是因为我跟前端约定好了,你们也可以自定定义;
*
* @Param message
* @return
**/
@OnEvent(value = SocketEventContants.CHANNEL_SYSTEM)
public void channelSystemListener(String message) {
if (!StringUtils.hasLength(message)){
return;
}
this.sendToAll(message);
}
}
3 Socket 调用
3.1 实际项目的应用场景:在需要发送消息通知的业务代码中调用
这个方法里有几个类:Message、DateUtils、IMessageService、MessageMapper,均为根据自身业务场景自定义的类,你们自己建吧。有需要再私信我要;
后端代码写到这里,实际上已经写完了。从 3.2 开始均为测试代码;
package com.mss.message.service.impl;
import com.mss.common.utils.DateUtils;
import com.mss.common.utils.socket.SocketUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.mss.message.mapper.MessageMapper;
import com.mss.message.domain.entity.Message;
import com.mss.message.service.IMessageService;
/**
* 消息Service业务层处理
*
* @author zhanleai
*/
@Service
@Slf4j
public class MessageServiceImpl implements IMessageService {
@Autowired
private MessageMapper messageMapper;
/**
* 新增消息
*
* @param message 消息
* @return 结果
*/
@Override
public int insertMessage(Message message) {
message.setSendTime(DateUtils.getNowDate());
// 消息入库,消息持久化
int i = messageMapper.insertMessage(message);
// 新增消息之后,再向前端推送 Socket 消息
SocketUtil.sendToOne(message.getSendUserId().toString(),message);
return i;
}
}
3.2 测试Controller
下文均为测试的代码
package com.mss.message.controller;
import com.mss.common.utils.socket.SocketUtil;
import io.swagger.annotations.Api;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.mss.common.core.controller.BaseController;
import com.mss.common.core.domain.AjaxResult;
/**
* 消息Controller
*
* @author zhanleai
*/
@RestController
@Api(tags="消息")
@RequestMapping("/message")
public class MessageController extends BaseController {
/**
* 给指定客户端发送消息
*
* @Param [userId, message]
* @return
**/
@GetMapping("/sendToOne")
public AjaxResult sendToOne(String userId , String message){
SocketUtil.sendToOne(userId,message);
return AjaxResult.success("单独发送消息成功。");
}
}
4 前端调用代码
前端代码监听了 channel_user 和 channel_system 两个频道,一个做了三个动作:
1)连接上服务端;
2)监听并接收 channel_user 频道的消息;
3)给服务端发送一条消息,并广播到所有客户端;postman 只做了一个动作,给后端指定的 userId 发送一条 channel_user 频道的消息,并被指定客户端捕获;
4.1 html 测试代码以及说明
此处不做赘述,详细的 html 测试代码见我另一篇文章《SocketIO 的 html 代码示例》:
链接跳转
4.2 浏览器打开 html 文件,然后查看后端服务日志
-
(socket 服务端启动,端口号为 33000,客户端 zhanleai 连接上来了)
-
浏览器截图
-
后端服务日志截图
4.3 postman 工具测试
- postman 截图
- 浏览器收到消息截图