Springboot项目使用原生Websocket

news2024/9/28 3:28:11

目录

  • 1.启用Websocket功能
  • 2.封装操作websocket session的工具
  • 3.保存websocket session的接口
  • 4.保存websocket session的类
  • 5.定义websocket 端点
  • 6.创建定时任务 ping websocket 客户端

1.启用Websocket功能

package com.xxx.robot.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
@EnableWebSocket
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpoint() {
        return new ServerEndpointExporter();
    }

}

2.封装操作websocket session的工具

package com.xxx.robot.websocket.util;

import java.util.Map;

import javax.websocket.Session;

import org.apache.tomcat.websocket.Constants;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;

import com.xxx.framework.security.config.MyUserDetails;
import com.xxx.framework.security.entity.LoginUser;
import com.xxx.user.entity.User;

public final class WebSocketSessionUtils {

    private WebSocketSessionUtils() {}

	public static final int WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE = 8 * 1024 * 1024;
    
    public static final int WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE = 8 * 1024 * 1024;
    
    /**
     * websocket block 发送超时 毫秒
     */
    public static final long WEBSOCKET_BLOCKING_SEND_TIMEOUT = 10 * 1000;

	/**
	 * 从 websocket session 中找到登录用户
	 * 其中 MyUserDetails 继承自 org.springframework.security.core.userdetails.User
	 * LoginUser、User 从业务层自定义的类
	 * 项目中使用了spring security框架
	 */
    public static User findUser (Session session) {
        UsernamePasswordAuthenticationToken uToken = (UsernamePasswordAuthenticationToken) session.getUserPrincipal();
        MyUserDetails userDetails = (MyUserDetails) uToken.getPrincipal();
        LoginUser loginUser = (LoginUser) userDetails.getUserData();
        return (User) loginUser.getAdditionalInfo();
    }
    
    /**
     * 给 websocket session 设置参数
     */
    public static void setProperties(Session session) {
    	//设置websocket文本消息的长度为8M,默认为8k
        session.setMaxTextMessageBufferSize(WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE);
        //设置websocket二进制消息的长度为8M,默认为8k
        session.setMaxBinaryMessageBufferSize(WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE);
        Map<String, Object> userProperties = session.getUserProperties();
        //设置websocket发送消息的超时时长为10秒,默认为20秒
        userProperties.put(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY, WEBSOCKET_BLOCKING_SEND_TIMEOUT);
    }
}

3.保存websocket session的接口

package com.xxx.robot.websocket;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;

import javax.websocket.Session;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public interface WebSocketSessionManager {
    Logger log = LoggerFactory.getLogger(WebSocketSessionManager.class);
    
    String PING = "ping";
    String PONG = "pong";
    
    Session get (String key);
    
    List<String> keys();

    void add (String key, Session session);
    
    Session remove (String key);
    
    /**
     * ping每一个websocket客户端,如果ping超时,则触发由@OnError注释的方法
     */
    default void pingBatch () {
        List<String> keyList = keys();
        log.info("WebSocket: {} 数量为:{}", this.getClass().getSimpleName(), keyList.size());
        for (String key : keyList) {
            if (key != null) {
                Session session = get(key);
                if (session != null) {
                    try {
                        session.getBasicRemote().sendPing(ByteBuffer.wrap(PING.getBytes()));
                        try {
                            Thread.sleep(10);
                        } catch (InterruptedException e1) {
                        }
                    } catch (Exception e) {
                        log.error("WebSocket-ping异常", e);
                    }
                }
            }
        }
    }
    
    /**
     * 消除所有websocket客户端
     */
    default void clearAllSession () {
        List<String> keyList = keys();
        int i = 0;
        for (String key : keyList) {
            if (key != null) {
                Session session = get(key);
                if (session != null) {
                    try {
                        remove(key);
                        i++;
                        session.close();
                    } catch (IOException e1) {
                        log.error("WebSocket-移除并关闭session异常", e1);
                    }
                    if (i % 10 == 0) {
                        try {
                            Thread.sleep(0);
                        } catch (InterruptedException e1) {
                        }
                    }
                }
            }
        }
        log.info("WebSocket-移除并关闭session数量为:{}", i);
    }
}

