Springboot集成SSE消息推送

news2024/11/24 2:49:49

SSE介绍
        SSE(Server-Sent Events)的全称是服务器推送事件,它是一种基于 HTTP 协议的实时通信技术,用于在客户端和服务器之间建立持久、单向的链接,允许服务器向客户端发送异步消息。

        了解 websocket 的小伙伴,可能也知道它也是长连接,可以推送信息,但是它们有一个明显的区别:SSE 是单通道,只能服务端向客户端发消息;而 webscoket 是双通道,客户端和服务端可以相互发消息

SSE
        1. 单向通信:SSE是一种服务器推送技术,服务器可以向客户端发送消息,但客户端无法主动发送消息到服务器。
        2. 持久连接:SSE在单个HTTP连接上建立持久连接,服务器可以多次发送事件到客户端,客户端只需保持连接不关闭。

        3 . 实时性:适用于需要从服务器获取实时更新的场景,如即时通知、实时数据更新

Websocket
        1. 双向通信:WebSocket提供了全双工通信,客户端和服务器可以双向发送消息,不需要等待请求-响应。
        2. 持久连接:WebSocket在单个TCP连接上实现持久连接,适用于双向通信的场景。
        3. 实时性:非常适合实时性要求高的应用,如在线游戏、实时聊天等。

使用方法:

Maven依赖

springboot中封装了sse代码,不需要额外的依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.8.21</version>
</dependency>
 

SseEmitterUtil工具类

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

/**
 * SSE长链接工具类
 */
@Slf4j
public class SseEmitterUtil {

    /**
     * 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面
     */
    private final static Map<Long, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    public static SseEmitter connect(Long userId) {
        // 设置超时时间,0表示不过期。默认30S,超时时间未完成会抛出异常:AsyncRequestTimeoutException
        SseEmitter sseEmitter = new SseEmitter(0L);

        // 注册回调
        sseEmitter.onCompletion(completionCallBack(userId));
        sseEmitter.onError(errorCallBack(userId));
        sseEmitter.onTimeout(timeoutCallBack(userId));
        sseEmitterMap.put(userId, sseEmitter);

        log.info("创建新的 SSE 连接,当前用户 {}, 连接总数 {}", userId, sseEmitterMap.size());
        return sseEmitter;
    }

    /**
     * 给制定用户发送消息
     *
     * @param userId 指定用户名
     * @param sseMessage 消息体
     */
    public static void sendMessage(Long userId, String sseMessage) {
        if (sseEmitterMap.containsKey(userId)) {
            try {
                sseEmitterMap.get(userId).send(sseMessage);
                log.info("用户 {} 推送消息 {}", userId, sseMessage);
            } catch (IOException e) {
                log.error("用户 {} 推送消息异常", userId, e);
                removeUser(userId);
            }
        } else {
            log.error("消息推送 用户 {} 不存在,链接总数 {}", userId, sseEmitterMap.size());
        }
    }

    /**
     * 群发消息
     */
    public static void batchSendMessage(String message, List<Long> ids) {
        ids.forEach(userId -> sendMessage(userId, message));
    }

    /**
     * 群发所有人
     */
    public static void batchSendMessage(String message) {
        sseEmitterMap.forEach((k, v) -> {
            try {
                v.send(message, MediaType.APPLICATION_JSON);
            } catch (IOException e) {
                log.error("用户 {} 推送异常", k, e);
                removeUser(k);
            }
        });
    }

    /**
     * 移除用户连接
     *
     * @param userId 用户 ID
     */
    public static void removeUser(Long userId) {
        if (sseEmitterMap.containsKey(userId)) {
            sseEmitterMap.get(userId).complete();
            sseEmitterMap.remove(userId);
            log.info("移除用户 {}, 剩余连接 {}", userId, sseEmitterMap.size());
        } else {
            log.error("消息推送 用户 {} 已被移除,剩余连接 {}", userId, sseEmitterMap.size());
        }
    }

    /**
     * 获取当前连接信息
     *
     * @return 所有的连接用户
     */
    public static List<Long> getIds() {
        return new ArrayList<>(sseEmitterMap.keySet());
    }

    /**
     * 获取当前的连接数量
     *
     * @return 当前的连接数量
     */
    public static int getUserCount() {
        return sseEmitterMap.size();
    }

    private static Runnable completionCallBack(Long userId) {
        return () -> {
            log.info("用户 {} 结束连接", userId);
        };
    }

    private static Runnable timeoutCallBack(Long userId) {
        return () -> {
            log.error("用户 {} 连接超时", userId);
            removeUser(userId);
        };
    }

    private static Consumer<Throwable> errorCallBack(Long userId) {
        return throwable -> {
            log.error("用户 {} 连接异常", userId);
            removeUser(userId);
        };
    }
}
Controller层
import cn.hutool.json.JSONUtil;
import com.geb.common.utils.SseEmitterUtil;
import com.geb.domain.SseMessage;
import com.geb.domain.WdAgent;
import io.grpc.internal.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Slf4j
@RestController
@RequestMapping("/sse")
public class SseController {

