Java Springboot SSE如何判断客户端能否正常接收消息

news2025/1/24 6:31:22

目录

  • 背景
  • 解决方案
    • 思路
    • 代码
    • 代码解释
  • Java反射知识点补充

背景

当新建一个 emitter 对象的时候, 它的默认超时时间是 30s.

SseEmitter emitter = new SseEmitter(); 

但是很多情况下, 默认30s的时间太短, 需要把 emitter 对象的超时时间设置成不超时, 也就是永久有效.

private static long DEFAULT_TIMEOUT = 0L;

......

SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT); 

这样也会带来一个问题, 就是永久有效的 emitter 对象如果没有调用关闭连接的接口的话 (比如用户直接关闭浏览器了) , 这个 emitter 对象就会一直存在.

解决方案

思路

sseEmitter 有下面的几个属性:

在这里插入图片描述

注意一下 sendFailed 这个属性, 我们可以利用这个属性来判断客户端能否正常接到消息.

当客户端无法接受消息时,SseEmitter对象在send一次之后sendFailed状态会变为True,这时候就可以剔除。同时在订阅时用此判断可以减少重复创建的机会

还有一个 complete 属性, 这个属性是与 sendFailed 有关的, 也就是消息发送成功的时候 complete 为 true, 失败的时候 complete 为 false. 我们可以用这个属性当做一个辅助.

请添加图片描述

拿到客户端是否能够正常接收消息这个状态以后, 我们就可以建立一个定时器,固定时间发送消息用来检测客户端是否离线.

代码

package com.example.demo.utils;