4.保存websocket session的类

package com.xxx.robot.websocket.robot.manager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

import javax.websocket.Session;

import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import com.xxx.robot.websocket.WebSocketSessionManager;

/**
 * 机器人模块WebSocket Session管理器
 */
@Component
public class RobotSessionManager implements WebSocketSessionManager {
    
    /**
     * key = userId + '-' + managerId
     * userId 从当前登录用户中可得到, managerId由客户端连接websocket时按服务端的接口传给服务端
     * 因为业务中不仅要获取每一个客户端,还要获取同一个用户下的所有客户端,所以由ConcurrentHashMap改为ConcurrentSkipListMap
     */
    private static final ConcurrentSkipListMap<String, Session> SESSION_POOL = new ConcurrentSkipListMap<>();
    
    public static final String joinKey (String userId, String managerId) {
        return userId + '-' + managerId;
    }

    public static final String joinKey (Long userId, String managerId) {
        return userId.toString() + '-' + managerId;
    }
    
    public static final String[] splitKey (String key) {
        return StringUtils.split(key, '-');
    }

    @Override
    public Session get(String key) {
        return SESSION_POOL.get(key);
    }
    
    /**
     * 根据用户ID查询所有websocket session的key
     * @param userId
     * @param excludeManagerId 排除的key, 可为空
     * @return
     */
    public List<String> keysByUserId(String userId, String excludeManagerId) {
    	//'-'的ascii码为45, '.'的ascii码为46, 所以下面获得的是key以 userId + '-' 为前缀的map视图
        ConcurrentNavigableMap<String, Session> subMap = SESSION_POOL.subMap(userId + '-', userId + '.');
        NavigableSet<String> keySet = subMap.navigableKeySet();
        List<String> list = new ArrayList<>();
        if (StringUtils.isBlank(excludeManagerId)) {
            for (String key : keySet) {
                if (key != null) {
                    list.add(key);
                }
            }
        } else {
            for (String key : keySet) {
                if (key != null && !key.equals(excludeManagerId)) {
                    list.add(key);
                }
            }
        }
        return list;
    }

    @Override
    public List<String> keys() {
        NavigableSet<String> keySet = SESSION_POOL.navigableKeySet();
        List<String> list = new ArrayList<>();
        for (String key : keySet) {
            if (key != null) {
                list.add(key);
            }
        }
        return list;
    }

    @Override
    public synchronized void add(String key, Session session) {
        removeAndClose(key);
        SESSION_POOL.put(key, session);
    }

    @Override
    public synchronized Session remove(String key) {
        return SESSION_POOL.remove(key);
    }
    
    /**
     * 必须key和value都匹配才能删除
     */
    public synchronized void remove(String key, Session session) {
        SESSION_POOL.remove(key, session);
    }
    
    private void removeAndClose (String key) {
        Session session = remove(key);
        if (session != null) {
            try {
                session.close();
            } catch (IOException e) {
            }
        }
    }

}

5.定义websocket 端点

package com.xxx.robot.websocket.robot.endpoint;

import java.util.Map;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.JsonNode;
import com.xxx.framework.util.SpringBeanUtils;
import com.xxx.user.entity.User;
import com.xxx.robot.corefunc.service.RobotCoreService;
import com.xxx.robot.util.serial.BaseJsonUtils;
import com.xxx.robot.websocket.WebSocketSessionManager;
import com.xxx.robot.websocket.robot.manager.RobotSessionManager;
import com.xxx.robot.websocket.util.WebSocketSessionUtils;

import lombok.extern.slf4j.Slf4j;

/**
 * 机器人模块WebSocket接口
 * 每一次websocket请求,RobotWebSocketServer都是一个新的实例,所以成员变量是安全的
 * 以致虽然类由@Component注释,但不可使用@Autowired等方式注入bean
 */
@Slf4j
@Component
@ServerEndpoint(value = "/robot/{id}")
public class RobotWebSocketServer {
    
    private volatile User user;
    
    private volatile String id;
    
    private volatile Session session;
    
