持久化SSE对象

news2025/1/18 6:50:29

SpringBoot整合SSE,实现后端主动推送DEMO

前些日子写了整合SSE得demo。但是SSE对象是存储在ConcurrentHashMap<String, SseEmitter>中。在正式环境明显就不行了,服务重启一下的话都没有了。

那么要持久化,第一选择放redis

1、写了一个redis操作组件

SseEmitterStore

/**
 * 不考虑redis 连接异常问题
 * @author cmy
 * @date 2024/8/21 10:55
 */
@Component
public class SseEmitterStore {

    private ConcurrentHashMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();
    @Resource
    private RedisTemplate<String, Object> redisTemplate;


    public void addEmitter(String key, SseEmitter emitter) {
        emitters.put(key, emitter);
        redisTemplate.opsForHash().put("sse-emitters", key, emitter);
    }

    public void removeEmitter(String key) {
        emitters.remove(key);
        redisTemplate.opsForHash().delete("sse-emitters", key);
    }

    @PostConstruct
    private void init() {
        Map<Object, Object> temp = redisTemplate.opsForHash().entries("sse-emitters");
        temp.forEach((key, value) -> {
            if (value instanceof SseEmitter) {
                emitters.put(key.toString(), (SseEmitter) value);
            }
        });
    }

    public ConcurrentHashMap<String, SseEmitter> getEmitters() {
        return emitters;
    }
}

Controller修改

public class SseController {


    @Resource
    SseEmitterStore sseEmitterStore;

    @GetMapping("/subscribe/{id}")
    @CrossOrigin(origins = "*")
    public SseEmitter subscribe(@PathVariable String id) {
        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
        sseEmitterStore.addEmitter(id,emitter);
        emitter.onCompletion(() -> sseEmitterStore.removeEmitter(id));
        emitter.onError(e -> sseEmitterStore.removeEmitter(id));
        return emitter;
    }

    @GetMapping("/unbind/{id}")
    @CrossOrigin(origins = "*")
    public ServerResponse deleteItem(@PathVariable String id) {
        this.sseEmitterStore.removeEmitter(id);
        return ServerResponse.success(true);
    }
}

异步发送消息service

    @Async
    public void broadcastMessage(String message) {
        List<String> keysToDelete = new ArrayList<>();

        this.sseEmitterStore.getEmitters().forEach((k, v) -> {
            try {
                v.send(message);
            } catch (Throwable t) {
                keysToDelete.add(k);
            }
        });
        keysToDelete.forEach(this.sseEmitterStore::removeEmitter);
    }

2、无法序列化的问题

跑起来之后,结果报错

DefaultSerializer requires a Serializable payload but received an object of type [org.springframework.web.servlet.mvc.method.annotation.SseEmitter]

错误信息已经很明显了

因为 SseEmitter 并不是一个实现了 Serializable 接口的类,因此不能被默认的序列化器正确处理。

问了AI

        

3、解决无法序列化问题

3.1自定义redis自定义序列化器

public class CustomJackson2JsonRedisSerializer<T> implements RedisSerializer<T> {

    private static final long serialVersionUID = -7649863253433761554L;
    private final ObjectMapper objectMapper;

    public CustomJackson2JsonRedisSerializer() {
        this.objectMapper = new ObjectMapper();
        this.objectMapper.registerModule(new JavaTimeModule());
        this.objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
    }

    @Override
    public byte[] serialize(T t) throws SerializationException {
        if (t == null) {
            return new byte[0];
        }
        try {
            return objectMapper.writeValueAsBytes(t);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Could not write JSON: " + e.getMessage(), e);
        }
    }

    @Override
    public T deserialize(byte[] bytes) throws SerializationException {
        if (bytes == null || bytes.length == 0) {
            return null;
        }
        try {
            return (T) objectMapper.readValue(bytes, SseEmitter.class);
        } catch (IOException e) {
            throw new SerializationException("Could not read JSON: " + e.getMessage(), e);
        }
    }
}

3.2redis配置,使序列化器生效

