什么是sse?
1、SSE 是Server-Sent Events(服务器发送事件)
2、SSE是一种允许服务器主动向客户端推送实时更新的技术。
3、它基于HTTP协议,并使用了其长连接特性,在客户端与服务器之间建立一条持久化的连接。 通过这条连接,服务器可以实时地向客户端发送事件流,而客户端可以监听这些事件并作出相应的处理。
4、SSE是单向通信机制,即只能由服务器向客户端推送数据,客户端不能通过SSE向服务器发送数据。
5、SSE在现代浏览器和移动设备上得到了广泛的支持,是实现实时Web应用的一种有效方式。
使用流程(经测试,此方式不会丢失消息,靠谱能用!)
1、引入springboot的web基本依赖,这里不细说
2、controller中
/**
* 订阅sse消息
*
* @return
*/
@CrossOrigin
@RequestMapping(path = "/subscribe/{userId}")
public SseEmitter subscribe(@PathVariable String userId) {
// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
return SSEServer.connect(userId);
}
3、SSEServer类
package com.orison.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
/**
* @ClassName SSEServer
* @Description TODO
* @Author xiaoli
* @Date 2022-10-26 18:00
* @Version 1.0
**/
@Slf4j
public class SSEServer {
/**
* 当前连接数
*/
private static AtomicInteger count = new AtomicInteger(0);
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
public static SseEmitter connect(String userId){
//设置超时时间,0表示不过期,默认是30秒,超过时间未完成会抛出异常
SseEmitter sseEmitter = new SseEmitter(0L);
//注册回调
sseEmitter.onCompletion(completionCallBack(userId));
sseEmitter.onError(errorCallBack(userId));
sseEmitter.onTimeout(timeOutCallBack(userId));
sseEmitterMap.put(userId,sseEmitter);
//数量+1
count.getAndIncrement();
log.info("create new sse connect ,current user:{}",userId);
return sseEmitter;
}
/**
* 给指定用户发消息
*/
public static void sendMessage(String userId, String message){
if(sseEmitterMap.containsKey(userId)){
try{
sseEmitterMap.get(userId).send(message);
}catch (IOException e){
log.error("user id:{}, send message error:{}",userId,e.getMessage());
log.error("Exception:",e);
}
}
}
/**
* 想多人发送消息,组播
*/
public static void groupSendMessage(String groupId, String message){
if(sseEmitterMap!=null&&!sseEmitterMap.isEmpty()){
sseEmitterMap.forEach((k,v) -> {
try{
if(k.startsWith(groupId)){
v.send(message, MediaType.APPLICATION_JSON);
}
}catch (IOException e){
log.error("user id:{}, send message error:{}",groupId,message);
removeUser(k);
}
});
}
}
public static void batchSendMessage(String message) {
sseEmitterMap.forEach((k,v)->{
try{
v.send(message,MediaType.APPLICATION_JSON);
}catch (IOException e){
log.error("user id:{}, send message error:{}",k,e.getMessage());
removeUser(k);
}
});
}
/**
* 群发消息
*/
public static void batchSendMessage(String message, Set<String> userIds){
userIds.forEach(userId->sendMessage(userId,message));
}
public static void removeUser(String userId){
sseEmitterMap.remove(userId);
//数量-1
count.getAndDecrement();
log.info("remove user id:{}",userId);
}
public static List<String> getIds(){
return new ArrayList<>(sseEmitterMap.keySet());
}
public static int getUserCount(){
return count.intValue();
}
private static Runnable completionCallBack(String userId) {
return () -> {
log.info("结束连接,{}",userId);
removeUser(userId);
};
}
private static Runnable timeOutCallBack(String userId){
return ()->{
log.info("连接超时,{}",userId);
removeUser(userId);
};
}
private static Consumer<Throwable> errorCallBack(String userId){
return throwable -> {
log.error("连接异常,{}",userId);
removeUser(userId);
};
}
}
4、前端
<script>
function createEventSource() {
const eventSource = new EventSource('http://localhost:13330/device/cameraDevice/subscribe/'+getRandomString(5));
eventSource.onmessage = function(event) {
console.log("sse连接中");
if (event.data){
console.log(event);
//这里就是请求streamEvents接口返回的值,此时就可以通过Ajax展示出来了
}
};
eventSource.onerror = function(event) {
console.error("sse连接失败,每5秒尝试重新连接");
// 关闭当前 EventSource 实例
eventSource.close();
// 尝试在 5 秒后重新连接(可以根据需要调整重连间隔)
setTimeout(createEventSource, 5000);
};
return eventSource;
}
// 初始化 EventSource 连接
createEventSource();
function getRandomString(len) {
const _charStr = 'abacdefghjklmnopqrstuvwxyzABCDEFGHJKLMNOPQRSTUVWXYZ0123456789';
let min = 0, max = _charStr.length - 1, _str = '';
//判断是否指定长度,否则默认长度为15
len = len || 15;
//循环生成字符串
for (var i = 0, index; i < len; i++) {
index = RandomIndex(min, max, i);
_str += _charStr[index];
}
return _str;
}
/**
* 随机生成索引
* @param min 最小值
* @param max 最大值
* @param i 当前获取位置
*/
function RandomIndex(min, max, i) {
const _charStr = 'abacdefghjklmnopqrstuvwxyzABCDEFGHJKLMNOPQRSTUVWXYZ0123456789';
let index = Math.floor(Math.random() * (max - min + 1) + min),
numStart = _charStr.length - 10;
//如果字符串第一位是数字,则递归重新获取
if (i == 0 && index >= numStart) {
index = RandomIndex(min, max, i);
}
//返回最终索引值
return index;
}
</script>