使用Spring Boot实现SSE长连接:跟踪文件上传和任务进度
文章目录
- 使用Spring Boot实现SSE长连接:跟踪文件上传和任务进度
- 什么是SSE?
- 使用场景
- 前端库选择
- 安装`event-source-polyfill`
- 1. 创建SSE连接
- 2. 关闭SSE连接
- 3. 结合Vue.js使用
- 使用Spring Boot实现SSE
- 1. 创建SSE工具类
- 2. 实现文件上传进度通知
- 3. 实现任务执行进度跟踪
在现代Web应用中,服务器实时向客户端推送数据是一项非常常见的需求。在多种实现技术中,Server-Sent Events(SSE)是一个轻量级的解决方案,适用于对实时性要求不高、数据量不大的场景。本文将介绍如何在Spring Boot中使用SSE,结合实际案例展示在文件上传和任务执行中的应用。
什么是SSE?
Server-Sent Events(SSE)是HTML5标准的一部分,允许服务器单向推送消息到客户端。它与WebSocket不同,SSE只支持服务器向客户端推送数据,而不支持客户端向服务器发送数据。SSE的优点在于其实现简单、兼容性好,非常适合不需要双向通讯的场景。
使用场景
- 文件上传进度通知:当用户上传文件时,服务器可以通过SSE实时告知客户端上传进度。
- 任务执行进度跟踪:对于耗时的任务(如数据处理、批量导入等),可以通过SSE向客户端实时推送任务进度。
前端库选择
由于原生的EventSource
在某些浏览器中可能不支持自定义请求头,因此选择event-source-polyfill
库来建立SSE连接。这一库允许在初始化时设置请求头,如身份验证所需的token等。
安装event-source-polyfill
首先安装event-source-polyfill
库:
npm install event-source-polyfill
前端实现步骤
1. 创建SSE连接
通过封装一个createSseConnection函数,建立与服务端的SSE连接,并定义如何处理不同消息事件:
import { EventSourcePolyfill } from "event-source-polyfill";
import config from "../../../../config";
export function createSseConnection(context, topic, callbacks, showMessage, onError) {
const url = config.sse_host[process.env.NODE_ENV] + "/techik/sse/subscribe?topic=" + topic;
const headers = { "token": localStorage.getItem("token") };
const source = new EventSourcePolyfill(url, {
headers,
heartbeatTimeout: 30 * 60 * 1000,
});
source.onopen = () => {
console.log("SSE connection established.");
};
source.onmessage = (e) => {
const message = e.data;
if (callbacks.onMessage) {
callbacks.onMessage(message, context);
}
if (showMessage && message.includes('success')) {
context.$message({
type: "success",
duration: 3000,
message: "提交成功!",
});
}
};
source.onerror = (e) => {
console.error("SSE error:", e);
if (callbacks.onError) {
callbacks.onError(e, context);
}
if (e.readyState === EventSource.CLOSED) {
console.log("SSE connection closed.");
if (callbacks.onClose) {
callbacks.onClose(context);
}
} else if (onError) {
onError(e);
}
};
return source;
}
2. 关闭SSE连接
提供一个closeSseConnection函数,当不再需要接收消息时,手动关闭SSE连接:
export function closeSseConnection(source, context, afterClose) {
if (source) {
source.close();
console.log("SSE connection closed.");
if (afterClose) {
afterClose(context);
}
}
}
3. 结合Vue.js使用
在Vue组件中使用createSseConnection和closeSseConnection管理SSE连接:
export default {
data() {
return {
sseSource: null,
};
},
methods: {
startListening() {
const topic = "uploadProgress"; // 根据需求选择不同的topic
this.sseSource = createSseConnection(this, topic, {
onMessage: this.handleMessage,
onError: this.handleError,
onClose: this.handleClose,
}, true);
},
handleMessage(message, context) {
console.log("Received message:", message);
// 处理接收到的消息
},
handleError(error, context) {
console.error("Error received:", error);
},
handleClose(context) {
console.log("Connection closed.");
},
stopListening() {
closeSseConnection(this.sseSource, this, () => {
this.sseSource = null;
});
}
},
beforeDestroy() {
this.stopListening();
}
};
使用Spring Boot实现SSE
1. 创建SSE工具类
首先,创建一个工具类SseUtils
来管理SSE连接:
package com.techik.Util;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
@Slf4j
public class SseUtils {
// 原子计数器,用于跟踪活跃的连接数
private static final AtomicInteger COUNT = new AtomicInteger(0);
// 存储主题和对应的SseEmitter映射关系,确保线程安全
private static final Map<String, SseEmitter> SSE_EMITTER_MAP = new ConcurrentHashMap<>();
/**
* 建立新的SSE连接,并设置相关的回调函数
*
* @param topic 要连接的主题
* @return 新创建的SseEmitter对象
*/
public static SseEmitter connect(String topic) {
// 设置超时时间为30分钟
SseEmitter sseemitter = new SseEmitter(30 * 60 * 1000L);
// 设置连接完成后的回调
sseemitter.onCompletion(completionCallBack(topic));
// 设置连接出错时的回调
sseemitter.onError(errorCallBack(topic));
// 设置连接超时时的回调
sseemitter.onTimeout(timeoutCallBack(topic));
// 将新的SseEmitter存储到Map中
SSE_EMITTER_MAP.put(topic, sseemitter);
// 增加活跃连接数
COUNT.incrementAndGet();
log.info("创建新的sse连接,当前的主题:{}", topic);
return sseemitter;
}
/**
* 发送消息到指定的主题
*
* @param topic 目标主题
* @param message 要发送的消息内容
*/
public static void sendMessage(String topic, String message) {
if (SSE_EMITTER_MAP.containsKey(topic)) {
try {
// 发送消息
SSE_EMITTER_MAP.get(topic).send(message);
} catch (IOException e) {
log.error("当前的主题:{},发送消息-错误:{}", topic, e.getMessage());
}
}
}
/**
* 移除指定主题的连接
*
* @param topic 要移除的主题
*/
public static void removeTopic(String topic) {
// 从Map中移除SseEmitter
SSE_EMITTER_MAP.remove(topic);
// 减少活跃连接数
COUNT.decrementAndGet();
log.info("删除主题:{}", topic);
}
// 创建连接完成的回调函数
private static Runnable completionCallBack(String topic) {
return () -> {
log.info("结束连接,{}", topic);
removeTopic(topic);
};
}
// 创建连接超时的回调函数
private static Runnable timeoutCallBack(String topic) {
return () -> {
log.info("连接超时,{}", topic);
removeTopic(topic);
};
}
// 创建连接出错的回调函数
private static Consumer<Throwable> errorCallBack(String topic) {
return throwable -> {
log.error("连接异常,{}", topic);
removeTopic(topic);
};
}
}
2. 实现文件上传进度通知
在上传文件的过程中,可以使用SseEmitter向客户端实时推送上传进度:
@PostMapping("/upload")
public ResponseEntity<String> uploadFile(MultipartFile file) {
String topic = "uploadProgress";
SseEmitter emitter = SseUtils.connect(topic);
// 模拟上传文件并推送进度
for (int i = 0; i <= 100; i += 10) {
SseUtils.sendMessage(topic, "上传进度: " + i + "%");
Thread.sleep(500); // 模拟耗时操作
}
SseUtils.sendMessage(topic, "上传完成!");
return ResponseEntity.ok("文件上传成功");
}
3. 实现任务执行进度跟踪
类似文件上传,当需要执行耗时任务时,可以使用SSE推送任务进度:
@GetMapping("/executeTask")
public ResponseEntity<String> executeTask() {
String topic = "taskProgress";
SseEmitter emitter = SseUtils.connect(topic);
// 模拟任务执行并推送进度
for (int i = 0; i <= 100; i += 20) {
SseUtils.sendMessage(topic, "任务进度: " + i + "%");
Thread.sleep(1000); // 模拟耗时操作
}
SseUtils.sendMessage(topic, "任务完成!");
return ResponseEntity.ok("任务执行成功");
}