SpringBoot SseEmitter,服务器单项消息推送

news2024/9/22 19:38:07

防止推送消息乱码

import org.jetbrains.annotations.NotNull;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.nio.charset.StandardCharsets;

/**
 * @Description 防止中文乱码
 * @Author WangKun
 * @Date 2024/7/30 11:22
 * @Version
 */
public class SseUTF8 extends SseEmitter {

    public SseUTF8(Long timeout) {
        super(timeout);
    }

    @Override
    protected void extendResponse(@NotNull ServerHttpResponse outputMessage) {
        super.extendResponse(outputMessage);
        HttpHeaders headers = outputMessage.getHeaders();
        headers.setContentType(new MediaType(MediaType.TEXT_EVENT_STREAM, StandardCharsets.UTF_8));
    }

}
SseEmitter工具类
import com.harmonywisdom.enums.ResultCode;
import com.harmonywisdom.utils.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.lang.reflect.Field;
import java.io.IOException;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @Description SSE消息推送工具
 * @Author WangKun
 * @Date 2024/7/29 14:55
 * @Version
 */
@Component
@Slf4j
public class SseUtils {

    /**
     * 容器
     */
    private static final ConcurrentHashMap<String, SseUTF8> SSE_MAP_CACHE = new ConcurrentHashMap<>(0);

    /**
     * 默认时长不过期(默认30s)
     */
    private static final long DEFAULT_TIMEOUT = 0L;

    /**
     * @param userId
     * @Description 创建连接
     * @Throws
     * @Return SseUTF8
     * @Date 2024-07-29 15:01:58
     * @Author WangKun
     **/
    public static SseUTF8 createConnect(String userId) {

        SseUTF8 sseEmitter = new SseUTF8(DEFAULT_TIMEOUT);
        // 需要给客户端推送ID
        if (SSE_MAP_CACHE.containsKey(userId)) {
            remove(userId);
        }
        // 长链接完成后回调接口(关闭连接时调用)
        sseEmitter.onCompletion(() -> {
            log.info("SSE连接结束:{}", userId);
            remove(userId);
        });
        // 连接超时回调
        sseEmitter.onTimeout(() -> {
            log.error("SSE连接超时:{}", userId);
            remove(userId);
        });
        // 连接异常时,回调方法
        sseEmitter.onError(
                throwable -> {
                    try {
                        log.info("SSE{}连接异常,{}", userId, throwable.toString());
                        sseEmitter.send(SseUTF8.event()
                                .id(userId)
                                .name("发生异常!")
                                .data("发生异常重试!")
                                .reconnectTime(3000));
                        SSE_MAP_CACHE.put(userId, sseEmitter);
                    } catch (IOException e) {
                        log.error("用户--->{} SSE连接失败重试,异常信息--->{}", userId, e.getMessage());
                        e.printStackTrace();
                    }
                }
        );
        SSE_MAP_CACHE.put(userId, sseEmitter);
        try {
            // 注册成功返回用户信息
            sseEmitter.send(SseUTF8.event().id(String.valueOf(ResultCode.CONNECT_SUCCESS.getCode())).data(userId, MediaType.APPLICATION_JSON));
        } catch (IOException e) {
            log.error("用户--->{} SSE连接失败,异常信息--->{}", userId, e.getMessage());
        }
        return sseEmitter;
    }

    /**
     * @param userId
     * @Description 移除用户连接
     * @Throws
     * @Return void
     * @Date 2024-07-29 15:07:03
     * @Author WangKun
     **/
    private static void remove(String userId) {
        SSE_MAP_CACHE.remove(userId);
        log.info("SSE移除用户连接--->{} ", userId);
    }

    /**
     * @param userId
     * @Description 关闭连接
     * @Throws
     * @Return void
     * @Date 2024-07-29 15:38:16
     * @Author WangKun
     **/
    public static void closeConnect(String userId) {
        SseUTF8 sseEmitter = SSE_MAP_CACHE.get(userId);
        if (sseEmitter != null) {
            sseEmitter.complete();
            log.info("SSE关闭连接:{}", userId);
            remove(userId);
        }
    }

