SSE(Server-Send-Event)服务端推送数据技术
大家是否遇到过服务端需要主动传输数据到客户端的情况,目前有三种解决方案。
- 客户端轮询更新数据。
- 服务端与客户端建立 Socket 连接双向通信
- 服务端与客户建立 SSE 连接单向通信
几种方案的比较:
-
轮询:
客户端通过频繁请求向服务端请求数据,达到类似实时更新的效果。轮询的优点是实现简单,但是会给服务端和网络带来额外的压力,且延迟较高。
-
WebSocket连接:
服务端与客户端建立Socket连接进行数据传输,Socket的传输方式是全双工的。WebSocket是基于 TCP 的长连接,和HTTP 协议相比,它能实现轻量级的、低延迟的数据传输,非常适合实时通信场景,主要用于交互性强的双向通信。
-
SSE推送:
SSE(Server-Sent Events)是一种基于 HTTP 协议的推送技术,只允许单向通讯。相较于 WebSocket,SSE 更简单、更轻量级。
下面是SpringBoot使用SSE的步骤和示例代码
-
配置依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-validation</artifactId> </dependency>
SSE已经集成到spring-web中,所以可以直接使用。
-
后端代码
import com.wry.wry_test.service.SseService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import javax.validation.constraints.NotBlank; import java.util.concurrent.CompletableFuture; @RestController @RequestMapping("/sse") @Slf4j @Validated public class SseTestController { @Autowired private SseService service; @GetMapping("/testSse") public SseEmitter testSse(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) { final SseEmitter emitter = service.getConn(clientId); CompletableFuture.runAsync(() -> { try { service.send(clientId); log.info("建立连接成功!clientId = {}", clientId); } catch (Exception e) { log.error("推送数据异常"); } }); return emitter; } @GetMapping("/sseConection") public SseEmitter createConnection(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) { return service.getConn(clientId); } @GetMapping("/sendMsg") public void sendMsg(@RequestParam("clientId") String clientId) { try { // 异步发送消息 CompletableFuture.runAsync(() -> { try { service.send(clientId); } catch (Exception e) { log.error("推送数据异常"); } }); } catch (Exception e) { e.printStackTrace(); } } @GetMapping("/sendMsgToAll") public void sendMsgToAll() { try { //异步发送消息 CompletableFuture.runAsync(() -> { try { service.sendToAll(); } catch (Exception e) { e.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } } @GetMapping("closeConn/{clientId}") public String closeConn(@PathVariable("clientId") @NotBlank(message = "客户端id不能为空") String clientId) { service.closeConn(clientId); return "连接已关闭"; } }
package com.wry.wry_test.service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import javax.validation.constraints.NotBlank; public interface SseService { /** * 获取连接 * @param clientId 客户端id * @return */ SseEmitter getConn(String clientId); /** * 发送消息到指定客户端 * @param clientId 客户端id * @throws Exception */ void send(String clientId); /** * 发送消息到所有SSE客户端 * @throws Exception */ void sendToAll() throws Exception; /** * 关闭指定客户端的连接 * @param clientId 客户端id */ void closeConn(String clientId); }
package com.wry.wry_test.service.impl; import com.wry.wry_test.service.SseService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import javax.validation.constraints.NotBlank; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Service @Slf4j public class SseServiceImpl implements SseService { private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>(); @Override public SseEmitter getConn(@NotBlank String clientId) { final SseEmitter sseEmitter = SSE_CACHE.get(clientId); if (sseEmitter != null) { return sseEmitter; } else { // 设置连接超时时间,需要配合配置项 spring.mvc.async.request-timeout: 600000 一起使用 final SseEmitter emitter = new SseEmitter(600_000L); // 注册超时回调,超时后触发 emitter.onTimeout(() -> { log.info("连接已超时,正准备关闭,clientId = {}", clientId); SSE_CACHE.remove(clientId); }); // 注册完成回调,调用 emitter.complete() 触发 emitter.onCompletion(() -> { log.info("连接已关闭,正准备释放,clientId = {}", clientId); SSE_CACHE.remove(clientId); log.info("连接已释放,clientId = {}", clientId); }); // 注册异常回调,调用 emitter.completeWithError() 触发 emitter.onError(throwable -> { log.error("连接已异常,正准备关闭,clientId = {}", clientId, throwable); SSE_CACHE.remove(clientId); }); SSE_CACHE.put(clientId, emitter); log.info("建立连接成功!clientId = {}", clientId); return emitter; } } /** * 模拟类似于 chatGPT 的流式推送回答 * * @param clientId 客户端 id * @throws IOException 异常 */ @Override public void send(@NotBlank String clientId) { final SseEmitter emitter = SSE_CACHE.get(clientId); if (emitter == null) return; // 开始推送数据 // todo 模拟推送数据 for (int i = 0; i < 10000000; i++) { String msg = "SSE 测试数据"; try { this.sseSend(emitter, msg, clientId); Thread.sleep(1000); } catch (Exception e) { log.error("推送数据异常", e); break; } } log.info("推送数据结束,clientId = {}", clientId); // 结束推流 emitter.complete(); } /** * 发送数据给所有连接 */ public void sendToAll() { List<SseEmitter> emitters = new ArrayList<>(SSE_CACHE.values()); for (int i = 0; i < 10000000; i++) { String msg = "SSE 测试数据"; this.sseSend(emitters, msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } @Override public void closeConn(@NotBlank String clientId) { final SseEmitter sseEmitter = SSE_CACHE.get(clientId); if (sseEmitter != null) { sseEmitter.complete(); } } /** * 推送数据封装 * * @param emitter sse长连接 * @param data 发送数据 * @param clientId 客户端id */ private void sseSend(SseEmitter emitter, Object data, String clientId) { try { emitter.send(data); log.info("推送数据成功,clientId = {}", clientId); } catch (Exception e) { log.error("推送数据异常", e); throw new RuntimeException("推送数据异常"); } } /** * 推送数据封装 * * @param emitter sse长连接 * @param data 发送数据 */ private void sseSend(List<SseEmitter> emitter, Object data) { emitter.forEach(e -> { try { e.send(data); } catch (IOException ioException) { log.error("推送数据异常", ioException); } }); log.info("推送数据成功"); } }
实现效果如下:服务端不断推送数据到前端,前端可以也可以调用接口主动关闭连接。
适用场景:SSE由于是服务端单向通讯,所以适合那种需要单向持久的连接。比如:
- ChatGPT这种实时加载会话数据
- 文件下载,通过SSE异步下载文件
- 服务端实时数据推送