庖丁解牛:NIO核心概念与机制详解 06 _ 连网和异步 I/O

news2025/1/2 3:59:49

文章目录

  • Pre
  • 概述
  • 异步 I/O
  • Selectors
  • 打开一个 ServerSocketChannel
  • 选择键
  • 内部循环
  • 监听新连接
  • 接受新的连接
  • 删除处理过的 SelectionKey
  • 传入的 I/O
  • 回到主循环

在这里插入图片描述


Pre

庖丁解牛:NIO核心概念与机制详解 01

庖丁解牛:NIO核心概念与机制详解 02 _ 缓冲区的细节实现

庖丁解牛:NIO核心概念与机制详解 03 _ 缓冲区分配、包装和分片

庖丁解牛:NIO核心概念与机制详解 04 _ 分散和聚集

庖丁解牛:NIO核心概念与机制详解 05 _ 文件锁定


概述

在 Java NIO 中,连网操作与其他操作一样,依赖于通道(Channel)和缓冲区(Buffer)。通道是用于读取和写入数据的途径,而缓冲区则用于暂存数据。

与传统的同步 I/O 不同,Java NIO 中的通道操作是非阻塞的,这意味着在发起 IO 请求后,进程可以继续执行其他任务,而不需要等待 IO 操作完成。当 IO 操作完成后,进程会收到通知,此时再进行相应的处理。


异步 I/O

异步 I/O 是一种 没有阻塞地 读写数据的方法。通常,在代码进行 read() 调用时,代码会阻塞直至有可供读取的数据。同样, write() 调用将会阻塞直至数据能够写入。

另一方面,异步 I/O 调用不会阻塞。相反,你将注册对特定 I/O 事件的兴趣 ― 可读的数据的到达、新的套接字连接,等等,而在发生这样的事件时,系统将会告诉你。

异步 I/O 的一个优势在于,它允许你同时根据大量的输入和输出执行 I/O。同步程序常常要求助于轮询,或者创建许许多多的线程以处理大量的连接。使用异步 I/O,你可以监听任何数量的通道上的事件,不用轮询,也不用额外的线程。

来看个Demo

这个程序就像传统的 echo server,它接受网络连接并向它们回响它们可能发送的数据。不过它有一个附加的特性,就是它能同时监听多个端口,并处理来自所有这些端口的连接。并且它只在单个线程中完成所有这些工作。


import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class MultiPortEcho {
    private int ports[];
    private ByteBuffer echoBuffer = ByteBuffer.allocate(1024);

    public MultiPortEcho(int ports[]) throws IOException {
        this.ports = ports;

        go();
    }

    private void go() throws IOException {
        // Create a new selector
        Selector selector = Selector.open();

        // Open a listener on each port, and register each one
        // with the selector
        for (int i = 0; i < ports.length; ++i) {
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
            ServerSocket ss = ssc.socket();
            InetSocketAddress address = new InetSocketAddress(ports[i]);
            ss.bind(address);

            SelectionKey key = ssc.register(selector, SelectionKey.OP_ACCEPT);

            System.out.println("Going to listen on " + ports[i]);
        }

        while (true) {
            int num = selector.select();

            Set selectedKeys = selector.selectedKeys();
            Iterator it = selectedKeys.iterator();

            while (it.hasNext()) {
                SelectionKey key = (SelectionKey) it.next();

                if ((key.readyOps() & SelectionKey.OP_ACCEPT)
                        == SelectionKey.OP_ACCEPT) {
                    // Accept the new connection
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);

                    // Add the new connection to the selector
                    SelectionKey newKey = sc.register(selector, SelectionKey.OP_READ);
                    it.remove();

                    System.out.println("Got connection from " + sc);
                } else if ((key.readyOps() & SelectionKey.OP_READ)
                        == SelectionKey.OP_READ) {
                    // Read the data
                    SocketChannel sc = (SocketChannel) key.channel();

                    // Echo data
                    int bytesEchoed = 0;
                    while (true) {
                        echoBuffer.clear();

                        int r = sc.read(echoBuffer);

                        if (r <= 0) {
                            break;
                        }

                        echoBuffer.flip();

                        sc.write(echoBuffer);
                        bytesEchoed += r;
                    }

                    System.out.println("Echoed " + bytesEchoed + " from " + sc);

                    it.remove();
                }

            }

//System.out.println( "going to clear" );
//      selectedKeys.clear();
//System.out.println( "cleared" );
        }
    }

    public static void main(String args[]) throws Exception {
        if (args.length <= 0) {
            System.err.println("Usage: java MultiPortEcho port [port port ...]");
            System.exit(1);
        }

        int ports[] = new int[args.length];

        for (int i = 0; i < args.length; ++i) {
            ports[i] = Integer.parseInt(args[i]);
        }

        new MultiPortEcho(ports);
    }
}

