业务场景:
领导让我们做一个根据进度实时刷新的进度条,如下所示
后面去网上查了下,可以通过websocket这种双向通信协议的持久链接实现。
配置
创建配置类,启用websocket支持
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
@EnableWebSocket
public class WebSocketConfiguration {
@Bean
public ServerEndpointExporter serverEndpointExporter (){
ServerEndpointExporter exporter = new ServerEndpointExporter();
return exporter;
}
}
定义处理类;
使用ConcurrentHashMap存储所有活跃的WebSocket连接会话
当客户端连接时(@OnOpen), 会将当前会话保存到静态变量sessions中
当客户端断开连接时(@OnClose), 从sessions中移除对应的会话
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
@ServerEndpoint("/progress")
public class ProgressWebSocketHandler {
private static ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<>();
private Session session;
// 收到消息
@OnMessage
public void onMessage(String message) throws IOException{
log.info("[websocket] 收到消息:id={},message={}", this.session.getId(), message);
if (message.equalsIgnoreCase("bye")) {
// 由服务器主动关闭连接。状态码为 NORMAL_CLOSURE(正常关闭)。
this.session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "Bye"));;
return;
}
this.session.getAsyncRemote().sendText("["+ this.session.getId() +"] Hello " + message);
}
// 连接打开
@OnOpen
public void onOpen(Session session, EndpointConfig endpointConfig){
// 保存 session 到对象
this.session = session;
sessions.put(session.getId(), session);
log.info("[websocket] 新的连接:id={}", this.session.getId());
}
// 连接关闭
@OnClose
public void onClose(CloseReason closeReason){
log.info("[websocket] 连接断开:id={},reason={}", this.session.getId(),closeReason);
}
// 连接异常
@OnError
public void onError(Throwable throwable) throws IOException {
log.info("[websocket] 连接异常:id={},throwable={}", this.session.getId(), throwable.getMessage());
// 关闭连接。状态码为 UNEXPECTED_CONDITION(意料之外的异常)
this.session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, throwable.getMessage()));
}
public static void sendMessage(String sessionId, String message) {
Session session = sessions.get(sessionId);
if (session != null && session.isOpen()) {
try {
session.getAsyncRemote().sendText("["+ session.getId() +"] Hello " + message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
测试链接
用postman或者aipfox创建一个websocket链接,地址格式为ws://+主机+端口+你指定的ServerEndpoint路径名,比如我在本地测试,websocket所在的微服务端口号为8118,在处理类用了@ServerEndpoint(“/progress”),那么链接地址为ws://localhost:8118/progress
点击链接,出行图下提示说明连接成功了
随便写点什么,点击发送,中括号里的数组就是sessionId(连接Id)。
业务代码
前端代码
前端不是我写的,不过也很好懂,大致意思就是开通一个websocket通道,然后把sessionId传给后端
getProgress() {
this.isFinish = false;
return new Promise((resolve, reject) => {
const id = this.generateUniqueId();
// 假设WebSocket服务的URL是 ws://yourserver.com/templates/variables
const socket = new WebSocket("ws://localhost:8118/progress?");
// 连接打开时触发的函数
socket.onopen = (event) => {
socket.send(this.id);
};
// 接收到服务器消息时触发的函数
socket.onmessage = (event) => {
let process = event.data.split(": ");
if (process.length > 1) {
this.process = process[1].replace("%", "");
}
if (this.process == "100") {
this.isFinish = true;
}
let match = event.data.match(/\[(\d+)\]/);
if (match) {
let result_number = match[1];
this.id = result_number;
// console.log(this.id, "this.ids");
resolve(this.id);
}
};
// 连接关闭时触发的函数
socket.onclose = function (event) {
if (event.wasClean) {
console.log("WebSocket 连接已正常关闭");
} else {
// 例如,服务器进程被杀死或网络中断
console.log("WebSocket 连接意外关闭");
}
console.log("关闭代码: " + event.code + " 关闭原因: " + event.reason);
};
// 发生错误时触发的函数
socket.onerror = function (error) {
console.error("WebSocket 发生错误:", error);
};
});
},
后端代码
在业务接口形参里接收前端传过来的sessionId,在你写的业务逻辑代码里面调用sendMessage方法,返回数据给前端
在测试软件上测试你的业务接口,这是你的业务代码接口,跟websocket接口是两个通道,不要混淆了。