Spring 使用SSE(Server-Sent Events)学习

news2024/11/24 12:31:38

什么是SSE

SSE 即服务器发送事件(Server-Sent Events),是一种服务器推送技术,允许服务器在客户端建立连接后,主动向客户端推送数据。

SSE 基于 HTTP 协议,使用简单,具有轻量级、实时性和断线重连等特点。它在一些需要实时数据更新的场景中非常有用,如股票行情、实时通知等。与传统的轮询方式相比,SSE 可以减少不必要的网络请求,提高数据传输效率。

SSE 的主要优点包括:

实时性:服务器可以实时推送数据到客户端,无需客户端不断轮询。
轻量级:SSE 使用简单的文本协议,数据量小,对网络带宽要求较低。
兼容性好:SSE 基于 HTTP 协议,大多数现代浏览器都支持。
易于实现:服务器端和客户端的实现都相对简单。

然而,SSE 也有一些局限性:

单向通信:SSE 只允许服务器向客户端推送数据,客户端无法直接向服务器发送数据。
支持的浏览器有限:虽然大多数现代浏览器支持 SSE,但一些较旧的浏览器可能不支持。
数据格式受限:SSE 通常只能传输文本数据,对于二进制数据的支持有限。

与 HTTP 相比,SSE 提供了更高效的实时数据推送机制,减少了不必要的请求和响应,降低了服务器负载。但 HTTP 更适合一般性的请求-响应模式的数据传输。

SSE WebSocket 对比

SSE 的优点:

  • 简单易用:SSE 使用标准的 HTTP 协议,实现相对简单,不需要复杂的握手和协议转换。
  • 单向通信:适合只需从服务器向客户端推送数据的场景,减少了不必要的双向通信开销。
  • 低延迟:由于基于 HTTP 协议,数据可以在服务器有新数据时立即推送,延迟较低。
  • 兼容性好:大多数现代浏览器都支持 SSE,不需要特殊的插件或扩展。
  • 轻量级:相比 WebSocket,SSE 的实现相对较轻量,对服务器资源的消耗较少。
  • 自动重连:如果连接中断,SSE 会自动尝试重新连接,确保数据的持续推送。

SSE 的缺点:

  • 单向通信限制:SSE 只支持服务器向客户端发送数据,客户端无法向服务器发送数据。
  • 数据格式受限:SSE 通常只能发送文本数据,对于二进制数据的支持有限。
  • 连接管理:每个 SSE 连接在每次数据推送后都会关闭,然后需要重新建立连接,这可能会导致一些额外的开销。

** WebSocket 的优点:**

  • 全双工通信:支持双向通信,客户端和服务器可以随时互相发送数据,适用于实时交互性较高的应用。
  • 低延迟:建立连接后,数据可以实时传输,延迟较低。
  • 二进制支持:WebSocket 可以发送文本和二进制数据,更适合处理多媒体等二进制数据。
  • 较少的 HTTP 开销:由于建立了持久连接,减少了 HTTP 请求头和响应头的开销。

WebSocket 的缺点:

  • 协议复杂性:WebSocket 协议相对较复杂,需要更多的代码和服务器资源来处理连接和数据传输。
  • 兼容性问题:虽然大多数现代浏览器支持 WebSocket,但在一些旧版本的浏览器或特定环境中可能存在兼容性问题。
  • 安全风险:由于 WebSocket 可以实现双向通信,需要注意安全问题,如防止跨站脚本攻击(XSS)和跨站请求伪造(CSRF)。
  • 服务器资源消耗:相比 SSE,WebSocket 可能会消耗更多的服务器资源,特别是在处理大量并发连接时。

SSE 适用于简单的单向数据推送场景,如新闻更新、实时通知等,而 WebSocket 更适合需要双向实时通信的场景,如在线聊天、实时游戏等。在选择使用哪种技术时,需要根据具体的应用需求、浏览器兼容性和服务器资源等因素进行综合考虑

效果演示

话不多说。直接上代码

Controller

package cn.ideamake.feishu.web.controller.sse;

import cn.hutool.core.thread.ThreadUtil;
import cn.ideamake.common.response.Result;
import cn.ideamake.feishu.pojo.dto.SseMessageDTO;
import cn.ideamake.feishu.service.sse.SseService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import javax.validation.Valid;

/**
 * @author Barcke
 * @version 1.0
 * @projectName feishu-application
 * @className SseController
 * @date 2024/6/5 10:14
 * @slogan: 源于生活 高于生活
 * @description:
 **/
