【项目实战】基于netty-websocket-spring-boot-starter实现WebSocket服务器长链接处理

news2024/9/26 5:19:18

一、背景

项目中需要建立客户端与服务端之间的长链接,首先就考虑用WebSocket,再来SpringBoot原来整合WebSocket方式并不高效,因此找到了netty-websocket-spring-boot-starter 这款脚手架,它能让我们在SpringBoot中使用Netty来开发WebSocket服务器,并像spring-websocket的注解开发一样简单

二、netty-websocket-spring-boot-starter 入门介绍

2.1 核心注解

2.1.1 @ServerEndpoint

当ServerEndpointExporter类通过Spring配置进行声明并被使用,它将会去扫描带有@ServerEndpoint注解的类 被注解的类将被注册成为一个WebSocket端点 所有的配置项都在这个注解的属性中 ( 如:@ServerEndpoint(“/ws”) )

2.1.2 @OnOpen

当有新的WebSocket连接完成时,对该方法进行回调 注入参数的类型:Session、HttpHeaders…

2.1.3 @OnClose

当有WebSocket连接关闭时,对该方法进行回调 注入参数的类型:Session

2.1.4 @OnError

当有WebSocket抛出异常时,对该方法进行回调 注入参数的类型:Session、Throwable

2.1.5 @OnMessage

当接收到字符串消息时,对该方法进行回调 注入参数的类型:Session、String

2.2 核心配置

属性属性说明
pathWebSocket的path,也可以用value来设置
hostWebSocket的host,"0.0.0.0"即是所有本地地址
portWebSocket绑定端口号。如果为0,则使用随机端口(端口获取可见 多端点服务)
maxFramePayloadLength最大允许帧载荷长度
allIdleTimeSeconds与IdleStateHandler中的allIdleTimeSeconds一致,并且当它不为0时,将在pipeline中添加IdleStateHandler

三、实践netty-websocket-spring-boot-starter

3.1引入POM文件

主要添加包括以下依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
    <groupId>org.yeauty</groupId>
    <artifactId>netty-websocket-spring-boot-starter</artifactId>
    <version>0.9.5</version>
</dependency>

<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.4.6</version>
</dependency>

3.2 在主程序类中排除数据库使用

/**
 * 主程序启动类
 */
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class WebsocketApplication {

    public static void main(String[] args) {
        SpringApplication.run(WebsocketApplication.class, args);
    }

}

3.3 开启WebSocket支持

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3.4 定义WebSocketServer服务器(核心代码)

在端点类上加上@ServerEndpoint注解,并在相应的方法上加上@OnOpen、@OnMessage、@OnError、@OnClose注解, 代码如下:

@ServerEndpoint(port = "${ws.port}", path = "/demo/{version}", maxFramePayloadLength = "6553600", allIdleTimeSeconds = "300")
public class WebSocketServer {

    private static Log LOGGER = LogFactory.get();

    // concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
    // 与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
    // 接收用户ID
    protected StringBuilder userInfo = new StringBuilder();

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session,
                       HttpHeaders headers,
                       @RequestParam String req,
                       @RequestParam MultiValueMap<String, Object> reqMap,
                       @PathVariable String arg,
                       @PathVariable Map<String, Object> pathMap) {
        this.session = session;
        // 加入set中
        webSocketSet.add(this);
        // 在线数加1
        addOnlineCount();
        LOGGER.debug("UserId = {}, 通道ID={}, 当前连接人数={}", userInfo.toString(), getSessionId(session), getOnlineCount());
    }

    /**
     * 收到客户端消息后调用的方法
     */
    @OnMessage
    public void onMessage(Session session, String message) {
        JSONObject jsonData = JSONUtil.parseObj(message);
        if (!jsonData.containsKey("command")) {
            LOGGER.debug("UserId = {}, 通道ID={}, 上行内容={}, 上行请求非法,缺少command参数, 处理结束",
                    userInfo.toString(), getSessionId(session), message);
            return;
        }
        String userId = jsonData.getStr("userId");
        this.userInfo = new StringBuilder(userId);
        String command = jsonData.getStr("command");
        Class<?> service = Command.getService(command);
        if (Objects.isNull(service)) {
            errorMessage(command);
            LOGGER.error("UserId = {}, 通道ID={}, 解析指令执行出错!", userInfo.toString(), getSessionId(session));
            return;
        }
        LOGGER.info("UserId = {}, 通道ID={}, 处理类={}, 开始处理,请求内容={}",
                userInfo.toString(), getSessionId(session), service, jsonData.toString());
        BaseMessage baseMessage = getBaseMessage(service, session, command);
        if (baseMessage == null) {
            return;
        }
        try {
            jsonData.set("SessionId", getSessionId(session));
            JSON resp = baseMessage.handlerMessage(userInfo, jsonData);
            resp.putByPath("command", command);
            resp.putByPath("userId", userId);
            String value = resp.toString();
            //将结果写回客户端, 实现服务器主动推送
            ChannelFuture future = sendMessage(value);
            LOGGER.info("UserId = {}, 通道ID = {}, 返回内容 = {}, future = {}, 处理结束",
                    userInfo.toString(), getSessionId(session), value, future.toString());
        } catch (Exception e) {
            LOGGER.error("UserId = {}, 通道ID={}, 解析执行出错信息={}", userInfo.toString(), getSessionId(session), e.getMessage());
        }
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose(Session session) {
        // 从set中删除
        webSocketSet.remove(this);
        // 在线数减1
        subOnlineCount();
        String userId = this.userInfo.toString();
        LOGGER.warn("UserId = {}, 通道ID = {}, 有一连接关闭!当前在线人数={}", userId, getSessionId(session), getOnlineCount());
        userInfo.delete(0, userInfo.length());
        if (ObjectUtil.isNotNull(userId)) {
            String keyStr = ConstDef.ONLINE_USER_TYPE + userId;
            redisTemplate.delete(keyStr);
        }
        session.close();
    }
    /**
     * 出错方法
     */
    @OnError
    public void onError(Session session, Throwable cause) {
        if (Objects.nonNull(this.session) && Objects.nonNull(cause) && !(cause instanceof EOFException)) {
            LOGGER.error("UserId = {}, 通道ID={}, 出错信息={}", userInfo.toString(), this.session.id(), cause.toString());
        }
        if (Objects.nonNull(session) && session.isOpen()) {
            session.close();
        }
    }
    /**
     * 通过class获取Bean
     */
    private BaseMessage getBaseMessage(Class<?> service, Session session, String command) {
        BaseMessage baseMessage;
        try {
            baseMessage = (BaseMessage) SpringUtils.getBean(service);
        } catch (Exception e) {
            LOGGER.error("UserId = {}, 通道ID = {}, 未找到协议头 = {} 的处理类", userInfo.toString(), getSessionId(session), service);
            errorMessage(command);
            return null;
        }
        return baseMessage;
    }
    /**
     * 获取通道ID
     */
    private String getSessionId(Session session) {
        return session.id().asShortText();
    }
    /**
     * 协议错误
     */
    public void errorMessage(String command) {
        JSONObject retObj = new JSONObject();
        retObj.set("code", ConstDef.ERROR_CODE_10001);
        retObj.set("msg", ConstDef.ERROR_CODE_10001_DESP);
        retObj.set("command", command);
        try {
            sendMessage(retObj.toString());
        } catch (IOException e) {
            LOGGER.error("UserId = {}, 通道ID={}, 解析执行出错信息={}", userInfo.toString(), getSessionId(session), e.getMessage());
        }
    }
    /**
     * 实现服务器主动推送
     */
    public ChannelFuture sendMessage(String message) throws IOException {
        return this.session.sendText(message);
    }
    /**
     * 在线用户数
     */
    public long getOnlineCount() {
        String onlineCountValue = redisTemplate.opsForValue().get(ConstDef.ONLINE_COUNT_KEY);
        if (StrUtil.isBlank(onlineCountValue) || !NumberUtil.isNumber(onlineCountValue)) {
            return 0L;
        }
        return Long.parseLong(onlineCountValue);
    }
    /**
     * 在线数+1
     */
    private void addOnlineCount() {
        redisTemplate.opsForValue().increment(ConstDef.ONLINE_COUNT_KEY);
    }
    /**
     * 在线数-1
     */
    private void subOnlineCount() {
        redisTemplate.opsForValue().decrement(ConstDef.ONLINE_COUNT_KEY);
    }
}

