NIO中的异步—ChannelFuture、CloseFuture以及异步提升在NIO中的应用

news2025/2/5 14:52:42

ChannelFuture

        客户端调用connect后返回值为ChannelFuture对象,我们可以利用ChannelFuture中的channel()方法获取到Channel对象。

        由于上述代为为客户端实现,若想启动客户端实现连接操作,必须编写服务端代码,实现如下:

package netty.simpleNetty.channel;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

/**
 * 服务端配合ClientChannelFuture验证Channel关闭的异步以及回调函数实现
 */
@Slf4j
public class ServerChannelFuture {
    public static void main(String[] args) throws InterruptedException {
        new ServerBootstrap()
                .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buffer = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
                                if(buffer != null){
                                    byte[] bytes = new byte[16];
                                    ByteBuf len = buffer.readBytes(bytes, 0, buffer.readableBytes());
                                    log.info(new String(bytes));
                                }
                            }
                        });
                    }
                })
                .bind(8000)
                .sync();
    }
}

        随后运行上述客户端代码,观察到日志输出如下:

         通过此输出日志发现借用channelFuture对象的channel方法获取的Channel对象并没有创建。

原因分析:connect方法是异步的,意味着不等待连接建立,方法执行就返回了。因此channelFuture对象中不能【立刻】获取到正确的Channel对象。

解决方法:

  • 使用sync方法让异步操作同步等待连接建立。
  • 使用回调方法,当connect连接建立后主动调用回调函数。

sync让异步操作同步

        获取到channelFuture对象后,调用sync()方法,同步等待连接的结束。sync阻塞住当前线程,直到Nio线程连接建立完毕 。先启动服务端再启动客户端日志输出如下:

         此时调用channelFuture.channle()方法获取到Channel,随后可以利用获取到的channel向服务端发送消息。

使用回调函数 

        调用获取的channelFuture对象的addListener()方法向其添加回调函数。

        可以将上述代码用Lambda表达式简化如下:

         服务端整体代码实现如下:

package netty.simpleNetty.channel;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;

/**
 * 客户端关闭channel异步以及回调函数实现
 */
@Slf4j
public class ClientChannelFuture {
    public static void main(String[] args) throws UnknownHostException, InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup(2))
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {

                    }
                })
                .connect(new InetSocketAddress(InetAddress.getLocalHost(), 8000));
        log.info("Before sync : {}",channelFuture.channel());
//        channelFuture.sync();
//        log.info("After sync : {}", channelFuture.channel());
        channelFuture.addListener((ChannelFutureListener)future -> {
            log.info("After listen : {}", future.channel());
        });
//        channelFuture.addListener(new ChannelFutureListener() {
//            @Override
//            public void operationComplete(ChannelFuture future) throws Exception {
//                log.info("After listen : {}", future.channel());
//            }
//        });
    }
}

CloseFuture

        上述操作探讨了调用connect方法建立连接后,根据chanelFuture获取到Channel对象的sync以及回调方法实现。在获取到Channel对象后,执行对应的writeAndFlush()方法发送消息后,需要及时关闭channel。

        channel关闭操作的实现有sync同步等待以及添加回调的实现方法:代码如下:

package netty.simpleNetty.channel;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Scanner;

/**
 * Test CloseFuture的异步以及回调实现
 */
@Slf4j
public class ClientCloseFuture {
    public static void main(String[] args) throws InterruptedException, UnknownHostException {
        Channel channel = new Bootstrap()
                .group(new NioEventLoopGroup(2))
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress(InetAddress.getLocalHost(), 8000))
                .sync()
                .channel();
        log.info("channel : {}", channel);
        new Thread(()->{
            Scanner scanner = new Scanner(System.in);
            while (true){
                String line = scanner.nextLine();
                if("q".equals(line)){
                    // channel的close也是一个异步操作,本线程不会执行close操作,而是交由一个其他线程执行close操作
                    channel.close();
                    break;
                }
                channel.writeAndFlush(line);
            }
        }).start();
        ChannelFuture closeFuture = channel.closeFuture();
        log.info("waiting close...");
        closeFuture.sync();
        log.info("closed...");

        channel.closeFuture().addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                log.info("closed...");
            }
        });

//        channel.closeFuture().addListener((ChannelFutureListener) channelFuture1 ->{
//                    log.info("closed...");
//        });
    }
}

