springboot框架集成websocket依赖实现物联网设备、前端网页实时通信!

news2025/4/15 23:21:29

需求:
最近在对接一个物联网里设备,他的通信方式是 websocket 。所以我需要在 springboot框架中集成websocket 依赖,从而实现与设备实时通信!
框架:springboot2.7
java版本:java8
好了,还是直接上代码
第一步:引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

第二步写配置:

package com.agentai.base.config;


import com.agentai.base.yumou.webSocket.YuMouDeviceWebSocketHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;

/**
 * WebSocket配置类
 * 负责配置WebSocket服务器和注册WebSocket处理器
 */
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        // 注册WebSocket处理器,
        // 允许所有来源的跨域请求
        registry.addHandler(deviceWebSocketHandler(), "/linker-dev")
                .setAllowedOrigins("*");
    }

    @Bean
    public YuMouDeviceWebSocketHandler deviceWebSocketHandler() {
        return new YuMouDeviceWebSocketHandler();
    }

    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        // 设置消息缓冲区大小
        container.setMaxTextMessageBufferSize(8192);
        container.setMaxBinaryMessageBufferSize(8192);
        // 设置会话超时时间(毫秒)
        container.setMaxSessionIdleTimeout(60000L);
        return container;
    }
}

第三方:WebSocket会话管理器

package com.agentai.base.yumou.webSocket;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;

import java.io.IOException;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * WebSocket会话管理器
 * 负责管理所有WebSocket会话,包括会话状态跟踪、心跳检测和清理过期会话
 */
@Slf4j
public class WebSocketSessionManager {

    // 心跳超时限制(毫秒)
    private static final long HEARTBEAT_TIMEOUT = 30000;
    // 心跳检查间隔(毫秒)
    private static final long HEARTBEAT_CHECK_INTERVAL = 10000;
    // 心跳消息内容
    private static final String HEARTBEAT_MESSAGE = "{\"type\":\"ping\"}";

    // 会话信息,包含WebSocket会话和最后活动时间
    private static class SessionInfo {
        WebSocketSession session;
        long lastActiveTime;

        SessionInfo(WebSocketSession session) {
            this.session = session;
            this.lastActiveTime = Instant.now().toEpochMilli();
        }

        void updateLastActiveTime() {
            this.lastActiveTime = Instant.now().toEpochMilli();
        }
    }

    // 保存所有会话信息
    private final Map<String, SessionInfo> sessions = new ConcurrentHashMap<>();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

    public WebSocketSessionManager() {
        // 启动心跳检查任务
        scheduler.scheduleAtFixedRate(this::checkHeartbeats,
                HEARTBEAT_CHECK_INTERVAL, HEARTBEAT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
    }

    /**
     * 添加新的会话
     * @param session 新的WebSocket会话
     */
    public void addSession(WebSocketSession session) {
        sessions.put(session.getId(), new SessionInfo(session));
        log.info("新会话已添加: {}", session.getId());
    }

    /**
     * 移除会话
     * @param sessionId 会话ID
     */
    public void removeSession(String sessionId) {
        sessions.remove(sessionId);
        log.info("会话已移除: {}", sessionId);
    }

    /**
     * 更新会话最后活动时间
     * @param sessionId 会话ID
     */
    public void updateSessionActivity(String sessionId) {
        SessionInfo info = sessions.get(sessionId);
        if (info != null) {
            info.updateLastActiveTime();
        }
    }

    /**
     * 发送消息到指定会话
     * @param sessionId 会话ID
     * @param message 消息内容
     * @return 是否发送成功
     */
    public boolean sendMessage(String sessionId, String message) {
        SessionInfo info = sessions.get(sessionId);
        if (info != null && info.session.isOpen()) {
            try {
                info.session.sendMessage(new TextMessage(message));
                return true;
            } catch (IOException e) {
                log.error("发送消息到会话[{}]失败: {}", sessionId, e.getMessage());
            }
        }
        return false;
    }

    /**
     * 广播消息到所有会话
     * @param message 消息内容
     */
    public void broadcastMessage(String message) {
        sessions.forEach((sessionId, info) -> {
            if (info.session.isOpen()) {
                try {
                    info.session.sendMessage(new TextMessage(message));
                } catch (IOException e) {
                    log.error("广播消息到会话[{}]失败: {}", sessionId, e.getMessage());
                }
            }
        });
    }

    /**
     * 检查心跳并清理过期会话
     */
    private void checkHeartbeats() {
        long now = Instant.now().toEpochMilli();
        sessions.forEach((sessionId, info) -> {
            if (now - info.lastActiveTime > HEARTBEAT_TIMEOUT) {
                try {
                    // 发送心跳消息
                    info.session.sendMessage(new TextMessage(HEARTBEAT_MESSAGE));
                    log.debug("发送心跳到会话: {}", sessionId);
                } catch (IOException e) {
                    // 如果发送失败,关闭并移除会话
                    log.warn("会话[{}]心跳检测失败,关闭会话: {}", sessionId, e.getMessage());
                    try {
                        info.session.close();
                    } catch (IOException ex) {
                        log.error("关闭会话[{}]失败: {}", sessionId, ex.getMessage());
                    }
                    removeSession(sessionId);
                }
            }
        });
    }

    /**
     * 关闭会话管理器
     */
    public void shutdown() {
        scheduler.shutdown();
        sessions.forEach((sessionId, info) -> {
            try {
                info.session.close();
            } catch (IOException e) {
                log.error("关闭会话[{}]失败: {}", sessionId, e.getMessage());
            }
        });
        sessions.clear();
    }
}

第四步:设备WebSocket处理器

package com.agentai.base.yumou.webSocket;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
/**
 * 设备WebSocket处理器
 * 负责处理设备的WebSocket连接、消息接收和断开连接
 */
@Slf4j
public class YuMouDeviceWebSocketHandler extends TextWebSocketHandler {

