(二十)springboot实战——springboot使用redis的订阅发布机制结合SSE实现站内信的功能

news2024/12/27 0:24:37

前言

在前面的章节内容中,我们介绍了如何使用springboot项目实现基于redis订阅发布机制实现消息的收发,同时也介绍了基于SSE机制的单通道消息推送案例,本节内容结合redis和sse实现一个常用的实战案例——站内信。实现系统消息的实时推送。

正文

①引入项目的pom依赖,并在application.yml中配置redis连接

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
	<groupId>org.projectlombok</groupId>
	<artifactId>lombok</artifactId>
</dependency>

 ②创建一个SSE服务器,用于连接用户和收发消息

package com.yundi.atp.server;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
public class SseServer {
    /**
     * 存储用户的连接
     */
    public static Map<String, SseEmitterUTF8> sseMap = new HashMap<>();

    /**
     * 建立连接
     *
     * @param username
     * @throws IOException
     */
    public static SseEmitterUTF8 connect(String username) throws IOException {
        if (!sseMap.containsKey(username)) {
            //设置超时时间(和token有效期一致,超时后不再推送消息),0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
            SseEmitterUTF8 sseEmitter = new SseEmitterUTF8(0L);
            sseEmitter.send(String.format("%s号用户,连接成功!", username));
            sseEmitter.onCompletion(() -> sseMap.remove(username));
            sseEmitter.onTimeout(() -> sseMap.remove(username));
            sseEmitter.onError(throwable -> sseMap.remove(username));
            sseMap.put(username, sseEmitter);
            return sseEmitter;
        } else {
            SseEmitterUTF8 sseEmitterUTF8 = sseMap.get(username);
            sseEmitterUTF8.send(String.format("%s,用户连接成功!", username));
            return sseEmitterUTF8;
        }
    }

    /**
     * 发送消息
     *
     * @param message
     */
    public static synchronized void sendMessage(String message) {
        List<String> removeList = new ArrayList<>();
        for (Map.Entry<String, SseEmitterUTF8> entry : sseMap.entrySet()) {
            String username = entry.getKey();
            try {
                SseEmitterUTF8 sseEmitterUTF8 = entry.getValue();
                sseEmitterUTF8.onCompletion(() -> sseMap.remove(username));
                sseEmitterUTF8.onTimeout(() -> sseMap.remove(username));
                sseEmitterUTF8.onError(throwable -> sseMap.remove(username));
                sseEmitterUTF8.send(message);
            } catch (IOException e) {
                //发送不成功,将该用户加入移除列表
                removeList.add(username);
            }
        }
        //移除连接异常的用户
        removeList.forEach(item -> sseMap.remove(item));
    }
}

 ③创建一个redis消息的监听器,将监听到的消息通过sse服务推送给连接的用户

package com.yundi.atp.listen;

import com.yundi.atp.constant.ChannelConstant;
import com.yundi.atp.server.SseServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;


@Slf4j
@Component
public class RedisMessageSubscriber implements MessageListener {
    @Autowired
    private RedisMessageListenerContainer redisMessageListenerContainer;

    /**
     * 订阅消息:将订阅者添加到指定的频道
     */
    @PostConstruct
    public void subscribeToChannel() {
        //广播消息
        redisMessageListenerContainer.addMessageListener(this, new ChannelTopic(ChannelConstant.CHANNEL_GLOBAL_NAME));
    }

    @Override
    public void onMessage(Message message, byte[] bytes) {
        String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
        String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("Received message: " + messageBody + " from channel: " + channel);
        SseServer.sendMessage(messageBody);
    }
}

 ④创建SseEmitterUTF8并继承SseEmitter,重写extendResponse方法,解决中文消息发送乱码问题

package com.yundi.atp.server;

import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.nio.charset.StandardCharsets;


public class SseEmitterUTF8 extends SseEmitter {

    public SseEmitterUTF8(Long timeout) {
        super(timeout);
    }

    @Override
    protected void extendResponse(ServerHttpResponse outputMessage) {
        super.extendResponse(outputMessage);
        HttpHeaders headers = outputMessage.getHeaders();
        headers.setContentType(new MediaType(MediaType.TEXT_EVENT_STREAM, StandardCharsets.UTF_8));
    }
}

⑤ 创建redis的配置类,用于初始化redis的容器监听器和工具类

package com.yundi.atp.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer;


@Configuration
public class RedisConfig {
    /**
     * 初始化一个Redis消息监听容器
     * @param connectionFactory
     * @return
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 添加其他配置,如线程池大小等
        return container;
    }

    @Bean
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setDefaultSerializer(new StringRedisSerializer());
        return redisTemplate;
    }
}

⑦ 创建用于站内信发送的频道Channel

package com.yundi.atp.constant;


public class ChannelConstant {
    /**
     * 广播通道
     */
    public static final String CHANNEL_GLOBAL_NAME = "channel-global";