@Slf4j
@RequiredArgsConstructor
@RestController
@RequestMapping("/sse")
@Validated
public class SseController {

    private final SseService sseService;

    /**
     * 创建sse链接
     * @param clientId
     * @return
     */
    @GetMapping("/createConnect")
    public SseEmitter createConnect(String clientId) {
        return sseService.createConnect(clientId);
    }

    /**
     * 给所有客户端发送消息
     * @param msg
     * @return
     */
    @PostMapping("/broadcast")
    public Result<Boolean> sendMessageToAllClient(@RequestBody String msg) {
        ThreadUtil.execute(() -> {
            sseService.sendMessageToAllClient(msg);
        });
        return Result.ok(true);
    }

    /**
     * 给指定端发送消息
     * @param sseMessageDTO
     * @return
     */
    @PostMapping("/sendMessage")
    public Result<Boolean> sendMessageToOneClient(@RequestBody @Valid SseMessageDTO sseMessageDTO) {
        ThreadUtil.execute(() -> {
            sseService.sendMessageToOneClient(sseMessageDTO.getClientId(), sseMessageDTO.getData());
        });
        return Result.ok(true);
    }

    /**
     * 关闭链接
     * @param clientId
     * @return
     */
    @GetMapping("/closeConnect")
    public Result<Boolean> closeConnect(@RequestParam("clientId") String clientId) {
        ThreadUtil.execute(() -> {
            sseService.closeConnect(clientId);
        });
        return Result.ok(true);
    }

}

Service

package cn.ideamake.feishu.service.sse;

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

/**
 * @author Barcke
 * @version 1.0
 * @projectName feishu-application
 * @className SseService
 * @date 2024/6/5 10:18
 * @slogan: 源于生活 高于生活
 * @description:
 **/
public interface SseService {

    /**
     * 创建连接
     *
     * @param clientId 客户端ID
     */
    SseEmitter createConnect(String clientId);

    /**
     * 根据客户端id获取SseEmitter对象
     *
     * @param clientId 客户端ID
     */
    SseEmitter getSseEmitterByClientId(String clientId);

    /**
     * 发送消息给所有客户端
     *
     * @param msg 消息内容
     */
    void sendMessageToAllClient(String msg);

    /**
     * 给指定客户端发送消息
     *
     * @param clientId 客户端ID
     * @param msg      消息内容
     */
    void sendMessageToOneClient(String clientId, String msg);

    /**
     * 关闭连接
     *
     * @param clientId 客户端ID
     */
    void closeConnect(String clientId);

}

ServiceImpl

package cn.ideamake.feishu.service.sse.impl;

import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpStatus;
import cn.ideamake.feishu.pojo.dto.SseMessageDTO;
import cn.ideamake.feishu.service.sse.SseService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Map;
import java.util.function.Consumer;

/**
 * @author Barcke
 * @version 1.0
 * @projectName feishu-application
 * @className SseServiceImpl
 * @date 2024/6/5 10:18
 * @slogan: 源于生活 高于生活
 * @description:
 **/
@Slf4j
@RequiredArgsConstructor
@Service
public class SseServiceImpl implements SseService {

    /**
     * 容器,保存连接,用于输出返回 ;可使用其他方法实现
     */
    private static final Map<String, SseEmitter> SSE_CACHE = MapUtil.newConcurrentHashMap();

    /**
     * 重试次数
     */
    private final Integer RESET_COUNT = 3;

    /**
     * 重试等待事件 单位 ms
     */
    private final Integer RESET_TIME = 5000;

    /**
     * 根据客户端id获取SseEmitter对象
     *
     * @param clientId 客户端ID
     */
    @Override
    public SseEmitter getSseEmitterByClientId(String clientId) {
        return SSE_CACHE.get(clientId);
    }

    /**
     * 创建连接
     *
     * @param clientId 客户端ID
     */
    @Override
    public SseEmitter createConnect(String clientId) {
        // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
        SseEmitter sseEmitter = new SseEmitter(0L);
        // 是否需要给客户端推送ID
        if (StrUtil.isBlank(clientId)) {
            clientId = IdUtil.simpleUUID();
        }
        // 注册回调
        // 长链接完成后回调接口(即关闭连接时调用)
        sseEmitter.onCompletion(completionCallBack(clientId));
        // 连接超时回调
        sseEmitter.onTimeout(timeoutCallBack(clientId));
        // 推送消息异常时,回调方法
        sseEmitter.onError(errorCallBack(clientId));
        SSE_CACHE.put(clientId, sseEmitter);
        log.info("创建新的sse连接,当前用户:{}    累计用户:{}", clientId, SSE_CACHE.size());
        try {
            // 注册成功返回用户信息
            sseEmitter.send(SseEmitter.event().id(String.valueOf(HttpStatus.HTTP_CREATED)).data(clientId, MediaType.APPLICATION_JSON));
        } catch (IOException e) {
            log.error("创建长链接异常,客户端ID:{}   异常信息:{}", clientId, e.getMessage());
        }
        return sseEmitter;
    }