    private final WebSocketSessionManager sessionManager;

    // 构造函数,初始化会话管理器
    public YuMouDeviceWebSocketHandler() {
        this.sessionManager = new WebSocketSessionManager();
    }

    /**
     * WebSocket连接建立后的处理
     * @param session WebSocket会话
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        // 将新会话添加到会话管理器
        String sessionId = session.getId();
        sessionManager.addSession(session);
        log.info("WebSocket连接已建立: {}", sessionId);
    }


    @Autowired
    YuMouService yuMouService;

    /**
     * 处理接收到的文本消息
     * @param session 当前会话
     * @param message 接收到的文本消息
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) {
        String payload = message.getPayload();
        String sessionId = session.getId();

        try {
            // 更新会话的活动时间
            sessionManager.updateSessionActivity(sessionId);


            log.info("接收到设备[{}]的文本消息: {}", sessionId, payload);

            JSONObject jsonObject = JSONObject.parseObject(payload);
           log.info("数据:", jsonObject );

            // 处理其他业务消息
            // TODO: 添加具体的业务消息处理逻辑

        } catch (Exception e) {
            log.error("处理设备[{}]消息时发生错误: {}", sessionId, e.getMessage());
        }
    }


    /**
     * 处理接收到的二进制消息
     * @param session 当前会话
     * @param message 接收到的二进制消息
     */
    @Override
    protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) {
        byte[] payload = message.getPayload().array();
        String sessionId = session.getId();

        log.info("接收到设备[{}]的二进制消息,长度: {} 字节", sessionId, payload.length);


        // 目前只打印消息长度,可以根据需求处理二进制数据
        // TODO: 添加二进制消息处理逻辑
    }

    /**
     * 处理传输错误
     * @param session 当前会话
     * @param exception 错误异常
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) {
        String sessionId = session.getId();
        log.error("设备[{}]连接传输错误: {}", sessionId, exception.getMessage());
    }

    /**
     * WebSocket连接关闭后的处理
     * @param session 当前会话
     * @param status 关闭状态
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        String sessionId = session.getId();
        sessionManager.removeSession(sessionId);
        log.info("设备[{}]WebSocket连接已关闭,状态码: {}", sessionId, status.getCode());
    }

    /**
     * 发送消息到指定会话
     * @param sessionId 会话ID
     * @param message 消息内容
     * @return 是否发送成功
     */
    public boolean sendMessage(String sessionId, String message) {
        return sessionManager.sendMessage(sessionId, message);
    }

    /**
     * 广播消息到所有连接的会话
     * @param message 消息内容
     */
    public void broadcastMessage(String message) {
        sessionManager.broadcastMessage(message);
    }

