Netty笔记03-组件Channel

news2024/9/20 7:56:49

文章目录

  • Channel概述
    • Channel 的概念
    • Channel 的主要功能
    • Channel 的生命周期
    • Channel 的状态
    • Channel 的类型
    • channel 的主要方法
  • ChannelFuture
  • CloseFuture
    • 💡 netty异步提升的是什么
    • 要点
    • 总结


Channel概述

Channel 的概念

在 Netty 中,Channel 是一个非常重要的概念,它代表了网络连接的抽象(一个网络连接),用于进行数据的读取和写入操作。Channel 是 Netty 中处理网络 I/O 的核心组件之一,它将网络 I/O 操作封装在一个统一的接口下,使得开发人员可以更容易地编写高性能的网络应用程序。

Channel 是 Netty 中的接口,它定义了一组基本的操作,如打开、关闭、读取、写入等。Netty 为不同的网络通信协议提供了不同的Channel 实现,比如 TCP 的 SocketChannel,UDP 的 DatagramChannel 等。

Channel 的主要功能

  • 读取数据:从网络中读取数据。
  • 写入数据:将数据写入到网络中。
  • 关闭连接:关闭当前的 Channel。
  • 绑定地址:将 Channel 绑定到特定的本地地址(IP 地址和端口号)。
  • 注册到 EventLoop:将 Channel 注册到 EventLoop,以便处理 I/O 事件。
  • 获取ChannelPipeline:每个 Channel 都有一个 ChannelPipeline,用于处理入站和出站的数据流。

Channel 的生命周期

一个 Channel 的生命周期通常包括以下几个阶段:

  • 创建:当一个新连接建立时,Netty 会创建一个新的 Channel。
  • 注册:新创建的 Channel 会注册到一个 EventLoop 上。
  • 激活:当 Channel 被绑定到一个 Socket 地址时,它会变为激活状态。
  • 读写操作:Channel 可以进行读取和写入操作。
  • 关闭:当连接关闭时,Channel 也会被关闭。

Channel 的状态

Channel 有几个重要的状态,包括但不限于:

  • 注册状态:Channel 是否已经被注册到 EventLoop。
channel.isRegistered() // 检查是否已注册
  • 活跃状态:Channel 是否已经绑定到一个 Socket 地址。
channel.isActive() // 检查是否活跃
  • 打开状态:Channel 是否处于打开状态。
channel.isOpen() // 检查是否打开

Channel 的类型

Netty 为不同的网络通信协议提供了不同的 Channel 实现:

  • TCP 通信:NioServerSocketChannel 和 NioSocketChannel。
  • UDP 通信:NioDatagramChannel。
  • 文件传输:FileRegion。

channel 的主要方法

  • close() 可以用来关闭 channel
  • closeFuture() 用来处理 channel 的关闭
    • sync 方法作用是同步等待 channel 关闭
    • 而 addListener 方法是异步等待 channel 关闭
  • pipeline() 方法添加处理器
  • write() 方法将数据写入(只会将数据写入到channel的缓冲区中,具体什么时候发送数据则有很多条件,如在执行flush()或缓冲区达到一定大小)
  • writeAndFlush() 方法将数据写入并刷出(将数据写入到channel的缓冲区中并立刻从缓冲区中发出)

ChannelFuture

使用上一章的服务器端和客户端代码
服务器端

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import jdk.internal.org.objectweb.asm.Handle;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;

@Slf4j
public class EventLoopServer {
    public static void main(String[] args) {
        // 创建一个独立的 EventLoopGroup,将耗时的代码放到一个额外的组中线程处理
        EventLoopGroup group = new DefaultEventLoopGroup();
        new ServerBootstrap()
                // boss 和 worker
                //  boss 只负责 ServerSocketChannel 上 accept 事件
                //  worker 只负责 socketChannel 上的读写
                //group参数1:boss不需要指定线程数,因为ServerSocketChannel只会跟一个EventLoop进行绑定,
                //              又因为服务器只有一个,所以只会占用一个线程,不用指定线程数。
                //group参数1:work线程指定为两个
                .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //如果读操作耗费的时间很上,会影响其他客户端的读写操作,
                        // 一个work管理多个channel,如果其中一个耗时过长则会影响其他channel的读写操作
                        ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
                            /**
                             * @param ctx
                             * @param msg ByteBuf类型
                             * @throws Exception
                             */
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                log.debug(buf.toString(Charset.defaultCharset()));
                                ctx.fireChannelRead(msg); // 让消息传递给下一个handler,如果不添加则消息不会传递给handler2中
                            }
                        }).addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
                            @Override                                         // ByteBuf
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                log.debug(buf.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

客户端代码

new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080)
    .sync()
    .channel()
    .writeAndFlush(new Date() + ": hello world!");