    /**
     * 单播通道
     */
    public static final String CHANNEL_SINGLE_NAME = "channel-single";
}

 ⑧创建一个消息发布接口和一个sse用户消息推送连接接口

package com.yundi.atp.controller;

import com.yundi.atp.constant.ChannelConstant;
import com.yundi.atp.server.SseEmitterUTF8;
import com.yundi.atp.server.SseServer;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.io.IOException;


@RequestMapping(value = "base")
@RestController
public class BaseController {
    @Resource
    private RedisTemplate redisTemplate;

    /**
     * 发布广播消息
     *
     * @param msg
     */
    @GetMapping(value = "/publish/{msg}")
    public void sendMsg(@PathVariable(value = "msg") String msg) {
        redisTemplate.convertAndSend(ChannelConstant.CHANNEL_GLOBAL_NAME, msg);
    }


    /**
     * 接收消息
     *
     * @return
     * @throws IOException
     */
    @GetMapping(path = "/connect/{username}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitterUTF8 connect(@PathVariable(value = "username") String username) throws IOException {
        SseEmitterUTF8 connect = SseServer.connect(username);
        return connect;
    }
}

 ⑨启动服务,验证站内信功能是否可以正常使用

结语

关于springboot使用redis的订阅发布机制结合SSE实现站内信的功能到这里就结束了,我们下期见。。。。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1460456.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C++】 类与对象——流操作符重载,const成员函数