3.5 定义接口

/**
 * 消息处理接口
 */
public interface BaseMessage {
    Log LOGGER = LogFactory.get();
    /**
     * 处理类、处理方法
     */
    JSON handlerMessage(StringBuilder vin, JSONObject jsonData);
}

3.6 定义接口实现类 (业务处理逻辑)

该类是各业务的处理逻辑类,是接口类的具体实现。

@Component
@Configuration
public class QueryAllActivityListMessage implements BaseMessage {
    @Override
    public JSON handlerMessage(StringBuilder userId, JSONObject jsonData) {
        LOGGER.debug("开始处理QueryAllActivityListMessage请求, 参数={}", JSONUtil.toJsonStr(jsonData));
        String resp = "我是服务器端返回的处理结果!";
        LOGGER.info("UserId = {}, param={}, QueryAllActivityListMessage回复 = {}", userId.toString(), jsonData, resp);
        JSONObject respStr = new JSONObject();
        return respStr.set("handleResult", resp);
    }
}

3.7 定义枚举Command

每增加一个业务接口的实现,就需要在这个枚举类注册一下。

/**
 * 指令-服务 枚举
 */
public enum Command {
    /**
     * 业务1处理逻辑
     */
    queryAllActivityList("queryAllActivityList", QueryAllActivityListMessage.class, "业务1处理逻辑");
     /**
     * 业务2处理逻辑
     */
     //略
     /**
     * 业务3处理逻辑
     */
     //略
    /**
     * 服务编码
     */
    private String processCode;
    /**
     * 服务接口类
     */
    private Class<?> service;
    /**
     * 接口描述
     */
    private String desc;

    Command(String processCode, Class<?> service, String desc) {
        this.processCode = processCode;
        this.service = service;
        this.desc = desc;
    }

    public Class<?> getService() {
        return service;
    }

    public static Class<?> getService(String processCode) {
        for (Command command : Command.values()) {
            if (command.processCode.equals(processCode)) {
                return command.getService();
            }
        }
        return null;
    }
}

3.8 编写SpringUtils 工具类

用于搜索Bean, 通过class获取Bean

/**
 * SpringUtils 工具类,用于搜索
 */
@Component
public class SpringUtils implements ApplicationContextAware {

    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (SpringUtils.applicationContext == null) {
            SpringUtils.applicationContext = applicationContext;
        }
    }

    /**
     * 获取applicationContext
     */
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    /**
     * 通过class获取Bean
     */
    public static <T> T getBean(Class<T> clazz) {
        return getApplicationContext().getBean(clazz);
    }

    /**
     * 通过name获取 Bean.
     */
    public static Object getBean(String name) {
        return getApplicationContext().getBean(name);
    }

    /**
     * 通过name,以及Clazz返回指定的Bean
     */
    public static <T> T getBean(String name, Class<T> clazz) {
        return getApplicationContext().getBean(name, clazz);
    }

}

3.9 定义常量定义类 + 返回码

/**
 * 常量定义类 + 返回码
 */
public class ConstDef {
    /**
     * 返回码
     */
    public static final int ERROR_CODE_10001 = 10001;
    public static final String ERROR_CODE_10001_DESP = "请求参数不合法";
    /**
     * 按UserId决定,用户在线类型,车机或者手机
     */
    public static final String ONLINE_USER_TYPE = "ONLINE_USER_TYPE_";
    /**
     * 在线用户数
     */
    public static final String ONLINE_COUNT_KEY = "ONLINE_COUNT_KEY";
}

四、功能验证

打开WebSocket客户端,连接到ws://127.0.0.1:9095/demo/1