    /**
     * 用于创建连接
     */
    @GetMapping("/connect/{userId}")
    public SseEmitter connect(@PathVariable Long userId) {
        return SseEmitterUtil.connect(userId);
    }

    /**
     * 关闭连接
     */
    @GetMapping("/close/{userid}")
    public void close(@PathVariable("userid") Long userid) {
        SseEmitterUtil.removeUser(userid);
    }

    @GetMapping("/sse")
    public void sse(){
        // 构建推送消息体
        SseMessage sseMessage = new SseMessage();
        sseMessage.setId(1L);
        sseMessage.setMsg("SSE测试发送消息");
        sseMessage.setAgentName("测试智能体推送消息");
        SseEmitterUtil.sendMessage(1L, JSONUtil.toJsonStr(sseMessage));
    }
}
测试

这里使用postman测试,输入链接点击运行后,会自动处于链接状态

我们发送一条消息测试,我这里对消息实体进行了封装,在postman新建窗口输入测试请求

消息发送完毕,在postman就可以看到刚刚的消息

可以多发送几条消息看看效果,关闭链接调用关闭的接口即可。

这是个本地的测试,如果使用了Nginx反向代理和Gateway网关,具体的使用情况需要根据测试或者生产环境进行相应的配置。

文章参考:Springboot集成SSE消息推送_springboot 集成sse推送-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1835976.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

257、二叉树的所有路径

给定一个二叉树&#xff0c;返回所有从根节点到叶子节点的路径。 说明: 叶子节点是指没有子节点的节点。 代码如下&#xff1a; class Solution { public:void traversal(TreeNode* cur, vector<int>& path, vector<string> &result){path.push_back(cur…

南阳理工学院(期末)算法分析练习题

一、算法阅读分析题&#xff1a; 1.分析如下算法&#xff0c;回答问题&#xff08;10分&#xff09;。 该算法的作用是什么(2分)&#xff1f;分析该算法的时间复杂度(5分)?设计算法的一个输入&#xff0c;并给出对应的算法输出结果(3分) &#xff08;1&#xff09;该算法的作…

虹软ArcSoft—真正离线免费的人脸识别SDK

虹软ArcSoft—真正离线免费的人脸识别SDK 高级功能收费 还是很好滴 人证核验功能是C/C的SDK&#xff0c;需要封装为C#&#xff0c;然后暴露为Restful API使用

2024年阿里巴巴全球数学竞赛首次向人工智能(AI)开放

大家好&#xff0c;我是微学AI&#xff0c;最近大家突然开始关注阿里巴巴全球数学竞赛了&#xff0c;在这个人工智能爆发的时代&#xff0c;2024年阿里巴巴全球数学竞赛首次向人工智能&#xff08;AI&#xff09;开放&#xff0c;要求参赛的AI模型在比赛前提交源代码&#xff0…

【向量检索】之向量数据库Milvus,Faiss详解及应用案例

Reference https://www.modb.pro/db/509268 笔记︱几款多模态向量检索引擎&#xff1a;Faiss 、milvus、Proxima、vearch、Jina等 - 知乎 (zhihu.com) 向量数据库入坑指南&#xff1a;聊聊来自元宇宙大厂 Meta 的相似度检索技术 Faiss - 苏洋的文章 - 知乎 常用的三种索引方…

ssh-add id_rsa_gitlab1 Error connecting to agent: No such file or directory

ssh-add id_rsa_gitlab1 Error connecting to agent: No such file or directory 目录 ssh-add id_rsa_gitlab1 Error connecting to agent: No such file or directory1. 启动 SSH 代理2. 添加 SSH 密钥3. 使用 Git Bash 或其他终端4. 使用 Pageant&#xff08;适用于 PuTTY 用…

大模型技术工程师:抓住时代机遇,成为行业精英_

伴随AI大模型的火热&#xff0c;中国科技大厂们正在掀起一场「跑步AI化」的风暴。从顶层战略到业务线重构&#xff0c;AI无疑已成为大厂们押注未来的新故事。 大模型时代已经到来 大模型已成为全球竞争热点&#xff0c;一个大模型时代已经到来。 大模型具备三个特点&#xf…

Vue3-国足18强赛抽签

Vue3国足18强赛抽签 国足遇到这个对阵&#xff0c;能顺利出现吗&#xff1f; 1、系统演示 Vue3模拟国足18强赛抽签 2、关键代码 开始抽签 <script setup> import FenDang from "/components/chouqian/FenDang.vue"; import {ref} from "vue";le…

CHATGPT说这个运算放大器是比较器,我说这是运放典型的同相比例放大器,一个光控电路分析

