# SpringBoot 集成 Netty

news2024/11/22 9:37:53

SpringBoot 集成 Netty

文章目录

  • SpringBoot 集成 Netty
    • 背景描述
    • Netty与SpringBoot整合关注点
    • Netty组件
      • Bootstrap、ServerBootstrap
      • Channel
      • EventLoop、EventLoopGroup
      • ChannelHandler
      • ChannelPipeline
      • ByteBuf
    • Pom依赖
    • Yml 配置
    • 整合Netty步骤
      • 服务端
      • 客户端

背景描述

  • 如果需要在SpringBoot开发的app中,提供Socket服务,那么Netty是不错的选择。

Netty与SpringBoot整合关注点

  • NettySpringboot生命周期保持一致,同生共死
  • Netty能用上ioc中的Bean
  • Netty能读取到全局的配置

Netty组件

Bootstrap、ServerBootstrap

  • 帮助 Netty 使用者更加方便地组装和配置 Netty ,也可以更方便地启动 Netty 应用程序
  • Bootstrap 用于启动一个 Netty TCP 客户端,或者 UDP 的一端。
  • ServerBootstrap 往往是用于启动一个 Netty 服务端。

Channel

  • ChannelNetty 网络操作抽象类,它除了包括基本的 I/O 操作,如 bind、connect、read、write 之外,还包括了 Netty 框架相关的一些功能,如获取该 ChannelEventLoop
  • 其实就是我们平常网络编程中经常使用的socket套接字对象

EventLoop、EventLoopGroup

  • EventLoop定义了Netty的核心对象,用于处理IO事件,多线程模型、并发
  • 一个EventLoopGroup包含一个或者多个EventLoop
  • 一个EventLoop在它的生命周期内只和一个Thread绑定
  • 所有有EventLoop处理的I/O事件都将在它专有的Thread上被处理
  • 一个Channel在它的生命周期内只注册于一个EventLoop
  • 一个EventLoop可能会被分配给一个或者多个Channel

ChannelHandler

  • ChannelHandler其实就是用于负责处理接收和发送数据的的业务逻辑,Netty中可以注册多个handler,以链式的方式进行处理,根据继承接口的不同,实现的顺序也不同。
  • ChannelHandler主要用于对出站和入站数据进行处理,它有两个重要的子接口:
    • ChannelInboundHandler——处理入站数据
    • ChannelOutboundHandler——处理出站数据

ChannelPipeline

  • ChannelPipelineChannelHandler的容器,通过ChannelPipeline可以将ChannelHandler组织成一个逻辑链,该逻辑链可以用来拦截流经Channel的入站和出站事件,当 Channel被创建时,它会被自动地分配到它的专属的 ChannelPipeline

ByteBuf

  • ByteBuf就是字节缓冲区,用于高效处理输入输出。

Pom依赖

  • 引入springboot starter web netty
<!-- SpringBoot 初始化依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.3.5.RELEASE</version>
</dependency>

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.85.Final</version>
</dependency>

Yml 配置

# Springboot 端口
server:
  port: 2345

netty:
  websocket:
    # Websocket服务端口
    port: 1024
    # 绑定的网卡
    ip: 0.0.0.0
    # 消息帧最大体积
    max-frame-size: 10240
    # URI路径
    path: /channel

整合Netty步骤

服务端

  • 使用 SpringBoot Runner 机制启动 Netty 服务。
@Component
@Order
public class NettyStartListener implements ApplicationRunner {

    @Resource
    private SocketServer socketServer;

    @Override
    public void run(ApplicationArguments args) {
        this.socketServer.start();
    }

}
  • SocketServe.java
@Component
public class SocketServer {

    private static final Logger logger = LoggerFactory.getLogger(SocketServer.class);

    /**
     * 负责初始化 netty 服务器
     */
    private ServerBootstrap serverBootstrap;

    @Autowired
    private SocketInitializer socketInitializer;

    @Value("${netty.websocket.port}")
    private int port;