从截图来看,WebSocket服务端能正常接受并处理来自客户端的请求,验证成功!

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/399574.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

进程和进程的调度

今天,为大家带来进程和进程的调度的学习 1.认识计算机 2.什么是操作系统 3.什么是进程 4.进程管理 5.进程的属性 6.进程的调度 7.进程调度的过程 8.内存分配 1.认识计算机 计算机的组成有五大部分 1.CPU(是计算机的大脑,负责逻辑运算和控制) 2.内存 3.外存 4.输入…

你了解线程的状态转换吗

本文概述: 讲述线程的六种状态. 你可能已经了解了六种状态, 但是你知道 sleep 被唤醒之后, wait ()被 notify 之后进入了什么状态吗? 本文只是开胃小菜, 你看看下一篇文章对你有没有帮助. 一共有六种状态: New 新建状态Runnable 运行状态Blocked 阻塞状态Waiting 等待状态Tim…

项目实战-瑞吉外卖day02(B站)持续更新

瑞吉外卖-Day02课程内容完善登录功能新增员工员工信息分页查询启用/禁用员工账号编辑员工信息1. 完善登录功能1.1 问题分析前面我们已经完成了后台系统的员工登录功能开发&#xff0c;但是目前还存在一个问题&#xff0c;接下来我们来说明一个这个问题&#xff0c; 以及如何处理…

前端实用技巧,JS压缩、美化、JS混淆加密

作为一名前端开发者&#xff0c;关注JavaScript代码的安全性和隐私性&#xff0c;或者需要对JavaScript代码进行美化、格式化、压缩等操作&#xff0c;帮助你提高开发效率和代码质量&#xff0c;利用一个好的工具非常重要。 如果不想让自己的代码被恶意篡改和盗用&#xff0c;作…

超详细Xshell7免费版安装与连接虚拟机教程

一、下载Xshell 1、首先打开Xshell官网&#xff0c;首页官网地址为&#xff1a; Xshell官网首页地址 官网首页地址有时候会发生变动&#xff0c;若不能通过链接直接进入官网&#xff0c;则在浏览器搜索xshell---->点击下图所示红框处即可 2、进入首页后&#xff0c;点击免…

C++基础了解-22-C++ 重载运算符和重载函数

C 重载运算符和重载函数 一、C 重载运算符和重载函数 C 允许在同一作用域中的某个函数和运算符指定多个定义&#xff0c;分别称为函数重载和运算符重载。 重载声明是指一个与之前已经在该作用域内声明过的函数或方法具有相同名称的声明&#xff0c;但是它们的参数列表和定义…

计算机网络之HTTP协议

目录 一、HTTP的含义 1.1 理解超文本 1.2 理解应用层协议 1.3 理解HTTP协议的工作过程 二、HTTP协议格式 2.1 抓包工具的使用 2.2 理解协议格式 2.2.1 请求协议格式 2.2.2. 响应格式请求 一、HTTP的含义 HTTP&#xff08;全称为“超文本传输协议”&#xff09;&#x…

WAMP搭建靶场

WAMP W&#xff1a;windows A&#xff1a;apache M&#xff1a;mysql&#xff0c;mariadb P&#xff1a;php 1. 下载phpstudy Windows版phpstudy下载 - 小皮面板(phpstudy) 2. 安装phpstudy 默认安装即可 3. 下载DVWA靶场 https://github.com/digininja/DVWA/archive/…

C++回顾(十五)—— 类模板

15.1 为什么要有类模板 类模板用于实现类所需数据的类型参数化类模板在表示如数组、表、图等数据结构显得特别重要&#xff0c;这些数据结构的表示和算法不受所包含的元素类型的影响 15.2 单个类模板语法 注意&#xff1a;类模板的创建对象一定要显示调用&#xff08;指明类型…

虹科分享 | 网络安全评级 | 突破能够让您变得更加强大

有两种CISO&#xff1a;入侵前和入侵后。入侵前的CISOs过于关注工具&#xff0c;并考虑投资于预防技术。在这样做的时候&#xff0c;他们几乎没有考虑一旦发生了不好的事情&#xff0c;恢复和及时恢复服务的问题。不好的事情会发生&#xff1b;这不是是否的问题&#xff0c;而是…