    private volatile Map<String, RobotCoreService> robotCoreServiceMap;

    /**
     * 所有初始化操作都写在@OnOpen注释的方法中
     * 连接成功
     * @param session
     */
    @OnOpen
    public void onOpen(@PathParam("id") String id, Session session) {
        WebSocketSessionUtils.setProperties(session);
        this.user = WebSocketSessionUtils.findUser(session);
        this.id = id;
        this.session = session;
        log.info("连接成功:{}, {}", id, this.user.getUserCode());
        //使用BeanUtils代替@Autowired获取bean, 
        //RobotCoreService为业务类,不必关心
        robotCoreServiceMap = SpringBeanUtils.getApplicationContext().getBeansOfType(RobotCoreService.class);
        RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);
        //保存websocket session
        robotSessionManager.add(RobotSessionManager.joinKey(this.user.getId(), id), session);
    }

    /**
     * 连接关闭
     * @param session
     */
    @OnClose
    public void onClose() {
        log.info("连接关闭:{}, {}", this.id, this.user.getUserCode());
        RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);
        //连接关闭时,使用两个参数的remove方法,多线程下安全删除
        robotSessionManager.remove(RobotSessionManager.joinKey(this.user.getId(), this.id), this.session);
    }
    
    @OnError
    public void onError(Throwable error) {
        log.error("onError:id = {}, {}, {}", this.id, this.session.getId(), this.user.getUserCode(), error);
        RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);
        //websocket异常时,使用两个参数的remove方法,多线程下安全删除
        //比如ping客户端超时,触发此方法,删除该客户端
        robotSessionManager.remove(RobotSessionManager.joinKey(this.user.getId(), this.id), this.session);
    }

    /**
     * 接收到消息
     * @param message
     */
    @OnMessage
    public void onMessage(String message) {
        log.info("onMessage:id = {}, {}, {}", this.id, this.user.getUserCode(), message);
        if (WebSocketSessionManager.PING.equals(message)) {
        	//自定义ping接口,收到ping后,响应pong,客户端暂时未使用此接口
            this.session.getAsyncRemote().sendText(WebSocketSessionManager.PONG);
            return;
        }
        //用 try...catch 包裹防止抛出异常导致websocket关闭
        try {
        	//业务层,使用jackson反序列化json,不必关心具体的业务
            JsonNode root = BaseJsonUtils.readTree(message);
            String apiType = root.at("/apiType").asText();
            //业务层代码应在子线程中执行,防止wesocket线程执行时间过长导致websocket关闭
            robotCoreServiceMap.get(apiType + "Service").receiveFrontMessage(this.user, RobotSessionManager.joinKey(this.user.getId(), this.id), root);
        } catch (Exception e) {
            log.error("处理消息错误", e);
        }
    }
    
}

在这里插入图片描述

6.创建定时任务 ping websocket 客户端

package com.xxx.robot.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
 * 启用定时任务功能
 * 因为websocket session是有状态的,只能保存在各自的服务端,
 * 所以只能使用单机式的定时任务,而不能使用分布式定时任务,
 * 因此 springboot自带的定时任务功能成为了首选
 * springboot定时任务线程池
 */
@Configuration
@EnableScheduling
public class TaskExecutorConfig {

    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(10);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("scheduler-executor-");
        return executor;
    }

}
package com.xxx.robot.websocket;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

/**
 * @author Sunzhihua
 */
@Slf4j
@Component
public class WebSocketSchedulerTask {
    
    /**
     * 注入所有的 websocket session 管理器
     */
    @Autowired
    private List<WebSocketSessionManager> webSocketSessionManagers;