拆分上面的代码

ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080); // 标记1

channelFuture.sync()
.channel()
.writeAndFlush(new Date() + ": hello world!");
  • 1 处返回的是 ChannelFuture 对象,它的作用是利用 channel() 方法来获取 Channel 对象

注意 connect 方法是异步的,意味着不等连接建立,方法执行就返回了。因此 channelFuture 对象中不能【立刻】获得到正确的 Channel 对象。

实验如下:

// 2. 类中带有 Future,Promise 的类型都是和异步方法配套使用,用来处理结果
ChannelFuture channelFuture = new Bootstrap()
        .group(new NioEventLoopGroup())
        .channel(NioSocketChannel.class)
        .handler(new ChannelInitializer<NioSocketChannel>() {
            @Override // 在连接建立后被调用
            protected void initChannel(NioSocketChannel ch) throws Exception {
                ch.pipeline().addLast(new StringEncoder());
            }
        })
        // 1. 连接到服务器
        // 异步非阻塞, main 发起了调用,真正执行 connect 是 nio 线程
        .connect(new InetSocketAddress("localhost", 8080)); // 1s 秒后

// 2.1 使用 sync 方法同步处理结果
//1.
System.out.println("sync前:"+channelFuture.channel());//[id: 0xe3489549]
//2.
channelFuture.sync(); // 阻塞住当前线程,直到nio线程连接建立完毕
//3.
System.out.println("sync后:"+channelFuture.channel());//[id: 0xe3489549, L:/127.0.0.1:54398 - R:localhost/127.0.0.1:8080]
Channel channel = channelFuture.channel();
log.debug("{}", channel);
//[DEBUG] [main] c.i.n.c.EventLoopClient - [id: 0xf540a105, L:/127.0.0.1:20434 - R:localhost/127.0.0.1:8080]
channel.writeAndFlush("hello, world");
channel.writeAndFlush("hello, world");
channel.writeAndFlush("hello, world");
System.out.println();
  • 执行到 1 时,连接未建立,打印 [id: 0xe3489549]
  • 执行到 2 时,sync 方法是同步等待连接建立完成
  • 执行到 3 时,连接肯定建立了,打印 [id: 0xe3489549, L:/127.0.0.1:54398 - R:localhost/127.0.0.1:8080]

除了用 sync 方法可以让异步操作同步以外,还可以使用回调的方式:

// 2. 类中带有 Future,Promise 的类型都是和异步方法配套使用,用来处理结果
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override // 在连接建立后被调用
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                // 1. 连接到服务器
                // 异步非阻塞, main 发起了调用,真正执行 connect 是 nio 线程
                .connect(new InetSocketAddress("localhost", 8080)); // 1s 秒后

        // 2.2 使用 addListener(回调对象) 方法异步(将main线程处理的活交给其他线程)处理结果
        //  将等待连接建立、连接成功后处理结果全部交给其他的线程处理
        //1.
        System.out.println("sync前:"+channelFuture.channel());//[id: 0x3baac75f]
        channelFuture.addListener(new ChannelFutureListener() {
/**         ↑
 *          ↑     ←      ←       ↑
 *                               ↑
 * @param future 该future对象就是channelFuture对象
 * @throws Exception
 */
            @Override
            // 在 nio 线程连接建立好之后,会调用 operationComplete
            public void operationComplete(ChannelFuture future) throws Exception {
                //2.
                System.out.println("sync后:"+channelFuture.channel());//[id: 0x3baac75f, L:/127.0.0.1:54744 - R:localhost/127.0.0.1:8080]
                Channel channel = future.channel();
                log.debug("{}", channel);
                //[DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.EventLoopClient - [id: 0x2ac4448f, L:/127.0.0.1:20346 - R:localhost/127.0.0.1:8080]
                channel.writeAndFlush("hello, world");
                //调用以上三行代码的还是nio线程
            }
        });
  • 执行到 1 时,连接未建立,打印 [id: 0x3baac75f]
  • ChannelFutureListener 会在连接建立时被调用(其中 operationComplete 方法),因此执行到 2 时,连接肯定建立了,打印 [id: 0x3baac75f, L:/127.0.0.1:54744 - R:localhost/127.0.0.1:8080]

