SpringBoot SSE服务端主动推送事件详解

news2024/10/1 17:40:23

一、SSE概述

1、SSE简介

SSE(Server Sent Event),直译为服务器发送事件,也就是服务器主动发送事件,客户端可以获取到服务器发送的事件。

我们常见的 http 交互方式是客户端发起请求,服务端响应,然后一次请求完毕。但是在SSE的使用场景下,客户端发起请求,然后建立SEE连接一直保持,服务端就可以返回数据给客户端。

SSE简单来说就是服务器主动向前端推送数据的一种技术,它是单向的。SSE适用于消息推送,监控等只需要服务器推送数据的场景中。比如:文件下载时,后端可以推送下载进度条信息。

2、特点

SSE (Server Send Event)服务端主动推送:

  • html5新标准,用来从服务端实时推送数据到浏览器端,
  • 直接建立在当前http连接上,本质上是保持一个http长连接,轻量协议

简单的服务器数据推送的场景,使用服务器推送事件还是很方便的。

3、SSE和WebScoket的区别

SSE 是单通道,只能服务端向客户端发消息;而WebScoket 是双通道。

SSEWebScoket
http 协议独立的 websocket 协议
轻量,使用简单相对复杂
默认支持断线重连需要自己实现断线重连
文本传输二进制传输
支持自定义发送的消息类型-

4、SSE规范

在 html5 的定义中,服务端SSE,一般需要遵循以下要求:

1)请求头
开启长连接 + 流方式传递:

Content-Type: text/event-stream;charset=UTF-8
Cache-Control: no-cache
Connection: keep-alive

2)数据格式
服务端发送的消息,由 message 组成,其格式如下:

field:value

其中 field 有五种可能:

  • 空:即以:开头,表示注释,可以理解为服务端向客户端发送的心跳,确保连接不中断
  • data:数据
  • event:事件,默认值
  • id:数据标识符用 id 字段表示,相当于每一条数据的编号
  • retry:重连时间

二、SSE实战

使用 SpringBoot简单实现一个SSE服务端主动推送数据为前端,前端页面接受后展示进度条。

1、SseEmitter类简介

SpringBoot 利用 SseEmitter 来支持SSE,并对SSE规范做了一些封装,使用起来非常简单。我们操作SseEmitter对象,关注消息文本即可。

SseEmitter类的几个方法:

  • send():发送数据,如果传入的是一个非SseEventBuilder对象,那么传递参数会被封装到 data 中。
  • complete():表示执行完毕,会断开连接。
  • onTimeout():连接超时时回调触发。
  • onCompletion():结束之后的回调触发。
  • onError():报错时的回调触发。

2、示例实战

创建 SpringBoot项目,引入依赖:

        <!-- 里面包含了 spring-webmvc-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.7.12</version>
        </dependency>

2.1 创建 SseServer

我们创建一个 SseServer来简单封装一下业务操作SSE的方法。

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

/**
 * SseServer业务封装类来操作SEE
 */
@Slf4j
public class SseServer {

    /**
     * 当前连接总数
     */
    private static AtomicInteger currentConnectTotal = new AtomicInteger(0);

    /**
     * messageId的 SseEmitter对象映射集
     */
    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    /**
     * 创建sse连接
     *
     * @param messageId - 消息id(唯一)
     * @return
     */
    public static SseEmitter createConnect(String messageId) {
        /**
         * 设置连接超时时间。0表示不过期,默认是30秒,超过时间未完成会抛出异常
         */
        SseEmitter sseEmitter = new SseEmitter(0L);
        /*
        // 超时时间设置为3s,设置前端的重试时间为1s。重连时,注意总数的统计
        SseEmitter sseEmitter = new SseEmitter(3_000L);
        try {
            sseEmitter.send(
                    SseEmitter.event()
                    .reconnectTime(1000L)
                    //.data("前端重连成功") // 重连成功的提示信息
            );
        } catch (IOException e) {
            log.error("前端重连异常 ==> messageId={}, 异常信息:", messageId, e.getMessage());
            e.printStackTrace();
        }*/


        // 注册回调
        sseEmitter.onCompletion(completionCallBack(messageId));
        sseEmitter.onTimeout(timeOutCallBack(messageId));
        sseEmitter.onError(errorCallBack(messageId));
        sseEmitterMap.put(messageId, sseEmitter);

        //记录一下连接总数。数量+1
        int count = currentConnectTotal.incrementAndGet();
        log.info("创建sse连接成功 ==> 当前连接总数={}, messageId={}", count, messageId);
        return sseEmitter;
    }