Selectors

我们来基于 MultiPortEcho 的源代码中的 go() 方法的实现,因此应该看一下源代码,以便对所发生的事情有个更全面的了解。

异步 I/O 中的核心对象名为 SelectorSelector 就是你注册对各种 I/O 事件的兴趣的地方,而且当那些事件发生时,就是这个对象告诉你所发生的事件。

所以,我们需要做的第一件事就是创建一个 Selector

// Create a new selector
Selector selector = Selector.open();

然后,我们将对不同的通道对象调用 register() 方法,以便注册我们对这些对象中发生的 I/O 事件的兴趣。register() 的第一个参数总是这个 Selector。


打开一个 ServerSocketChannel

为了接收连接,我们需要一个 ServerSocketChannel。事实上,我们要监听的每一个端口都需要有一个 ServerSocketChannel 。

对于每一个端口,我们打开一个 ServerSocketChannel,如下所示:

ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking( false );
  
ServerSocket ss = ssc.socket();
InetSocketAddress address = new InetSocketAddress( ports[ii] );
ss.bind( address );

第一行创建一个新的 ServerSocketChannel ,最后三行将它绑定到给定的端口。
第二行将 ServerSocketChannel 设置为 非阻塞的 。我们必须对每一个要使用的套接字通道调用这个方法,否则异步 I/O 就不能工作。


选择键

下一步是将新打开的 ServerSocketChannels 注册到 Selector上。为此我们使用 ServerSocketChannel.register() 方法,如下所示:

SelectionKey key = ssc.register( selector, SelectionKey.OP_ACCEPT );

register() 的第一个参数总是这个 Selector
第二个参数是 OP_ACCEPT,这里它指定我们想要监听 accept 事件,也就是在新的连接建立时所发生的事件。这是适用于 ServerSocketChannel 的唯一事件类型。

请注意对 register() 的调用的返回值。 SelectionKey 代表这个通道在此 Selector 上的这个注册。当某个 Selector 通知你某个传入事件时,它是通过提供对应于该事件的 SelectionKey 来进行的。SelectionKey 还可以用于取消通道的注册。


内部循环

现在已经注册了我们对一些 I/O 事件的兴趣,下面将进入主循环。使用 Selectors 的几乎每个程序都像下面这样使用内部循环:

int num = selector.select();
  
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
  
while (it.hasNext()) {
     SelectionKey key = (SelectionKey)it.next();
     // ... deal with I/O event ...
}

首先,我们调用 Selectorselect() 方法。这个方法会阻塞,直到至少有一个已注册的事件发生。当一个或者更多的事件发生时, select() 方法将返回所发生的事件的数量。

接下来,我们调用 SelectorselectedKeys() 方法,它返回发生了事件的 SelectionKey 对象的一个 集合 。

我们通过迭代 SelectionKeys 并依次处理每个 SelectionKey 来处理事件。对于每一个 SelectionKey,你必须确定发生的是什么 I/O 事件,以及这个事件影响哪些 I/O 对象。


监听新连接

程序执行到这里,我们仅注册了 ServerSocketChannel,并且仅注册它们“接收”事件。为确认这一点,我们对 SelectionKey 调用 readyOps() 方法,并检查发生了什么类型的事件:

if ((key.readyOps() & SelectionKey.OP_ACCEPT)
     == SelectionKey.OP_ACCEPT) {
  
     // Accept the new connection
     // ...
}

可以肯定地说, readOps() 方法告诉我们该事件是新的连接。


接受新的连接

因为我们知道这个服务器套接字上有一个传入连接在等待,所以可以安全地接受它;也就是说,不用担心 accept() 操作会阻塞:

ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
SocketChannel sc = ssc.accept();

下一步是将新连接的 SocketChannel 配置为非阻塞的。而且由于接受这个连接的目的是为了读取来自套接字的数据,所以我们还必须将 SocketChannel 注册到 Selector上,如下所示:

sc.configureBlocking( false );
SelectionKey newKey = sc.register( selector, SelectionKey.OP_READ );