CloseFuture

需求:客户端的控制台不断的接受用户的输入,将用户输入的信息发给服务器端, 当不想发送信息时,输入q退出。

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Scanner;

@Slf4j
public class CloseFutureClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override // 在连接建立后被调用
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //LoggingHandler一般用于调试使用,会将channel运行流程、状态显示出来
                        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        System.out.println(channelFuture.getClass());
        Channel channel = channelFuture.sync().channel();
        log.debug("{}", channel);
        new Thread(()->{
            Scanner scanner = new Scanner(System.in);
            //不断拿到用户输入的信息
            while (true) {
                String line = scanner.nextLine();
                if ("q".equals(line)) {
                    channel.close(); // 调用close()方法是交给其他线程执行异步操作
//                    log.debug("处理关闭之后的操作"); // 不能在这里善后,因为是交给其他线程执行可能出现1s之后执行close()的情况
                    break;
                }
                //输入的不是q则发送给服务器
                channel.writeAndFlush(line);
            }
        }, "input").start();

        // 获取 CloseFuture 对象, 1) 同步处理关闭, 2) 异步处理关闭
        ChannelFuture closeFuture = channel.closeFuture();
        //1) 同步处理关闭
//        log.debug("waiting close...");
//        closeFuture.sync();//只有在调用了channel.close()后才会继续向下运行
//        log.debug("处理关闭之后的操作");

        //2) 异步处理关闭
        System.out.println("closeFuture.getClass():"+closeFuture.getClass());
        
//        channelFuture.addListener(new ChannelFutureListener() {
//            //关闭channel的线程调用operationComplete方法
//            //也就是nio线程执行完close(),找到该回调对象调用该方法
//            @Override
//            public void operationComplete(ChannelFuture future) throws Exception {
//                log.debug("处理关闭之后的操作");
//            }
//        });
        //以上代码会出现用户输入q时,程序没有结束的情况。
        // 原因是NioEventLoopGroup中还有一些线程在运行,需要进行关闭
       
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                log.debug("处理关闭之后的操作");
                group.shutdownGracefully();
            }
        });
        //或使用以下代码替换
//        closeFuture.addListener((ChannelFutureListener) future -> {
//            log.debug("处理关闭之后的操作");
//            group.shutdownGracefully();
//        });

    }
}

在这里插入图片描述

💡 netty异步提升的是什么

  • 有些同学看到这里会有疑问:为什么不在一个线程中去执行建立连接、去执行关闭 channel,那样不是也可以吗?非要用这么复杂的异步方式:比如一个线程发起建立连接,另一个线程去真正建立连接。
  • 还有同学会笼统地回答,因为 netty 异步方式用了多线程、多线程就效率高。其实这些认识都比较片面,多线程和异步所提升的效率并不是所认为的。(其实提高的是单位时间内处理请求的吞吐量)

思考:
思考下面的场景,4 个医生给人看病,每个病人花费 20 分钟,而且医生看病的过程中是以病人为单位的,一个病人看完了,才能看下一个病人。假设病人源源不断地来,可以计算一下 4 个医生一天工作 8 小时,处理的病人总数是:4 * 8 * 3 = 96
在这里插入图片描述
经研究发现,看病可以细分为四个步骤,经拆分后每个步骤需要 5 分钟,如下
在这里插入图片描述
因此可以做如下优化,只有一开始,医生 2、3、4 分别要等待 5、10、15 分钟才能执行工作,但只要后续病人源源不断地来,他们就能够满负荷工作,并且处理病人的能力提高到了 4 * 8 * 12 效率几乎是原来的四倍。
在这里插入图片描述

要点

  • 单线程没法异步提高效率,必须配合多线程、多核 cpu 才能发挥异步的优势
  • 异步并没有缩短响应时间,反而有所增加
  • 合理进行任务拆分,也是利用异步的关键

总结

异步并没有缩短处理单位的时间,反而有所增加请求的处理和响应时间。
关键点是提高了单位时间内处理请求的(个数)吞吐量,单位时间内能过处理请求的速度


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2129357.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

