SSE(Server Sent Event)实战(3)- Spring Web Flux 实现

news2025/1/6 18:00:34

上篇博客 SSE(Server Sent Event)实战(2)- Spring MVC 实现,我们用 Spring MVC 实现了简单的消息推送,并且留下了两个问题,这篇博客,我们用 Spring Web Flux 实现,并且看看这两个问题怎么解决。

一、服务端实现

/*
 * XingPan.com
 * Copyright (C) 2021-2024 All Rights Reserved.
 */
package com.sse.demo2.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;

import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.time.Duration;
import java.util.Enumeration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author liuyuan
 * @version SseController.java, v 0.1 2024-07-15 14:24
 */
@Slf4j
@RestController
@RequestMapping("/sse")
public class SseController {

    private static final HttpClient HTTP_CLIENT = HttpClient.create().responseTimeout(Duration.ofSeconds(5));

    private final Map<String, FluxSink<String>> USER_CONNECTIONS = new ConcurrentHashMap<>();

    /**
     * 用来存储用户和本机地址,实际生成请用 redis
     */
    private final Map<String, String> USER_CLIENT = new ConcurrentHashMap<>();

    /**
     * 创建连接
     */
    @GetMapping(value = "/create-connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> createConnect(@RequestParam("userId") String userId) {

        // 获取本机地址
        String hostAddress = this.getHostAddress();

        Flux<String> businessData = Flux.create(sink -> {

            USER_CONNECTIONS.put(userId, sink);
            USER_CLIENT.put(userId, hostAddress);
            log.info("创建了用户[{}]的SSE连接", userId);

            sink.onDispose(() -> {
                USER_CONNECTIONS.remove(userId);
                USER_CLIENT.remove(userId);
                log.info("移除用户[{}]的SSE连接", userId);
            });
        });

        // 创建心跳
        Flux<String> heartbeat = Flux.interval(Duration.ofMinutes(1)).map(tick -> "data: heartbeat\n\n");

        return Flux.merge(businessData, heartbeat);
    }

    /**
     * 发送消息 gateway
     */
    @GetMapping("/send-message-gateway")
    public Mono<RpcResult<Boolean>> sendMessageGateway(@RequestParam("userId") String userId, @RequestParam("message") String message) {

        String userHostAddress = USER_CLIENT.get(userId);
        if (userHostAddress == null) {
            log.info("用户[{}]的SSE连接不存在,无法发送消息", userId);
            return Mono.just(RpcResult.error("10001", "SSE连接不存在,无法发送消息"));
        }

        // 获取本机地址和用户连接地址比较,如果相同,直接使用localhost发消息
        String hostAddress = this.getHostAddress();
        userHostAddress = userHostAddress.equals(hostAddress) ? "localhost" : userHostAddress;
        String baseUrl = "http://" + userHostAddress + ":8080";
        log.info("发送消息 > baseUrl = {}", baseUrl);

        WebClient webClient = WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(HTTP_CLIENT))
                .baseUrl(baseUrl)
                .build();