    /**
     * 发送消息给所有客户端
     *
     * @param msg 消息内容
     */
    @Override
    public void sendMessageToAllClient(String msg) {
        if (MapUtil.isEmpty(SSE_CACHE) || StringUtils.isBlank(msg)) {
            return;
        }
        // 判断发送的消息是否为空
        for (Map.Entry<String, SseEmitter> entry : SSE_CACHE.entrySet()) {
            SseMessageDTO sseMessageDTO = new SseMessageDTO();
            sseMessageDTO.setClientId(entry.getKey());
            sseMessageDTO.setData(msg);
            sendMsgToClientByClientId(entry.getKey(), sseMessageDTO, entry.getValue());
        }

    }

    /**
     * 给指定客户端发送消息
     *
     * @param clientId 客户端ID
     * @param msg      消息内容
     */
    @Override
    public void sendMessageToOneClient(String clientId, String msg) {
        SseMessageDTO sseMessageDTO = new SseMessageDTO(clientId, msg);
        sendMsgToClientByClientId(clientId, sseMessageDTO, SSE_CACHE.get(clientId));
    }

    /**
     * 关闭连接
     *
     * @param clientId 客户端ID
     */
    @Override
    public void closeConnect(String clientId) {
        SseEmitter sseEmitter = SSE_CACHE.get(clientId);
        if (sseEmitter != null) {
            sseEmitter.complete();
            removeUser(clientId);
        }
    }

    /**
     * 推送消息到客户端
     * 此处做了推送失败后,重试推送机制,可根据自己业务进行修改
     *
     * @param clientId  客户端ID
     * @param sseMessageDTO 推送信息,此处结合具体业务,定义自己的返回值即可
     **/
    private void sendMsgToClientByClientId(String clientId, SseMessageDTO sseMessageDTO, SseEmitter sseEmitter) {
        if (sseEmitter == null) {
            log.error("推送消息失败:客户端{}未创建长链接,失败消息:{}", clientId, sseMessageDTO);
            return;
        }
        SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(String.valueOf(HttpStatus.HTTP_OK))
                .data(sseMessageDTO, MediaType.APPLICATION_JSON);
        try {
            sseEmitter.send(sendData);
        } catch (IOException e) {
            // 推送消息失败,记录错误日志,进行重推
            log.error("推送消息失败:{},尝试进行重推", sseMessageDTO);
            boolean isSuccess = true;
            // 推送消息失败后,每隔10s推送一次,推送5次
            for (int i = 0; i < RESET_COUNT; i++) {
                try {
                    Thread.sleep(RESET_TIME);
                    sseEmitter = SSE_CACHE.get(clientId);
                    if (sseEmitter == null) {
                        log.error("{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);
                        continue;
                    }
                    sseEmitter.send(sendData);
                } catch (Exception ex) {
                    log.error("{}的第{}次消息重推失败", clientId, i + 1, ex);
                    continue;
                }
                log.info("{}的第{}次消息重推成功,{}", clientId, i + 1, sseMessageDTO);
                return;
            }
        }
    }


    /**
     * 长链接完成后回调接口(即关闭连接时调用)
     *
     * @param clientId 客户端ID
     **/
    private Runnable completionCallBack(String clientId) {
        return () -> {
            log.info("结束连接:{}", clientId);
            removeUser(clientId);
        };
    }

    /**
     * 连接超时时调用
     *
     * @param clientId 客户端ID
     **/
    private Runnable timeoutCallBack(String clientId) {
        return () -> {
            log.info("连接超时:{}", clientId);
            removeUser(clientId);
        };
    }

    /**
     * 推送消息异常时,回调方法
     *
     * @param clientId 客户端ID
     **/
    private Consumer<Throwable> errorCallBack(String clientId) {
        return throwable -> {
            log.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId);

            // 推送消息失败后,每隔10s推送一次,推送5次
            for (int i = 0; i < RESET_COUNT; i++) {
                try {
                    Thread.sleep(RESET_TIME);
                    SseEmitter sseEmitter = SSE_CACHE.get(clientId);
                    if (sseEmitter == null) {
                        log.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失败,未获取到 {} 对应的长链接", i + 1, clientId);
                        continue;
                    }
                    sseEmitter.send("失败后重新推送");
                } catch (Exception e) {
                    log.error("sse推送消息异常", e);
                }
            }
        };
    }