c++题目_洛谷 / 题目详情 P1012 [NOIP1998 提高组] 拼数

# [NOIP1998 提高组] 拼数 ## 题目描述 设有 $n$ 个正整数 $a_1 \dots a_n$&#xff0c;将它们联接成一排&#xff0c;相邻数字首尾相接&#xff0c;组成一个最大的整数。 ## 输入格式 第一行有一个整数&#xff0c;表示数字个数 $n$。 第二行有 $n$ 个整数&#xff0c;表…

Linux学习笔记(黑马程序员,前四章节)

第一章 快照 虚拟机快照&#xff1a; 通俗来说&#xff0c;在学习阶段我们无法避免的可能损坏Linux操作系统&#xff0c;如果损坏的话&#xff0c;重新安装一个Linux操作系统就会十分麻烦。VMware虚拟机支持为虚拟机制作快照。通过快照将当前虚拟机的状态保存下来&#xff0c;…

Leetcode 移动零

要求将数组中的所有 0 移动到数组的末尾&#xff0c;同时保持非零元素的相对顺序。下面是该题的 C 解决方案&#xff1a; class Solution { public:void moveZeroes(vector<int>& nums) {int nonZeroPos 0; // 记录非零元素应该放置的位置// 遍历数组&#xff0c;…

镭速如何高效使用SQLite数据库高速传输结构化数据文件传输

SQLite数据库因其小巧、跨平台和无需配置的特性&#xff0c;在众多关系型数据库中独树一帜。与传统的服务器/客户端架构数据库&#xff0c;例如MySQL不同&#xff0c;SQLite通常被嵌入到应用程序中作为一个库。它不仅支持创建索引&#xff0c;还严格遵循ACID原则&#xff0c;非…

先攒一波硬件,过几年再给电脑升级,靠谱吗?想啥呢?

前言 最近有小伙伴发来消息&#xff1a;我可以今年先买电脑的部分硬件&#xff0c;明年再买电脑的另一部分硬件&#xff0c;再组装起来不就是一台电脑了吗&#xff1f; 这确实是一个很好的办法。 我还记得大学有个室友&#xff0c;从大一每个月省吃俭用&#xff0c;攒下的钱…

全球AI应用市场洞察:市场快速崛起,“陪伴式AI”、“图像AI”应用爆款频出!

自2023年 GPT4 的问世以来&#xff0c;得益于这股新的技术变量&#xff0c;各类 AI 应用在市场上遍地开花。在这轮热潮之下&#xff0c;不同市场和细分赛道有何机遇和挑战&#xff1f;以下根据 Sensor Tower 最新报告分析&#xff0c;帮助移动开发者、广告主洞察全球AI应用市场…

Linux系统:chgrp命令

1、命令详解&#xff1a; chgrp命令是Linux操作系统中用于修改文件或目录的所属组的命令。chgrp命令还可以修改链接文件的所属组&#xff0c;而不是链接所指向的文件的所属组。默认情况下&#xff0c;只有文件的所有者和超级用户才能修改文件的所属组&#xff0c;但如果用户是一…

JavaWeb【day08】--(MySQL-Mybatis入门)

数据库开发-MySQL 1. 多表查询 1.1 概述 1.1.1 数据准备 SQL脚本&#xff1a; #建议&#xff1a;创建新的数据库 create database db04; use db04; ​ -- 部门表 create table tb_dept (id int unsigned primary key auto_increment comment 主键ID,name v…

Win32编程:创建属于你的第一个窗口

目录 一、Win32程序的运行过程 二、创建Windows窗口 1、分析 2、完整代码 一、Win32程序的运行过程 Win32编程&#xff0c;也称为Windows编程。运行步骤主要包含&#xff1a;加载程序&#xff08;操作系统加载程序的可执行文件&#xff08;exe格式&#xff09;到内存中,创建…

代码随想录刷题day30丨452. 用最少数量的箭引爆气球, 435. 无重叠区间,763.划分字母区间

代码随想录刷题day30丨452. 用最少数量的箭引爆气球&#xff0c; 435. 无重叠区间&#xff0c;763.划分字母区间 1.题目 1.1用最少数量的箭引爆气球 题目链接&#xff1a;452. 用最少数量的箭引爆气球 - 力扣&#xff08;LeetCode&#xff09; 视频讲解&#xff1a;贪心算法…

