基于Netty构建Websocket服务端

news2025/2/2 10:49:57

除了构建TCP和UDP服务器和客户端,Netty还可以用于构建WebSocket服务器。WebSocket是一种基于TCP协议的双向通信协议,可以在Web浏览器和Web服务器之间建立实时通信通道。下面是一个简单的示例,演示如何使用Netty构建一个WebSocket服务器。
项目目录:
在这里插入图片描述
引入pom依赖:

 <dependency>
     <groupId>io.netty</groupId>
     <artifactId>netty-all</artifactId>
     <version>4.1.69.Final</version>
 </dependency>
 <dependency>
     <groupId>org.projectlombok</groupId>
     <artifactId>lombok</artifactId>
     <optional>true</optional>
 </dependency>

编写SocketServer:

package com.lzq.websocket.config;

import com.lzq.websocket.handlers.WebSocketHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.TimeUnit;

@Slf4j
@Configuration
public class WebSocketConfig implements CommandLineRunner {

    private static final Integer PORT = 8888;

    @Override
    public void run(String... args) throws Exception {
        new WebSocketConfig().start();
    }

    public void start() {
        // 创建EventLoopGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new HttpServerCodec());
                            // 最大数据长度
                            pipeline.addLast(new HttpObjectAggregator(65536));
                            // 添加接收websocket请求的url匹配路径
                            pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));
                            // 10秒内收不到消息强制断开连接
                            // pipeline.addLast(new ReadTimeoutHandler(10, TimeUnit.SECONDS));
                            pipeline.addLast(new WebSocketHandler());
                        }
                    });

            ChannelFuture future = serverBootstrap.bind(PORT).sync();
            log.info("websocket server started, port={}", PORT);
            // 处理 channel 的关闭,sync 方法作用是同步等待 channel 关闭
            // 阻塞
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            log.error("websocket server exception", e);
            throw new RuntimeException(e);
        } finally {
            log.info("websocket server close");
            // 关闭EventLoopGroup
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

编写WebSocketHandler:

package com.lzq.websocket.handlers;

import com.lzq.websocket.config.NettyConfig;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;

@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {

    private WebSocketServerHandshaker webSocketServerHandshaker;
    private static final String WEB_SOCKET_URL = "ws://127.0.0.1:8888/websocket";

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 创建连接时执行
        NettyConfig.group.add(ctx.channel());
        log.info("client channel active, id={}", ctx.channel().id().toString());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 关闭连接时执行
        NettyConfig.group.remove(ctx.channel());
        log.info("client channel disconnected, id={}", ctx.channel().id().toString());
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 服务端接收客户端发送过来的数据结束之后调用
        ctx.flush();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
            WebSocketServerProtocolHandler.HandshakeComplete handshake = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
            log.info("client channel connected, id={}, url={}", ctx.channel().id().toString(), handshake.requestUri());
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {
            // 处理客户端http握手请求
            handlerHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            // 处理websocket连接业务
            handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }

    /**
     * 处理websocket连接业务
     *
     * @param ctx
     * @param frame
     */
    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
        log.info("handlerWebSocketFrame>>>>class={}", frame.getClass().getName());
        // 判断是否是关闭websocket的指令
        if (frame instanceof CloseWebSocketFrame) {
            webSocketServerHandshaker.close(ctx.channel(), ((CloseWebSocketFrame) frame).retain());
            return;
        }
        // 判断是否是ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        if (!(frame instanceof TextWebSocketFrame)) {
            throw new RuntimeException("不支持消息类型:" + frame.getClass().getName());
        }
        String text = ((TextWebSocketFrame) frame).text();
        if ("ping".equals(text)) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        log.info("WebSocket message received: {}", text);
        /**
         * 可通过客户传输的text,设计处理策略:
         * 如:text={"type": "messageHandler", "userId": "111"}
         * 服务端根据type,采用策略模式,自行派发处理
         *
         * 注意:这里不需要使用线程池,因为Netty 采用 Reactor线程模型(目前使用的是主从Reactor模型),
         * Handler已经是线程处理,每个用户的请求是线程隔离的
         */
        // 返回WebSocket响应
        ctx.writeAndFlush(new TextWebSocketFrame("server return:" + text));
        /*// 群发
        TextWebSocketFrame twsf = new TextWebSocketFrame(new Date().toString()
                + ctx.channel().id()
                + " : "
                + text);
        NettyConfig.group.writeAndFlush(twsf);*/
    }

    /**
     * 处理客户端http握手请求
     *
     * @param ctx
     * @param request
     */
    private void handlerHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
        log.info("handlerHttpRequest>>>>class={}", request.getClass().getName());
        // 判断是否采用WebSocket协议
        if (!request.getDecoderResult().isSuccess() || !("websocket".equals(request.headers().get("Upgrade")))) {
            sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(WEB_SOCKET_URL, null, false);
        webSocketServerHandshaker = wsFactory.newHandshaker(request);
        if (webSocketServerHandshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            webSocketServerHandshaker.handshake(ctx.channel(), request);
        }
    }

    private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, DefaultFullHttpResponse response) {
        if (response.getStatus().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), StandardCharsets.UTF_8);
            response.content().writeBytes(buf);
            buf.release();
        }
        // 服务端向客户端发送数据
        ChannelFuture f = ctx.channel().writeAndFlush(response);
        if (response.getStatus().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 非正常断开时调用
        log.error("client channel execute exception, id={}", ctx.channel().id().toString(), cause);
        ctx.close();
    }
}