    /**
     * @param userId
     * @param message
     * @param sseEmitter
     * @Description 推送消息到客户端
     * @Throws
     * @Return void
     * @Date 2024-07-29 15:48:19
     * @Author WangKun
     **/
    private static boolean sendMsgToClient(String userId, String message, SseUTF8 sseEmitter) {
        // 推送之前检测心态是否存在
        boolean isAlive = checkSseConnectAlive(sseEmitter);
        if (!isAlive) {
            // 失去连接移除
            log.error("SSE推送消息失败:客户端{}未创建长链接或者关闭,失败消息:{}", userId, message);
            SSE_MAP_CACHE.remove(userId);
            return false;
        }
        SseUTF8.SseEventBuilder sendData = SseUTF8.event().id(String.valueOf(ResultCode.CONNECT_SUCCESS.getCode())).data(message, MediaType.APPLICATION_JSON);
        try {
            sseEmitter.send(sendData);
            return true;
        } catch (IOException e) {
            log.error("推送消息失败:{}", message);
        }
        return true;
    }

    /**
     * @param sseEmitter
     * @Description 检测连接心跳
     * @Throws
     * @Return boolean
     * @Date 2024-07-30 17:27:32
     * @Author WangKun
     **/
    public static boolean checkSseConnectAlive(SseUTF8 sseEmitter) {
        if (sseEmitter == null) {
            return false;
        }
        // 返回true代表还连接, 返回false代表失去连接
        return !(Boolean) getField(sseEmitter, sseEmitter.getClass(), "sendFailed") &&
                !(Boolean) getField(sseEmitter, sseEmitter.getClass(), "complete");
    }

    /**
     * @param obj
     * @param clazz
     * @param fieldName
     * @Description 反射获取 sendFailed complete
     * @Throws
     * @Return java.lang.Object
     * @Date 2024-07-30 17:27:49
     * @Author WangKun
     **/
    public static Object getField(Object obj, Class<?> clazz, String fieldName) {
        for (; clazz != Object.class; clazz = clazz.getSuperclass()) {
            try {
                Field field;
                field = clazz.getDeclaredField(fieldName);
                field.setAccessible(true);
                return field.get(obj);
            } catch (Exception ignored) {
            }
        }

        return null;
    }

    /**
     * @param msg
     * @Description 发送消息给所有客户端
     * @Throws
     * @Return void
     * @Date 2024-07-29 15:48:40
     * @Author WangKun
     **/
    public static boolean sendTextMessage(String msg) {
        if (SSE_MAP_CACHE.isEmpty()) {
            return false;
        }
        if (StringUtils.isEmpty(msg) || StringUtils.isBlank(msg)) {
            return false;
        }
        boolean isSuccess = false;
        for (Map.Entry<String, SseUTF8> entry : SSE_MAP_CACHE.entrySet()) {
            isSuccess = sendMsgToClient(entry.getKey(), msg, entry.getValue());
            if (!isSuccess) {
                log.error("群发客户端{}消息推送,失败消息:{}", entry.getKey(), msg);
            }
        }
        return isSuccess;
    }

    /**
     * @param clientId
     * @param msg
     * @Description 给指定客户端发送消息
     * @Throws
     * @Return Boolean
     * @Date 2024-07-29 15:51:30
     * @Author WangKun
     **/
    public static boolean sendTextMessage(String clientId, String msg) {
        return sendMsgToClient(clientId, msg, SSE_MAP_CACHE.get(clientId));
    }

    /**
     * @Description 检测客户端心跳(连接状态,给客户端发送信息,如果sendFailed,complete返回false 移除客户端,说明客户端关闭)
      * @param
     * @Throws
     * @Return void
     * @Date 2024-07-31 16:22:25
     * @Author WangKun
     **/
    @Async("threadPoolExecutor")
    @Scheduled(cron = "0 0/15 * * * ?")
    public void checkSseAlive() {
        log.info("检测客户端连接状态");
        sendTextMessage("LIVE");
    }

}

使用方法

/**
 * @Description 测试sse
 * @Author WangKun
 * @Date 2024/7/29 15:56
 * @Version
 */
@RequiredArgsConstructor
@RestController
@RequestMapping("/api/sse")
public class TestSSEController {

    @Log("测试SSE消息连接")
    @AnonymousPostMapping(value = "/connect")
    public SseUTF8 connect(@RequestParam String userId) {
        return SseUtils.createConnect(userId);
    }

    @Log("测试SSE消息推送")
    @AnonymousPostMapping(value = "/send")
    public ResponseResult<Boolean> send(@RequestParam String userId, @RequestParam String param) {
        boolean flag = SseUtils.sendTextMessage(userId, param);
        if (!flag) {
            return ResponseResult.error(ResultCode.UNAVAILABLE_FOR_LEGAL_REASONS.getCode(), ResultCode.UNAVAILABLE_FOR_LEGAL_REASONS.getMsg());
        }
        return ResponseResult.success(true, ResultCode.OK.getCode());
    }