注意我们使用 register()OP_READ 参数,将 SocketChannel 注册用于 读取 而不是 接受 新连接。


删除处理过的 SelectionKey

在处理 SelectionKey 之后,我们几乎可以返回主循环了。但是我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活的键出现,这会导致我们尝试再次处理它。我们调用迭代器的 remove() 方法来删除处理过的 SelectionKey

it.remove();

现在我们可以返回主循环并接受从一个套接字中传入的数据(或者一个传入的 I/O 事件)了。


传入的 I/O

当来自一个套接字的数据到达时,它会触发一个 I/O 事件。这会导致在主循环中调用 Selector.select(),并返回一个或者多个 I/O 事件。这一次, SelectionKey 将被标记为 OP_READ 事件,如下所示:

} else if ((key.readyOps() & SelectionKey.OP_READ)
     == SelectionKey.OP_READ) {
     // Read the data
     SocketChannel sc = (SocketChannel)key.channel();
     // ...
}

与以前一样,我们取得发生 I/O 事件的通道并处理它。在本例中,由于这是一个 echo server,我们只希望从套接字中读取数据并马上将它发送回去。


回到主循环

每次返回主循环,我们都要调用 selectSelector()方法,并取得一组 SelectionKey。每个键代表一个 I/O 事件。我们处理事件,从选定的键集中删除 SelectionKey,然后返回主循环的顶部。

这个程序有点过于简单,因为它的目的只是展示异步 I/O 所涉及的技术。在现实的应用程序中,我们需要通过将通道从 Selector 中删除来处理关闭的通道。而且我们可能要使用多个线程。这个程序可以仅使用一个线程,因为它只是一个演示,但是在现实场景中,创建一个线程池来负责 I/O 事件处理中的耗时部分会更有意义。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1231181.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网络运维与网络安全 学习笔记2023.11.20

网络运维与网络安全 学习笔记 第二十一天 今日目标 交换网路径选择、Eth-Trunk原理、动态Eth-Trunk配置 Eth-Trunk案例实践、MUX VLAN原理、MUX VLAN配置 交换网路径选择 STP的作用 在交换网络中提供冗余/备份路径 提供冗余路径的同时&#xff0c;防止环路的产生 影响同网…

【高级程序设计】Week2-4Week3-1 JavaScript

一、Javascript 1. What is JS 定义A scripting language used for client-side web development.作用 an implementation of the ECMAScript standard defines the syntax/characteristics of the language and a basic set of commonly used objects such as Number, Date …

计算机毕业设计 基于SpringBoot的企业内部网络管理系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…

Java Web——JS中的BOM

1. Web API概述 Web API 是指浏览器提供的一套接口&#xff0c;这些接口允许开发人员使用 JavaScript&#xff08;JS&#xff09;来操作浏览器功能和页面元素。通过 Web API&#xff0c;开发人员可以与浏览器进行交互&#xff0c;以实现更复杂的功能和效果。 1.1. 初识Web AP…

鸿蒙4.0开发笔记之DevEco Studio之配置代码片段快速生成(三)

一、作用 配置代码片段可以让我们在Deveco Studio中进行开发时快速调取常用的代码块、字符串或者某段具有特殊含义的文字。其实现方式类似于调用定义好变量&#xff0c;然而这个变量是存在于Deveco Studio中的&#xff0c;并不会占用项目的资源。 二、配置代码段的方法 1、打…

Go 语言中的map和内存泄漏

map在内存中总是会增长&#xff1b;它不会收缩。因此&#xff0c;如果map导致了一些内存问题&#xff0c;你可以尝试不同的选项&#xff0c;比如强制 Go 重新创建map或使用指针。 在 Go 中使用map时&#xff0c;我们需要了解map增长和收缩的一些重要特性。让我们深入探讨这一点…

Kotlin学习之函数

原文链接 Understanding Kotlin Functions 函数对于编程语言来说是极其重要的一个组成部分&#xff0c;函数可以视为是程序的执行&#xff0c;是真正活的代码&#xff0c;为啥呢&#xff1f;因为运行的时候你必须要执行一个函数&#xff0c;一般从主函数入口&#xff0c;开始一…

flink源码分析之功能组件(一)-metrics

简介 本系列是flink源码分析的第二个系列,上一个《flink源码分析之集群与资源》分析集群与资源,本系列分析功能组件,kubeclient,rpc,心跳,高可用,slotpool,rest,metric,future。其中kubeclient上一个系列介绍过,本系列不在介绍。 本文介绍flink metrics组件,metric…