    /**
     * 关闭WebSocket处理器,清理资源
     */
    public void shutdown() {
        sessionManager.shutdown();
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2334725.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ES6学习03-字符串扩展(unicode、for...of、字符串模板)和新方法()

一、字符串扩展 1. eg: 2.for...of eg: 3. eg: 二。字符串新增方法 1. 2. 3. 4. 5.

目前状况下,计算机和人工智能是什么关系?

目录 一、计算机和人工智能的关系 &#xff08;一&#xff09;从学科发展角度看 计算机是基础 人工智能是计算机的延伸和拓展 &#xff08;二&#xff09;从技术应用角度看 二、计算机系学生对人工智能的了解程度 &#xff08;一&#xff09;基础层面的了解 必备知识 …

Flutter 2025 Roadmap

2025 这个路线图是有抱负的。它主要代表了我们这些在谷歌工作的人收集的内容。到目前为止&#xff0c;非Google贡献者的数量超过了谷歌雇佣的贡献者&#xff0c;所以这并不是一个详尽的列表&#xff0c;列出了我们希望今年Flutter能够出现的所有令人兴奋的新事物&#xff01;在…

[数据结构]排序 --2

目录 8、快速排序 8.1、Hoare版 8.2、挖坑法 8.3、前后指针法 9、快速排序优化 9.1、三数取中法 9.2、采用插入排序 10、快速排序非递归 11、归并排序 12、归并排序非递归 13、排序类算法总结 14、计数排序 15、其他排序 15.1、基数排序 15.2、桶排序 8、快速排…

第16届蓝桥杯c++省赛c组个人题解

偷偷吐槽&#xff1a; c组没人写题解吗&#xff0c;找不到题解啊 P12162 [蓝桥杯 2025 省 C/研究生组] 数位倍数 题目背景 本站蓝桥杯 2025 省赛测试数据均为洛谷自造&#xff0c;与官方数据可能存在差异&#xff0c;仅供学习参考。 题目描述 请问在 1 至 202504&#xff…

记一次InternVL3- 2B 8B的部署测验日志

1、模型下载魔搭社区 2、运行环境&#xff1a; 1、硬件 RTX 3090*1 云主机[普通性能] 8核15G 200G 免费 32 Mbps付费68Mbps ubuntu22.04 cuda12.4 2、软件&#xff1a; flash_attn&#xff08;好像不用装 忘记了&#xff09; numpy Pillow10.3.0 Requests2.31.0 transfo…

使用SSH解决在IDEA中Push出现403的问题

错误截图&#xff1a; 控制台日志&#xff1a; 12:15:34.649: [xxx] git -c core.quotepathfalse -c log.showSignaturefalse push --progress --porcelain master refs/heads/master:master fatal: unable to access https://github.com/xxx.git/: The requested URL return…

Tauri 2.3.1+Leptos 0.7.8开发桌面应用--Sqlite数据库的写入、展示和选择删除

在前期工作的基础上&#xff08;Tauri2Leptos开发桌面应用--Sqlite数据库操作_tauri sqlite-CSDN博客&#xff09;&#xff0c;尝试制作产品化学成分录入界面&#xff0c;并展示数据库内容&#xff0c;删除选中的数据。具体效果如下&#xff1a; 一、前端Leptos程序 前端程序主…

《车辆人机工程-》实验报告

汽车驾驶操纵实验 汽车操纵装置有哪几种&#xff0c;各有什么特点 汽车操纵装置是驾驶员直接控制车辆行驶状态的关键部件&#xff0c;主要包括以下几种&#xff0c;其特点如下&#xff1a; 一、方向盘&#xff08;转向操纵装置&#xff09; 作用&#xff1a;控制车辆行驶方向…

使用多进程和 Socket 接收解析数据并推送到 Kafka 的高性能架构

使用多进程和 Socket 接收解析数据并推送到 Kafka 的高性能架构 在现代应用程序中&#xff0c;实时数据处理和高并发性能是至关重要的。本文将介绍如何使用 Python 的多进程和 Socket 技术来接收和解析数据&#xff0c;并将处理后的数据推送到 Kafka&#xff0c;从而实现高效的…

Redis 哨兵模式 搭建

1 . 哨兵模式拓扑 与 简介 本文介绍如何搭建 单主双从 多哨兵模式的搭建 哨兵有12个作用 。通过发送命令&#xff0c;让Redis服务器返回监控其运行状态&#xff0c;包括主服务器和从服务器。 当哨兵监测到master宕机&#xff0c;会自动将slave切换成master&#xff0c;然后通过…

【网络安全 | 项目开发】Web 安全响应头扫描器(提升网站安全性)

原创项目,未经许可,不得转载。 文章目录 项目简介工作流程示例输出技术栈项目代码使用说明项目简介 安全响应头是防止常见 Web 攻击(如点击劫持、跨站脚本攻击等)的有效防线,因此合理的配置这些头部信息对任何网站的安全至关重要。 Web 安全响应头扫描器(Security Head…

基于PySide6与pycatia的CATIA绘图比例智能调节工具开发全解析

引言:工程图纸自动化处理的技术革新 在机械设计领域,CATIA图纸的比例调整是高频且重复性极强的操作。传统手动调整方式效率低下且易出错。本文基于PySide6+pycatia技术栈,提出一种支持智能比例匹配、实时视图控制、异常自处理的图纸批处理方案,其核心突破体现在: ​操作效…

STM32硬件IIC+DMA驱动OLED显示——释放CPU资源,提升实时性

目录 前言 一、软件IIC与硬件IIC 1、软件IIC 2、硬件IIC 二、STM32CubeMX配置KEIL配置 三、OLED驱动示例 1、0.96寸OLED 2、OLED驱动程序 3、运用示例 4、效果展示 总结 前言 0.96寸OLED屏是一个很常见的显示模块&#xff0c;其驱动方式在用采IIC通讯时&#xff0c;常用软件IIC…

泛型的二三事

泛型&#xff08;Generics&#xff09;是Java语言的一个重要特性&#xff0c;它允许在定义类、接口和方法时使用类型参数&#xff08;Type Parameters&#xff09;&#xff0c;从而实现类型安全的代码重用。泛型在Java 5中被引入&#xff0c;极大地增强了代码的灵活性和安全性。…

编程思想——FP、OOP、FRP、AOP、IOC、DI、MVC、DTO、DAO

个人简介 &#x1f440;个人主页&#xff1a; 前端杂货铺 &#x1f64b;‍♂️学习方向&#xff1a; 主攻前端方向&#xff0c;正逐渐往全干发展 &#x1f4c3;个人状态&#xff1a; 研发工程师&#xff0c;现效力于中国工业软件事业 &#x1f680;人生格言&#xff1a; 积跬步…

【区块链安全 | 第三十九篇】合约审计之delegatecall(一)

文章目录 外部调用函数calldelegatecallcall 与 delegatecall 的区别示例部署后初始状态调用B.testCall()函数调用B.testDelegatecall()函数区别总结漏洞代码代码审计攻击代码攻击原理解析攻击流程修复建议审计思路外部调用函数 在 Solidity 中,常见的两种底层外部函数调用方…

linux多线(进)程编程——(6)共享内存

前言 话说进程君的儿子经过父亲点播后就开始闭关&#xff0c;它想要开发出一种全新的传音神通。他想&#xff0c;如果两个人的大脑生长到了一起&#xff0c;那不是就可以直接知道对方在想什么了吗&#xff0c;这样不是可以避免通过语言传递照成的浪费吗&#xff1f; 下面就是它…

信息安全管理与评估2021年国赛正式卷答案截图以及十套国赛卷

2021年全国职业院校技能大赛高职组 “信息安全管理与评估”赛项 任务书1 赛项时间 共计X小时。 赛项信息 赛项内容 竞赛阶段 任务阶段 竞赛任务 竞赛时间 分值 第一阶段 平台搭建与安全设备配置防护 任务1 网络平台搭建 任务2 网络安全设备配置与防护 第二…

高并发秒杀系统设计:关键技术解析与典型陷阱规避

电商、在线票务等众多互联网业务场景中&#xff0c;高并发秒杀活动屡见不鲜。这类活动往往在短时间内会涌入海量的用户请求&#xff0c;对系统架构的性能、稳定性和可用性提出了极高的挑战。曾经&#xff0c;高并发秒杀架构设计让许多开发者望而生畏&#xff0c;然而&#xff0…