SpringBoot
系列之@ServerEndpoint
方式开发WebSocket
应用。在实时的数据推送方面,经常会使用WebSocket
或者MQTT
来实现,WebSocket
是一种不错的方案,只需要建立连接,服务端和客户端就可以进行双向的数据通信。很多网站的客户聊天,也经常使用WebSocket技术来实现。
WebSocket简介
WebSocket
是一种建立在TCP协议上的一种网络协议,与Http协议类似,端口都是80
或者443
,协议标识符是ws
、如果是加密安全的就是wss
,这个和http/https
有点类似。WebSocket 连接以 HTTP 请求/响应握手开始,连接成功后,客户端可以向服务端发送消息,反之亦可,WebSocket协议支持二进制数据和文本字符串的传输。因为客户端和服务端之间只有一条TCP通信连接,以后所有的请求都使用这条连接,所以Websocket也是属于长连接。下面给出WebSocket通讯示意图:
WebSocket官网给出的HTTP和WebSocket的对比图:https://websocket.org/guides/road-to-websockets
实验环境准备
- JDK 1.8
- SpringBoot 3.3.0
- Maven 3.3.9
- 开发工具
- IntelliJ IDEA
- smartGit
新建WebSocket项目
在idea里新建一个module,选择Spring Initializr
项目,默认选择Spring
官网的https://start.spring.io
选择需要的依赖,这里可以选择Springboot
集成的WebSocket starter
生成项目后,检查一下对应Maven配置文件中是否有加上spring-boot-starter-websocket
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
如果不是Springboot
项目,加入spring-websocket
即可
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
<version>5.2.1.RELEASE</version>
<scope>compile</scope>
</dependency>
创建ServerEndpoint
创建一个Websocket
的ServerEndpoint
类,这个类是为了创建一个WebSocket
服务端,这个类使用线程安全的CopyOnWriteArrayList
集合来存储所有的WebSocket
对象,再自定义一个socketClientCode
,目的是为了客户端只和对应的服务端通信,客户端建立连接会进入onOpen
方法,发送消息会调用onMessage
方法
package com.example.springboot.websocket.message;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
@ServerEndpoint("/ws/webSocketServer")
@Component
@Slf4j
public class WebSocket {
private static final String PREFIX = "socketClient=";
private String socketClientCode;
private static CopyOnWriteArrayList<WebSocket> webSocketSet = new CopyOnWriteArrayList<>();
private Session session ;
@OnOpen
public void onOpen(Session session) {
this.session = session;
webSocketSet.add(this);
log.info("open a webSocket {}, online num: {}",getSocketClientCode(), getOnlineNum());
}
@OnClose
public void onClose() {
webSocketSet.remove(this);
log.error("close a webSocket {}, online num:{}", getSocketClientCode(), getOnlineNum());
printOnlineClientCode();
}
@OnError
public void onError(Session session, Throwable error) {
webSocketSet.remove(this);
log.error("webSocket error {}, {}, online num:{}", error, getSocketClientCode(), getOnlineNum());
printOnlineClientCode();
}
@OnMessage
public void onMessage(String message, Session session) {
log.info("receive message from client:{}", message);
// 业务实现
if (message.startsWith(PREFIX)) {
String socketClientCode = message.substring(PREFIX.length());
this.setSocketClientCode(socketClientCode);
sendMessage(message);
printOnlineClientCode();
} else {
sendMessage(message);
}
}
/**
* 发送消息
*
* @Date 2024/06/19 16:36
* @Param [message]
* @return void
*/
public void sendMessage(String message) {
if (!this.session.isOpen()) {
log.warn("webSocket is close");
return;
}
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("sendMessage exception:{}", e);
}
}
/**
* 给客户端发送消息
*
* @Date 2024/06/19 16:37
* @Param [message, socketClientCode]
* @return void
*/
public void sendMessageToClient(String message, String socketClientCode) {
log.info("send message to client, message:{}, clientCode:{}", message, socketClientCode);
printOnlineClientCode();
webSocketSet.stream().forEach(ws -> {
if (StrUtil.isNotBlank(socketClientCode) && StrUtil.isNotBlank(ws.getSocketClientCode()) && ws.getSocketClientCode().equals(socketClientCode)) {
ws.sendMessage(message);
}
});
}
/**
* 群发消息
*
* @Date 2024/06/19 16:37
* @Param [message]
* @return void
*/
public void fanoutMessage(String message) {
webSocketSet.forEach(ws -> {
ws.sendMessage(message);
});
}
private static synchronized int getOnlineNum() {
return webSocketSet.size();
}
private void printOnlineClientCode() {
webSocketSet.stream().forEach(ws -> {
log.info("webSocket online:{}", ws.getSocketClientCode());
});
}
public String getSocketClientCode() {
return socketClientCode;
}
public void setSocketClientCode(String socketClientCode) {
this.socketClientCode = socketClientCode;
}
}
加上ServerEndpointExporter
在Springboot
项目中为了能扫描到所有的ServerEndpoint
,需要注入一个ServerEndpointExporter
,这个类能扫描项目里所有的ServerEndpoint
类,不加的话,客户端会一直连不上服务端
package com.example.springboot.websocket.configuration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfiguration {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
启动SpringBoot
项目,一个WebSocket
服务端就建立好了。网上找一个websocket测试网站,https://www.wetools.com/websocket,测试一下服务是否正常,如图:
前端WebSocket客户端
写一个WebSocket
调用的客户端,启动服务器,换一下WebSocket的地址,ws://127.0.0.1:8080/ws/webSocketServer,如果是https的,就换成wss://127.0.0.1:8080/ws/webSocketServer
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>webSocket</title>
<style type="text/css">
</style>
</head>
<body>
<h1>WebSocket Demo</h1>
<input type="button" onclick="websocket.send('666666')" value="点我发消息"/>
</body>
<script type="application/javascript">
var websocket = {
send: function (str) {
}
};
window.onload = function () {
if (!'WebSocket' in window) return;
webSocketInit();
};
function webSocketInit() {
websocket = new WebSocket("ws://127.0.0.1:8080/ws/webSocketServer");
//建立连接
websocket.onopen = function () {
websocket.send("socketClient=666");
console.log("成功连接到服务器");
};
//接收到消息
websocket.onmessage = function (event) {
console.log(event.data);
};
//连接发生错误
websocket.onerror = function () {
alert("WebSocket连接发生错误");
};
//连接关闭
websocket.onclose = function () {
alert("WebSocket连接关闭");
};
//监听窗口关闭
window.onbeforeunload = function () {
websocket.close()
};
}
</script>
</html>
走一个,在浏览器按F12,看看日志
后端的日志打印:
服务端给客户端发送消息
给客户端发送消息,为了只给对应的客户端发送消息,这里加上一个校验,只给注册的客户端发送
/**
* 给客户端发送消息
*
* @Date 2024/06/19 16:37
* @Param [message, socketClientCode]
* @return void
*/
public void sendMessageToClient(String message, String socketClientCode) {
log.info("send message to client, message:{}, clientCode:{}", message, socketClientCode);
printOnlineClientCode();
webSocketSet.stream().forEach(ws -> {
if (StrUtil.isNotBlank(socketClientCode) && StrUtil.isNotBlank(ws.getSocketClientCode()) && ws.getSocketClientCode().equals(socketClientCode)) {
ws.sendMessage(message);
}
});
}
写一个API接口:
package com.example.springboot.websocket.rest;
import cn.hutool.json.JSONUtil;
import com.example.springboot.websocket.dto.WebSocketDto;
import com.example.springboot.websocket.message.WebSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
@RestController
@RequestMapping("/api")
@Slf4j
public class WebSocketApiController {
@Resource
@Qualifier("webSocket")
private WebSocket webSocket;
@PostMapping
@RequestMapping("/sendMessage")
public ResultBean<Boolean> sendMessage(@RequestBody WebSocketDto sendDto) {
log.info("webSocket发送消息给客户端:{}", JSONUtil.toJsonStr(sendDto));
try {
webSocket.sendMessageToClient(sendDto.getMessage(), sendDto.getSocketClient());
return ResultBean.ok(true);
} catch (Exception e) {
log.error("发送WebSocket消息异常:{}", e);
return ResultBean.badRequest("发送WebSocket消息异常", false);
}
}
}