SSE(Server-Send-Event)服务端推送数据技术

news2024/9/25 21:23:37

SSE(Server-Send-Event)服务端推送数据技术

大家是否遇到过服务端需要主动传输数据到客户端的情况,目前有三种解决方案。

  1. 客户端轮询更新数据。
  2. 服务端与客户端建立 Socket 连接双向通信
  3. 服务端与客户建立 SSE 连接单向通信

几种方案的比较:

  1. 轮询:

    客户端通过频繁请求向服务端请求数据,达到类似实时更新的效果。轮询的优点是实现简单,但是会给服务端和网络带来额外的压力,且延迟较高。

  2. WebSocket连接:

    服务端与客户端建立Socket连接进行数据传输,Socket的传输方式是全双工的。WebSocket是基于 TCP 的长连接,和HTTP 协议相比,它能实现轻量级的、低延迟的数据传输,非常适合实时通信场景,主要用于交互性强的双向通信。

  3. SSE推送:

    SSE(Server-Sent Events)是一种基于 HTTP 协议的推送技术,只允许单向通讯。相较于 WebSocket,SSE 更简单、更轻量级。

下面是SpringBoot使用SSE的步骤和示例代码

  1. 配置依赖

    	    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
           <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-validation</artifactId>
            </dependency>
    

    SSE已经集成到spring-web中,所以可以直接使用。

  2. 后端代码

    import com.wry.wry_test.service.SseService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.validation.annotation.Validated;
    import org.springframework.web.bind.annotation.*;
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    
    import javax.validation.constraints.NotBlank;
    import java.util.concurrent.CompletableFuture;
    
    @RestController
    @RequestMapping("/sse")
    @Slf4j
    @Validated
    public class SseTestController {
    
        @Autowired
        private SseService service;
    
    
        @GetMapping("/testSse")
        public SseEmitter testSse(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
            final SseEmitter emitter = service.getConn(clientId);
            CompletableFuture.runAsync(() -> {
                try {
                    service.send(clientId);
                    log.info("建立连接成功!clientId = {}", clientId);
                } catch (Exception e) {
                    log.error("推送数据异常");
                }
            });
            return emitter;
        }
    
    
        @GetMapping("/sseConection")
        public SseEmitter createConnection(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
            return service.getConn(clientId);
        }
    
        @GetMapping("/sendMsg")
        public void sendMsg(@RequestParam("clientId") String clientId) {
            try {
                // 异步发送消息
                CompletableFuture.runAsync(() -> {
                    try {
                        service.send(clientId);
                    } catch (Exception e) {
                        log.error("推送数据异常");
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @GetMapping("/sendMsgToAll")
        public void sendMsgToAll() {
            try {
                //异步发送消息
                CompletableFuture.runAsync(() -> {
                    try {
                        service.sendToAll();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    
        @GetMapping("closeConn/{clientId}")
        public String closeConn(@PathVariable("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
            service.closeConn(clientId);
            return "连接已关闭";
        }
    
    
    }
    
    
    package com.wry.wry_test.service;
    
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    
    import javax.validation.constraints.NotBlank;
    
    public interface SseService {
    
    
        /**
         * 获取连接
         * @param clientId 客户端id
         * @return
         */
        SseEmitter getConn(String clientId);
    
        /**
         *  发送消息到指定客户端
         * @param clientId 客户端id
         * @throws Exception
         */
        void send(String clientId);
    
        /**
         * 发送消息到所有SSE客户端
         * @throws Exception
         */
        void sendToAll() throws Exception;
    
        /**
         * 关闭指定客户端的连接
         * @param clientId 客户端id
         */
        void closeConn(String clientId);
    }
    
    
    package com.wry.wry_test.service.impl;
    
    import com.wry.wry_test.service.SseService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Service;
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    
    import javax.validation.constraints.NotBlank;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    
    @Service
    @Slf4j
    public class SseServiceImpl implements SseService {
    
        private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();
    
        @Override
        public SseEmitter getConn(@NotBlank String clientId) {
            final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
    
            if (sseEmitter != null) {
                return sseEmitter;
            } else {
                // 设置连接超时时间,需要配合配置项 spring.mvc.async.request-timeout: 600000 一起使用
                final SseEmitter emitter = new SseEmitter(600_000L);
                // 注册超时回调,超时后触发
                emitter.onTimeout(() -> {
                    log.info("连接已超时,正准备关闭,clientId = {}", clientId);
                    SSE_CACHE.remove(clientId);
                });
                // 注册完成回调,调用 emitter.complete() 触发
                emitter.onCompletion(() -> {
                    log.info("连接已关闭,正准备释放,clientId = {}", clientId);
                    SSE_CACHE.remove(clientId);
                    log.info("连接已释放,clientId = {}", clientId);
                });
                // 注册异常回调,调用 emitter.completeWithError() 触发
                emitter.onError(throwable -> {
                    log.error("连接已异常,正准备关闭,clientId = {}", clientId, throwable);
                    SSE_CACHE.remove(clientId);
                });
    
                SSE_CACHE.put(clientId, emitter);
                log.info("建立连接成功!clientId = {}", clientId);
                return emitter;
            }
        }
    
        /**
         * 模拟类似于 chatGPT 的流式推送回答
         *
         * @param clientId 客户端 id
         * @throws IOException 异常
         */
        @Override
        public void send(@NotBlank String clientId) {
            final SseEmitter emitter = SSE_CACHE.get(clientId);
            if (emitter == null) return;
    
            // 开始推送数据
            // todo 模拟推送数据
            for (int i = 0; i < 10000000; i++) {
                String msg = "SSE 测试数据";
                try {
                    this.sseSend(emitter, msg, clientId);
                    Thread.sleep(1000);
                } catch (Exception e) {
                    log.error("推送数据异常", e);
                    break;
                }
            }
    
            log.info("推送数据结束,clientId = {}", clientId);
            // 结束推流
            emitter.complete();
        }
    
        /**
         * 发送数据给所有连接
         */
        public void sendToAll() {
            List<SseEmitter> emitters = new ArrayList<>(SSE_CACHE.values());
            for (int i = 0; i < 10000000; i++) {
                String msg = "SSE 测试数据";
                this.sseSend(emitters, msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        @Override
        public void closeConn(@NotBlank String clientId) {
            final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
            if (sseEmitter != null) {
                sseEmitter.complete();
            }
        }
    
        /**
         * 推送数据封装
         *
         * @param emitter  sse长连接
         * @param data     发送数据
         * @param clientId 客户端id
         */
        private void sseSend(SseEmitter emitter, Object data, String clientId) {
            try {
                emitter.send(data);
                log.info("推送数据成功,clientId = {}", clientId);
            } catch (Exception e) {
                log.error("推送数据异常", e);
                throw new RuntimeException("推送数据异常");
            }
        }
    
        /**
         * 推送数据封装
         *
         * @param emitter sse长连接
         * @param data    发送数据
         */
        private void sseSend(List<SseEmitter> emitter, Object data) {
    
            emitter.forEach(e -> {
                try {
                    e.send(data);
                } catch (IOException ioException) {
                    log.error("推送数据异常", ioException);
                }
            });
            log.info("推送数据成功");
        }
    
    }
    

    实现效果如下:服务端不断推送数据到前端,前端可以也可以调用接口主动关闭连接。

    image-20240710180401231

适用场景:SSE由于是服务端单向通讯,所以适合那种需要单向持久的连接。比如:

  • ChatGPT这种实时加载会话数据
  • 文件下载,通过SSE异步下载文件
  • 服务端实时数据推送

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1920366.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

实变函数精解【3】

文章目录 点集求导集 闭集参考文献 点集 求导集 例1 E { 1 / n 1 / m : n , m ∈ N } 1. lim ⁡ n → ∞ ( 1 / n 1 / m ) 1 / m 2. lim ⁡ n , m → ∞ ( 1 / n 1 / m ) 0 3. E ′ { 0 , 1 , 1 / 2 , 1 / 3 , . . . . } E\{1/n1/m:n,m \in N\} \\1.\lim_{n \rightar…

Spark SQL 概述

Spark SQL 概述 Spark SQL 是 Apache Spark 的一个模块&#xff0c;专门用于处理结构化数据。它集成了 SQL 查询和 Spark 编程的强大功能&#xff0c;使得处理大数据变得更加高效和简便。通过 Spark SQL&#xff0c;用户可以直接在 Spark 中使用 SQL 查询&#xff0c;或者使用 …

i18n、L10n、G11N 和 T9N 的含义

注&#xff1a;机翻&#xff0c;未校对。 Looking into localization for the first time can be terrifying, if only due to all of the abbreviations. But the meaning of i18n, L10n, G11N, and T9N, are all very easy to understand. 第一次研究本地化可能会很可怕&…

Git 删除包含敏感数据的历史记录及敏感文件

环境 Windows 10 Git 2.41.0 首先备份你需要删除的文件&#xff08;如果还需要的话&#xff09;&#xff0c;因为命令会将本地也删除将项目中修改的内容撤回或直接提交到仓库中&#xff08;有修改内容无法提交&#xff09; 会提示Cannot rewrite branches: You have unstaged …

给后台写了一个优雅的自定义风格的数据日志上报页面

highlight: atelier-cave-dark 查看后台数据日志是非常常见的场景,经常看到后台的小伙伴从服务器日志复制一段json数据字符串,然后找一个JSON工具网页打开,在线JSON格式化校验。有的时候,一些业务需要展示mqtt或者socket的实时信息展示,如果不做任何修改直接展示一串字符…

音频语言学习领域数据集现状、分类及评估

Audio Language Learning (Audio-Text Learning) 是一个新兴的研究领域&#xff0c;专注于处理、理解和描述声音。它的发展动力是机器学习技术的进步以及越来越多地将声音与其相应的文本描述相结合的数据集的可用性。 Audio Language Models (ALMs) 是这个领域的关键技术&#…

零基础学python(二)

1. 字典 字典的创建 最常见的创建方式&#xff1a; d {"k1": "v1", "k2": "v2"} 再向字典中添加一对键值&#xff1a; d["k3"] "v3" 字典还可以用dict()创建&#xff0c;这种方式中&#xff0c;键是不加引…

嵌入式工程师从0开始,到底该学什么,怎么学?

作为嵌入式工程师&#xff0c;从零开始学习需要掌握以下几个关键方面。我收集归类了一份嵌入式学习包&#xff0c;对于新手而言简直不要太棒&#xff0c;里面包括了新手各个时期的学习方向编程教学、问题视频讲解、毕设800套和语言类教学&#xff0c;敲个22就可以免费获得。 基…

WPS打开PDF文件的目录

WPS打开PDF文件的目录 其实WPS中PDF文件并没有像Word那样标准的目录&#xff0c;但是倒是有书签&#xff0c;和目录一个效果 点击左上角书签选项&#xff0c;或者使用Alt Shift 1快捷键即可

java解决实例问题--拿硬币堆

题目&#x1f38a; 编程梦想家&#xff08;大学生版&#xff09;-CSDN博客 桌上有 n 堆力扣币&#xff0c;每堆的数量保存在数组 coins 中。我们每次可以选择任意一堆&#xff0c;拿走其中的一枚或者两枚&#xff0c;求拿完所有力扣币的最少次数。 ❤ 这个问题实际上是一个贪…

【简历】西安某211大学研究生:Java简历面试通过率低

注&#xff1a;为保证用户信息安全&#xff0c;姓名和学校等信息已经进行同层次变更&#xff0c;内容部分细节也进行了部分隐藏 简历说明 这个同学是211研究生的一份Java简历,这个简历版面没有问题,但是因为主项目重复度过大,所以导致这个简历的简历通过率会大大降低,面试通过…

《Windows API每日一练》9.2.1 菜单

■和菜单有关的概念 窗口的菜单栏紧挨着标题栏下面显示。这个菜单栏有时叫作程序的“主菜单”或“顶级菜单“&#xff08;top-level menu&#xff09;。顶级菜单中的菜单项通常会激活下拉菜单&#xff08;drop-downmenu&#xff09;&#xff0c;也 叫“弹出菜单”&#xff08;…

头歌资源库(25)地图着色

一、 问题描述 任何平面区域图都可以用四种颜色着色&#xff0c;使相邻区域颜色互异。这就是四色定理。要求给定区域图&#xff0c;排出全部可能的着色方案。例如&#xff0c;区域图如下图所示&#xff1a; 要求用四种颜色着色。 则输入&#xff1a; 10 4 &#xff08;分别表示…

什么是敏捷本地化

快速、敏捷的多语言产品和服务交付正逐渐成为众多行业的常态。在这种情况下&#xff0c;重点从传统的期望&#xff08;即在合理的时间框架内翻译大量内容&#xff09;转变为翻译工作量非常大的小片段&#xff0c;通常在2-3到12-24小时之间&#xff0c;通常在周末或假期。 Logr…

如何做好漏洞扫描工作提高网络安全

在数字化浪潮席卷全球的今天&#xff0c;企业数字化转型已成为提升竞争力、实现可持续发展的关键路径。然而&#xff0c;这一转型过程并非坦途&#xff0c;其中网络安全问题如同暗礁般潜伏&#xff0c;稍有不慎便可能引发数据泄露、服务中断乃至品牌信誉受损等严重后果。因此&a…

usbserver工程师手记(三)手工开通 OTP功能

1、设定密钥&#xff0c;用户自行选择一个密钥&#xff0c;以下以密钥为 EAZAYOKNGETBOPC5 为例说明 2、usb server 配置otp 密钥&#xff0c;目前还没有UI 界面开通&#xff0c;后续版本会支持从管理界面开通 curl -X POST -H Content-Type: application/json -H Accept: app…

mysql高可用解决方案:MHA原理及实现

MHA&#xff1a;Master High Availability。对主节点进行监控&#xff0c;可实现自动故障转移至其它从节点&#xff1b;通过提升某一从节点为新的主节点&#xff0c;基于主从复制实现&#xff0c;还需要客户端配合实现&#xff0c;目前MHA主要支持一主多从的架构&#xff0c;要…

应力平衡方程的推导

应力平衡方程的推导 对于一点&#xff0c;已知其应力状态有&#xff1a; σ x , τ x y , τ x z \sigma_x,\tau_{xy},\tau_{xz} σx​,τxy​,τxz​ 则其附近点的应力状态为&#xff1a; σ x ∂ σ x ∂ x d x , τ x y ∂ τ x y ∂ x d x , τ x z ∂ τ x z ∂ x d …

【JavaScript 报错】未捕获的范围错误:Uncaught RangeError

&#x1f525; 个人主页&#xff1a;空白诗 文章目录 一、错误原因分析1. 递归调用次数过多2. 数组长度超出限制3. 数值超出允许范围 二、解决方案1. 限制递归深度2. 控制数组长度3. 检查数值范围 三、实例讲解四、总结 Uncaught RangeError 是JavaScript中常见的一种错误&…

2024年06月CCF-GESP编程能力等级认证C++编程三级真题解析

本文收录于专栏《C等级认证CCF-GESP真题解析》&#xff0c;专栏总目录&#xff1a;点这里。订阅后可阅读专栏内所有文章。 一、单选题&#xff08;每题 2 分&#xff0c;共 30 分&#xff09; 第 1 题 小杨父母带他到某培训机构给他报名参加CCF组织的GESP认证考试的第1级&…