    /**
     * 移除用户连接
     *
     * @param clientId 客户端ID
     **/
    private void removeUser(String clientId) {
        SSE_CACHE.remove(clientId);
        log.info("SseEmitterServiceImpl[removeUser]:移除用户:{}", clientId);
    }
}

DTO

package cn.ideamake.feishu.pojo.dto;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;

import javax.validation.constraints.NotNull;

/**
 * @author Barcke
 * @version 1.0
 * @projectName feishu-application
 * @className SseMessageDTO
 * @date 2024/6/5 10:19
 * @slogan: 源于生活 高于生活
 * @description:
 **/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Accessors(chain = true)
public class SseMessageDTO {

    /**
     * 客户端id
     */
    @NotNull(message = "客户端id 不能为空")
    private String clientId;
    /**
     * 传输数据体(json)
     */
    private String data;

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1804046.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

linux中dd命令以及如何测试读写速度

dd命令详解 dd命令是一个在Unix和类Unix系统中非常常用的命令行工具&#xff0c;它主要用于复制文件和转换文件数据。下面我会详细介绍一些dd命令的常见用法和功能&#xff1a; 基本语法 dd命令的基本语法如下&#xff1a; bash Copy Code dd [option]...主要选项和参数 if…

JS-Fetch

Fetch 是一种用于进行网络请求的现代 JavaScript API。它提供了一种简单、灵活且功能强大的方式&#xff0c;用于从服务器获取资源并处理响应。 Fetch API 在浏览器中原生支持&#xff0c;并且以 Promise 为基础&#xff0c;使得异步请求更加直观和易用。使用 Fetch API&#…

大学搜题软件网课?推荐五个搜题软件和学习工具 #其他#经验分享#知识分享

大学生活中&#xff0c;选择适合自己的学习工具能够提高学习效率&#xff0c;让学习更加轻松愉快。 1.彩虹搜题 这个是公众号 提供了各大教材以及网课平台的练习题答案&#xff0c;强大的平台支持&#xff0c;无论是智慧树还是MOOC&#xff0c;只有老师们用不到&#xff0c;…

【SQLAlChemy】如何定义ORM模型,如何映射到数据库?

定义ORM模型并映射到数据库 创建 ORM 基类 使用 declarative_base 根据 engine 来创建一个 ORM 基类。 from SqlAIchemy.LinkDB.main import engineBase declarative_base()创建自定义类 用上边定义的 Base 类来实现自己的 ORM 类。 __tablename__ 类属性&#xff0c;可以…

Vue 2 + Element UI 选择一个el-select清空另一个el-select选中的值

需求&#xff1a;表单中有两个下拉选择器&#xff0c;先选中第一个&#xff0c;清空第二个选中的值 尝试过this.$refs[form].resetFields(field name);全都失效&#xff01; 效果图如下&#xff1a; 先选择商品分类&#xff0c;再去选择商品列表中的某一件商品 <el-form-…

GDPU JavaWeb Ajax请求

异步请求可以提升用户体验并优化页面性能。 ajax登录 实现ajax异步登录。 注意&#xff0c;ajax用到了jQuery库&#xff0c;先下载好相应的js库&#xff0c;然后复制导入到工程的web目录下&#xff0c;最好与你的前端页面同一层级。然后编写时路径一定要找准&#xff0c;“pag…

天才程序员周弈帆 | Stable Diffusion 解读(一):回顾早期工作

本文来源公众号“天才程序员周弈帆”&#xff0c;仅用于学术分享&#xff0c;侵权删&#xff0c;干货满满。 原文链接&#xff1a;Stable Diffusion 解读&#xff08;一&#xff09;&#xff1a;回顾早期工作 在2022年的这波AI绘画浪潮中&#xff0c;Stable Diffusion无疑是最…

视频去水印电脑版,视频去水印软件

视频去水印怎么去&#xff0c;一直是视频编辑者们的热门话题。那么&#xff0c;如何去除频水印呢&#xff1f;接下来&#xff0c;我们将为您详细介绍视频去水印方法。 第一种方法&#xff1a; 首先通过浏览器打开 “ 51视频处理官网” 的网站。打开网站后&#xff0c;我们上传…

C语言---枚举位运算

枚举的定义&#xff1a; 一枚枚的列举 //一个个的列举 枚举的语法&#xff1a; enum 枚举名 { 枚举常量//名字 }&#xff1b; 例子&#xff1a; enum weekay { MON&#xff0c; TUES; WED; THURS; FRI; SAT; SUN;//枚举常量 }; 注意&#xff1a; 1、C语言中…

数据库(28)——联合查询

对于union查询&#xff0c;就是把多次查询的结果合并起来&#xff0c;形成一个新的查询结果集。 语法 SELECT 字段列表 FROM 表A... UNION [ALL] SELECT 字段列表 FROM 表B...; 演示 select * from user where age > 22 union all select * from user where age < 50; u…

消息队列笔记

异步技术 企业级应用中广泛使用的三种异步消息传递技术 原文链接&#xff1a;https://blog.csdn.net/qq_55917018/article/details/122122218 三种异步消息传递技术 JMS (java message service) 一个Java规范&#xff0c;等同于JDBC规范&#xff0c;提供了与消息服务相关的…

tcp aimd 窗口的推导

旧事重提&#xff0c;今天用微分方程的数值解观测 tcp aimd 窗口值。 设系统 AI&#xff0c;MD 参数分别为 a 1&#xff0c;b 0.5&#xff0c;丢包率由 buffer 大小&#xff0c;red 配置以及线路误码率共同决定&#xff0c;设为 p&#xff0c;窗口为 W&#xff0c;则有&…

docker部署skywalking

skywalking版本下载 1&#xff1a;拉取skywalking的oap镜像(可以选择自己的版本&#xff0c;最好与ui&#xff0c;agent版本一致) docker pull apache/skywalking-oap-server:9.5.02&#xff1a;启动oap docker run -d -p 11800:11800 -p 12800:12800 --name sw_oap apache/…

【介绍下R-tree,什么是R-tree?】

&#x1f308;个人主页: 程序员不想敲代码啊 &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共…

前端渲染大量数据思路【虚拟列表】【异步机制】

当浏览器遇到性能瓶颈导致页面卡顿时&#xff0c;你会怎么处理&#xff1f;如何查找问题的原因&#xff1f; 浏览器本身自带性能检测工具&#xff0c;通常我们分析由脚本导致的页面卡顿会选择 性能&#xff08;performance&#xff09; 选项卡&#xff0c;在其中我们可以找到导…

嵌入式linux系统中利用I2C控制器应用开发详解

大家好,今天主要给大家分享一下,在linux系统上如何使用I2C进行应用开发详解。 l2C (Inter一Integrated Circuit BUS)是I2C BUS简称.中文为集成电路总线.是目前应用最广泛的总线之一。和IMX6ULL有些相关的是.刚好该总线是NXP前身的PHLIPS 设计。 第一:I2C协议概述 …

C++基础编程100题-007 OpenJudge-1.3-05 计算分数的浮点数值

更多资源请关注纽扣编程微信公众号 http://noi.openjudge.cn/ch0103/05/ 描述 两个整数a和b分别作为分子和分母&#xff0c;既分数 a/b &#xff0c;求它的浮点数值&#xff08;双精度浮点数&#xff0c;保留小数点后9位&#xff09; 输入 输入仅一行&#xff0c;包括两个…

Postman 连接数据库 利用node+xmysql

1、准备nodejs环境 如果没有安装&#xff0c;在网上找教程&#xff0c;安装好后&#xff0c;在控制台输入命令查看版本&#xff0c;如下就成功了 2、安装xmysql 在控制台输入 npm install -g xmysql 3、连接目标数据库 帮助如下&#xff1a; 示例&#xff1a; 目标数据库…

新品发布 | 飞凌嵌入式RK3562J核心板,智能工业时代的国产智慧引擎

飞凌嵌入式推出FET3562J-C全国产核心板&#xff0c;专为工业自动化及消费类电子设备设计&#xff0c;打造智能工业时代的国产智慧新引擎。 FET3562J-C核心板基于Rockchip RK3562J处理器开发设计&#xff0c;该处理器采用22nm先进制程工艺&#xff0c;集成了4个ARM Cortex-A53高…

在C++中用3种方法访问一个字符串

1&#xff0e;用字符数组存放一个字符串 编写程序&#xff1a; str是字符数组名&#xff0c;它代表字符数组的首元素的地址&#xff0c;输出时从str指向的字符开始&#xff0c;逐个输出字符&#xff0c;直至遇到\0为止。 2&#xff0e;用字符串变量存放字符串 编写程序&…