    /**
     * 给指定 messageId发消息
     *
     * @param messageId - 消息id(唯一)
     * @param message   - 消息文本
     */
    public static void sendMessage(String messageId, String message) {
        if (sseEmitterMap.containsKey(messageId)) {
            try {
                sseEmitterMap.get(messageId).send(message);
            } catch (IOException e) {
                log.error("发送消息异常 ==> messageId={}, 异常信息:", messageId, e.getMessage());
                e.printStackTrace();
            }
        } else {
            throw new RuntimeException("连接不存在或者超时, messageId=" + messageId);
        }
    }

    /**
     * 给所有 messageId广播发送消息
     *
     * @param message
     */
    public static void batchAllSendMessage(String message) {
        sseEmitterMap.forEach((messageId, sseEmitter) -> {
            try {
                sseEmitter.send(message, MediaType.APPLICATION_JSON);
            } catch (IOException e) {
                log.error("广播发送消息异常 ==> messageId={}, 异常信息:", messageId, e.getMessage());
                removeMessageId(messageId);
            }
        });
    }

    /**
     * 给指定 messageId集合群发消息
     *
     * @param messageIds
     * @param message
     */
    public static void batchSendMessage(List<String> messageIds, String message) {
        if (CollectionUtils.isEmpty(messageIds)) {
            return;
        }
        // 去重
        messageIds = messageIds.stream().distinct().collect(Collectors.toList());
        messageIds.forEach(userId -> sendMessage(userId, message));
    }


    /**
     * 给指定组群发消息(即组播,我们让 messageId满足我们的组命名确定即可)
     *
     * @param groupId
     * @param message
     */
    public static void groupSendMessage(String groupId, String message) {
        if (MapUtils.isEmpty(sseEmitterMap)) {
            return;
        }
        sseEmitterMap.forEach((messageId, sseEmitter) -> {
            try {
                // 这里 groupId作为前缀
                if (messageId.startsWith(groupId)) {
                    sseEmitter.send(message, MediaType.APPLICATION_JSON);
                }
            } catch (IOException e) {
                log.error("组播发送消息异常 ==> groupId={}, 异常信息:", groupId, e.getMessage());
                removeMessageId(messageId);
            }
        });
    }

    /**
     * 移除 MessageId
     *
     * @param messageId
     */
    public static void removeMessageId(String messageId) {
        sseEmitterMap.remove(messageId);
        //数量-1
        currentConnectTotal.getAndDecrement();
        log.info("remove messageId={}", messageId);
    }

    /**
     * 获取所有的 MessageId集合
     *
     * @return
     */
    public static List<String> getMessageIds() {
        return new ArrayList<>(sseEmitterMap.keySet());
    }

    /**
     * 获取当前连接总数
     *
     * @return
     */
    public static int getConnectTotal() {
        return currentConnectTotal.intValue();
    }

    /**
     * 断开SSE连接时的回调
     *
     * @param messageId
     * @return
     */
    private static Runnable completionCallBack(String messageId) {
        return () -> {
            log.info("结束连接 ==> messageId={}", messageId);
            removeMessageId(messageId);
        };
    }

    /**
     * 连接超时时回调触发
     *
     * @param messageId
     * @return
     */
    private static Runnable timeOutCallBack(String messageId) {
        return () -> {
            log.info("连接超时 ==> messageId={}", messageId);
            removeMessageId(messageId);
        };
    }

    /**
     * 连接报错时回调触发。
     *
     * @param messageId
     * @return
     */
    private static Consumer<Throwable> errorCallBack(String messageId) {
        return throwable -> {
            log.error("连接异常 ==> messageId={}", messageId);
            removeMessageId(messageId);
        };
    }
}

2.2 业务controller

@RestController
@CrossOrigin
@RequestMapping("/sse")
public class SseDemoController {

    /**
     * 用户SSE连接
     * 它返回一个SseEmitter实例,这时候连接就已经创建了.
     *
     * @return
     */
    @GetMapping("/userConnect")
    public SseEmitter connect() {
        /**
         * 一般取登录用户账号作为 messageId。分组的话需要约定 messageId的格式。
         * 这里模拟创建一个用户连接
         */
        String userId = "userId-" + RandomUtils.nextInt(1, 10);
        return SseServer.createConnect(userId);
    }

    /**
     * 模拟实例:下载进度条显示。 前端访问下载接口之前,先建立用户SSE连接,然后访问下载接口,服务端推送消息。
     * http://localhost:8080/sse/downLoad/userId-1
     *
     * @throws InterruptedException
     */
    @GetMapping("/downLoad/{userId}")
    public void pushOne(@PathVariable("userId") String userId) throws InterruptedException {
        for (int i = 0; i <= 100; i++) {
            if (i > 50 && i < 70) {
                Thread.sleep(500L);
            } else {
                Thread.sleep(100L);
            }
            System.out.println("sendMessage --> 消息=" + i);
            SseServer.sendMessage(userId, String.valueOf(i));
        }
        System.out.println("下载成功");
    }