@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);

        StringRedisSerializer stringSerializer = new StringRedisSerializer();
        CustomJackson2JsonRedisSerializer<Object> jacksonSerializer = new CustomJackson2JsonRedisSerializer<>();

        // 根据实际情况,自行修改
        template.setKeySerializer(stringSerializer);
        template.setValueSerializer(jacksonSerializer);
        template.setHashKeySerializer(stringSerializer);
        template.setHashValueSerializer(jacksonSerializer);

        template.afterPropertiesSet();
        return template;
    }
}

再次启动服务,即生效。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2067540.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

When Do We Not Need Larger Vision Models?

总结 传统观点挑战&#xff1a;传统上&#xff0c;扩大视觉模型的大小一直被认为是提升视觉表示能力和下游任务性能的关键途径。然而&#xff0c;本文重新审视了这一观点&#xff0c;提出了通过在不同图像尺度上运行较小的预训练视觉模型&#xff08;如ViT-B或ViT-L&#xff0…

Linux入门——11 线程

线程的概念&#xff0c;线程的控制&#xff0c;线程的同步和互斥&#xff0c;队列结构&#xff0c;线程池&#xff0c;锁 1.预备知识 1.1可重入函数 1.1.1链表的头插 main函数调用insert函数向一个链表head中插入节点node1,插入操作分为两步,刚做完第一步的时候,因为硬件中断…

续——网络通信编程

一、网络通信 1、编程 &#xff08;1&#xff09;基于UDP c/s通信模型 -------server——服务端——被动角色------- socket 全双工的&#xff08;可读可写&#xff09;。同上篇。 bind int bind(int sockfd , struct sockaddr *my_addr&#xff08;所绑定的地址信息&…

Linux的进程详解(进程创建函数fork和vfork的区别,资源回收函数wait,进程的状态(孤儿进程,僵尸进程),加载进程函数popen)

目录 什么是进程 Linux下操作进程的相关命令 进程的状态&#xff08;生老病死&#xff09; 创建进程系统api介绍&#xff1a; fork() 父进程和子进程的区别 vfork() 进程的状态补充&#xff1a; 孤儿进程 僵尸进程 回收进程资源api介绍&#xff1a; wait() waitpid…

编译运行 llama.cpp (vulkan, Intel GPU SYCL)

llama.cpp 是一个运行 AI (神经网络) 语言大模型的推理程序, 支持多种 后端 (backend), 也就是不同的具体的运行方式, 比如 CPU 运行, GPU 运行等. 但是编译运行 llama.cpp 并不是那么容易的, 特别是对于 SYCL 后端 (用于 Intel GPU), 坑那是一大堆. 只有特定版本的 llama.cpp…

【代码随想录训练营第42期 Day38打卡 - 动态规划Part6 - LeetCode 322. 零钱兑换 279.完全平方数 139.单词拆分

目录 一、做题心得 二、题目与题解 题目一&#xff1a;322. 零钱兑换 题目链接 题解&#xff1a;动态规划--完全背包 题目二&#xff1a; 279.完全平方数 题目链接 题解&#xff1a;动态规划--完全背包 题目三&#xff1a;139.单词拆分 题目链接 题解&#xff1a;动…

blender骨骼绑定(让物体动起来)

园哥摸索了两天了&#xff0c;骨骼做好就是不能带动物体&#xff0c;点击时候要选中那个骨骼点圆圈&#xff0c;点中间骨骼没用。终于动起来了。虽然有点奇怪。 点击图二那个点&#xff0c;貌似我的骨骼生长反了。做游戏是真麻烦。本来想搞个简单的2d游戏&#xff0c;结果那个瓦…

一起学Java(4)-[起步篇]教你掌握本协作项目中的Gralde相关配置文件(上)

将思绪拉回java-all-in-one项目&#xff0c;如果你fork并下载了代码&#xff0c;你会看到在项目中除了HelloWorldMain代码外&#xff0c;还存在很多文件。如果你并不了解他们的作用并有足够的好奇心&#xff0c;那你应该想要知道他们的作用。带着好奇&#xff0c;今天我也来研究…

网络抓包测试

利用fgets遇到\n停止的特性&#xff0c;给流数据直接加间隔&#xff0c;fgets读的时候会把soket缓冲区里面的数据全部放到fgets的缓冲区内&#xff0c;再读的时候就不能从套接字fd描述符读而是从fgets的fq里面读 行为1. 读取行为&#xff1a;•fgets 读取字符直到遇到换行符 \n…

