WebSocket解决方案(springboot 基于Redis发布订阅)

news2024/12/25 9:06:15

WebSocket        

        因为一般的请求都是HTTP请求(单向通信),HTTP是一个短连接(非持久化),且通信只能由客户端发起,HTTP协议做不到服务器主动向客户端推送消息。WebSocket确能很好的解决这个问题,服务端可以主动向客户端推送消息,客户端也可以主动向服务端发送消息,实现了服务端和客户端真正的平等

特点

1.全双工通信:允许服务器和客户端在同一连接上同时进行双向通信

2.持久连接:连接一旦建立,会一直保持打开状态,减少了每次连接建立和关闭的开销,使通信更加高效

3.低延迟:由于连接保持打开状态,WebSocket 通信具有较低的延迟,适用于实时性要求较高的应用

4.兼容性:代浏览器和大多数服务器支持 WebSocket

5.安全性:与其他网络通信协议一样,WebSocket 通信也需要一些安全性的考虑。可以使用加密协议(如 TLS)来保护数据在网络传输中的安全性

实战

1.添加依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.创建配置类

创建配置类,并将其注入到Bean容器中

@Configuration
public class WebSocketConfig {
    /**
     * 注入ServerEndpointExporter,
     * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
3.创建WebSocketServer类

创建WebSocketHandler类,并将其注入到Bean容器中

@ServerEndpoint("/websocket/{equipmentId}"),该注解用于配置建立WebSocket连接的路径,可以按需修改

@Component
@Slf4j
@ServerEndpoint("/websocket/{equipmentId}")
public class WebSocketHandler {
    private Session session;

    //concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。
    private static CopyOnWriteArraySet<WebSocketHandler> webSocketUtils = new CopyOnWriteArraySet<>();
    // 用来存在线连接数
    private static Map<String, Session> sessionPool = new HashMap<>();

    private static EquipmentService equipmentService = SpringContextHolder.getBean(EquipmentService.class);

    /**
     * 链接成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "equipmentId") String equipmentId) {
        try {
            this.session = session;
            webSocketUtils.add(this);
            sessionPool.put(equipmentId, session);
            sendOneMessage(equipmentId, "");

            equipmentService.onlineRecord(equipmentId,0);

            log.info("【websocket消息】有新的连接,总数为:" + webSocketUtils.size());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 链接关闭调用的方法
     */
    @OnClose
    public void onClose(@PathParam(value = "equipmentId") String equipmentId) {
        try {
            webSocketUtils.remove(this);

            equipmentService.onlineRecord(equipmentId,1);

            log.info("【websocket消息】连接断开,总数为:" + webSocketUtils.size());

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message
     * @param
     */
    @OnMessage
    public void onMessage(@PathParam(value = "equipmentId") String equipmentId, String message) {

        log.info("【websocket消息】收到客户端消息:" + message);

        sendOneMessage(equipmentId, message);
    }

    /**
     * 发送错误时的处理
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {

        log.error("用户错误,原因:" + error.getMessage());
        error.printStackTrace();
    }


    /**
     * 推消息给前端
     *
     * @param equipmentId
     * @param message
     * @return
     */
    public static Runnable sendOneMessage(String equipmentId, Object message) {
        Session session = sessionPool.get(equipmentId);
        if (session != null && session.isOpen()) {
            try {
                log.info("【推给前端消息】 :" + message);

                //高并发下,防止session占用期间,被其他线程调用
                synchronized (session) {
                    session.getBasicRemote().sendText(Objects.toString(message));
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return null;
    }

}

功能点:

1.处理异常: 与任何网络通信一样,WebSocket 连接可能会面临各种异常情况,如断开连接、网络问题等。WebSocket 服务器需要能够处理这些异常情况,进行适当的清理和处理。

2.消息处理: 一旦客户端连接成功,WebSocket 服务器需要处理客户端发送过来的消息。这可以在 WebSocket 端点中的方法上定义处理逻辑。服务器可以根据不同的业务需求处理不同类型的消息

3.WebSocket 服务器负责监听客户端的连接请求,一旦有客户端连接,服务器会创建一个 WebSocket 会话(Session)来管理这个连接。服务器需要能够维护这些连接,包括打开、关闭、保持心跳等操作。

4.WebSocket 服务器需要注册一个或多个 WebSocket 端点。每个端点对应一种处理逻辑,可以处理客户端发送过来的消息,以及向客户端发送消息。这些端点通过注解或配置来定义

因业务需求,常需要对获取的消息进行处理,websocket 不能注入( @Autowired ) service,解决办法:

private static EquipmentService equipmentService = SpringContextHolder.getBean(EquipmentService.class);

@Component
public class SpringContextHolder implements ApplicationContextAware, DisposableBean {
    private static ApplicationContext applicationContext = null;


    /**
     * 取得存储在静态变量中的ApplicationContext.
     */
    public static ApplicationContext getApplicationContext() {
        assertContextInjected();
        return applicationContext;
    }

    /**
     * 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) {
        assertContextInjected();
        return (T) applicationContext.getBean(name);
    }

    /**
     * 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.
     */
    public static <T> T getBean(Class<T> requiredType) {
        assertContextInjected();
        return applicationContext.getBean(requiredType);
    }

    /**
     * 清除SpringContextHolder中的ApplicationContext为Null.
     */
    public static void clearHolder() {
        applicationContext = null;
    }

    /**
     * 实现ApplicationContextAware接口, 注入Context到静态变量中.
     */
    @Override
    public void setApplicationContext(ApplicationContext appContext) {
        applicationContext = appContext;
    }

    /**
     * 实现DisposableBean接口, 在Context关闭时清理静态变量.
     */
    @Override
    public void destroy() throws Exception {
        SpringContextHolder.clearHolder();
    }

    /**
     * 检查ApplicationContext不为空.
     */
    private static void assertContextInjected() {
        Validate.validState(applicationContext != null, "applicaitonContext属性未注入, 请在applicationContext.xml中定义SpringContextHolder.");
    }
}
4.测试

Redis 发布/订阅

特点

        发布/订阅是一种消息通信模式,其中发送者(发布者)发布消息,多个接收者(订阅者)订阅并接收这些消息。发布者和订阅者之间没有直接联系,消息由消息中间件(如 Redis)传递。

优点

        高性能:Redis 作为内存存储,具备极高的读写性能,能够快速处理发布和订阅消息

        简单易用:Redis 的发布/订阅接口简单,易于集成和使用

        实时性强:发布的消息会立即传递给所有订阅者,具备高实时性

缺点

        消息丢失:由于 Redis 是内存存储,如果 Redis 实例宕机,未处理的消息可能会丢失
        无法持久化:Redis 的发布/订阅模式不支持消息持久化,无法存储和检索历史消息
        订阅者不可控:发布者无法控制订阅者的数量和状态,无法保证所有订阅者都能接收到消息
        无确认机制:发布者无法确认消息是否被订阅者接收和处理

        Redis 的发布订阅功能并不可靠,如果我们需要保证消息的可靠性、包括确认、重试等要求,我们还是要选择MQ实现发布订阅

运用场景

        对于消息处理可靠性要求不强

        消息无需持久化

        消费能力无需通过增加消费方进行增强

        架构简单 中小型系统不希望应用过多中间件

发布订阅命令

SpringBoot整合

1.添加依赖

<dependency>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-data-redis</artifactId>

</dependency>

2.配置redis

# application.yml

spring:

        redis:

                host: localhost

                port: 6379

3.创建redis配置类
public void sendMsg(String key,Object msg){
    redisTemplate.convertAndSend(key,msg);
}

注意:

当发布消息时,订阅着输出消息,可能会出现乱码情况:

设置实例化对象

@Bean
public RedisTemplate redisTemplateInit() {
    //设置序列化Key的实例化对象
    redisTemplate.setKeySerializer(new StringRedisSerializer());
    //设置序列化Value的实例化对象
    redisTemplate.setValueSerializer(new StringRedisSerializer());

    return redisTemplate;
}
4.创建消息监听器
@Component
public class RedisMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String messageStr = new String(message.getBody(),StandardCharsets.UTF_8);

        /**
         * 根据实际情况处理消息
         */
        List<WebsocketRes> websocketRes = JSONArray.parseArray(messageStr, WebsocketRes.class);
        String equipmentId = "";
        List<WebsocketRes> websocketResList = new ArrayList<>();

        for(WebsocketRes res : websocketRes){
            equipmentId = res.getEquipmentId();
            res.setEquipmentId(null);

            websocketResList.add(res);
        }

        Gson gson = new Gson();
        String jsonString = gson.toJson(websocketResList);

        WebSocketHandler.sendOneMessage(equipmentId,jsonString);
    }
}
5.配置消息监听容器
@Configuration
public class RedisConfig {

    @Autowired
    private RedisMessageListener redisMessageListener;

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        // 订阅一个或多个频道
        container.addMessageListener(listenerAdapter, new PatternTopic("my-topic"));

        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(RedisMessageListener redisMessageListener) {
        return new MessageListenerAdapter(redisMessageListener);
    }
}
6.发布消息
redisUtils.sendMsg("my-topic",jsonString);

websocket与发布/订阅结合

        并发过高时,websocket连接需单独部署,减缓压力;websocket将业务信息实时推送给前端,就用到了redis 发布订阅功能。

使用

socket消息推送时,把信息发布到redis中。socket服务订阅redis的消息,订阅成功后进行推送

1.在websocket服务中创建消息监听器(处理消息)

2.在websocket服务中创建消息监听容器

3.在业务服务中发布消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1887569.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【ARM系列】GIC600AE功能安全

GIC600AE在原GIC600版本基础上增加了FuSa功能&#xff0c;所增加的FuSa特性都集成在GIC600外围&#xff0c;不会改变原GIC600的功能。 GIC600AE主要安全机制分布图&#xff1a; GIC-600AE包含以下FuSa安全机制&#xff1a; lockstep logic protection 通过添加duplication l…

数值治理学习记录

添加链接描述 数值清洗规则 数据质量问题 数据治理校验规则 数据探查分析 数据质量规则指标 转自&#xff1a;公众号《数据治理体系》

嵌入式c语言1——gcc以及linux嵌入式

GCC全名GNU Complier Collection&#xff0c;是一个开源的程序语言解释器&#xff0c;运行在linux系统中 对以程序名后缀结尾源代码文件&#xff0c;gcc可以做解释并生成可执行文件

贪心算法算法,完全零基础小白教程,不是计算机的都能学会!超详解

目录 一、基本概念 二、举几个例子&#xff0c;便于理解 1、找零问题 2、最小路径和 3、背包问题 1&#xff09;只考虑体积的贪心策略&#xff1a; 2&#xff09; 只考虑价值的贪心策略&#xff1a; 三、贪心策略的特点 四、贪心策略证明 四、如何学习贪心 五、例题…

Unity海面效果——4、法线贴图和高光

Unity引擎制作海面效果 大家好&#xff0c;我是阿赵。 继续做海面效果&#xff0c;上次做完了漫反射颜色和水波动画&#xff0c;这次来做法线和高光效果。 一、 高光的计算 之前介绍过高光的光照模型做法&#xff0c;比较常用的是Blinn-Phong 所以我这里也稍微连线实现了一下 …

问题-小技巧-Win11的常用快捷方式和有用快捷方式

文章目录 常用快捷方式1、CtrlA 全部选中2、Ctrl Z 撤销3、Ctrl X 剪切4、Ctrl C 粘贴5、Ctrl V 复制6、winshifts截图&#xff0c;Windows系统自带截图工具&#xff0c;功能太少7、ctrlshifts截图&#xff0c;edge自带截图工具&#xff0c;使用时需要打开edge8、 winv 可以查看…

实习总结 --- 内部平台使用

常用术语 CR CR–标准问题分类管理平台&#xff1a;由业务类型-角色-国家-品类-Page定义。 FAQSOP FAQ是端上用户自助的第一道关口&#xff0c;在引导用户进行自助解决上起关键作用 SOP是指标准作业程序&#xff0c;客服SOP是针对用户遇到的具体问题场景&#xff0c;给客服…

编写静态库

一、静态库 1.制作完成整体目录结构 2.首先创建mymath.c和mymath.h 3.编写Makefile 4.创建测试的main函数 test文件夹 先把lib移到test文件夹里面 4.编译链接 gcc main.c -I ./lib/include/ -L ./lib/mymathlib/ -l mymath 5.形成可执行程序a.out 要是不想执行第四步那么麻烦…

揭秘软件性能测试方法和注意事项,专业软件测试公司分享

目前&#xff0c;随着云计算和大数据的发展&#xff0c;软件的性能需求越来越高&#xff0c;用户对于软件的性能体验也有了更高的期望。因此&#xff0c;进行软件性能测试不仅是一种需求&#xff0c;更是一种责任&#xff0c;是开发过程中必不可少的一环。软件性能测试&#xf…

Python容器 之 元组

1.元组的介绍 1, 元组 tuple, 使用的是 () 2, 元组和列表非常相似, 都可以存储多个数据, 都可以存储任意类型的数据 3, 区别就是 元组中的数据不能修改,列表中可以修改 4, 因为元组中的数据不能修改,所以只能 查询方法, 如 index, count ,支持下标和切片 5, 元组, 主要用于…

ArcTs布局入门03——层叠布局(Stack)

如果你也对鸿蒙开发感兴趣&#xff0c;加入“Harmony自习室”吧&#xff01; 扫描下面的二维码关注公众号。 1、概述 叠布局&#xff08;StackLayout&#xff09;用于在屏幕上预留一块区域来显示组件中的元素&#xff0c;提供元素可以重叠的布局。层叠布局通过Stack容器组件实…

【鸿蒙学习笔记】基础组件 Button

官方文档&#xff1a;按钮 (Button)添加链接描述 官方文档&#xff1a;button开发指导 目录标题 属性迭代完善不含子组件的按钮包含子组件的按钮ButtonType添加事件跳转超链接提交表单悬浮按钮 属性迭代完善 不含子组件的按钮 Column({ space: 10 }) {Row() {Button(添加子目…

全国总工会党组书记徐留平来苏调研 观摩体验苏州金龙无人驾驶巴士

6月28日&#xff0c;全国总工会党组书记、副主席、书记处第一书记徐留平率队来苏州调研工会工作&#xff0c;并在苏州市劳模工匠人才创新成果展上观摩体验苏州金龙无人驾驶巴士。 本届苏州市劳模工匠人才创新成果展共设20个展位&#xff0c;集中展示了全市“劳模工匠助企行”的…

java面试-SpringAOP

1.SpringAOP的使用 你了解Spring AOP 吗&#xff1f; 通过预编译方式和运行期动态代理实现程序功能的统一维护的一种技术。 2.SpringAOP的原理 我们可以将ASM生成的类进行缓存&#xff0c;这样能解决生成的类比较低效的问题。 ASM是可以操作字节码的框架。 真实实现类和…

数据库操作语言(DML)

数据库操作语言&#xff08;DML&#xff09; 文章目录 数据库操作语言&#xff08;DML&#xff09;一、四种操作二、数据的插入&#xff08;增&#xff09;三、数据的删除&#xff08;删&#xff09;四、数据的修改&#xff08;改&#xff09;五、数据的查询&#xff08;查&…

RK3568驱动指南|第十五篇 I2C-第179章在应用程序中使用I2C

瑞芯微RK3568芯片是一款定位中高端的通用型SOC&#xff0c;采用22nm制程工艺&#xff0c;搭载一颗四核Cortex-A55处理器和Mali G52 2EE 图形处理器。RK3568 支持4K 解码和 1080P 编码&#xff0c;支持SATA/PCIE/USB3.0 外围接口。RK3568内置独立NPU&#xff0c;可用于轻量级人工…

WPDRRC信息安全体系架构模型

构建信息安全保障体系框架应包括技术体系、组织机构体系和管理体系等三部分&#xff0c;也就是说&#xff1a;人、管理和技术手段是信息安全架构设计的三大要素&#xff0c;而构成动态的信息与网络安全保障体系框架是实现系统的安全保障。 1.WPDRRC信息安全模型的定义 WPDRRC…

Oracle - 数据库打补丁实践

原文&#xff1a;https://www.cnblogs.com/ddzj01/p/12097467.html 一、概述 本文将介绍如何给oracle数据库打最新补丁&#xff0c;数据库版本为11.2.0.4单实例&#xff0c;操作系统为redhat6.5 二、下载相关升级包 1. 登录MOS&#xff0c;查阅(ID 2118136.2)&#xff0c;下载…

电脑录音方法:电脑怎么录音?5招轻松搞定录音!

想要从麦克风或系统音频录制电脑声音吗&#xff1f;这是一项简单的任务。本文将为您介绍5种最佳且最简单的方法&#xff0c;包括使用Windows系统自带的录音工具来录制电脑音频&#xff0c;在线音频录音软件和专业的第三方电脑录音软件。这些工具都能够很好地帮助您完成电脑怎么…

MySQL数据库中文乱码处理

出现中文乱码之后处理方式 1、执行下面语句查看一下关于编码方式 show variables like %char%结果展示&#xff1a;【你应该和我的不一样】 2、如果你的和我查询结果不一致请设置成一致语句&#xff0c;根据自己需要复制语句 如下&#xff1a;【除了最后一条记录哈】 SET G…