        RpcResult<Boolean> errorResult = RpcResult.error("10002", "消息发送失败");
        return webClient.get()
                .uri("/sse/send-message?userId={userId}&message={message}", userId, message)
                .exchangeToMono(clientResponse -> {
                    if (clientResponse.statusCode().is2xxSuccessful()) {
                        log.info("消息发送成功 > 用户 = {},消息内容 = {}", userId, message);
                        return Mono.just(RpcResult.success(true));
                    } else {
                        log.error("消息发送失败 > 状态码 = {},用户 = {},消息内容 = {}", clientResponse.statusCode().value(), userId, message);
                        return Mono.just(errorResult);
                    }
                }).onErrorResume(error -> {
                    log.error("消息发送失败 > 用户 = {}, 消息内容 = {}, e = ", userId, message, error);
                    return Mono.just(errorResult);
                });
    }

    /**
     * 发送消息
     */
    @GetMapping("/send-message")
    public Mono<Void> sendMessage(@RequestParam("userId") String userId, @RequestParam("message") String message) {

        FluxSink<String> sink = USER_CONNECTIONS.get(userId);
        if (sink != null) {
            try {
                sink.next(message);
                log.info("给用户[{}]发送消息成功: {}", userId, message);
            } catch (Exception e) {
                log.error("向用户[{}]发送消息失败,sink可能已关闭或无效", userId, e);
                USER_CONNECTIONS.remove(userId);
                USER_CLIENT.remove(userId);
            }
        } else {
            log.info("用户[{}]的SSE连接不存在或已关闭,无法发送消息", userId);
        }

        return Mono.empty();
    }

    private String getHostAddress() {

        String hostAddress = "localhost";
        try {
            Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
            while (networkInterfaces.hasMoreElements()) {
                NetworkInterface networkInterface = networkInterfaces.nextElement();
                Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
                while (inetAddresses.hasMoreElements()) {
                    InetAddress inetAddress = inetAddresses.nextElement();
                    if (!inetAddress.isLoopbackAddress() && !inetAddress.getHostAddress().contains(":") && inetAddress.getHostAddress().startsWith("10.")) {
                        hostAddress = inetAddress.getHostAddress();
                    }
                }
            }
        } catch (SocketException e) {
            log.error("获取主机地址失败", e);
        }

        log.info("获取主机地址 > hostAddress = {}", hostAddress);

        return hostAddress;
    }
}
  1. 如果我们服务设置了最大连接时间,比如 3 分钟,而服务端又长时间没有消息推送给客户端,导致长连接被关闭该怎么办?

在创建连接时/create-connect,增加心跳,只要心跳频率小于超时时间,基本就可以解决这个问题,但是前端要注意隐藏心跳内容。

  1. 实际生产环境,我们肯定是多个实例部署,那么怎么保证创建连接和发送消息是在同一个实例完成?如果不是一个实例,就意味着用户没有建立连接,消息肯定发送失败。

a. 将用户id 和用户请求的实例 ip 绑定,我这里用的是Map(USER_CLIENT)存储,生产请换成分布式缓存;
b. 服务端发送消息使用/send-message-gateway接口,这个接口只做消息分发,不真实发送消息。从USER_CLIENT中获取用户所在的实例,然后将请求分发到具体实例;
c. /send-message-gateway将请求打到/send-message,然后给用户推送消息;

二、客户端实现


<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>SSE Demo</title>
    <script>        document.addEventListener('DOMContentLoaded', function () {
        var userId = "1";

        // 创建一个新的EventSource对象
        var source = new EventSource('http://localhost:8080/sse/create-connect?userId=' + userId);

        // 当连接打开时触发
        source.onopen = function (event) {
            console.log('SSE连接已打开');
        };

        // 当从服务器接收到消息时触发
        source.onmessage = function (event) {
            // event.data 包含服务器发送的文本数据
            console.log('接收到消息:', event.data);
            // 在页面上显示消息
            var messagesDiv = document.getElementById('messages');
            if (messagesDiv) {
                messagesDiv.innerHTML += '<p>' + event.data + '</p>'; // 直接使用event.data
            } else {
                console.error('未找到消息容器元素');
            }
        };

        // 当发生错误时触发
        source.onerror = function (event) {
            console.error('SSE连接错误:', event);
        };
    });
    </script>
</head>
<body>
<div id="messages">
    <!-- 这里将显示接收到的消息 -->
</div>
</body>
</html>

三、启动项目

  1. 运行 Spring 项目
  2. 浏览器打开 index.html文件
  3. 调用发送消息接口
    curl http://localhost:8080/sse/send-message-gateway?userId=1&message=test0001
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1933130.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL条件查询(DQL)

在此之前先给大家看一下我的表里面的数据&#xff0c;以方便接下来的讲解 还需要大家先熟悉这里面的条件 1.语法 SELECT 字段列表 FROM 表名 WHERE 条件列表 例如 1.查询年龄等于20的员工 select * from emp where age 20; 2.查询年龄小于等于20的员工信息 select * fr…