类与对象 流操作符重载1 << 重载2 >> 重载 const 修饰Thanks♪(&#xff65;ω&#xff65;)&#xff89;谢谢阅读&#xff01;&#xff01;&#xff01;下一篇文章见&#xff01;&#xff01;&#xff01; 流操作符重载 流操作符功能<<输出操作符>>输…

2024 AI 大模型全栈开发知识体系【LLM 技术栈】

2023 年最火的一件事莫过于以 ChatGPT 为代表的 AI 大模型的兴起与应用探索&#xff0c;这一年大模型领域可以说是百花齐放&#xff0c;很多人都惊叹其在各行各业带来的影响。 有很多人说&#xff0c;AI 相对于当年的区块链、元宇宙有过之而无不及。 甚至于 2024 年 sora 的推出…

Python学习-列表1

十二、列表1 1、创建列表及基本运算 1&#xff09;使用中括号&#xff0c;将所有准备放入列表中的元素&#xff0c;包裹起来&#xff0c;不同元素之间使用逗号分隔。 举例&#xff1a; [1,2,3,4,5]2&#xff09;列表可以容纳不同类型的数据。 举例&#xff1a; [1,2,3,4,5,…

ETL快速拉取物流信息

我国作为世界第一的物流大国&#xff0c;但是在目前的物流信息系统还存在着几大的痛点。主要包括以下几个方面&#xff1a; 数据孤岛&#xff1a;有些物流企业各个部门之间的数据标准不一致&#xff0c;难以实现数据共享和协同&#xff0c;容易导致信息孤岛。 操作繁琐&#x…

【论文阅读】【yolo系列】YOLO-Pose的论文阅读

Abstract 我们介绍YOLO-pose&#xff0c;一种无热图联合检测的新方法&#xff0c;基于流行的YOLO目标检测框架的图像二维多人姿态估计。 【现有方法的问题】现有的基于热图的两阶段方法是次优的&#xff0c;因为它们不是端到端可训练的&#xff0c;训练依赖于surrogate L1 loss…

常用的消息中间件RabbitMQ

目录 一、消息中间件 1、简介 2、作用 3、两种模式 1、P2P模式 2、Pub/Sub模式 4、常用中间件介绍与对比 1、Kafka 2、RabbitMQ 3、RocketMQ RabbitMQ和Kafka的区别 二、RabbiMQ集群 RabbiMQ特点 RabbitMQ模式⼤概分为以下三种: 集群中的基本概念&#xff1a; 集…

unity学习(19)——客户端与服务器合力完成注册功能(1)入门准备

逆向服务器用了三天的时间&#xff0c;但此时觉得一切都值&#xff0c;又可以继续学习了。 服务器中登录请求和注册请求由command变量进行区分&#xff0c;上一层的type变量都是login。 public void process(Session session, SocketModel model) {switch (model.Command){ca…

Linux常见基本指令

本文将详细的介绍Linux中各常见指令的用法&#xff0c;并且在每个指令都有使用样例。一共有以下指令&#xff1a; 1. man指令 2.目录基础指令&#xff1a;2.1 pwd指令、2.2 ls指令、2.3 cd指令 3.文件创建与删除&#xff1a;3.1 touch指令、3.2 mkdir指令、3.3 rmdir 指令 &…

FL Studio21.2.3更新哪些新功能?中文汉化版如何下载

FL Studio 21.2.3的更新会带来一些变化&#xff0c;这些变化主要集中在以下几个方面&#xff1a; 功能增强和改进&#xff1a;随着版本的更新&#xff0c;FL Studio可能会引入一些新的功能或改进现有功能。这些新功能可能包括新的音频处理工具、效果器、虚拟乐器或混音选项&am…

智慧公厕是什么?智慧公厕对智慧城市的意义

城市的信息化发展需要催化了智慧城市&#xff0c;公共厕所作为城市的重要民生设施&#xff0c;如何实现更高阶的信息化建设&#xff0c;成为一个重要课题。那么&#xff0c;智慧公厕是什么&#xff1f;为什么它对智慧城市的建设如此重要&#xff1f;本文以智慧公厕源头厂家广州…

五种多目标优化算法(MOAHA、MOGWO、NSWOA、MOPSO、NSGA2)性能对比,包含6种评价指标,9个测试函数(提供MATLAB代码)

一、5种多目标优化算法简介 1.1MOAHA 1.2MOGWO 1.3NSWOA 1.4MOPSO 1.5NSGA2 二、5种多目标优化算法性能对比 为了测试5种算法的性能将其求解9个多目标测试函数&#xff08;zdt1、zdt2 、zdt3、 zdt4、 zdt6 、Schaffer、 Kursawe 、Viennet2、 Viennet3&#xff09;&#xff…

论软件测试工程师 重要性!

在生活中&#xff0c;我们常常会遇到以下几种窘迫时刻&#xff1a; 准备骑共享单车出行&#xff0c;却发现扫码开锁半天&#xff0c;车子都没有反应&#xff1b;手机导航打车&#xff0c;却发现地图定位偏差很大&#xff0c;司机总是跑错地方&#xff1b;买个水&#xff0c;却…

【日常聊聊】计算机专业必看的电影

&#x1f34e;个人博客&#xff1a;个人主页 &#x1f3c6;个人专栏&#xff1a;日常聊聊 ⛳️ 功不唐捐&#xff0c;玉汝于成 目录 前言 正文 方向一&#xff1a;电影推荐 方向二&#xff1a;技术与主题 方向三&#xff1a;职业与人生 结语 我的其他博客 前言 计算机…

搜维尔科技:OptiTrack探索人类与技术之间关系的开创性表演

另一种蓝色通过 OptiTrack 释放创造力 总部位于荷兰的当代舞蹈团因其探索人类与技术之间关系的开创性表演而受到广泛赞誉。该公司由富有远见的编舞家大卫米登多普创立&#xff0c;不仅利用技术作为探索的主题&#xff0c;而且将其作为表达故事的动态工具。 “我一直对文化与…

Arcgis小技巧【17】——如何修改ArcGIS中影像的背景颜色

一、问题分析 在ArcGIS中&#xff0c;有时候会遇到影像有背景色&#xff0c;看上去很不美观。 尤其在多个影像叠加的时候&#xff0c;更是会造成遮挡的问题。 二、解决办法 首先&#xff0c;用【识别】工具在背景色是点击一下&#xff0c;查看弹出的窗口&#xff0c;记住背景…

VMware ESXi 8.0的安装、配置、使用

VMware ESXi 8.0的安装、配置、使用 ESXi的安装与配置下载镜像安装网络配置 Web控制台的管理操作激活开启直通网络配置修改电源模式创建虚拟机 其他ESXI秘钥克隆虚拟机 ESXi的安装与配置 下载镜像 官网&#xff1a;https://www.vmware.com/ 文档&#xff1a;https://docs.vm…

基础小白快速入门Python----数组的概念

啥是数组&#xff1f; 数组是一中基础的数据结构&#xff0c;用来存储单个或者多个数据元素 并且。数组内的元素必须为同一种类型。 元素在数组中占据特定的位置&#xff0c;通常称为索引或下标。数组的索引从0开始&#xff0c;这意味着第一个元素的位置是0&#xff0c;第二…

CTFHub技能树web之RCE(二)

第五题&#xff1a;远程包含 根据题目&#xff0c;使用远程包含进行 打开phpinfo&#xff0c;可以看到allow_url_fopen和allow_url_include都是On&#xff0c;因此可以使用php://input&#xff0c;由于代码会检查file中的内容&#xff0c;因此不能够使用php://filter包含文件&a…

芯课堂 | 一种用于振荡器的修调电路

​ 高精度时钟产生器是数模混合集成电路及数字集成电路的主要模块。晶体振荡器供与工艺、电源电压和温度无关的稳定时钟&#xff0c;但它与集成电路工艺不兼容&#xff0c;同时有相对较高的成本&#xff0c;这样它的应用就受到了一些限制。随着CMOS集成电路工艺和SOC技术的…

Flink 在蚂蚁实时特征平台的深度应用

摘要&#xff1a;本文整理自蚂蚁集团高级技术专家赵亮星云&#xff0c;在 Flink Forward Asia 2023 AI 特征工程专场的分享。本篇内容主要分为以下四部分&#xff1a; 蚂蚁特征平台特征实时计算特征 Serving特征仿真回溯 一、蚂蚁特征平台 蚂蚁特征平台是一个多计算模式融合的高…