    @Log("测试SSE所有客户端消息推送")
    @AnonymousPostMapping(value = "/sendAll")
    public ResponseResult<Boolean> sendAll(@RequestParam String param) {
        boolean flag = SseUtils.sendTextMessage(param);
        if (!flag) {
            return ResponseResult.error(ResultCode.UNAVAILABLE_FOR_LEGAL_REASONS.getCode(), ResultCode.UNAVAILABLE_FOR_LEGAL_REASONS.getMsg());
        }
        return ResponseResult.success(true, ResultCode.OK.getCode());
    }

    @Log("测试SSE消息关闭")
    @AnonymousPostMapping(value = "/close")
    public ResponseResult<String> close(@RequestParam String userId) {
        SseUtils.closeConnect(userId);
        return ResponseResult.success(ResultCode.OK.getCode());
    }
}

简要说明

1:

推送消息之前,检测推送得到客户端是否存在,不存在,直接移除,避免浪费。

前端关闭浏览器或者关闭界面要调用关闭接口,将其关闭。

结果:

连接

推送:

给admin的客户端推送

给全部客户端推送

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1966960.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

阿里云实时计算Flink在多行业的应用和实践

摘要&#xff1a;本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践&#xff0c;对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术&#xff0c;并且提供一些在技术架构上的参考建议。内容分为以下四个部分&#xff1a; 业…

Magic-PDF:端到端PDF文档解析神器 构建高质量RAG必备!

项目结构 流程解析 预处理的作用是判断文档内容是否需要进行OCR识别&#xff0c;如果是普通可编辑的PDF文档&#xff0c;则使用PyMuPDF库提取元信息。 模型层除了常规的OCR、版面结构分析外&#xff0c;还有公式检测模型&#xff0c;可提取公式内容&#xff0c;用于后续把公式…

Ubuntu系统在两个屏幕上都显示任务栏

Ubuntu系统在两个屏幕上都显示任务栏 目标 希望在两个屏幕&#xff08;主屏和扩展屏&#xff09;上都显示下图的状态栏 解决方法 打开设置&#xff0c;找到>外观>Dock 2. 将显示于改成所有显示

明清进士人数数据

明清进士人数数据 指标&#xff1a;省份名称、城市名称、区县名称、明清各省进士人数、明清各城市进士人数、明清各县区进士人数 指标说明&#xff1a; Province[省份名称]-统计数据所属省份 City[城市名称]-统计数据所属地级市 Region[区县名称]-统计数据所属区县 MQpro…

ZooKeeper日志自动清理实用脚本

ZooKeeper日志自动清理:保持系统整洁的实用脚本 在管理ZooKeeper集群时,定期清理日志文件是一项重要但常被忽视的任务。本文将介绍一个简单而有效的bash脚本,用于自动清理ZooKeeper的日志和快照文件,并讨论如何使用cron来定期执行此脚本。 磁盘告警&#xff0c;所以写了一个脚…

如何用代码在数据库新建一个表格/HTML的跨行合并和跨列合并

1.用navicat新建一表格 数据库使用链接 2.前端代码 &#xff08;1&#xff09;跨行合并&#xff1a;rowspan“合并单元格的个数” 跨列合并&#xff1a;colspan“合并单元格的个数” <body> <table border"1”align"center” width"100%cellpaddin…

微信公众号,配置自定义菜单,跟回调授权网址配置入口,图讲解

微信公众号&#xff0c;配置自定义菜单&#xff0c;跟回调授权网址配置入口&#xff0c;图讲解

【图像识别】十大数据集合集!

本文将为您介绍10个经典、热门的数据集&#xff0c;希望对您在选择适合的数据集时有所帮助。 1 DanishFungi2020 发布方&#xff1a; Google 发布时间&#xff1a; 2021 简介&#xff1a; 补充材料&#xff1a;丹麦真菌 2020 - 不仅仅是另一个图像识别数据集为了支持细粒度植…

django网络爬虫系统- 计算机毕业设计源码81040

摘要 本论文主要论述了如何开发一个网络爬虫系统&#xff0c;对旅游景点信息进行爬取&#xff0c;本系统将严格按照软件开发流程进行各个阶段的工作&#xff0c;面向对象编程思想进行项目开发。在引言中&#xff0c;作者将论述网络爬虫系统的当前背景以及系统开发的目的&#x…

【计算机网络】TCP/IP四层模型