ColorThief的介绍与使用

概述 colorThief是一个 Javascript 插件&#xff0c;支持在浏览器端或 Node 环境中使用。Thief的中文意思是偷窃、小偷。colorThief的作用就是通过算法去获取图片的色源。 API 介绍与示例 colorThief提供两个方法&#xff0c;getColor和getPalette&#xff0c;这两个方法在 …

类和对象(c++)

欢迎来到本期频道&#xff01; 类和对象 类定义&#xff1a;格式&#xff1a;类域&#xff1a;访问限定符友元内部类this指针静态与非静态成员关系类型转换六大默认成员函数&#xff08;C98&#xff09;1️⃣构造函数2️⃣拷贝构造函数浅拷贝与深拷贝 3️⃣赋值重载拷贝函数4️…

x-cmd pkg | superfile: 终端文件管理器,界面精致美观

目录 简介快速上手功能特点竞品和相关项目进一步阅读 简介 superfile 是 github.com/yorukot 用 Go 开发的终端文件管理器&#xff0c;相比于其他终端文件管理器&#xff0c;它最显著的特点是 UI 精致美观。 Tip Superfile 采用了特殊的 Unicode 符号来标识各种类型的文件&…

【我的 PWN 学习手札】Unsortedbin Leak

前言 从前都是野路子学习&#xff0c;学校时间也比较紧张&#xff0c;没有能够好好总结。一直有做个人笔记的习惯&#xff0c;但是学习路线比较分散盲目&#xff0c;虽然跟着wiki做&#xff0c;但是也面临知识点不全的窘境。近期开始跟着课程系统的学习&#xff0c;对于老的知…

mp3转文字要怎么处理?使用这4个工具就对了

MP3是音频当中比较常用的格式&#xff0c;如果像将其转换成文字内容&#xff0c;一般的语音转文字工具都是可以完成的。但是音频转换成文字的过程中&#xff0c;它的准确率是会受到像口音&#xff0c;语言&#xff0c;环境音等因素的影响的。所以大家如果想将自己的mp3语音转成…

en造数据结构与算法C# 用数组实现个栈还不简单???看我一秒破之!!!(unity演示)

实现效果 线性表&#xff0c;线性表是指数据元素按顺序排列的一种数据结构&#xff0c;下面这些东西可以说都是线性表&#xff0c;所以用array实现三者一点问题都没有 分别是List动态数组&#xff0c;stack栈和queue队列&#xff0c;今天就秒掉stack 栈&#xff1a;先进后出 实…

Vue邮件发送:如何有效集成邮件发送功能?

vue邮件发送功能实现方法&#xff1f;Vue邮件发送性能怎么优化&#xff1f; 无论是用户注册验证、密码重置&#xff0c;还是通知提醒&#xff0c;邮件发送功能都能提供重要的支持。本文将详细探讨如何在Vue项目中有效集成邮件发送功能&#xff0c;确保邮件能够准确、及时地送达…

万龙觉醒免费辅助:VMOS云手机辅助巴克尔阵容搭配攻略!

《万龙觉醒》是一款策略类手游&#xff0c;选择合适的英雄阵容搭配能够极大提升战斗效果。而借助VMOS云手机的辅助功能&#xff0c;玩家可以更加轻松地管理游戏进程&#xff0c;优化操作体验。以下是VMOS云手机的三大核心功能&#xff0c;帮助你更好地掌控《万龙觉醒》战局。 V…

通信工程学习:什么是QoS服务质量

QoS服务质量 在通信工程中&#xff0c;QoS&#xff08;Quality of Service&#xff0c;服务质量&#xff09;是一个至关重要的概念&#xff0c;它关乎网络性能的提升和用户体验的优化。QoS是网络的一种安全机制&#xff0c;旨在通过一系列技术和策略来优化网络资源的分配&#…

代码随想录打卡Day30

今天的题目还可以&#xff0c;第一题看了视频&#xff0c;看卡哥把问题转化成数学问题&#xff0c;把图画出来以后就会了&#xff0c;剩下两题没看视频直接AC的。 452. 用最少数量的箭引爆气球 这个题主要是画完图以后就很好理解了&#xff0c;需要先对区间按照区间左值进行排…