    /**
     * 广播发送。http://localhost:8080/sse/pushAllUser
     *
     * @throws InterruptedException
     */
    @GetMapping("/pushAllUser")
    public void pushAllUser() throws InterruptedException {
        for (int i = 0; i <= 100; i++) {
            if (i > 50 && i < 70) {
                Thread.sleep(500L);
            } else {
                Thread.sleep(100L);
            }
            System.out.println("batchAllSendMessage --> 消息=" + i);
            SseServer.batchAllSendMessage(String.valueOf(i));
        }
    }

}

2.3 前端html

简单写一个html来演示效果。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Home</title>
    <script>
        var sseSource = new EventSource("http://localhost:8080/sse/userConnect");
        <!-- 添加一个信息回调 -->
        sseSource.onmessage = function(event){
            console.log("test=>",event)
            document.getElementById("result").innerText = event.data+'%';
            document.getElementById("my-progress").value = event.data;
        }
        // 使用vue交互事件,可以添加一些SSE的回调
        // sseSource.dispatchEvent();
        // sseSource.close();
    </script>
</head>
<body>
    <div id="result"></div>
        下载进度条:<progress style="width: 300px" id="my-progress" value="0" max="100"></progress>
    </body>
</html>

3、演示效果

在这里插入图片描述

参考文章:

  • HTML- server-sent-events:https://html.spec.whatwg.org/multipage/server-sent-events.html

– 求知若饥,虚心若愚。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/585152.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Centos 7安装python 3.9.10

概述 Python是一种高级编程语言&#xff0c;它具有简单易学、可读性强、代码简洁等特点。Python由Guido van Rossum于1991年创造&#xff0c;最初被用作一种教学语言&#xff0c;但现在已经成为一种通用的编程语言。 Python支持多种编程范式&#xff0c;包括面向对象编程、函数…

如何运用R语言进行Meta分析在【文献计量分析、贝叶斯、机器学习等】多技术的融合

Meta分析是针对某一科研问题&#xff0c;根据明确的搜索策略、选择筛选文献标准、采用严格的评价方法&#xff0c;对来源不同的研究成果进行收集、合并及定量统计分析的方法&#xff0c;最早出现于“循证医学”&#xff0c;现已广泛应用于农林生态&#xff0c;资源环境等方面。…

【MySQL】MySql的底层数据结构

文章目录 前言索引结构及查找算法不适合做MySql的数据结构及其原因 一、BTree和BTree的引出1.1 BTree数据结构2.2 BTree数据结构 二、计算m阶&#xff0c;即BTree该取多少合适总结 前言 索引结构及查找算法 一个sql语句在mysql里究竟是如何运行的呢&#xff1f;又是怎么去查找…

如何在Linux系统中使用SCP命令传输文件和文件夹?

在Linux系统中&#xff0c;SCP&#xff08;Secure Copy&#xff09;是一种用于在本地和远程主机之间安全传输文件和文件夹的命令行工具。它基于SSH协议&#xff0c;并提供了加密和身份验证机制&#xff0c;确保数据的安全性和完整性。 本文将详细介绍如何使用SCP命令在Linux系统…

如何通过pytest进行更改自动化测试用例的执行顺序?

前言 在自动化测试中&#xff0c;自动化测试用例设计原则就是执行过程时不能存在依赖顺序&#xff0c;那么如果测试用例需要按照指定顺序执行&#xff0c;这个时候应该怎么做呢&#xff1f;目前单元测试框架中unittest没有办法改变测试用例的执行顺序&#xff0c;但是另一个单…

北京发布Web3.0白皮书!币圈扬言:国际金融格局即将重塑!

如今&#xff0c;虚拟资产已成为香港数字经济与金融创新的“桥头堡”。随着加密新政生效在即&#xff0c;市场暗流涌动&#xff0c;头部交易所争相布局&#xff0c;香港或将迎来新一轮的加密竞争。 多家交易所进军香港 5月28日&#xff0c;欧易&#xff08;OKX&#xff09;完成…

浅谈兼容性测试点和注意项

一&#xff1a;兼容性测试的概念&#xff1a;就是验证开发出来的程序在特定的运行环境中与特定的软件、硬件或数据相组合是否能正常运行、有无异常的测试过程。 二&#xff1a;兼容性测试的分类&#xff1a; &#xff08;1&#xff09;浏览器兼容性测试 指的是在浏览器上检…

one-stage目标检测方法

YOLO系列算法 从区域推荐到端到端 RCNN系列的方法和核心思想在于&#xff1a;先找出可能存在物体的区域&#xff0c;再确认物体的存在 这种思路归根溯源&#xff0c;来自传统的目标检测算法。 为了解决传统方法中的滑动窗口的方式&#xff0c;来找出可能存在目标的区域&…

自学软件测试到什么程度才可以去找工作...拿到阿里offer后才知道有这些就够了!!