文章目录 TCP/IP模型应用层&#xff08;Application Layer&#xff09;传输层&#xff08;Transport Layer&#xff09;网络层&#xff08;Internet Layer&#xff09;网络接口层&#xff08;Network Interface Layer&#xff09; TCP/IP模型 经典的TCP/IP参考模型从上至下分为…

如何手动修复DLL丢失?directx修复工具收费吗?

在使用电脑时&#xff0c;我们有时会遇到DLL文件丢失或损坏的问题&#xff0c;导致软件无法正常运行。DLL&#xff08;动态链接库&#xff09;文件是Windows操作系统中重要的组成部分&#xff0c;用于共享程序代码和资源。本文将介绍如何手动修复DLL丢失问题&#xff0c;手动修…

【优秀python案例】基于Python的豆瓣电影TOP250爬虫与可视化设计与实现

摘要&#xff1a;伴随着当代社会物质水平的不断提高&#xff0c;人们越来越注重精神享受&#xff0c;看电影成为人们日常生活中重要的组成成分。本文将针对豆瓣上热门电影评论进行爬取&#xff0c;应用可视化分析更为形象地了解该电影的动态。该系统可以使得人们实时了解到有关…

WebLogic:CVE-2023-21839[JNDI注入]

原理 1、T3/IIOP协议支持 远程绑定对象 bind到服务端&#xff0c;而且可以通过lookup代码 c.lookup("xxxxxx"); 查看 2、远程对象继承自OpaqueReference并lookup查看远程对象时&#xff0c;服务端会调用远程对象 getReferent 方法 3、由于 weblogic.deployment.…

Java未来还是霸主吗?Java 在当今企业中的未来到底是什么?

Java 及其生态系统对于许多现代企业的成功至关重要。它是一种多功能语言&#xff0c;对许多用例提供强大支持&#xff0c;并具有强大的新功能来应对棘手的情况。但您可能会问自己&#xff1a;Java 的未来是什么&#xff1f; 尽管自 1999 年以来 Java 一直是软件开发领域的关键角…

【C++11】深度解析--异步操作(什么是异步?异步有那些操作?异步操作有什么用呢?)

目录 一、前言 二、什么是异步操作呢&#xff1f; &#x1f525;异步的概念&#x1f525; &#x1f525;异步的生活案例说明&#x1f525; 三、异步有那些操作呢&#xff1f; &#x1f525;std::future&#x1f525; &#x1f4a2;std::future 的概念&#x1f4a2; &a…

SmolLM-HuggingFace发布的高性能小型语言模型

SmolLM是什么&#xff1f; SmolLM是由 Huggingface 最新发布的一系列最先进的小型语言模型&#xff0c;有三种规格&#xff1a;1.35亿、3.6亿和17亿个参数。这些模型建立在 Cosmo-Corpus 上&#xff0c;Cosmo-Corpus 是一个精心策划的高质量训练数据集。Cosmo-Corpus 包括 Cos…

AT24C08系列eeprom总结

内存大小说明&#xff1a;单位&#xff08;bits(位&#xff09; AT24C02A&#xff0c;2K bits串行EEPROM&#xff1a;内部组织为256页&#xff0c;每页1字节&#xff0c; AT24C04A&#xff0c;4K bits串行EEPROM&#xff1a;4K内部组织为256页&#xff0c;每页2字节。 AT24C…

拯救丢失数据,这三款数据恢复软件你不可错过

数据丢失真的很烦人&#xff0c;无论是手滑删除了重要文件&#xff0c;还是电脑突然崩溃&#xff0c;那些珍贵的照片、文档、视频&#xff0c;一瞬间就仿佛人间蒸发了一样。但别担心&#xff0c;科技的力量总能给我们带来希望&#xff0c;数据恢复软件就是我们的救星。我用过了…

如何选择开放式耳机?2024五款热门机型推荐!

耳机在我们日常通勤和运动锻炼中扮演着重要的角色&#xff0c;它不仅帮助我们放松和振奋&#xff0c;还提供了随时可得的安慰和动力。选择一款合适的耳机非常关键&#xff0c;开放式耳机因其不挤压耳道的设计&#xff0c;在多种使用场景下都能提供良好的适应性&#xff0c;特别…

本地node搭建web服务器

首先确认自己的电脑已经安装了node.js 1.创建一个node服务文件 创建一个node-serve文件夹&#xff0c;然后在当前文件下输入初始化node项目命令。 npm init然后一直按回车即可&#xff0c;完成之后生成一个 package.json 文件。 2.在当前文件夹下新建一个 index.js 的文件&…