设计模式-责任链-笔记

动机&#xff08;Motivation&#xff09; 在软件构建过程中&#xff0c;一个请求可能被多个对象处理&#xff0c;但是每个请求在运行时只能有个接受者&#xff0c;如果显示指定&#xff0c;将必不可少地带来请求者与接受者的紧耦合。 如何使请求的发送者不需要指定具体的接受…

Halcon Solution Guide I basics(2): Image Acquisition(图像加载)

文章目录 文章专栏前言文章解读文章开头流程图算子介绍案例自主练习读取一张图片读取多张图片 文章专栏 Halcon开发 Halcon学习 练习项目gitee仓库 前言 今天来看Halcon的第二章&#xff0c;图像获取。在第二章之后&#xff0c;后面文章就会提供案例了。到时候我会尽量完成每一…

pnpm : 无法加载文件 E:\Soft\PromSoft\nodejs\node_global\pnpm.ps1,

pnpm : 无法加载文件 E:\Soft\PromSoft\nodejs\node_global\pnpm.ps1&#xff0c;因为在此系统上禁止运行脚本。有关详细信息&#xff0c;请参阅 https:/go.microsoft.com/fwlink/?LinkID135170 中 的 about_Execution_Policies。 所在位置 行:1 字符: 1pnpm -v~~~~ CategoryI…

【Flask使用】全知识md文档,4大部分60页第3篇:状态cookie和session保持

本文的主要内容&#xff1a;flask视图&路由、虚拟环境安装、路由各种定义、状态保持、cookie、session、模板基本使用、过滤器&自定义过滤器、模板代码复用&#xff1a;宏、继承/包含、模板中特有变量和函数、Flask-WTF 表单、CSRF、数据库操作、ORM、Flask-SQLAlchemy…

Android描边外框stroke边线、rotate旋转、circle圆形图的简洁通用方案,基于Glide与ShapeableImageView,Kotlin

Android描边外框stroke边线、rotate旋转、circle圆形图的简洁通用方案&#xff0c;基于Glide与ShapeableImageView&#xff0c;Kotlin 利用ShapeableImageView专门处理圆形和外框边线的特性&#xff0c;通过Glide加载图片装载到ShapeableImageView。注意&#xff0c;因为要描边…

Motion Plan之搜素算法笔记

背景&#xff1a; 16-18年做过一阵子无人驾驶&#xff0c;那时候痴迷于移动规划&#xff1b;然而当时可学习的资料非常少&#xff0c;网上的论文也不算太多。基本就是Darpa的几十篇无人越野几次比赛的文章&#xff0c;基本没有成系统的文章和代码讲解实现。所以对移动规划的认…

什么是Mock?为什么要使用Mock呢?

1、前言 在日常开发过程中&#xff0c;大家经常都会遇到&#xff1a;新需求来了&#xff0c;但是需要跟第三方接口来对接&#xff0c;第三方服务还没好&#xff0c;我们自己的功能设计如何继续呢&#xff1f;这里&#xff0c;给大家推荐一下Mock方案。 2、场景示例 2.1、场景一…

ArkTS - HarmonyOS服务卡片(创建)

可以参考官网文档 其中我们在已有的文件中File > New > Service Widget创建你想要的小卡片 本文章发布时目前可使用的模板就三种 有卡片后的new 最终效果

AIGC 技术在淘淘秀场景的探索与实践

本文介绍了AIGC相关领域的爆发式增长&#xff0c;并探讨了淘宝秀秀(AI买家秀)的设计思路和技术方案。文章涵盖了图像生成、仿真形象生成和换背景方案&#xff0c;以及模型流程串联等关键技术。 文章还介绍了淘淘秀的使用流程和遇到的问题及处理方法。最后&#xff0c;文章展望…

【10套模拟】【7】

关键字&#xff1a; 二叉排序树插入一定是叶子、单链表简单选择排序、子串匹配、层次遍历

【C++ Primer Plus学习记录】for循环

很多情况下都需要程序执行重复的任务&#xff0c;C中的for循环可以轻松地完成这种任务。 我们来从程序清单5.1了解for循环所做的工作&#xff0c;然后讨论它是如何工作的。 //forloop.cpp #if 1 #include<iostream> using namespace std;int main() {int i;for (i 0; …