一、服务端
1、maven引入netty-socketio
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.22</version>
</dependency>
2、服务端java代码
@Service
public class SocketService {
@Autowired
private SocketConfig socketConfig;
private SocketIOServer socketIOServer = null;
@Autowired(required = false)
private ISocketLinesService socketLinesService;
//pc端
private Map<String /*userId*/, Set<UUID>> CLIENT_ID = new HashMap<>();
//移动端
private Set<String> MOBILE_ID = new HashSet<>();
private Set<String> PC_COUNT = new HashSet<>();
@PostConstruct
public void init() {
Configuration config = this.socketConfig.getConfig();
if (config == null) {
// 配置默认的启动信息
config = new Configuration();
config.setHostname("0.0.0.0");
config.setPort(7543);
}
int num = Runtime.getRuntime().availableProcessors();
//主线程设置为1即可
config.setBossThreads(1);
//设置工作线程
config.setWorkerThreads(num * 2);
//解决对此重启服务时,netty端口被占用问题
com.corundumstudio.socketio.SocketConfig tmpConfig = new com.corundumstudio.socketio.SocketConfig();
tmpConfig.setReuseAddress(true);
config.setSocketConfig(tmpConfig);
//异常处理
config.setExceptionListener(new ExceptionListener() {
@Override
public void onPongException(Exception e, SocketIOClient client) {
}
@Override
public void onEventException(Exception e, List<Object> args, SocketIOClient client) {
// System.out.println("收发消息异常");
}
@Override
public void onDisconnectException(Exception e, SocketIOClient client) {
// System.out.println("断开连接异常");
}
@Override
public void onConnectException(Exception e, SocketIOClient client) {
// System.out.println("建立连接异常");
}
@Override
public void onPingException(Exception e, SocketIOClient client) {
// System.out.println("心跳异常");
}
@Override
public boolean exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception {
return false;
}
});
this.socketIOServer = new SocketIOServer(config);
this.addEvent();
System.out.println(String.format("socket服务已启动,地址:%s,端口:%d", config.getHostname(), config.getPort()));
this.socketIOServer.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("应用程序关闭了");
socketIOServer.stop();
}));
}
@PreDestroy
public void stop() {
if (this.socketIOServer != null) {
this.socketIOServer.stop();
}
}
public void addEvent() {
this.socketIOServer.addEventListener("emit", String.class, new DataListener<String>() {
@Override
public void onData(SocketIOClient client, String data, AckRequest ackRequest) {
System.out.println(String.format("收到的消息:%s", data));
}
});
this.socketIOServer.addConnectListener(new ConnectListener() {
@Override
public void onConnect(SocketIOClient client) {
//处理会话建立业务
Map<String, List<String>> params = client.getHandshakeData().getUrlParams();
if (params == null || params.size() == 0) {
client.disconnect();
return;
}
try {
line(params, 1, client);
} catch (RuntimeException e) {
//校验身份失败
client.disconnect();
} catch (Exception e) {
e.printStackTrace();
}
}
});
this.socketIOServer.addDisconnectListener(new DisconnectListener() {
@Override
public void onDisconnect(SocketIOClient client) {
// System.out.println("断开连接了");
// 处理会话断开业务
Map<String, List<String>> params = client.getHandshakeData().getUrlParams();
try {
line(params, 2, client);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
public void line(Map<String, List<String>> params, int type, SocketIOClient client) {
List<String> list = params.get("userId");
if (list == null || list.size() == 0) {
send("身份认证失效");
client.disconnect();
return;
}
String user_id = params.get("userId").get(0);
Set<UUID> uids = CLIENT_ID.get(user_id);
if (uids == null) {
uids = new HashSet<>();
}
System.out.println("clientId:" + user_id);
uids.add(client.getSessionId());
if (type == 1) {
if (user_id.equalsIgnoreCase("undefined") || user_id.equals("")) {
client.disconnect();
} else {
CLIENT_ID.put(user_id, uids);
PC_COUNT.add(user_id);
}
} else {
uids.remove(client.getSessionId());
PC_COUNT.remove(user_id);
}
//移动端在线
handleMobileLine(user_id, params, type, client);
}
public void handleMobileLine(String userId, Map<String, List<String>> params, int type, SocketIOClient client) {
if (StringUtils.isEmpty(userId) || userId.equalsIgnoreCase("undefined")) {
return;
}
List<String> list = params.get("mobile");
if (list != null && list.size() > 0) {
UUID id = client.getSessionId();
if (type == 1) {
MOBILE_ID.add(userId);
} else {
MOBILE_ID.remove(userId);
}
}
}
public void sendAll(String topic, byte[] data) {
this.socketIOServer.getBroadcastOperations().sendEvent(topic, data);
}
public void sendOne(String topic, String clientId, byte[] data) {
Set<UUID> uids = CLIENT_ID.get(clientId);
if (uids != null) {
for (UUID uid : uids) {
this.socketIOServer.getClient(uid).sendEvent(topic, data);
}
}
}
public void send(String data) {
this.socketIOServer.getBroadcastOperations().sendEvent("message", data);
}
public void send(String event, String data) {
this.socketIOServer.getBroadcastOperations().sendEvent(event, data);
}
public void sendOne(String event, String data, String userId) {
Set<UUID> uids = CLIENT_ID.get(userId);
if (uids != null) {
for (UUID uid : uids) {
this.socketIOServer.getClient(uid).sendEvent(event, data);
}
}
}
/**
* 刷新在线人数
*/
@Scheduled(cron = "0/10 * * * * ?")
public void reshCount() {
if (this.socketLinesService != null) {
this.socketLinesService.reshCount(0, PC_COUNT.size());
this.socketLinesService.reshCount(1, MOBILE_ID.size());
}
}
}
3、 消息发送工具
@Component
public class SendMsgUtil {
private static SendMsgUtil sendMsgUtil;
@Autowired
private SocketService socketService;
@PostConstruct
public void start() {
sendMsgUtil = this;
}
public static void send2User(String userId, String event, MsgEntity msg) {
sendMsgUtil.socketService.sendOne(event, JSON.toJSONString(msg), userId);
}
public static void send2User(String userId, String event, byte[] data) {
sendMsgUtil.socketService.sendOne(event, userId, data);
}
public static void send2User(String userId, MsgEntity msg) {
try {
sendMsgUtil.socketService.sendOne("message", JSON.toJSONString(msg), userId);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void sendAll(String event, String data) {
sendMsgUtil.socketService.send(event, data);
}
public static void sendAll(String event, byte[] data) {
sendMsgUtil.socketService.sendAll(event, data);
}
}
4、发送消息
@RestController
@RequestMapping(value = "admin/demo")
public class TestSocketController {
@ApiOperation(value = "测试发送")
@GetMapping(value = "send")
public ResultMsg send(
@RequestParam String name
) {
//
PlayerOuterClass.Player.Builder builder = PlayerOuterClass.Player.newBuilder();
builder.setId(1).setName(name).setEnterTime(System.currentTimeMillis());
PlayerOuterClass.Player player = builder.build();
System.out.println(player.getName());
//发送普通文本消息
SendMsgUtil.sendAll("topic", "cn_yaojin");
//发送protobuf协议消息
SendMsgUtil.sendAll("topic1", player.toByteArray());
return ResultMsg.builder();
}
}
5、proto文件:Player.proto,使用idea插件生成java代码
syntax = "proto3";
package com.cn.web.msg;
message Player {
uint32 id = 1; //唯一ID 首次登录时设置为0,由服务器分配
string name = 2; //显示名字
uint64 enterTime = 3; //登录时间
}
二、前端
<!DOCTYPE html>
<html>
<head>
<title>Socket.IO chat</title>
<style>
body {
margin: 0;
padding-bottom: 3rem;
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif;
}
#form {
background: rgba(0, 0, 0, 0.15);
padding: 0.25rem;
position: fixed;
bottom: 0;
left: 0;
right: 0;
display: flex;
height: 3rem;
box-sizing: border-box;
backdrop-filter: blur(10px);
}
#input {
border: none;
padding: 0 1rem;
flex-grow: 1;
border-radius: 2rem;
margin: 0.25rem;
}
#input:focus {
outline: none;
}
#form>button {
background: #333;
border: none;
padding: 0 1rem;
margin: 0.25rem;
border-radius: 3px;
outline: none;
color: #fff;
}
#messages {
list-style-type: none;
margin: 0;
padding: 0;
}
#messages>li {
padding: 0.5rem 1rem;
}
#messages>li:nth-child(odd) {
background: #efefef;
}
</style>
<!-- 引入2.0版本的socket.io文件 -->
<script src="https://cdn.jsdelivr.net/npm/socket.io-client@2/dist/socket.io.js" ></script>
<!-- 引入protobuf文件 -->
<script src="//cdn.jsdelivr.net/npm/protobufjs@7.X.X/dist/protobuf.js"></script>
</head>
<body>
<ul id="messages"></ul>
<form id="form" action="">
<input id="input" autocomplete="off" /><button>Send</button>
</form>
<script type="module">
//建立socket连接
const socket = io("http://127.0.0.1:3091", {
reconnectionDelayMax: 10000,
auth: {
token: "123"
},
query: {
"userId": "cn_yaojin"
}
});
socket.emit("emit","hello world !");
socket.on("connect", () => {
console.log("通道已建立")
});
//topic类型消息是服务端发送的普通文本
socket.on("topic", (data) => {
console.log("收到新的数据了");
});
//加载Player.proto文件(在前端代码目录下的某个位置,我放在了当前目录) 并解析收到socket数据(服务端发送的:topic1 类型消息
protobuf.load("Player.proto", function (err, root) {
// Player 是proto文件中定义的实体名称
const tickerData = root.lookupType("Player");
//监听服务端发送的 topic1 消息
socket.on("topic1", (data) => {
console.log("收到新的数据了");
//解析数据
var message = tickerData.decode(new Uint8Array(data));
var object = tickerData.toObject(message, {
longs: String,
enums: String,
bytes: String,
// see ConversionOptions
});
console.log(object)
//另一种解析方式
// var d = new Uint8Array(data.byteLength);
// var dataView = new DataView(data);
// for (var i = 0; i < data.byteLength; i++) {
// d[i] = dataView.getInt8(i);
// }
// //解析后的数据
// let decoded = tickerData.decode(d);
// console.log(`decoded = ${JSON.stringify(decoded)}`);
// console.log(decoded.id+"---"+decoded.name)
});
});
</script>
</body>
</html>