什么是SSE
SSE 即服务器发送事件(Server-Sent Events),是一种服务器推送技术,允许服务器在客户端建立连接后,主动向客户端推送数据。
SSE 基于 HTTP 协议,使用简单,具有轻量级、实时性和断线重连等特点。它在一些需要实时数据更新的场景中非常有用,如股票行情、实时通知等。与传统的轮询方式相比,SSE 可以减少不必要的网络请求,提高数据传输效率。
SSE 的主要优点包括:
实时性:服务器可以实时推送数据到客户端,无需客户端不断轮询。
轻量级:SSE 使用简单的文本协议,数据量小,对网络带宽要求较低。
兼容性好:SSE 基于 HTTP 协议,大多数现代浏览器都支持。
易于实现:服务器端和客户端的实现都相对简单。
然而,SSE 也有一些局限性:
单向通信:SSE 只允许服务器向客户端推送数据,客户端无法直接向服务器发送数据。
支持的浏览器有限:虽然大多数现代浏览器支持 SSE,但一些较旧的浏览器可能不支持。
数据格式受限:SSE 通常只能传输文本数据,对于二进制数据的支持有限。
与 HTTP 相比,SSE 提供了更高效的实时数据推送机制,减少了不必要的请求和响应,降低了服务器负载。但 HTTP 更适合一般性的请求-响应模式的数据传输。
SSE WebSocket 对比
SSE 的优点:
- 简单易用:SSE 使用标准的 HTTP 协议,实现相对简单,不需要复杂的握手和协议转换。
- 单向通信:适合只需从服务器向客户端推送数据的场景,减少了不必要的双向通信开销。
- 低延迟:由于基于 HTTP 协议,数据可以在服务器有新数据时立即推送,延迟较低。
- 兼容性好:大多数现代浏览器都支持 SSE,不需要特殊的插件或扩展。
- 轻量级:相比 WebSocket,SSE 的实现相对较轻量,对服务器资源的消耗较少。
- 自动重连:如果连接中断,SSE 会自动尝试重新连接,确保数据的持续推送。
SSE 的缺点:
- 单向通信限制:SSE 只支持服务器向客户端发送数据,客户端无法向服务器发送数据。
- 数据格式受限:SSE 通常只能发送文本数据,对于二进制数据的支持有限。
- 连接管理:每个 SSE 连接在每次数据推送后都会关闭,然后需要重新建立连接,这可能会导致一些额外的开销。
** WebSocket 的优点:**
- 全双工通信:支持双向通信,客户端和服务器可以随时互相发送数据,适用于实时交互性较高的应用。
- 低延迟:建立连接后,数据可以实时传输,延迟较低。
- 二进制支持:WebSocket 可以发送文本和二进制数据,更适合处理多媒体等二进制数据。
- 较少的 HTTP 开销:由于建立了持久连接,减少了 HTTP 请求头和响应头的开销。
WebSocket 的缺点:
- 协议复杂性:WebSocket 协议相对较复杂,需要更多的代码和服务器资源来处理连接和数据传输。
- 兼容性问题:虽然大多数现代浏览器支持 WebSocket,但在一些旧版本的浏览器或特定环境中可能存在兼容性问题。
- 安全风险:由于 WebSocket 可以实现双向通信,需要注意安全问题,如防止跨站脚本攻击(XSS)和跨站请求伪造(CSRF)。
- 服务器资源消耗:相比 SSE,WebSocket 可能会消耗更多的服务器资源,特别是在处理大量并发连接时。
SSE 适用于简单的单向数据推送场景,如新闻更新、实时通知等,而 WebSocket 更适合需要双向实时通信的场景,如在线聊天、实时游戏等。在选择使用哪种技术时,需要根据具体的应用需求、浏览器兼容性和服务器资源等因素进行综合考虑
效果演示
话不多说。直接上代码
Controller
package cn.ideamake.feishu.web.controller.sse;
import cn.hutool.core.thread.ThreadUtil;
import cn.ideamake.common.response.Result;
import cn.ideamake.feishu.pojo.dto.SseMessageDTO;
import cn.ideamake.feishu.service.sse.SseService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.validation.Valid;
/**
* @author Barcke
* @version 1.0
* @projectName feishu-application
* @className SseController
* @date 2024/6/5 10:14
* @slogan: 源于生活 高于生活
* @description:
**/
@Slf4j
@RequiredArgsConstructor
@RestController
@RequestMapping("/sse")
@Validated
public class SseController {
private final SseService sseService;
/**
* 创建sse链接
* @param clientId
* @return
*/
@GetMapping("/createConnect")
public SseEmitter createConnect(String clientId) {
return sseService.createConnect(clientId);
}
/**
* 给所有客户端发送消息
* @param msg
* @return
*/
@PostMapping("/broadcast")
public Result<Boolean> sendMessageToAllClient(@RequestBody String msg) {
ThreadUtil.execute(() -> {
sseService.sendMessageToAllClient(msg);
});
return Result.ok(true);
}
/**
* 给指定端发送消息
* @param sseMessageDTO
* @return
*/
@PostMapping("/sendMessage")
public Result<Boolean> sendMessageToOneClient(@RequestBody @Valid SseMessageDTO sseMessageDTO) {
ThreadUtil.execute(() -> {
sseService.sendMessageToOneClient(sseMessageDTO.getClientId(), sseMessageDTO.getData());
});
return Result.ok(true);
}
/**
* 关闭链接
* @param clientId
* @return
*/
@GetMapping("/closeConnect")
public Result<Boolean> closeConnect(@RequestParam("clientId") String clientId) {
ThreadUtil.execute(() -> {
sseService.closeConnect(clientId);
});
return Result.ok(true);
}
}
Service
package cn.ideamake.feishu.service.sse;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
* @author Barcke
* @version 1.0
* @projectName feishu-application
* @className SseService
* @date 2024/6/5 10:18
* @slogan: 源于生活 高于生活
* @description:
**/
public interface SseService {
/**
* 创建连接
*
* @param clientId 客户端ID
*/
SseEmitter createConnect(String clientId);
/**
* 根据客户端id获取SseEmitter对象
*
* @param clientId 客户端ID
*/
SseEmitter getSseEmitterByClientId(String clientId);
/**
* 发送消息给所有客户端
*
* @param msg 消息内容
*/
void sendMessageToAllClient(String msg);
/**
* 给指定客户端发送消息
*
* @param clientId 客户端ID
* @param msg 消息内容
*/
void sendMessageToOneClient(String clientId, String msg);
/**
* 关闭连接
*
* @param clientId 客户端ID
*/
void closeConnect(String clientId);
}
ServiceImpl
package cn.ideamake.feishu.service.sse.impl;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpStatus;
import cn.ideamake.feishu.pojo.dto.SseMessageDTO;
import cn.ideamake.feishu.service.sse.SseService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
import java.util.function.Consumer;
/**
* @author Barcke
* @version 1.0
* @projectName feishu-application
* @className SseServiceImpl
* @date 2024/6/5 10:18
* @slogan: 源于生活 高于生活
* @description:
**/
@Slf4j
@RequiredArgsConstructor
@Service
public class SseServiceImpl implements SseService {
/**
* 容器,保存连接,用于输出返回 ;可使用其他方法实现
*/
private static final Map<String, SseEmitter> SSE_CACHE = MapUtil.newConcurrentHashMap();
/**
* 重试次数
*/
private final Integer RESET_COUNT = 3;
/**
* 重试等待事件 单位 ms
*/
private final Integer RESET_TIME = 5000;
/**
* 根据客户端id获取SseEmitter对象
*
* @param clientId 客户端ID
*/
@Override
public SseEmitter getSseEmitterByClientId(String clientId) {
return SSE_CACHE.get(clientId);
}
/**
* 创建连接
*
* @param clientId 客户端ID
*/
@Override
public SseEmitter createConnect(String clientId) {
// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(0L);
// 是否需要给客户端推送ID
if (StrUtil.isBlank(clientId)) {
clientId = IdUtil.simpleUUID();
}
// 注册回调
// 长链接完成后回调接口(即关闭连接时调用)
sseEmitter.onCompletion(completionCallBack(clientId));
// 连接超时回调
sseEmitter.onTimeout(timeoutCallBack(clientId));
// 推送消息异常时,回调方法
sseEmitter.onError(errorCallBack(clientId));
SSE_CACHE.put(clientId, sseEmitter);
log.info("创建新的sse连接,当前用户:{} 累计用户:{}", clientId, SSE_CACHE.size());
try {
// 注册成功返回用户信息
sseEmitter.send(SseEmitter.event().id(String.valueOf(HttpStatus.HTTP_CREATED)).data(clientId, MediaType.APPLICATION_JSON));
} catch (IOException e) {
log.error("创建长链接异常,客户端ID:{} 异常信息:{}", clientId, e.getMessage());
}
return sseEmitter;
}
/**
* 发送消息给所有客户端
*
* @param msg 消息内容
*/
@Override
public void sendMessageToAllClient(String msg) {
if (MapUtil.isEmpty(SSE_CACHE) || StringUtils.isBlank(msg)) {
return;
}
// 判断发送的消息是否为空
for (Map.Entry<String, SseEmitter> entry : SSE_CACHE.entrySet()) {
SseMessageDTO sseMessageDTO = new SseMessageDTO();
sseMessageDTO.setClientId(entry.getKey());
sseMessageDTO.setData(msg);
sendMsgToClientByClientId(entry.getKey(), sseMessageDTO, entry.getValue());
}
}
/**
* 给指定客户端发送消息
*
* @param clientId 客户端ID
* @param msg 消息内容
*/
@Override
public void sendMessageToOneClient(String clientId, String msg) {
SseMessageDTO sseMessageDTO = new SseMessageDTO(clientId, msg);
sendMsgToClientByClientId(clientId, sseMessageDTO, SSE_CACHE.get(clientId));
}
/**
* 关闭连接
*
* @param clientId 客户端ID
*/
@Override
public void closeConnect(String clientId) {
SseEmitter sseEmitter = SSE_CACHE.get(clientId);
if (sseEmitter != null) {
sseEmitter.complete();
removeUser(clientId);
}
}
/**
* 推送消息到客户端
* 此处做了推送失败后,重试推送机制,可根据自己业务进行修改
*
* @param clientId 客户端ID
* @param sseMessageDTO 推送信息,此处结合具体业务,定义自己的返回值即可
**/
private void sendMsgToClientByClientId(String clientId, SseMessageDTO sseMessageDTO, SseEmitter sseEmitter) {
if (sseEmitter == null) {
log.error("推送消息失败:客户端{}未创建长链接,失败消息:{}", clientId, sseMessageDTO);
return;
}
SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(String.valueOf(HttpStatus.HTTP_OK))
.data(sseMessageDTO, MediaType.APPLICATION_JSON);
try {
sseEmitter.send(sendData);
} catch (IOException e) {
// 推送消息失败,记录错误日志,进行重推
log.error("推送消息失败:{},尝试进行重推", sseMessageDTO);
boolean isSuccess = true;
// 推送消息失败后,每隔10s推送一次,推送5次
for (int i = 0; i < RESET_COUNT; i++) {
try {
Thread.sleep(RESET_TIME);
sseEmitter = SSE_CACHE.get(clientId);
if (sseEmitter == null) {
log.error("{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);
continue;
}
sseEmitter.send(sendData);
} catch (Exception ex) {
log.error("{}的第{}次消息重推失败", clientId, i + 1, ex);
continue;
}
log.info("{}的第{}次消息重推成功,{}", clientId, i + 1, sseMessageDTO);
return;
}
}
}
/**
* 长链接完成后回调接口(即关闭连接时调用)
*
* @param clientId 客户端ID
**/
private Runnable completionCallBack(String clientId) {
return () -> {
log.info("结束连接:{}", clientId);
removeUser(clientId);
};
}
/**
* 连接超时时调用
*
* @param clientId 客户端ID
**/
private Runnable timeoutCallBack(String clientId) {
return () -> {
log.info("连接超时:{}", clientId);
removeUser(clientId);
};
}
/**
* 推送消息异常时,回调方法
*
* @param clientId 客户端ID
**/
private Consumer<Throwable> errorCallBack(String clientId) {
return throwable -> {
log.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId);
// 推送消息失败后,每隔10s推送一次,推送5次
for (int i = 0; i < RESET_COUNT; i++) {
try {
Thread.sleep(RESET_TIME);
SseEmitter sseEmitter = SSE_CACHE.get(clientId);
if (sseEmitter == null) {
log.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失败,未获取到 {} 对应的长链接", i + 1, clientId);
continue;
}
sseEmitter.send("失败后重新推送");
} catch (Exception e) {
log.error("sse推送消息异常", e);
}
}
};
}
/**
* 移除用户连接
*
* @param clientId 客户端ID
**/
private void removeUser(String clientId) {
SSE_CACHE.remove(clientId);
log.info("SseEmitterServiceImpl[removeUser]:移除用户:{}", clientId);
}
}
DTO
package cn.ideamake.feishu.pojo.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import javax.validation.constraints.NotNull;
/**
* @author Barcke
* @version 1.0
* @projectName feishu-application
* @className SseMessageDTO
* @date 2024/6/5 10:19
* @slogan: 源于生活 高于生活
* @description:
**/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Accessors(chain = true)
public class SseMessageDTO {
/**
* 客户端id
*/
@NotNull(message = "客户端id 不能为空")
private String clientId;
/**
* 传输数据体(json)
*/
private String data;
}