异步提升

        针对上述connect以及close操作,真正执行对应方法是在一个新的线程中执行的,而不是在调用connect的线程中执行连接操作。

        为什么不在一个线程中去执行建立连接、去执行关闭 channel【建立连接connet,关闭channel都是在EventLoopGroup中执行的。也即nio中执行的】,那样不是也可以吗?非要用这么复杂的异步方式:比如一个线程发起建立连接,另一个线程去真正建立连接。

        因为 netty 异步方式用了多线程、多线程就效率高。(理解错误的)

         netty中的异步指的是建立连接connet、读取数据read、关闭channel等不再一个线程中。而是由专门的线程(handler、EventLoop)负责执行专门的操作。

        思考下面的场景,4 个医生给人看病,每个病人花费 20 分钟,而且医生看病的过程中是以病人为单位的,一个病人看完了,才能看下一个病人。假设病人源源不断地来,可以计算一下 4 个医生一天工作 8 小时,处理的病人总数是:4 * 8 * 3 = 96。

经研究发现,看病可以细分为四个步骤,经拆分后每个步骤需要 5 分钟,如下

因此可以做如下优化,只有一开始,医生 2、3、4 分别要等待 5、10、15 分钟才能执行工作,但只要后续病人源源不断地来,他们就能够满负荷工作,并且处理病人的能力提高到了 4 * 8 * 12 效率几乎是原来的四倍。

  • 单线程没法异步提高效率,必须配合多线程、多核 cpu 才能发挥异步的优势

  • 异步并没有缩短响应时间,反而有所增加。【这里所说的响应时间的增加主要指的是不同子任务之间等待的耗时。

  • 合理进行任务拆分,也是利用异步的关键

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2056710.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

TCP协议为什么是三次握手和四次挥手

1.一次握手&&二次握手 一次握手就能成功的话&#xff0c;也就代表着不需要进行确认&#xff0c;那么万一有恶意的服务器一直发送SYN&#xff0c;而服务器需要维护大量的连接&#xff0c;维护连接又需要成本&#xff0c;那么就很容易引发SYN洪水&#xff0c;导致服务器…

Linux中的exec族函数

exec 系列函数用于替换当前进程的用户空间代码和数据&#xff0c;从而执行一个新的程序。调用 exec 系列函数不会创建新的进程&#xff0c;但会用新程序的代码和数据替换当前进程&#xff0c;因此调用 exec 后&#xff0c;进程的 ID 保持不变&#xff0c;但进程的行为变为执行新…

计算机毕业设计 教师科研管理系统 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试

&#x1f34a;作者&#xff1a;计算机编程-吉哥 &#x1f34a;简介&#xff1a;专业从事JavaWeb程序开发&#xff0c;微信小程序开发&#xff0c;定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事&#xff0c;生活就是快乐的。 &#x1f34a;心愿&#xff1a;点…

第1章-05-通过浏览器控制台安装JQuery.js库

🏆作者简介,黑夜开发者,CSDN领军人物,全栈领域优质创作者✌,CSDN博客专家,阿里云社区专家博主,2023年CSDN全站百大博主。 🏆数年电商行业从业经验,历任核心研发工程师,项目技术负责人。 🏆本文已收录于专栏:Web爬虫入门与实战精讲。 🎉欢迎 👍点赞✍评论⭐收…

大数据背景下基于Python语言的单车租赁商业数据可视化分析

注&#xff1a;源码在最后&#xff0c;只是一次实验记录,不足之处请指教。 一 研究背景及意义 在大数据时代&#xff0c;商业领域的数据量迅速增长&#xff0c;如何有效地利用这些数据成为企业决策和优化成为重要的研究课题。单车租赁作为一种新兴的共享经济模式&#xff0c;其…

健韵坊(详细项目实战一)Spring系列 + Vue3

这一次来一个项目改造的项目实战&#xff0c;基于很久之前的一个demo项目&#xff0c;来实现一个改造优化和部署上线的项目实战。&#xff08;就当是接手*山项目并且加以改造的一个实战吧。&#xff09; 之前是一个关于运动的一个项目&#xff08;其实之前连名字都没想好hhhh&…

vue3 响应式 API:watch()、watchEffect()

watch() 基本概念 watch()用于监视响应式数据的变化&#xff0c;并在数据变化时执行相应的回调函数。可以监视单个响应式数据、多个响应式数据的组合&#xff0c;或者一个计算属性。 返回值 返回一个函数&#xff0c;调用这个函数可以停止监视。 特点 watch() 默认是懒侦听的&…

【Linux网络】select函数

欢迎来到 破晓的历程的 博客 ⛺️不负时光&#xff0c;不负己✈️ 文章目录 select函数介绍select函数参数介绍select函数返回值select的工作流程TCP服务器【多路复用版】 select函数介绍 在Linux网络编程中&#xff0c;select 函数是一种非常有用的IO多路复用技术&#xff0…

秃姐学AI系列之:LeNet + 代码实现