下载ncurses操作步骤

https://invisible-island.net/ncurses/announce.htmlncurses-6.5.官网下载链接 选择下载版本

信刻离线文件单向导入系统

信刻针对不同数据单向导入的需求&#xff0c;按需推出的离线文件单向导入系统采用软硬件相结合的技术&#xff0c;支持信息导入申请、身份认证、光盘读取、病毒查杀、光盘复刻、光盘数据信息导入、审查审批、用户管理、日志管理、三权管理、数据加密、数据检查、校验、安全审计…

pd虚拟机 Parallels Desktop 19 for Mac安装教程【支持Intel和M芯片】

pd虚拟机 Parallels Desktop 19 for Mac安装教程【支持Intel和M芯片】 一、准备工作 二、开始安装 安装包内有三个软件 Parallels Desktop是一款广受好评的Mac虚拟机软件&#xff0c;本文来讲述一下Parallels Desktop是如何下载、安装、激活与换机的。 Parallels Desktop 下…

外排序之文件归并排序实现

外排序介绍 外排序是指能够处理极大量数据的排序算法。通常来说&#xff0c;外排序处理的数据不能一次装入内存&#xff0c;只能放在读写较慢的外存储器(通常是硬盘)上。外排序通常采用的是⼀种“排序-归并”的策略。在排序阶段&#xff0c;先读入能放在内存中的数据量&#x…

【Kafka源码走读】消息生产者与服务端的连接过程

说明&#xff1a;以下描述的源码都是基于最新版&#xff0c;老版本可能会有所不同。 一. 查找源码入口 kafka-console-producer.sh是消息生产者的脚本&#xff0c;我们从这里入手&#xff0c;可以看到源码的入口&#xff1a; if [ "x$KAFKA_HEAP_OPTS" "x&qu…

Vue处理表格长字段显示问题

背景 有些单元个中会有比较长的内容&#xff0c;如果使用默认格式&#xff0c;会导致单元格的高度比较怪异&#xff0c;需要将超长的内容进行省略。 当前效果&#xff1a; 优化效果&#xff1a; 优化代码&#xff1a; 在内容多的单元格增加下面代码 <el-table-columnprop…

SAP成本核算-事前控制(标准成本核算)

一、BOM清单 1、BOM清单抬头 BOM用途:决定成本核算控制的依据 物料清单状态:决定成本核算控制的依据 基本数量:用于计算标准的用量 有效期:决定生产工单开单的日期范围,以及成本核算的日期范围 物料清单状态默认值后台配置:事务代码OS21 2、BOM清单行项目 有效期:决…

Java框架Shiro、漏洞工具利用、复现以及流量特征分析

Shiro流量分析 前言 博客主页&#xff1a; 靶场&#xff1a;Vulfocus 漏洞威胁分析平台 Shiro&#xff08;Apache Shiro&#xff09;是一个强大且灵活的开源安全框架&#xff0c;专为Java应用程序提供安全性解决方案。它由Apache基金会开发和维护&#xff0c;广泛应用于企业级…

Anolis os系统进入单用户模式重置密码

Anolis os系统进入单用户模式重置密码 1、重启计算机。 2、在启动时&#xff0c;当GRUB菜单出现时&#xff0c;按下任意键停止自动倒计时。 3、选择要启动的内核版本&#xff0c;然后按e键编辑启动参数。 4、找到以linux或linux16开头的行&#xff0c;通常这行包含了启动内核…

keepalived与lvs

1 lvs Linux服务器集群系统(一) -- LVS项目介绍 LVS&#xff08;Linux Virtual Server&#xff09;即Linux虚拟服务器,是一个基于Linux操作系统的虚拟服务器技术&#xff0c;用于实现负载均衡和高可用性。章文嵩&#xff0c;是中国国内最早出现的自由软件项目之一。 2 lvs发展…

Circuitjs 快捷键完全列表

对于常见组件, 反复通过菜单去选择也是比较繁琐的, 系统考虑到这一点, 也为那些常用组件添加了快捷键. 通过 菜单--选项--快捷键... 可以查看所有快捷键, 分配新的快捷键或调整现有的快捷键. 点开菜单时, 位于菜单右侧的那些字母即是对应的快捷键, 如下图所示: 注: 旧版本有, …