手把手教你如何做数据报表

数据报表是一种数据可视化形式&#xff0c;它将复杂的数据信息通过图形、表格等形式进行展示和解释&#xff0c;让人们更加直观地理解和分析数据。数据报表已成为现代企业决策的必备工具之一。对企业来说&#xff0c;数据报表有很多用处。首先&#xff0c;数据报表可以帮助企业…

基于BiLSTM+CRF医学病例命名实体识别项目

研究背景 为通过项目实战增加对命名实体识别的认识&#xff0c;本文找到中科院软件所刘焕勇老师在github上的开源项目&#xff0c;中文电子病例命名实体识别项目MedicalNamedEntityRecognition。对其进行详细解读。 原项目地址&#xff1a;https://github.com/liuhuanyong/Med…

一小时轻松掌握Git,看这一篇就足够

文章目录序言&#xff1a;版本控制分类一、Git环境配置下载卸载安装二、常用linux命令三、基本配置四、Git基本操作0.原理图1.项目创建及克隆方式一&#xff1a;本地仓库搭建方式二&#xff1a;克隆远程仓库2.文件操作3.配置ssh公钥4.分支5.push代码参考序言&#xff1a;版本控…

ORB_SLAM2+kinect稠密建图

下载代码&#xff1a;https://github.com/gaoxiang12/ORBSLAM2_with_pointcloud_map 运行代码&#xff1a; 解压代码后&#xff0c;删掉作者自己编译的build文件夹&#xff08;下面三个都删除&#xff09;&#xff1a; ~/ORB_SLAM2_modified/build, ~/ORB_SLAM2_modified/T…

【上传项目代码到Git详细步骤】

1.下载安装Git到电脑上&#xff08;这里我之前已经安装好了&#xff0c;就不细说了&#xff09;2.进入控制台安装好后右键点击桌面空白部分会多出两个菜单选项&#xff0c;点击第二个Git Bash Here&#xff08;点击第一个你会爆炸&#xff09;会弹出一个git控制台&#xff0c;如…

华为OD机试用Python实现 -【分解质因数】 2023Q1A

华为OD机试题 本篇题目:分解质因数题目示例 1输入输出示例 2输入输出Code代码编写思路最近更新的博客 华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南

从0-1搭建交付型项目管理体系流程(上)【宝芝林2】

很多项目经理在这个阶段&#xff0c;由于经验不足及整个项目管理体系涉及的环节和内容比较庞杂&#xff0c;往往无法有效思考&#xff0c;无从下手。笔者有幸在最近几年的工作实践中&#xff0c;实际搭建并迭代了2-3次项目管理体系流程框架&#xff0c;期间也经历过很多迷茫&am…

C++基础了解-21-C++ 继承

C 继承 一、C 继承 面向对象程序设计中最重要的一个概念是继承。继承允许我们依据另一个类来定义一个类&#xff0c;这使得创建和维护一个应用程序变得更容易。这样做&#xff0c;也达到了重用代码功能和提高执行效率的效果。 当创建一个类时&#xff0c;不需要重新编写新的…

mes系统如何管理企业生产

随着市场竞争的日趋激烈&#xff0c;很多企业都开始意识到生产管理的重要性。mes系统如何管理企业生产&#xff1f;下面&#xff0c;我们通过一个实例来说明。 案例简介&#xff1a; 一、客户需求快速增长 随着市场竞争的加剧&#xff0c;越来越多的客户提出了个性化需求。同…

人体存在传感器成品方案,精准感知静止存在,实时智能化感控技术

随着现今智能时代的发展&#xff0c;酒店也越来越趋于智能化&#xff0c;也在不断地推行智慧酒店&#xff0c;这也给人们入住酒店提供了良好的体验。 人体存在感知是智能酒店中极其重要的一项应用技术&#xff0c;只有智能设备通过精准地感知人体存在&#xff0c;才能更好地做…