    /**
     * 启动 netty 服务器
     */
    public void start() {
        this.init();
        this.serverBootstrap.bind(this.port);
        logger.info("Netty started on port: {} (TCP) with boss thread {}", this.port, 2);
    }

    /**
     * 初始化 netty 配置
     */
    private void init() {
        // 创建两个线程组 bossGroup 为接收请求的线程组 一般1-2个就行
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(2);
        // 实际工作的线程组
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        this.serverBootstrap = new ServerBootstrap();
        // 两个线程组加入进来 加入自己的初始化器
        this.serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(this.socketInitializer);
    }
}
  • 编写Netty服务端监听消息处理器
public class SocketHandler extends ChannelInboundHandlerAdapter {

    private static final Logger log = LoggerFactory.getLogger(SocketHandler.class);

    public static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 读取到客户端发来的消息
     *
     * @param ctx ChannelHandlerContext
     * @param msg msg
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 由于我们配置的是 字节数组 编解码器,所以这里取到的用户发来的数据是 byte数组
        byte[] data = (byte[]) msg;
        log.info("收到消息: " + new String(data));
        // 给其他人转发消息
        for (Channel client : clients) {
            if (!client.equals(ctx.channel())) {
                client.writeAndFlush(data);
            }
        }
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        log.info("新的客户端链接:" + ctx.channel().id().asShortText());
        clients.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        clients.remove(ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.channel().close();
        clients.remove(ctx.channel());
    }
}
  • 设置出站解码器和入站编码器
@Component
public class SocketInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) {
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 添加对byte数组的编解码,netty提供了很多编解码器,你们可以根据需要选择
        pipeline.addLast(new ByteArrayDecoder());
        pipeline.addLast(new ByteArrayEncoder());
        // 添加上自己的处理器
        pipeline.addLast(new SocketHandler());
    }
}

客户端

  • 编写 socket连接
public class ChatClient {

    public void start(String name) throws IOException {
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 1024));
        socketChannel.configureBlocking(false);
        Selector selector = Selector.open();
        socketChannel.register(selector, SelectionKey.OP_READ);
        // 监听服务端发来得消息
        new Thread(new ClientThread(selector)).start();
        // 监听用户输入
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()) {
            String message = scanner.next();
            if (StringUtils.hasText(message)) {
                socketChannel.write(StandardCharsets.UTF_8.encode(name + ": " + message));
            }
        }
    }

    private class ClientThread implements Runnable {

        private final Logger logger = LoggerFactory.getLogger(ClientThread.class);

        private final Selector selector;

        public ClientThread(Selector selector) {
            this.selector = selector;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    int channels = selector.select();
                    if (channels == 0) {
                        continue;
                    }
                    Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                    Iterator<SelectionKey> keyIterator = selectionKeySet.iterator();
                    while (keyIterator.hasNext()) {
                        SelectionKey selectionKey = keyIterator.next();
                        // 移除集合当前得selectionKey,避免重复处理
                        keyIterator.remove();
                        if (selectionKey.isReadable()) {
                            handleRead(selector, selectionKey);
                        }
                    }
                }
            } catch (IOException e) {
                logger.error(e.getMessage(), e);
            }
        }
    }

    // 处理可读状态
    private void handleRead(Selector selector, SelectionKey selectionKey) throws IOException {
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        StringBuilder message = new StringBuilder();
        if (channel.read(byteBuffer) > 0) {
            byteBuffer.flip();
            message.append(StandardCharsets.UTF_8.decode(byteBuffer));
        }
        // 再次注册到选择器上,继续监听可读状态
        channel.register(selector, SelectionKey.OP_READ);
        System.out.println(message);
    }
}
  • start()