如果是纯自学&#xff0c;建议先自学软件测试基础、功能测试等&#xff0c;然后找一个相关的工作&#xff0c;在工作中&#xff0c;边增长项目经验&#xff0c;边自学更难一点的自动化编程、性能测试等。自动化测试、性能测试如果没有老师带&#xff0c;自学的难度比较大&#…

uniapp 使用自定义icon图标

1.下载图标文件 阿里图标库位置&#xff1a;iconfont-阿里巴巴矢量图标库 eg: 搜索 “书签” 图标&#xff0c;点击加入购物车&#xff0c;再进入购物车&#xff0c;填写加入项目&#xff0c;也可以直接下载&#xff0c;点击编辑 编辑可以设置图片颜色&#xff0c;或像素大小…

immutable深拷贝:数据多层属性-不可变数据结构

一、为何要用immutable深拷贝&#xff1f; 1.浅拷贝&#xff08;浅复制&#xff09; //引用赋值-浅复制、浅拷贝 var obj{name:"溜溜球"}var obj2obj;obj2.name"刘刘球";console.log(obj);//name:"刘刘球"console.log(obj2);//name:"刘刘…

[C++][opencv]opencv填充透明色到不规则polygon区域

大家用yolov5-seg分割都知道官方演示分割结果会把分割区域半透明填充到原图里面&#xff0c;那么C如何实现呢。今天特地研究了下。由于分割点是变动的&#xff0c;所以我们需要用变量控制分割点数。 参考文章写的很不错&#xff0c;但是有个毛病&#xff0c;他这个是5点必须是…

MySQL---JDBC基础操作、SQL注入

1. JDBC JDBC&#xff08;Java DataBase Connectivity,java数据库连接&#xff09;是一种用于执行SQL语句的Java API。 JDBC是Java访问数据库的标准规范&#xff0c;可以为不同的关系型数据库提供统一访问&#xff0c;它由一组用Java 语言编写的接口和类组成。 JDBC需要连接…

unreal 5.1 增强输入实现

在ue5.1版本增加了增强输入&#xff0c;并且废弃了之前的轴映射和操作映射。 官方文档地址&#xff1a;https://docs.unrealengine.com/5.1/zh-CN/enhanced-input-in-unreal-engine/ 输入动作&#xff08;Input Actions&#xff09; 更改后的区别我体验下来&#xff0c;它将…

探索 PlanetIX:解读区块链游戏运营的奥秘

作者: danielfootprint.network 熊市之中&#xff0c;PlanetIX 成长为最强的 Web3 游戏&#xff0c;在 Polygon 网络上独占鳌头。而其开发团队深度使用了 Footprint Analtics 的零代码数据分析平台和-GameFi 的数据 API 来提升用户的游戏体验。 近日&#xff0c;Footprint 与…

Visual Studio添加native tools command prompt

学习UEFI开发&#xff0c;环境设置种需要用到native tools command prompt&#xff0c;但是看了一下VS2017的Tools菜单下没有这个选项。网上查询&#xff0c;解决了问题&#xff1a; Tools > External Tools > Add Title:VS Command PromptCommand:C:\Windows\System32\…

操作系统原理 —— 内存管理的概念(十八)

为什么要有内存管理 为什么要对内存进行管理&#xff0c;需要解决什么问题&#xff1f; 要回答这个问题&#xff0c;首先我们需要明白&#xff1a;进程运行时&#xff0c;需放在内存才能运行。比如在执行一个程序时&#xff0c;需将该程序的相关数据与指令装入内存才能运行。…

家居购项目 (上)

文章目录 &#x1f400;Java后端经典三层架构&#x1f407;MVC模型&#x1f407;开发环境搭建&#x1f407;会员注册&#x1f333;前端验证用户注册信息&#x1f333;思路分析&#x1f349;创建表&#x1f349;创建实体类&#x1f349;DAO&#x1f34c;MemberDAOImpl &#x1f…

ISO21434 组织网络安全管理

目录 一、概述 二、目标 三、输入 3.1 先决条件 3.2 进一步支持信息 四、要求和建议 4.1 网络安全治理 4.2 网络安全文化 4.3 信息共享 4.4 管理系统 4.5 工具管理 4.6 信息安全管理 4.7 组织网络安全审计 五、输出 一、概述 为了实现网络安全工程&#xff0c;该…

Rotary Position Embedding (RoPE, 旋转式位置编码) | 原理讲解+torch代码实现

&#x1f525; RoPE为苏剑林大佬之作&#xff0c;最早应用于他自研的RoFormer (Rotary Transformer)&#xff0c;属于相对位置编码。效果优于绝对位置编码和经典式相对位置编码。出自论文&#xff1a;《RoFormer: Enhanced Transformer with Rotary Position Embedding》 &…