NettyConfig:

package com.lzq.websocket.config;

import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

public class NettyConfig {
    /**
     * 存储接入的客户端的channel对象
     */
    public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}

使用Apifox测试:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1329073.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Qt WebAssembly开发环境配置

目录 前言1、下载Emscripten SDK2、 安装3、环境变量配置4、QtCreator配置5、运行示例程序总结 前言 本文主要介绍 Qt WebAssembly 开发环境的配置。Qt for Webassembly 可以使Qt应用程序在Web上运行。WebAssembly&#xff08;简称Wasm&#xff09;是一种能够在虚拟机中执行的…

element-table表格中插入颜色块显示数据状态

dom部分&#xff1a; <el-table-column label"是否异常"><template slot-scope"scope"><div class"dcs_sf_red" v-if"scope.row.sfyc 0"></div><div class"dcs_sf_green" v-if"scope.row…

【JavaWeb学习笔记】14 - 三大组件其二 Listener Filter

API文档JAVA_EE_api_中英文对照版 Listener 一、监听器Listener 1. Listener监听器它是JavaWeb的三大组件之一。 JavaWeb的三大组件分别是: Servlet程序、Listener监听器、Filter过滤器 2. Listener是JavaEE的规范&#xff0c;就是接口 3.监听器的作用是&#xff0c;监听某…

湖北省工程类助理工程师申报评审通过不是难事

湖北省工程类助理工程师申报评审通过不是难事 想要初级职称/助理工程师工程类&#xff0c;建筑施工、土木工程、市政、路桥、水利水电、机电、园林、测绘等一系列建筑类的初级职称。12月份交资料&#xff0c;春节前可以评审出来。 初级职称申报周期-一个月左右 一般初级职称申…

【容器Docker】Docker学习笔记

1、什么是Docker&#xff1a; Docker 将程序和程序运行所依赖的所有环境都打包到镜像里。“build once, run anywhere”Docker 是容器的一种实现。 Windows 下如何安装Docker: 官方安装教程&#xff1a;Install Docker Desktop on Windows | Docker Docs有两种安装套装&…

配置BGP的基本示例

目录 BGP简介 BGP定义 配置BGP目的 受益 实验 实验拓扑 ​编辑 组网需求 配置思路 配置步骤 配置各接口所属的VLAN 配置各Vlanif的ip地址 配置IBGP连接 配置EBGP 查看BGP对等体的连接状态 配置SwitchA发布路由10.1.0.0/16 配置BGP引入直连路由 BGP简介 BGP定义 …

四、Spring IoC实践和应用(基于注解方式管理 Bean)

本章概要 基于注解方式管理 Bean 实验一&#xff1a; Bean注解标记和扫描 (IoC)实验二&#xff1a; 组件&#xff08;Bean&#xff09;作用域和周期方法注解实验三&#xff1a; Bean属性赋值&#xff1a;引用类型自动装配 (DI)实验四&#xff1a; Bean属性赋值&#xff1a;基本…

01-基于粤嵌GEC6818实现屏幕的显示固定颜色进行自动切换

基于GEC6818实现屏幕颜色的切换 本文使用开发板GEC6818&#xff0c;实现屏幕显示特定颜色并且进行自动切换的功能。 文章目录 基于GEC6818实现屏幕颜色的切换一、 初始化开发板--&#xff08;开发板是新的则可以省略很多步骤&#xff09;1.1 **删除文件和文件夹**1.2 **查看磁盘…

STM32的以太网外设+PHY(LAN8720)使用详解(2):硬件设计