public static void main(String[] args) throws IOException {
    new ChatClient().start("张三");
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/20702.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【万兴PDF专家】OCR引擎的离线安装方法,让你不受网速的折磨,PDF给OCR成可搜索的高级PDF,牛逼了我的万兴

一、问题背景 万兴PDF是一个很好用的PDF工具&#xff0c;它不仅可以实现PDF的浏览和批注常见功能&#xff0c;还具有OCR、压缩PDF&#xff0c;乃至批量化的功能。 因此&#xff0c;实在是一个非常值得花钱去买的PDF工具包&#xff01;&#xff01; 但是&#xff0c;软件里的O…

Prometheus与Grafana监控SpringBoot应用

Prometheus与Grafana监控SpringBoot应用 1.SpringBoot应用暴露端点 2.转换成Prometheus能解析得数据 3.向Prometheus注册时赋予项目名 docker部署 4701模板

七.STM32F030C8T6 MCU开发之TIMER模块级联组成32BIT计时器案例

七.STM32F030C8T6 MCU开发之TIMER模块级联组成32BIT计时器案例 文章目录七.STM32F030C8T6 MCU开发之TIMER模块级联组成32BIT计时器案例0.总体功能概述1.TIM硬件介绍1.1 TIM1/3级联硬件介绍1.1.1 主从模式介绍1.1.2 TIM1为主&#xff0c;TIM3为从&#xff0c;TIM3 的输入触发源选…

【计算机网络】习题(三)—— 数据链路层

【计算机网络】习题&#xff08;三&#xff09;—— 数据链路层2&#xff0e;数据链路层协议的功能不包括&#xff08;). A&#xff0e;定义数据格式 B。提供结点之间的可靠传输 C.控制对物理传输介质的访问 D.为终端结点隐蔽物理传输的细节 2.D 主是是数据链路层的主要功能包…

SECCON CTF 2022 web复现

skipinx 知识点&#xff1a;qs 参数解析错误qs简介 一句话介绍就是&#xff1a;qs是负责url参数转化的js库&#xff0c;当然也可以说是查询字符串解析和字符串化库。 详细了解移步&#xff1a;https://www.npmjs.com/package/qs qs简单用法 例如&#xff1a;我们 url 参数…

NTPv4协议解析

前言 本文的撰写基于RFC5905.NTP 是时间网络控制协议&#xff0c;V4版本相交V3版本&#xff0c;修复了V3存在的一些问题。尤其是NTPV4的拓展时间戳鼓励使用浮动双数据类型&#xff0c;这样使得NTP能够更好的支持1ns的场景&#xff0c;轮询间隔也从上一代的最多1024s拓展到了36…

上位机通信标准-OPC

OPC通信&#xff0c;基于OPC的通信是一种通信整合方案&#xff0c;通过OPC标准&#xff0c;整合各类协议并统一化接口。 1、上位机通信环境 - 品牌、各类繁多 - 通信环境的统一&#xff1a;OPC 2、OPC - 什么是OPC&#xff1a;OLE for Process Control Windows插件&#x…

数字集成电路设计(五、仿真验证与 Testbench 编写)(一)

文章目录引言1. Verilog HDL 电路仿真和验证概述2. Verilog HDL测试程序设计基础2.1 Testbench及其结构2.2 测试平台举例2.2.1 组合电路仿真环境搭建2.2.2 时序电路仿真环境搭建2.3 Verilog HDL仿真结果确认2.4 Verilog HDL仿真效率3. 与仿真相关的系统任务3.1 $display和\$wri…

Mybatis的二级缓存 (默认方式)

目录前置生效场景一场景二失效场景一场景二场景三场景四脏数据场景前置 什么是二级缓存: 一级缓存是基于sqlsession级别, 当一个sqlsession会话结束, 一级缓存也就结束了. 定义一级缓存为局部缓存, 那么二级缓存就是全局全局缓存 二级缓存是基于mapper文件的namespace级别&…

进程和线程的区别

进程和线程的区别 文章目录进程和线程的区别进程和线程的概念一、从属关系不同二、所属基本单位不同三、资源消耗不同四、是否同步和互斥额外补充问题&#xff1a;一个进程是不是可以创建无限数量的线程&#xff1f;参考链接进程和线程的概念 在了解区别之前&#xff0c;我们先…

【Java】IO流 - 字节流

文章目录FileInputStream 介绍FileOutputStream介绍文件输入输出综合使用【拷贝】FileInputStream 介绍 创建一个txt文件&#xff0c;写入 HelloWorld 并用Java读取&#xff1a; Test public void readFile01(){//提前创建一个文件hello.txt并编辑一个HelloWorldString filePa…

Nacos 注册中心的常用配置

1.服务端地址 spring.cloud.nacos.discovery.server-addr 无 Nacos Server 启动监听的 ip 地址和端口2.服务名 spring.cloud.nacos.discovery.s ervice ${spring.application.name} 给当前的服务命名3.服务分组spring.cloud.nacos.discovery.groupDEFAULT_GROUP 设置服务所处的…

机器视觉之ros人脸识别

系列文章目录 机器视觉之ros人脸识别 ros人脸识别系列文章目录一、WIN下的环境设置二、连接摄像头设备到虚拟机三、安装摄像头驱动设备3.1判断安装usb还是uvc驱动包3.2查看摄像头设备3.3测试网络摄像头3.4安装摄像头驱动包四、调用视觉功能包五、人脸识别的调用一、WIN下的环境…

封装系统之新手操作版

一、需要软件&#xff1a;Vmware16&#xff0c;win10正版系统&#xff0c;EasySysprep5&#xff0c;EasyU_v3.6.iso 下载地址&#xff1a;EasySysprep5&#xff1a;https://www.itsk.com/thread-425990-1-1.html EasyU_v3.6&#xff1a;https://www.itsk.com/thread-426856-1-1…

【计算机视觉】不来试试图片轮廓提取?

文章目录&#x1f6a9; 前言&#x1f348; 边缘提取原理卷积用特殊的卷积核进行轮廓提取&#x1f34f; 开始轮廓提取代码&#x1f6a9; 前言 最近学到了深度学习的卷积操作&#xff0c;在卷积神经网络出现之前&#xff0c;就已经有使用卷积核 &#xff08;也叫滤波器&#xff…

NLP模型(三)——FastText介绍

文章目录1. FastText 概述2. FastText 分类模型2.1 结构2.2 n-gram3. FastText 词嵌入模型1. FastText 概述 首先&#xff0c;我们得搞清楚&#xff0c;FastText 是什么&#xff1f;有的地方说是分类模型&#xff0c;有的地方又将其用于词向量&#xff0c;那么&#xff0c;Fas…

ppt复现CVPR顶会流程图

本次目标如下图&#xff0c;难点在于立方体和矩阵格网的绘制 文末附机器学习绘图模板~ 先来绘制立方体&#xff0c;插入——形状——立方体&#xff0c;调节成如下图&#xff0c;再点击水平翻转&#xff1a; 绘制矩形&#xff0c;多绘制几个组合成矩形格网&#xff0c;右键设置…

TFT-LCD屏幕读取Flash芯片图片资源并显示

TFT-LCD屏幕读取Flash芯片图片资源并显示 在前面用TFT-LCD显示图片的实验中&#xff0c;由于图片资源过大&#xff0c;240 * 320 的图片大小为150K&#xff0c;而STM32F103ZET6的内部Flash才512K&#xff0c;最多能放三张图片&#xff0c;所以这次将图片放到外部Flash中&#…

【Java八股文总结】之Redis数据库

文章目录Redis 数据库一、Redis基础1、Redis应用场景2、Redis数据类型3、Redis常用命令4、Redis为什么速度快&#xff1f;5、Redis和Memcached的区别和共同点6、Redis和MySQL的区别&#xff1f;二、高可用1、主从复制Q&#xff1a;主从复制主要的作用?2、Redis主从复制原理Red…

Cadence之Allegro:蛇形与差分等长

文章目录 一、三种等长方法二、直接等长法设置教程1、差分设置2、analysis设置三、pin-pair法设置教程一、三种等长方法 直接等长法 适用pin和pin之间没有容抗和阻抗的情况,即pin和pin之间只有一根线、没有电阻和电容的时候才可以使用这种方法。 pin-pair法 建立Sigxplorer模形…