一、SSE是什么?
SSE技术是基于单工通信模式,只是单纯的客户端向服务端发送请求,服务端不会主动发送给客户端。服务端采取的策略是抓住这个请求不放,等数据更新的时候才返回给客户端,当客户端接收到消息后,再向服务端发送请求,周而复始。
注意:因为EventSource对象是SSE的客户端,可能会有浏览器对其不支持
二、sse 与 websoket
1、SSE(Server-Sent Events)
是 HTML5 遵循 W3C 标准提出的客户端和服务端之间进行实时通信的协议。
优点
SSE 客户端可以接收来自服务器的“流”数据,而不需要进行轮询。由于没有浪费的请求,因此 SSE 对于减轻服务器的压力非常有用。
SSE 使用纯 JavaScript 实现简单,不需要额外的插件或库来处理消息。客户端可以使用 EventSource 接口轻松地与 SSE 服务器通信。
SSE 天生具有自适应性,由于 SSE 是基于 HTTP 响应使用 EventStream 传递消息,因此它利用了 HTTP 的开销和互联网上的结构。
SSE 可以与任何服务器语言和平台一起使用,因为 SSE 是一种规定了消息传递方式的技术,不依赖于具体的服务器语言和平台。
缺点
SSE 是单向通信,只能从服务器推送到客户端。如果应用程序需要双向通信,就需要使用 Websocket。
SSE无法发送二进制数据,只能发送 UTF-8 编码的文本。如果应用程序需要发送二进制数据,就需要使用 Websocket。
SSE 不是所有浏览器都支持。虽然 SSE 是 HTML5 的一部分,但具体的浏览器支持性可能会有差异。
2、Websocket
是 HTML5 的一部分,提供了一种双向通信的机制。
优点
Websocket 支持双向通信。使用 Websocket 可以同时向客户端发送和接收数据。
Websocket 协议可以传输二进制数据,这使得 Websocket 更加灵活和强大。
Websocket 连接长期存在,而不需要仅仅为了接收数据而保持 HTTP 连接打开。
Websocket 的实现支持跨域的通信,可以方便地进行跨域通信。
缺点
Websocket 不支持所有浏览器。特别是老浏览器可能不支持 Websocket 协议。
Websocket 是一种全双工的通信方式。由于 Websocket 长期存在,会占用服务器资源。在高并发场景下,应该考虑使用 SSE。
三、前端示例代码
// 建立连接
createSseConnect(clientId){
if(window.EventSource){
const eventSource = new EventSource('http://127.0.0.1:8083/sse/createSseConnect?clientId='+clientId);
console.log(eventSource)
eventSource.onmessage = (event) =>{
console.log("onmessage:"+clientId+": "+event.data)
};
eventSource.onopen = (event) =>{
console.log("onopen:"+clientId+": "+event)
};
eventSource.onerror = (event) =>{
console.log("onerror :"+clientId+": "+event)
};
eventSource.close = (event) =>{
console.log("close :"+clientId+": "+event)
};
}else{
console.log("你的浏览器不支持SSE~")
}
console.log(" 测试 打印")
},
四、后端示例代码
SseController
package com.joker.cloud.linserver.controller;
import com.joker.cloud.linserver.conf.sse.sseUtils;
import com.joker.common.message.Result;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Map;
/**
* SseController
*
**/
@RestController
@Slf4j
@CrossOrigin
@RequestMapping("/sse")
public class SseController {
@Autowired
private sseUtils sseUtils;
@GetMapping(value = "/createSseConnect", produces="text/event-stream;charset=UTF-8")
public SseEmitter createSseConnect(@RequestParam(name = "clientId", required = false) Long clientId) {
return sseUtils.connect(clientId);
}
@PostMapping("/sendMessage")
public void sendMessage(@RequestParam("clientId") Long clientId, @RequestParam("message") String message){
sseUtils.sendMessage(clientId, "123456789", message);
}
@GetMapping(value = "/listSseConnect")
public Result<Map<Long, SseEmitter>> listSseConnect(){
Map<Long, SseEmitter> sseEmitterMap = sseUtils.listSseConnect();
return Result.success(sseEmitterMap);
}
/**
* 关闭SSE连接
*
* @param clientId 客户端ID
**/
@GetMapping("/closeSseConnect")
public Result closeSseConnect(Long clientId) {
sseUtils.deleteUser(clientId);
return Result.success();
}
}
sseUtils工具类
package com.joker.cloud.linserver.conf.sse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* sseUtils
**/
@Slf4j
@Component
public class sseUtils {
private static final Map<Long, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
/**
* 创建连接
*/
public SseEmitter connect(Long userId) {
if (sseEmitterMap.containsKey(userId)) {
SseEmitter sseEmitter =sseEmitterMap.get(userId);
sseEmitterMap.remove(userId);
sseEmitter.complete();
}
try {
UUID uuid = UUID.randomUUID();
String str = uuid.toString();
String temp = str.substring(0, 8) + str.substring(9, 13) + str.substring(14, 18) + str.substring(19, 23) + str.substring(24);
// 设置超时时间,0表示不过期。默认30秒
SseEmitter sseEmitter = new SseEmitter(30*1000L);
sseEmitter.send(SseEmitter.event().id(temp).data(""));
// reconnectTime(10*1000L)
// 注册回调
sseEmitter.onCompletion(completionCallBack(userId));
// sseEmitter.completeWithError(errorCallBack(userId));
sseEmitter.onTimeout(timeoutCallBack(userId));
sseEmitterMap.put(userId, sseEmitter);
log.info("创建sse连接完成,当前用户:{}", userId);
return sseEmitter;
} catch (Exception e) {
log.info("创建sse连接异常,当前用户:{}", userId);
}
return null;
}
/**
* 给指定用户发送消息
*
*/
public boolean sendMessage(Long userId,String messageId, String message) {
if (sseEmitterMap.containsKey(userId)) {
SseEmitter sseEmitter = sseEmitterMap.get(userId);
try {
sseEmitter.send(SseEmitter.event().id(messageId).data(message));
// reconnectTime(10*1000L)
log.info("用户{},消息id:{},推送成功:{}", userId,messageId, message);
return true;
}catch (Exception e) {
sseEmitterMap.remove(userId);
log.info("用户{},消息id:{},推送异常:{}", userId,messageId, e.getMessage());
sseEmitter.complete();
return false;
}
}else {
log.info("用户{}未上线", userId);
}
return false;
}
/**
* 删除连接
* @param userId
*/
public void deleteUser(Long userId){
removeUser(userId);
}
private static Runnable completionCallBack(Long userId) {
return () -> {
log.info("结束sse用户连接:{}", userId);
removeUser(userId);
};
}
private static Throwable errorCallBack(Long userId) {
log.info("sse用户连接异常:{}", userId);
removeUser(userId);
return new Throwable();
}
private static Runnable timeoutCallBack(Long userId) {
return () -> {
log.info("连接sse用户超时:{}", userId);
removeUser(userId);
};
}
/**
* 断开
* @param userId
*/
public static void removeUser(Long userId){
if (sseEmitterMap.containsKey(userId)) {
SseEmitter sseEmitter = sseEmitterMap.get(userId);
sseEmitterMap.remove(userId);
sseEmitter.complete();
}else {
log.info("用户{} 连接已关闭",userId);
}
}
public Map<Long, SseEmitter> listSseConnect(){
return sseEmitterMap;
}
}
五、模拟测试
浏览器建立的连接中会看到服务器推送到客户端的消息内容及ID等基础信息
控制台也可以监听到事件的变化并输出