PPOCRLabel安装及使用

一、环境准备 1、 使用anaconda创建一个Python3.7.x的环境 # 在命令行输入以下命令&#xff0c;创建名为paddle_env的环境 conda create --name paddle_env python3.7 --channel https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/输入以下命令可以查看我们所创建的所…

【作业】 贪心算法1

Tips:三题尚未完成。 #include <iostream> #include <algorithm> using namespace std; int a[110]; int main(){int n,r,sum0;cin>>n>>r;for(int i0;i<n;i){cin>>a[i];}sort(a0,an);for(int i0;i<n;i){if(i>r){a[i]a[i-r]a[i];}suma[…

借助Aspose.Note笔记工具,在Java 中更改 OneNote 中的样式

Microsoft OneNote是一款功能强大的笔记工具。学习如何使用 Java 更改样式&#xff0c;将您的 OneNote 体验提升到一个新的水平&#xff01;在本指南中&#xff0c;我们将向您展示如何使用 Java 更改 OneNote 文档中的样式以增强可读性。本指南将为您提供分步说明&#xff0c;以…

k8s集群 安装配置 Prometheus+grafana

k8s集群 安装配置 Prometheusgrafana k8s环境如下&#xff1a;机器规划&#xff1a; node-exporter组件安装和配置安装node-exporter通过node-exporter采集数据显示192.168.40.180主机cpu的使用情况显示192.168.40.180主机负载使用情况 Prometheus server安装和配置创建sa账号&…

【算法/天梯赛训练】天梯赛模拟题集

L1-009 N个数求和 #include <iostream> #include <algorithm>using namespace std;typedef long long ll; const int N 105;typedef struct node {ll x, y; }node; node a[N];ll gcd(ll a, ll b) {return b ? gcd(b, a % b) : a; }int main() {int n;cin >>…

三伏天,艾灸这2处,既能祛寒湿、还能补阳气,效果是平时的好几倍~

这周开始&#xff0c;我们就进入了最炎热的三伏天&#xff01;今年的三伏整整40天&#xff0c;高温闷热&#xff0c;汗流浃背&#xff0c;想想都崩溃~ 最热最难熬的这段时间&#xff0c;也是天赐的良机&#xff01;此时阳气达到一年中的鼎盛&#xff0c;人体阳气也最为充沛&…

金航标kinghelm萨科微slkor总经理宋仕强(Huaqiangbei Songshiqiang )为大家介绍连接器时说

金航标kinghelm萨科微slkor总经理宋仕强&#xff08;Huaqiangbei Songshiqiang &#xff09;为大家介绍连接器时说&#xff0c;连接器通常可以分为以下几大类&#xff1a;1. 矩形连接器&#xff0c;这种连接器的外形通常呈矩形&#xff0c;具有较多的接触点&#xff0c;可传输多…

vue3中引入、封装和使用svg矢量图的实现示例

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;我是码喽的自我修养&#xff01;今天给大家分享vue3中引入和封装svg矢量图的实现示例&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到带大家&#xff0c;欢迎…

[python]pycharm设置清华源

国内镜像源有以下几个&#xff0c;因为都是国内的&#xff0c;基本速度差不了太多。 清华&#xff1a;https://pypi.tuna.tsinghua.edu.cn/simple 阿里云&#xff1a;http://mirrors.aliyun.com/pypi/simple/ 中国科技大学 https://pypi.mirrors.ustc.edu.cn/simple/ 豆瓣&…

【单片机毕业设计选题24071】-基于STM32的超声波驱鸟器设计

系统功能: 系统操作说明&#xff1a; 上电后OLED显示 “欢迎使用智能驱鸟系统请稍后”&#xff0c;两秒后进入正常显示界面。 如果红外避障模块检测到物体后OLED显示“Detected”&#xff0c;蜂鸣器报警&#xff0c;继电器吸合 超声波换能器发出超声波。如果红外避障模块未…