目录 LeNet MNIST数据集 LeNet模型图 ​编辑 总结 代码实现&#xff1a;卷积神经网络 LeNet LeNet&#xff08;LeNet-5&#xff09;由两个部分组成&#xff1a;卷积编码器核全连接层密集块 检查模型 LeNet 卷积神经网络里面最为著名的一个网络&#xff0c;80年代末提出…

【vue教程】七. Vue 的动画和过渡

文章目录 往期列表回顾本章涵盖知识点Vue 的内置动画系统基本的进入和离开过渡列表过渡 CSS 过渡CSS 过渡基础Vue 中的 CSS 过渡 JavaScript 动画使用 JavaScript 钩子 第三方动画库的使用集成 Animate.css 实例演示创建一个简单的动画应用 结语 往期列表 【vue教程】一. 环境…

iOS18升级出现白苹果、无法重启等问题,要怎么解决

随着苹果iOS 18系统beta版本的推出&#xff0c;不少用户在私信说升级后遇到了白苹果和无法重启等问题。这些问题不仅影响了大家的正常使用&#xff0c;还会导致数据丢失和系统崩溃。本文将详细介绍iOS 18升级后出现白苹果、无法重启等问题的原因及解决方法&#xff0c;帮助大家…

日期转时间濯

tfunction(date_str) local code ,time World:getTimeFromDateString(date_str) return time/(60*60*24) end print(t(2024-08-16)-t(2024-08-3))

指针 (四)

一 . 指针的使用和传值调用 &#xff08;1&#xff09;strlen 的模拟实现 库函数 strlen 的功能是求字符串长度&#xff0c;统计的是字符串中 \0 之前的字符个数&#xff0c;函数原格式如下&#xff1a; 我们的参数 str 接收到一个字符串的起始地址&#xff0c;然后开始统计…

JS 获取当前操作系统类型

在JavaScript中&#xff0c;‌直接获取用户的操作系统信息是不可能的&#xff0c;‌因为JavaScript主要运行在浏览器中&#xff0c;‌而浏览器出于安全和隐私的考虑&#xff0c;‌不会提供访问操作系统详细信息的API。‌ 但是&#xff0c;‌你可以通过分析用户代理字符串&…

数据库性能定位-慢sql定位、sql优化(docker容器实战)

安装好mysql数据之后&#xff0c;创建库的时候&#xff0c;要注意选择 字符集编码。如果没有选择好&#xff0c;你的库表存中文的时候&#xff0c;会字符集乱码。选择utf8mb4. 建表的时候&#xff0c;存储引擎 InnoDB、MyISAM mysql5.7及以后数据库&#xff0c;表的默认存储引…

官方招募 | 仓颉语言三方库社区建设全速启航,全球开发者、技术大神只等您!

Cangjie-TPC招募令 仓颉社区的小伙伴们&#xff0c;官方三方库&#xff08;Cangjie-TPC&#xff09;招募开始啦&#xff01; Cangjie-TPC&#xff08;https://gitcode.com/Cangjie-TPC&#xff09; 是 Cangjie 社区用于汇集基于仓颉编程语言开发的开源三方库的主干仓&#xf…

JS获取当前浏览器名称

在JavaScript中&#xff0c;获取当前浏览器名称的方法并不是一个标准的功能&#xff0c;因为浏览器厂商并没有提供一个直接的API来获取浏览器的名称。但是&#xff0c;你可以通过分析用户代理字符串&#xff08;User-Agent&#xff09;来推断出浏览器的名称。 以下是一个简单的…

ArcGIS如何将投影坐标系转回为地理坐标系

有时候两个数据&#xff0c;一个为投影坐标系&#xff0c;另一个为地理坐标系时&#xff0c;在GIS软件中位置无法叠加到一起&#xff0c;这需要将两个或多个数据的坐标系统一&#xff0c;可以直接将地理坐标系的数据进行投影&#xff0c;或将投影坐标系转为地理坐标系。下面介绍…

视频号AI美女跳舞,轻松月入30000+,蓝海赛道,流量池巨大,起号猛

今天给大家分享的是一个男生比较感兴趣的内容&#xff0c;AI美女视频跳舞项目4.0版本&#xff0c;7天快速起号&#xff0c;实现每月30000的稳定收入. 大家刷抖音的时候&#xff0c;肯定都刷到过美女跳舞的视频&#xff0c;对吧&#xff1f;这种视频&#xff0c;不管在哪个平台…

电商项目DevOps一体化运维实战

主要讲了git和jkins的使用&#xff0c;其中maven的一个插件还挺好用的&#xff0c;主要可以用来查看哪些类没有使用&#xff0c;哪些导入的包是多余的等。这里展示一下用法。至于git和jkins的搭建后续再操作。 maven插件的使用&#xff1a; 编译后就可以在target下面看到这个h…