纠正 图1 光控电路 该电路来自一个问题&#xff0c;链接见文末。 因GPT的分析有误&#xff0c;特此纠正。 引用图片和答案用于分析&#xff0c;如侵权请联系本人。 电路分析&#xff1a; 该电路为光控灯电路&#xff0c;灯光为LED发光二极管 D。 光敏电阻RG的阻值和光线强度关…

重学java 72.正则表达式

人长大之后就在频繁地离别&#xff0c;相聚反而时日无多 —— 24.6.17 一、正则表达式的概念及演示 1.概述 正则表达式是一个具有特殊规则的字符串 2.作用&#xff1a;校验 3.String中有一个校验正则的方法&#xff1a; boolean matches(String regex) —— 校验字符串是否…

从11个视角看全球Rust程序员4/4:深度解读JetBrains最新报告

讲动人的故事,写懂人的代码 8 Rust程序员最喜欢用什么工具调试程序? 用println!或dbg!宏来调试一下 2022年:55%2023年:55%在IDE里玩玩UI调试 2022年:27%2023年:29%在控制台里调试调试 2022年:11%2023年:10%不调试,任性 2022年:5%2023年:6%有其他奇思妙想 2022年:1%…

开源的数字孪生平台

欧洲对工业4.0的追求体现在三个方面&#xff1a; 数字孪生、智能制造和万物互联。 资助2440万欧元的开源数字孪生平台 源代码&#xff1a; http://www.gitpp.com/ccdan/dpqq-digital-twins 这套数字孪生是工业4.0整体规划中的中的一项技术&#xff0c;实现了一种称为“数字…

Python武器库开发-武器库篇之链接提取器(六十)

Python武器库开发-武器库篇之链接提取器&#xff08;六十&#xff09; 链接提取器介绍 链接提取器&#xff08;Link Extractor&#xff09;是一种用于从网页中提取链接的工具。它可以从网页的源代码中识别出所有的链接&#xff0c;并将这些链接提取出来。链接提取器可以用于各…

Java技术驱动的智能ERP系统:打造企业高效管理与创新发展的数字化引擎

随着数字化浪潮的席卷&#xff0c;现代企业对于高效、稳定、易于扩展的管理系统需求愈发迫切。为了满足这一需求&#xff0c;我们倾力打造了一款基于Java技术的企业级资源规划&#xff08;ERP&#xff09;管理系统。该系统以Spring Cloud Alibaba、Spring Boot、MybatisPlus、R…

MySQL操作数据库语句

mysql关键字不区分大小写 1. 创建数据库 CREATE DATABASE [IF NOT EXISTS] westos (带[ ]表示该语句可有可无) 2. 删除数据库 DROP DATABASE [IF EXISTS] westos 3.使用数据库 -- tab键的上面&#xff0c;如果你的表名或者字段名是一个特殊的字符&#xff0c;就需要带 …

基于Redis实现共享session登录

搭配食用&#xff1a;Redis&#xff08;基础篇&#xff09;-CSDN博客 项目实现前的 Mysql中的表&#xff1a; 表说明tb_user用户表tb_user_info用户详情表tb_shop商户信息表tb_shop_type商户类型表tb_blog用户日记表&#xff08;达人探店日记)tb_follow用户关注表tb_voucher优…

jquey+mybatis-plus实现简单分页功能

这篇文章介绍一下怎么通过JQuery结合mybatis-plus的分页插件实现原生HTML页面的分页效果&#xff0c;没有使用任何前端框架&#xff0c;主要是对前端知识的应用。 创建Springboot项目 Intellij IDEA中创建一个Springboot项目&#xff0c;项目名为pager。 添加必须的依赖包 修…

modbus流量计数据解析(4个字节与float的换算)

通过modbus协议从流量计中读取数据后&#xff0c;需要将获得的字节数据合成float类型。以天信流量计为例&#xff1a; 如何将字节数据合并成float类型呢&#xff1f;这里总结了三种方法。 以温度值41 A0 00 00为例 目录 1、使用char*逐字节解析2、使用memcpy转换2、使用联合体…

【每天学会一个渗透测试工具】Nessus安装及使用指南

&#x1f31d;博客主页&#xff1a;泥菩萨 &#x1f496;专栏&#xff1a;Linux探索之旅 | 网络安全的神秘世界 | 专接本 | 每天学会一个渗透测试工具 其他扫描工具&#xff1a; AWVS和Xray&#xff1a;应用漏洞扫描工具 fscan&#xff1a;虽然能扫主机&#xff0c;但比较老了…

【ajax基础03】常用ajax请求方法和数据提交以及axios错误处理

一&#xff1a;请求方法 什么是请求方法&#xff1a; 浏览器对服务器资源&#xff0c;要执行的操作 常见请求方法如下 二&#xff1a;axios中应用 语法格式&#xff1a; method:为请求方法&#xff0c;默认情况下为get&#xff08;获取数据&#xff09; data&#xff1a;…