synchronized的实现原理和锁升级 面试重点

1.synchronized的实现原理 synchronized是Java 中的一个很重要的关键字&#xff0c;主要用来加锁&#xff0c;synchronized所添加的锁有以下几个特点。synchronized的使用方法比较简单&#xff0c;主要可以用来修饰方法和代码块。根据其锁定的对象不同&#xff0c;可以用来定义…

前端基础之JavaScript学习——运算符、分支/循环语句、数组

大家好我是来自CSDN的前端寄术区博主PleaSure乐事&#xff0c;今天我们继续有关JavaScript的学习&#xff0c;使用的编译器为vscode&#xff0c;浏览器为谷歌浏览器。大家使用webstorm等编译器或其他浏览器效果效果问题不大&#xff0c;都可以使用。 目录 运算符 赋值运算符 …

微服务设计原则——高性能:锁

文章目录 1.锁的问题2.无锁2.1 串行无锁2.2 无锁数据结构 3.减少锁竞争参考文献 1.锁的问题 高性能系统中使用锁&#xff0c;往往带来的坏处要大于好处。 并发编程中&#xff0c;锁带解决了安全问题&#xff0c;同时也带来了性能问题&#xff0c;因为锁让并发处理变成了串行操…

【node-RED 4.0.2】连接操作 Oracle 数据库实现 增 删 改 查【新版,使用新插件:@hylink/node-red-oracle】

总览 上节课&#xff0c;我们说到&#xff0c;在 node-red 上链接 oracle 数据库 我们使用的插件是 node-red-contrib-agur-connector。 其实后来我发现&#xff0c;有一个插件更简便&#xff0c;并且也更好用&#xff1a;hylink/node-red-oracle &#xff01;&#xff01;&am…

001、Mac系统上Stable Diffusion WebUI环境搭建

一、目标 如标题所述&#xff0c;在苹果电脑&#xff08;Mac&#xff09;上搭建一套Stable Diffusion本地服务&#xff0c;以实现本地AI生图目的。 二、安装步骤 1、准备源码【等价于准备软件】 # 安装一系列工具库&#xff0c;包括cmake,protobuf,rust,python3.10,git,wge…

第四届网络安全、人工智能与数字经济国际学术会议(CSAIDE 2025)

#先投稿&#xff0c;先送审# 第四届网络安全、人工智能与数字经济国际学术会议&#xff08;CSAIDE 2025&#xff09; 2025 4th International Conference on Cyber Security, Artificial Intelligence and Digital Economy 重要信息 会议官网&#xff1a;www.csaide.net 会…

企业用蚓链数字化营销思维做新赛道设计

​在当今数字化时代&#xff0c;企业面临着日益激烈的竞争和不断变化的市场环境。运用数字化营销思维进行新赛道设计已成为企业寻求突破和创新的关键策略。 数字化营销思维为企业提供了更精准的市场洞察能力。通过大数据分析和用户行为追踪&#xff0c;企业能够深入了解消费者的…

运动用什么骨传导耳机好?推荐这五款运动骨传导耳机!

在运动生涯&#xff0c;我见证了自我挑战与超越的每一个瞬间&#xff0c;而这一切都离不开那如影随形的运动骨传导耳机。一款出色的运动耳机&#xff0c;其重要性不言而喻——它不仅是提升运动效率的得力助手&#xff0c;更是开启多元化运动体验的金钥匙。近年来&#xff0c;运…

集群技术,一主一从的部署和原理方式

集群概述 所谓集群&#xff0c;就是将多台服务器集中在一起&#xff0c;同时处理用户对服务器的请求 比如&#xff0c;我们现在开启的这一台mysql服务器&#xff0c;可以同时处理1000个用户的请求&#xff0c;那么我们开启两个这样的服务器&#xff0c;就可以同时处理2000 数…