0 工具准备 1.野火 stm32f407霸天虎开发板 2.LAN8720数据手册 3.STM32F4xx中文参考手册1 PHY&#xff08;LAN8720&#xff09;硬件配置 1.1 硬件配置引脚说明 在LAN8720上电或复位时会读取一些特定引脚的电平&#xff0c;根据电平来进行硬件配置。LAN8720的引脚分布如下&…

在MongoDB中使用数组字段和子文档字段进行索引

本文主要介绍在MongoDB使用数组字段和子文档字段进行索引。 目录 MongoDB的高级索引一、索引数组字段二、索引子文档字段 MongoDB的高级索引 MongoDB是一个面向文档的NoSQL数据库&#xff0c;它提供了丰富的索引功能来加快查询性能。除了常规的单字段索引之外&#xff0c;Mong…

深度学习建模从零开始步骤流程

深度学习建模从零开始步骤流程 步骤如下&#xff1a; 环境准备三方库安装建模开发 环境准备 Anaconda安装&#xff1a; Anaconda下载网址&#xff0c;下载win10下的64位版本。 清华镜像站 下载完毕后点击安装&#xff0c;一直点确定或下一步 到上图点击 Just me&#xff…

基于SpringBoot的桃花峪滑雪场租赁系统 JAVA简易版

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 游客服务2.2 雪场管理 三、数据库设计3.1 教练表3.2 教练聘请表3.3 押金规则表3.4 器材表3.5 滑雪场表3.7 售票表3.8 器材损坏表 四、系统展示五、核心代码5.1 查询教练5.2 教练聘请5.3 查询滑雪场5.4 滑雪场预定5.5 新…

黑芝麻智能与亿咖通科技签署战略合作协议,深化协同助力智能驾驶量产落地

12月22日&#xff0c;全球智能汽车计算芯片引领者黑芝麻智能与全球出行科技企业亿咖通科技共同签署战略合作协议&#xff0c;通过深化合作&#xff0c;整合双方研发、产品和技术资源&#xff0c;联手打造领先智能驾驶系统解决方案&#xff0c;合力推进商业拓展和市场应用&#…

左值右值引用,完美转发

1.c98/03&#xff0c;类模板和函数模板只能含固定数量的模板参数&#xff0c;c11的新特性可以创建接受可变参数的函数模板和类模板 //Args是一个模板参数包&#xff0c;args是一个函数形参参数包 //声明一个参数包Args… args,这个参数包可以包括0到任意个模板参数 template&l…

Ignite数据流处理

数据流处理 #1.概述 Ignite提供了一个数据流API&#xff0c;可用于将大量连续的数据流注入Ignite集群&#xff0c;数据流API支持容错和线性扩展&#xff0c;并为注入Ignite的数据提供了至少一次保证&#xff0c;这意味着每个条目至少会被处理一次。 数据通过与缓存关联的数据…

【AI】YOLO学习笔记三-YOLOV5代码解析

YOLOv5是Glenn Jocher等人研发&#xff0c;它是Ultralytics公司的开源项目。YOLOv5根据参数量分为了n、s、m、l、x五种类型&#xff0c;其参数量依次上升&#xff0c;其效果也是越来越好。由于其代码是长期维护的且具有工程化的思维&#xff0c;所以方便应用在实际的项目中&…

【算法】使用二分查找解决算法问题:理解二分法思想,模板讲解与例题实践

文章目录 二分算法思想 / 性质 / 朴素模板二分查找的引入&#xff08;二段性&#xff09;704.二分查找 模板34.在排序数组中查找元素的第一个和最后一个位置 二分查找的前提条件 / 时间复杂度分析 算法题69.x的平方根35.搜索插入位置852.山脉数组的峰顶索引162.寻找峰值153.寻找…

Servlet-Filter 执行顺序测试

Servlet-Filter 执行顺序测试 对于 web.xml 文件注册过滤器这里就不多说了&#xff0c;就是谁声明的早&#xff0c;谁先被调用。因为在上面的过滤器信息最先被扫描到。 模型抽象 为了便于在实践中使用&#xff0c;结合部分底层原理&#xff0c;我们可以对 Filter 链的执行做…

【3D生成与重建】SSDNeRF:单阶段Diffusion NeRF的三维生成和重建

系列文章目录 题目&#xff1a;Single-Stage Diffusion NeRF: A Unified Approach to 3D Generation and Reconstruction 论文&#xff1a;https://arxiv.org/pdf/2304.06714.pdf 任务&#xff1a;无条件3D生成&#xff08;如从噪音中&#xff0c;生成不同的车等&#xff09;、…