import org.springframework.http.MediaType;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class SSEUtils {
    public static Map<String, SseEmitter> subscribeMap = new ConcurrentHashMap<>();

    /***
     * 添加订阅
     * @param id 客户id
     * @return
     */
    public static SseEmitter addSubscribe(String id) {
        SseEmitter sseEmitter = subscribeMap.get(id);
        if (sseEmitter == null) {
            sseEmitter = new SseEmitter(0L); // 永久有效
            sseEmitter.onTimeout(() -> {
                subscribeMap.remove(id);
            });
            sseEmitter.onError(throwable -> {
                subscribeMap.remove(id);
            });
            SseEmitter finalSseEmitter = sseEmitter;
            sseEmitter.onCompletion(() -> {
                subscribeMap.put(id, finalSseEmitter);
            });
        }
        return sseEmitter;
    }

    /***
     * 给单个用户发消息
     * @param id
     * @param msg
     * @return
     */
    public static boolean sendSingleClientMsg(String id,Object msg) {
        SseEmitter sseEmitter = subscribeMap.get(id);
        if (sseEmitter == null) {
            return false;
        }
        try {
            sseEmitter.send(msg, MediaType.APPLICATION_JSON);
            return true;
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }
    }


    /***
     * 关闭订阅
     * @param id
     * @return
     */
    public static boolean closeSubscribe(String id) {
        SseEmitter sseEmitter = subscribeMap.get(id);
        if (sseEmitter == null) {
            return true;
        }
        try {
            sseEmitter.complete();
            subscribeMap.remove(id);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /***
     * 检测客户端连接状态
     * @param sseEmitter
     * @return true代表还连接, false代表失去连接
     */
    public static boolean checkSseConnectAlive(SseEmitter sseEmitter) {
        if (sseEmitter == null) {
            return false;
        }
        // 返回true代表还连接, 返回false代表失去连接
        return !(Boolean) getField(sseEmitter,sseEmitter.getClass(), "sendFailed") &&
                !(Boolean) getField(sseEmitter,sseEmitter.getClass(), "complete");
    }

    public static Object getField(Object obj, Class<?> clazz, String fieldName) {
        for (; clazz != Object.class; clazz = clazz.getSuperclass()) {
            try {
                Field field;
                field = clazz.getDeclaredField(fieldName);
                field.setAccessible(true);
                return field.get(obj);
            } catch (Exception e) {
            }
        }

        return null;
    }

    /***
     * 给所有客户端发消息
     * @param msg
     */
    public void sendAllClientMsg(Object msg) {
        if (subscribeMap != null && !subscribeMap.isEmpty()) {
            for (String key : subscribeMap.keySet()) {
                // 发送检测消息
                sendSingleClientMsg(key,msg);
                // 判断客户端是否能接收到消息
                boolean isAlive = checkSseConnectAlive(subscribeMap.get(key));
                if (!isAlive) {
                    // 断开连接的业务代码
                }
            }
        }
    }

    /***
     * 定时判断所有客户端状态
     */
    @Async("threadPoolTaskExecutor")
    @Scheduled(fixedDelay = 1000*60*10) // 10min
    public void checkAlive() {
        sendAllClientMsg("CHECK_ALIVE");
    }
}

使用 @Scheduled 定时器, 不要忘记在启动类上面加这两个注解:

@SpringBootApplication
@EnableAsync
@EnableScheduling
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

代码解释

重点部分是下面这段代码:

    /***
     * 检测客户端连接状态
     * @param sseEmitter
     * @return
     */
    public static boolean checkSseConnectAlive(SseEmitter sseEmitter) {
        if (sseEmitter == null) {
            return false;
        }
        // 返回true代表还连接, 返回false代表失去连接
        return !(Boolean) getField(sseEmitter,sseEmitter.getClass(), "sendFailed") &&
                !(Boolean) getField(sseEmitter,sseEmitter.getClass(), "complete");
    }

    public static Object getField(Object obj, Class<?> clazz, String fieldName) {
        for (; clazz != Object.class; clazz = clazz.getSuperclass()) {
            try {
                Field field;
                field = clazz.getDeclaredField(fieldName);
                field.setAccessible(true);
                return field.get(obj);
            } catch (Exception e) {
            }
        }

        return null;
    }

1. 循环找 SseEmitter 和它的父类中是否存在 sendFailed 这个属性, 直到找到.

这是因为 sendFailed 这个属性是私有的, 不供外部访问, 这属性还正好在父类里, 所以要循环父类.

在这里插入图片描述

在这里插入图片描述

2. 通过 getDeclaredField() 方法拿到传入的 fieldName 的属性 (也就是 "sendFailed""complete" ), 接着使用 setAccessible(true) 把这个值设置为可访问的.

3. 最后通过 field.get(obj) 拿到这个属性的值, 也就是"sendFailed""complete" 的值是 true/false

思路和代码参考: Java Springboot SSE 解决永久存活 判断客户端离线问题. 关于 SSE utils的一些工具类的方法在这个博客里面也有.

Java反射知识点补充

Java 反射是指在运行时动态地获取一个类的信息,并且可以操作它的属性、方法和构造方法等。Java 反射机制提供了一种在运行时检查、创建和操作对象的能力,这使得 Java 程序可以实现动态性和灵活性。

Java 反射机制主要包括以下三个类:

  • java.lang.Class 类:代表一个类,在运行时动态获取一个类的信息。
  • java.lang.reflect.Method 类:代表类的方法,在运行时可以使用 Method.invoke() 方法调用一个方法。
  • java.lang.reflect.Field 类:代表类的属性,在运行时可以使用 Field.get() 和 Field.set() 方法获取或设置一个属性的值。

以下是一个简单的 Java 反射示例,演示如何使用反射获取一个类的信息:

import java.lang.reflect.*;

public class MyClass {
    private String name;
    private int age;

    public MyClass(String name, int age) {
        this.name = name;
        this.age = age;
    }

    public void sayHello() {
        System.out.println("Hello, " + name + "!");
    }

    public static void main(String[] args) throws Exception {
        // 获取 MyClass 类的 Class 对象
        Class<?> myClass = MyClass.class;

        // 创建一个 MyClass 对象
        MyClass obj = new MyClass("Bob", 20);

        // 获取 MyClass 类的构造方法,并使用它创建一个新的 MyClass 对象
        Constructor<?> constructor = myClass.getConstructor(String.class, int.class);
        MyClass newObj = (MyClass) constructor.newInstance("Alice", 30);

        // 获取 MyClass 类的属性,并使用它获取 obj 对象的 name 属性值
        Field field = myClass.getDeclaredField("name");
        field.setAccessible(true);
        String name = (String) field.get(obj);

        // 获取 MyClass 类的方法,并使用它调用 obj 对象的 sayHello 方法
        Method method = myClass.getMethod("sayHello");
        method.invoke(obj);

        System.out.println(name);         // 输出:Bob
        System.out.println(newObj.name);  // 输出:Alice
    }
}

在上述示例中,我们首先获取了 MyClass 类的 Class 对象。然后,我们创建了一个 MyClass 对象,并使用 getConstructor() 方法获取了 MyClass 类的构造方法,并使用 newInstance() 方法创建了一个新的 MyClass 对象。

接着,我们使用 getDeclaredField() 方法获取了 MyClass 类的 name 属性,并使用 setAccessible() 方法设置该属性可访问性为 true,然后使用 get() 方法获取了 obj 对象中 name 属性的值。

最后,我们使用 getMethod() 方法获取了 MyClass 类的 sayHello() 方法,并使用 invoke() 方法调用了 obj 对象的 sayHello() 方法。

需要注意的是,在使用反射机制时,应该尽量避免使用硬编码的字符串来表示类名、方法名和属性名等信息,这样会使代码更加灵活和可维护。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1394967.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RHEL8 Samba服务器详细配置用户模式

任务&#xff1a; 配置server01为samba服务器&#xff0c;samba服务器的/companydata/sales为共享目录&#xff0c;共享名为sales&#xff0c;里面创建测试文件test_share.tar&#xff0c;创建用户组sales&#xff0c;创建组内用户sale1&#xff0c;要求配置用户模式访问&#…

py爬虫入门笔记(request.get的使用)

文章目录 Day11. 了解浏览器开发者工具2. Get请求http://baidu.com3. Post请求https://fanyi.baidu.com/sug4. 肯德基小作业 Day21. 正则表达式2. 使用re模块3. 爬取豆瓣电影Top250的第一页4. 爬取豆瓣电影Top250所有的250部电影信息 Day31. xpath的使用2. 认识下载照片线程池的…

【翻译】在Qt Designer中创建主窗口(Main Windows)

原文地址&#xff1a;https://doc.qt.io/qt-6/designer-creating-mainwindows.html Qt Designer 可用于为不同用途创建用户界面&#xff0c;并为每个用户界面提供不同类型的模板。主窗口模板用于创建具有菜单栏、工具栏和停靠窗口部件的应用程序窗口。 通过打开文件菜单并选择…

工程档案数字化的意义

工程档案数字化可以提高档案管理效率、节约资源成本、保护档案安全、提高档案可持续性、提升检索与利用的便捷性&#xff0c;促进信息共享与合作&#xff0c;具有重要的意义和价值。 1. 提高档案管理效率&#xff1a;数字化档案可以通过电子方式进行存储、检索和共享&#xff0…

企业网盘的价值:为什么企业需要它?

企业网盘因其主打的文件管理协作功能&#xff0c;正好符合信息时代高速发展下企业的需要&#xff0c;能够帮助企业集中管理文件数据&#xff0c;提供便捷的文件协作服务&#xff0c;一跃成为近两年企业服务类产品榜单中的一匹黑马。 企业网盘真的这么好用吗&#xff1f;企业真…

ant-desgin的table的上移、下移

文章目录 html部分函数部分 html部分 <a-table :columns"columns" :data-source"dataList" :loading"listLoading" :pagination"false"><template #bodyCell"{ column, record, index }"><template v-if&qu…

class_10:this关键字

this关键字是指向调用对象的指针 #include <iostream> #include <iostream> using namespace std;class Car{ public://成员数据string brand; //品牌int year; //年限//构造函数名与类名相同Car(string brand,int year){cout<<"构造函数中&#…

字面量(java)

字面量类型&#xff1a; 整数类型&#xff1a;不带小数的数字&#xff0c;如666、-88 小数类型&#xff1a;带小数点的数字&#xff0c;如13.14、-5.21 字符串类型&#xff1a;用双引号引起来的内容&#xff0c;如"HelloWorld"&#xff0c;""," &q…

HCIA交换技术

VLAN的作用&#xff08;只记录MAC&#xff09;&#xff1a; 路由器和交换机协同工作&#xff0c;为了解决广播域带来的问题&#xff0c;人们引入了VLAN&#xff08;virtual local area network&#xff09;&#xff0c;即虚拟局域网技术&#xff1a;通过在交换机上部署VLAN&…

【软件测试】学习笔记-精准测试

软件测试行业从最开始的手工测试到自动化测试&#xff0c;从黑盒测试到白盒测试&#xff0c;测试理念和技术都发生了日新月异的变化。现如今&#xff0c;几乎所有的软件公司都有一套强大且复杂的自动化测试用例&#xff0c;用来夜以继日地保证产品的正确性和稳定性。 然而&…

Nginx的access_log 状态码499的问题排查

前提&#xff1a;公司的项目网站&#xff0c;运行环境是lnmp环境下 一、起因 如下图&#xff0c;网站请求超过60s(如&#xff1a;导出半年的报表数据到excel)时&#xff0c;报如下错误&#xff0c;且浏览器上没有返回值 二、发展 查找nginx和php-fpm都没有报错日志。于是先把…

万字讲解新一代分布式任务调度框架Power-job

1、简介 Power-Job 的设计目标是成为企业级的分布式任务调度平台&#xff0c;整个公司统一部署调度中心 power-job-server&#xff0c;旗下所有业务线应用只需要依赖 power-job-worker 即可接入调度中心获取任务调度与分布式计算能力。 Power-job官方网址&#xff1a;http:/…

ELK 分离式日志

目录 一.ELK组件 ElasticSearch&#xff1a; Kiabana&#xff1a; Logstash&#xff1a; 可以添加的其它组件&#xff1a; ELK 的工作原理&#xff1a; 二.部署ELK 节点都设置Java环境: 每台都可以部署 Elasticsearch 软件&#xff1a; 修改elasticsearch主配置文件&…

QT-QML2048小游戏

QT-QML2048小游戏 一、演示效果二、关键程序三、下载链接 一、演示效果 二、关键程序 import QtQuick 2.2 import QtQuick.Controls 1.1 import QtQuick.Controls.Styles 1.1 import QtQuick.Dialogs 1.1 import QtQuick.Window 2.1 import "2048.js" as MyScriptAp…

使用pyechart创建折线图

import json from pyecharts.charts import Line from pyecharts import options# 首先使用文件打开数据 f_us open(Desktop/python/Project/数据可视化/美国.txt,r,encoding"UTF-8") f_rb open(Desktop/python/Project/数据可视化/日本.txt,r,encoding"UTF-8…

基于springboot+vue的蜗牛兼职网的设计与实现系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容&#xff1a;毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 项目背景…

Pypputeer自动化

Pyppeteer简介 pyppeteer 是 Python 语言的一个库&#xff0c;它是对 Puppeteer 的一个非官方端口&#xff0c;Puppeteer 是一个 Node 库&#xff0c;Puppeteer是Google基于Node.js开发的一个工具&#xff0c;它提供了一种高层次的 API 来通过 DevTools 协议控制 Chrome 或 Ch…

【XTuner 大模型单卡低成本微调实战】学习笔记

参考学习教程【XTuner 大模型单卡低成本微调实战】 理论 Finetune简介 大语言模型 微调模式 增量预训练 指令跟随微调 LoRA和QLoRA Xtuner介绍 实战 自定义微调 用 Medication QA 数据集进行微调 将数据转为 XTuner 的数据格式 目标格式&#xff1a;(.jsonL) 写提示词请C…

清晰光谱空间:全自动可调波长系统的高光谱成像优势

高光谱成像技术 高光谱成像技术是一种捕获和分析宽波长信息的技术&#xff0c;能够对材料和特征进行详细的光谱分析和识别。高光谱成像技术的实现通过高光谱相机&#xff0c;其工作原理是使用多个光学传感器或光学滤波器分离不同波长的光&#xff0c;并捕获每个波段的图像&…

CSS笔记II

CSS第二天笔记 复合选择器后代选择器子选择器并集选择器交集选择器伪类选择器 三大特性继承性层叠性优先级优先级-叠加计算规则 Emmet写法 背景属性背景图平铺方式位置缩放固定复合属性 显示模式转换显示模式 复合选择器 定义&#xff1a;由两个或多个基础选择器&#xff0c;通…