	/**
	 * initialDelay 表示 延迟60秒初始化
	 * fixedDelay 表示 上一次任务结束后,再延迟30秒执行
	 */
    @Scheduled(initialDelay = 60000, fixedDelay = 30000)
    public void clearInvalidSession() {
        try {
            log.info("pingBatch 开始。。。");
            for (WebSocketSessionManager webSocketSessionManager : webSocketSessionManagers) {
                webSocketSessionManager.pingBatch();
            }
            log.info("pingBatch 完成。。。");
        } catch (Exception e) {
            log.error("pingBatch异常", e);
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/671382.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

机器学习实践(1.2)XGBoost回归任务

前言 XGBoost属于Boosting集成学习模型&#xff0c;由华盛顿大学陈天齐博士提出&#xff0c;因在机器学习挑战赛中大放异彩而被业界所熟知。相比越来越流行的深度神经网络&#xff0c;XGBoost能更好的处理表格数据&#xff0c;并具有更强的可解释性&#xff0c;还具有易于调参…

Axure教程—树

本文将教大家如何用AXURE中的动态面板制作树 一、效果 预览地址&#xff1a;https://1rmtjd.axshare.com 二、功能 1、点击“”&#xff0c;展开子节点 2、点击“-”子节点折叠 三、制作 1、父节点制作 拖入一个动态面板&#xff0c;进入&#xff0c;如图&#xff1a; 拖入一…

【LeetCode】HOT 100(18)

题单介绍&#xff1a; 精选 100 道力扣&#xff08;LeetCode&#xff09;上最热门的题目&#xff0c;适合初识算法与数据结构的新手和想要在短时间内高效提升的人&#xff0c;熟练掌握这 100 道题&#xff0c;你就已经具备了在代码世界通行的基本能力。 目录 题单介绍&#…

【玩转Docker小鲸鱼叭】Docker容器常用命令大全

在 Docker 核心概念理解 一文中&#xff0c;我们知道 Docker容器 其实就是一个轻量级的沙盒&#xff0c;应用运行在不同的容器中从而实现隔离效果。容器的创建和运行是以镜像为基础的&#xff0c;容器可以被创建、销毁、启动和停止等。本文将介绍下容器的这些常用操作命令。 1、…

max^2 - min^2

2001^2- 2000^2 ???? max^2 - min^2min * (max - min) min * (max - min) (max - min)* (max - min) min * (max - min) * 2 (max - min)* (max - min) (max min)(max - min)

管理类联考——逻辑——技巧篇——形式逻辑——秒杀思路

第一章&#xff1a;说明 形式逻辑出现频次 8-10 道 形式逻辑细分思路 直言命题三段论与文氏图AEIO 与模态命题形式逻辑复合命题固定秒杀思路 说明1&#xff1a; AEIO 全称肯定命题&#xff1a;所有 S 都是 P&#xff0c;记作 SAP。 简称为“A” 全称否定命题&#xff1a;所…

TCL、海信、小米密集推新,Mini LED电视熬出头了

作者 | 辰纹 来源 | 洞见新研社 OLED和Mini LED之间的对决来到了赛点。 进入2023年&#xff0c;一线电视厂商发布的新品中&#xff0c;Mini LED成为主流。 3月2日&#xff0c;索尼全球发布了其2023年BRAVIA XR 电视新品阵容。其中&#xff0c;索尼Mini LED电视X95L采用Mini …

MFC学习之修改设置控件字体显示和颜色参数

前言 最近一直配合研发部门写一些调试类的小软件&#xff0c;记得之前电脑显示器和显卡配置都不高&#xff0c;显示分辨率比较低&#xff0c;软件界面上的文字还能看到清楚&#xff08;不显小&#xff09;。 现在公司新配置的电脑都比较好了&#xff0c;界面字体&#xff0c;尤…

【高性能计算】监督学习之支持向量机分类实验

【高性能计算】监督学习之支持向量机分类实验 实验目的实验内容实验步骤1、支持向量机算法1.1 支持向量机算法的基本思想1.2 支持向量机算法的分类过程1.3 支持向量机算法的模型构建过程 2、使用Python语言编写支持向量机的源程序代码并分析其分类原理2.1 支持向量机SVM模型代码…

四、卷积神经网络整体基础结构

一、计算机发展应用 神经网络主要用于特征提取 卷积神经网络主要应用在图像领域&#xff0c;解决传统神经网络出现的过拟合、权重太多等风险 1&#xff0c;CV领域的发展 Computer vision计算机视觉的发展在2012年出现的AlexNet开始之后得到了挽救 之前都是一些传统的机器学习…

SQL高级语句2

SQL高级语句2 SQL高级语句17.---- 连接查询 ----18.自我连接&#xff0c;算排名&#xff1a;19.---- CREATE VIEW ----视图&#xff0c;可以被当作是虚拟表或存储查询。20.---- UNION ----联集&#xff0c;将两个SQL语句的结果合并起来&#xff0c;两个SQL语句所产生的字段需要…

重新定义DevOps:容器化的变革力量

在快速发展的数字时代&#xff0c;DevOps 已成为重塑软件开发格局的重要范例。DevOps 是一个源自“开发”和“运营”的术语&#xff0c;它将这两个历史上孤立的功能集成到一个统一的方法中&#xff0c;专注于缩短软件开发生命周期。因此&#xff0c;DevOps 实践促进了更快、更可…

【产品应用】一体化电机在拉伸吹瓶机的应用

随着塑料瓶的广泛应用&#xff0c;拉伸吹瓶机作为生产塑料瓶的关键设备之一&#xff0c;受到了越来越多企业的关注。而在拉伸吹瓶机中&#xff0c;一体化电机的应用正逐渐成为行业的新趋势。 01.设备简介 吹瓶机是一种用于制造塑料瓶的机械设备。它通过将预先加热的塑料颗粒或…

当 MQTT 遇上 ChatGPT:探索可自然交互的物联网智能应用

前言 随着物联网技术的迅猛发展&#xff0c;人与设备、设备与设备之间的互动已变得不再困难&#xff0c;而如何更加自然、高效、智能地实现交互则成为物联网领域新的挑战。 近期&#xff0c;由 OpenAI 发布的 ChatGPT、GPT-3.5 和 GPT-4 等先进大语言模型&#xff08;LLM&…

python 基础知识:使用jieba库对文本进行分词

前言 嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 一、jieba库是什么&#xff1f; Python的jieba库是一个中文分词工具&#xff0c;它可以将一段中文文本分割成一个一个的词语&#xff0c;方便后续的自然语言处理任务&#xff0c;如文本分类、情感分析等。 jieba库使用…

Riddztecia 作品集 |Beast Wear 出品

Beast & Wear&#xff1a;一个以彩色部落和可训练野兽庆祝多样性的收藏品。通过 NFT 野兽和装备提升你的 Riddzee 人物化身&#xff0c;增强视觉效果&#xff0c;提升你在迷人的 Riddztecia 世界中的游戏体验。去游戏、去成长、去探索。 Riddztecia NFT 空投包括训练怪兽和…

leetcode216. 组合总和 III(回溯算法-java)

组合总和 III leetcode216. 组合总和 III题目描述解题思路代码演示 回溯算法专题 leetcode216. 组合总和 III 来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 链接&#xff1a;https://leetcode.cn/problems/combination-sum-iii 题目描述 找出所有相加之和为 n 的 k 个…

JMeter 批量接口测试

一、背景 最近在进行某中台的接口测试准备&#xff0c;发现接口数量非常多&#xff0c;有6、70个&#xff0c;而且每个接口都有大量的参数并且需要进行各种参数验证来测试接口是否能够正确返回响应值。想了几种方案后&#xff0c;决定尝试使用JMeter的csv读取来实现批量的接口…

Linux修改权限chown和chmod指令

一、 chmod指令修改文件权限 -rw-rw-r-- 1 zcb zcb 1135 Jun 9 14:56 test.c drwx------ 2 root root 4096 Apr 7 16:50 testemmc/ 上面两个文件&#xff0c;第一个字符-和d&#xff0c;分别表示二进制文件&#xff0c;目录&#xff0c;后面9个分别表示 u&#xff1a;拥有者…

【CV 向】了解 OpenCV 中的算术与位运算

文章目录 引言1. 利用 NumPy 创建图像2. 算术运算2.1 加法与减法2.2 乘法与除法 3. 位运算3.1 与运算3.2 或运算3.3 异或运算3.4 非运算 结论 引言 Python OpenCV 是一个功能强大的计算机视觉库&#xff0c;用于图像处理和计算机视觉任务。在 OpenCV 中&#xff0c;我们可以使…