防止推送消息乱码
import org.jetbrains.annotations.NotNull;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.nio.charset.StandardCharsets;
/**
* @Description 防止中文乱码
* @Author WangKun
* @Date 2024/7/30 11:22
* @Version
*/
public class SseUTF8 extends SseEmitter {
public SseUTF8(Long timeout) {
super(timeout);
}
@Override
protected void extendResponse(@NotNull ServerHttpResponse outputMessage) {
super.extendResponse(outputMessage);
HttpHeaders headers = outputMessage.getHeaders();
headers.setContentType(new MediaType(MediaType.TEXT_EVENT_STREAM, StandardCharsets.UTF_8));
}
}
SseEmitter工具类
import com.harmonywisdom.enums.ResultCode;
import com.harmonywisdom.utils.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.lang.reflect.Field;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @Description SSE消息推送工具
* @Author WangKun
* @Date 2024/7/29 14:55
* @Version
*/
@Component
@Slf4j
public class SseUtils {
/**
* 容器
*/
private static final ConcurrentHashMap<String, SseUTF8> SSE_MAP_CACHE = new ConcurrentHashMap<>(0);
/**
* 默认时长不过期(默认30s)
*/
private static final long DEFAULT_TIMEOUT = 0L;
/**
* @param userId
* @Description 创建连接
* @Throws
* @Return SseUTF8
* @Date 2024-07-29 15:01:58
* @Author WangKun
**/
public static SseUTF8 createConnect(String userId) {
SseUTF8 sseEmitter = new SseUTF8(DEFAULT_TIMEOUT);
// 需要给客户端推送ID
if (SSE_MAP_CACHE.containsKey(userId)) {
remove(userId);
}
// 长链接完成后回调接口(关闭连接时调用)
sseEmitter.onCompletion(() -> {
log.info("SSE连接结束:{}", userId);
remove(userId);
});
// 连接超时回调
sseEmitter.onTimeout(() -> {
log.error("SSE连接超时:{}", userId);
remove(userId);
});
// 连接异常时,回调方法
sseEmitter.onError(
throwable -> {
try {
log.info("SSE{}连接异常,{}", userId, throwable.toString());
sseEmitter.send(SseUTF8.event()
.id(userId)
.name("发生异常!")
.data("发生异常重试!")
.reconnectTime(3000));
SSE_MAP_CACHE.put(userId, sseEmitter);
} catch (IOException e) {
log.error("用户--->{} SSE连接失败重试,异常信息--->{}", userId, e.getMessage());
e.printStackTrace();
}
}
);
SSE_MAP_CACHE.put(userId, sseEmitter);
try {
// 注册成功返回用户信息
sseEmitter.send(SseUTF8.event().id(String.valueOf(ResultCode.CONNECT_SUCCESS.getCode())).data(userId, MediaType.APPLICATION_JSON));
} catch (IOException e) {
log.error("用户--->{} SSE连接失败,异常信息--->{}", userId, e.getMessage());
}
return sseEmitter;
}
/**
* @param userId
* @Description 移除用户连接
* @Throws
* @Return void
* @Date 2024-07-29 15:07:03
* @Author WangKun
**/
private static void remove(String userId) {
SSE_MAP_CACHE.remove(userId);
log.info("SSE移除用户连接--->{} ", userId);
}
/**
* @param userId
* @Description 关闭连接
* @Throws
* @Return void
* @Date 2024-07-29 15:38:16
* @Author WangKun
**/
public static void closeConnect(String userId) {
SseUTF8 sseEmitter = SSE_MAP_CACHE.get(userId);
if (sseEmitter != null) {
sseEmitter.complete();
log.info("SSE关闭连接:{}", userId);
remove(userId);
}
}
/**
* @param userId
* @param message
* @param sseEmitter
* @Description 推送消息到客户端
* @Throws
* @Return void
* @Date 2024-07-29 15:48:19
* @Author WangKun
**/
private static boolean sendMsgToClient(String userId, String message, SseUTF8 sseEmitter) {
// 推送之前检测心态是否存在
boolean isAlive = checkSseConnectAlive(sseEmitter);
if (!isAlive) {
// 失去连接移除
log.error("SSE推送消息失败:客户端{}未创建长链接或者关闭,失败消息:{}", userId, message);
SSE_MAP_CACHE.remove(userId);
return false;
}
SseUTF8.SseEventBuilder sendData = SseUTF8.event().id(String.valueOf(ResultCode.CONNECT_SUCCESS.getCode())).data(message, MediaType.APPLICATION_JSON);
try {
sseEmitter.send(sendData);
return true;
} catch (IOException e) {
log.error("推送消息失败:{}", message);
}
return true;
}
/**
* @param sseEmitter
* @Description 检测连接心跳
* @Throws
* @Return boolean
* @Date 2024-07-30 17:27:32
* @Author WangKun
**/
public static boolean checkSseConnectAlive(SseUTF8 sseEmitter) {
if (sseEmitter == null) {
return false;
}
// 返回true代表还连接, 返回false代表失去连接
return !(Boolean) getField(sseEmitter, sseEmitter.getClass(), "sendFailed") &&
!(Boolean) getField(sseEmitter, sseEmitter.getClass(), "complete");
}
/**
* @param obj
* @param clazz
* @param fieldName
* @Description 反射获取 sendFailed complete
* @Throws
* @Return java.lang.Object
* @Date 2024-07-30 17:27:49
* @Author WangKun
**/
public static Object getField(Object obj, Class<?> clazz, String fieldName) {
for (; clazz != Object.class; clazz = clazz.getSuperclass()) {
try {
Field field;
field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
return field.get(obj);
} catch (Exception ignored) {
}
}
return null;
}
/**
* @param msg
* @Description 发送消息给所有客户端
* @Throws
* @Return void
* @Date 2024-07-29 15:48:40
* @Author WangKun
**/
public static boolean sendTextMessage(String msg) {
if (SSE_MAP_CACHE.isEmpty()) {
return false;
}
if (StringUtils.isEmpty(msg) || StringUtils.isBlank(msg)) {
return false;
}
boolean isSuccess = false;
for (Map.Entry<String, SseUTF8> entry : SSE_MAP_CACHE.entrySet()) {
isSuccess = sendMsgToClient(entry.getKey(), msg, entry.getValue());
if (!isSuccess) {
log.error("群发客户端{}消息推送,失败消息:{}", entry.getKey(), msg);
}
}
return isSuccess;
}
/**
* @param clientId
* @param msg
* @Description 给指定客户端发送消息
* @Throws
* @Return Boolean
* @Date 2024-07-29 15:51:30
* @Author WangKun
**/
public static boolean sendTextMessage(String clientId, String msg) {
return sendMsgToClient(clientId, msg, SSE_MAP_CACHE.get(clientId));
}
/**
* @Description 检测客户端心跳(连接状态,给客户端发送信息,如果sendFailed,complete返回false 移除客户端,说明客户端关闭)
* @param
* @Throws
* @Return void
* @Date 2024-07-31 16:22:25
* @Author WangKun
**/
@Async("threadPoolExecutor")
@Scheduled(cron = "0 0/15 * * * ?")
public void checkSseAlive() {
log.info("检测客户端连接状态");
sendTextMessage("LIVE");
}
}
使用方法
/**
* @Description 测试sse
* @Author WangKun
* @Date 2024/7/29 15:56
* @Version
*/
@RequiredArgsConstructor
@RestController
@RequestMapping("/api/sse")
public class TestSSEController {
@Log("测试SSE消息连接")
@AnonymousPostMapping(value = "/connect")
public SseUTF8 connect(@RequestParam String userId) {
return SseUtils.createConnect(userId);
}
@Log("测试SSE消息推送")
@AnonymousPostMapping(value = "/send")
public ResponseResult<Boolean> send(@RequestParam String userId, @RequestParam String param) {
boolean flag = SseUtils.sendTextMessage(userId, param);
if (!flag) {
return ResponseResult.error(ResultCode.UNAVAILABLE_FOR_LEGAL_REASONS.getCode(), ResultCode.UNAVAILABLE_FOR_LEGAL_REASONS.getMsg());
}
return ResponseResult.success(true, ResultCode.OK.getCode());
}
@Log("测试SSE所有客户端消息推送")
@AnonymousPostMapping(value = "/sendAll")
public ResponseResult<Boolean> sendAll(@RequestParam String param) {
boolean flag = SseUtils.sendTextMessage(param);
if (!flag) {
return ResponseResult.error(ResultCode.UNAVAILABLE_FOR_LEGAL_REASONS.getCode(), ResultCode.UNAVAILABLE_FOR_LEGAL_REASONS.getMsg());
}
return ResponseResult.success(true, ResultCode.OK.getCode());
}
@Log("测试SSE消息关闭")
@AnonymousPostMapping(value = "/close")
public ResponseResult<String> close(@RequestParam String userId) {
SseUtils.closeConnect(userId);
return ResponseResult.success(ResultCode.OK.getCode());
}
}
简要说明
1:
推送消息之前,检测推送得到客户端是否存在,不存在,直接移除,避免浪费。
前端关闭浏览器或者关闭界面要调用关闭接口,将其关闭。
结果:
连接
推送:
给admin的客户端推送
给全部客户端推送