java网络编程——NIO架构

news2024/11/14 12:07:14

目录

1.什么是NIO

2.NIO结构

3.基于NIO下的聊天系统实现

4.Netty


1.什么是NIO

NIO:java non-blocking IO,同步非阻塞IO。

BIO是阻塞IO,即每一个事件都需要分配一个进程给他,如果客户端没有连接上,则一直阻塞等待。

而NIO,异步 I/O 是一种没有阻塞地读写数据的方法:该架构下我们可以注册对特定 I/O 事件诸如数据可读、新连接到来等等,而在发生这样感兴趣的事件时,系统将会告诉您,而不用一直等待。

打个比方:

五个人(请求)写作业,BIO架构下是五个老师(五个进程)看着写,学生直接在书上写(内存读写)。
NIO架构下则是一个老师(进程)看着写,要求学生先写在本子上(buffer,缓冲区),并且一直问助教(selector),写好的(事件就绪)交上来!

2.NIO结构

NIO主要有三大核心部分:Channel(通道),Buffer(缓冲区), Selector。

Channel和IO中的Stream(流)类似。只不过Stream是单向的,channel是双向的,既可以用来进行读操作,又可以用来进行写操作。

Buffer:NIO中的缓冲区,本质上是一个可读取的内存块。当向buffer写入数据时,buffer会记录下写了多少数据。一旦要读取数据,需要通过flip()方法将Buffer从写模式切换到读模式,读取之前写入到buffer的所有数据。

NIO和java中普通IO最大的区别是数据打包和传输方式。传统IO基于字节流和字符流进行操作,而NIO基于Channel和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。

如上图所示:客户端/服务端将需要传输的数据存入到buffer缓冲区中,再通过java流获取对应的channel,然后使用channel写入/读取缓冲区中的数据,输出到对应的目标(文件/字节流)中。

selector:要使用Selector, 得向Selector注册Channel,然后调用它的select()方法进行监听:这个方法会一直阻塞到至少一个注册的通道有事件就绪;当事件就绪,会把对应的selectionKey(用于关联channel,每一个类型的事件对应一个selectionKey,可以帮助获得对应的channel进行后续操作)加入到内部集合,并返回;一旦这个方法返回,线程就可以处理这些事件。

注:select()方法是阻塞的,select(1000)方法阻塞1000毫秒;selectNow()方法是非阻塞的

当客户端发起一次请求事件过后,NIO架构的响应过程如下图所示:

首先,服务端会获得一个ServerSocketChannel并向selector注册,即可等待客户端连接。

客户端向服务端发起请求,selector通过select()方法监听到事件发生(如客户端读事件),分配线程处理该事件;服务端将事件类型对应的selectionKey返回,然后通过key获得处理的channel,通过channel.read()方法将数据写入buffer,并返回至客户端;客户端从buffer中读到需要的数据,请求结束。

Selector(选择区)用于监听多个通道的事件(比如:连接打开,数据到达),只有当 通道 真正有读写事件发生时,才会进行读写,大大减少系统开销,避免了多线程之间的上下文切换。这使得一个I/O线程可以并发处理多个客户端连接和读写操作,极大提升了性能。 

3.基于NIO下的聊天系统实现

基于以上架构,我们可以根据NIO结构,编写一个聊天系统:

(1)服务端:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

public class ChatServer {
    private ServerSocketChannel serverSocketChannel;
    private Selector selector;
    public static final int port = 8080;

    public ChatServer(){
        try {
            //获取ServerSocketChannel供客户端连接,并开放端口
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            serverSocketChannel.configureBlocking(false);
            //得到Selector,并注册channel
            selector = Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("服务器就绪");
        }catch (IOException e){
            e.printStackTrace();
        }
    }

    public void listen(){
        try {
            while (true){
                //阻塞2秒
                int count = selector.select(2000);

                if (count>0){
                    //有事件需要处理
                    Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
                    while (keyIterator.hasNext()){
                        SelectionKey key = keyIterator.next();
                        //客户端连接事件,为客户端生成SocketChannel
                        if (key.isAcceptable()){
                            SocketChannel socketChannel = serverSocketChannel.accept();
                            //非阻塞channel才能进行注册
                            socketChannel.configureBlocking(false);
                            //注册,绑定事件读,并为该channel关联buffer
                            socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                            System.out.println("客户端:"+socketChannel.getRemoteAddress()+" 成功连接");
                        }
                        //读取事件
                        if (key.isReadable()){
                            readMessage(key);
                        }

                        //删除已处理key,避免重复操作
                        keyIterator.remove();
                    }
                }else {
                    continue;
                }
            }
        }catch (IOException e){
            e.printStackTrace();
        }
    }

    //从客户端读取消息
    private void readMessage(SelectionKey key){
        SocketChannel channel = null;
        try {
            //通过key反向获取channel
            channel = (SocketChannel) key.channel();
            //获取该channel关联buffer
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            //从buffer中读取数据
            int read = channel.read(buffer);
            if (read>0){
                String msg = new String(buffer.array()).trim();
                //转发消息(排除自己)
                transformMessage(msg,channel);
            }

        }catch (IOException e){
            try {
                System.out.println(channel.getRemoteAddress()+"下线");
                //取消注册
                key.cancel();
                //关闭通道
                channel.close();
            }catch (Exception exception){
                exception.printStackTrace();
            }
        }
    }
    //转发消息给其他客户端
    public void transformMessage(String msg, SocketChannel self) throws IOException {
        System.out.println("服务器转发消息");
        //遍历所有注册到selector的channel,进行转发
        for (SelectionKey key:selector.keys()){
            Channel targetChannel = key.channel();
            //排除自己
            if (targetChannel instanceof SocketChannel && targetChannel!=self){
                //转发消息
                SocketChannel dest = (SocketChannel) targetChannel;
                ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8));
                dest.write(buffer);
            }

        }
    }

    public static void main(String[] args) {
        ChatServer chatServer = new ChatServer();
        chatServer.listen();
    }
}
  1. 建立ServerSocketChannel,开放端口,并设置为非阻塞。      
  2. 获取一个selector,并将channel注册进去,绑定触发事件及对应的SelectionKey
  3. 等待客户端连接。
  4. 当select()方法监听到事件,获取事件的selectionKey集合;遍历集合,处理事件。(连接事件则创建socketChannel连接;读事件则读取buffer中消息,并转发至其他客户端)

注:只有阻塞模式下,channel才可以向selector注册serverSocketChannel.accept()获取的SocketChannel也需要设置;server端先生成一个ServerSocketChannel并注册后,客户端才可以用SocketChannel对其进行连接; 

(2)客户端:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Scanner;

public class ChatClient implements Runnable{
    private SocketChannel socketChannel;
    private final String host = "127.0.0.1";
    private final int port = 8080;
    private Selector selector;
    private String username;

    public ChatClient(){
        try {
            socketChannel=SocketChannel.open(new InetSocketAddress(host, port));
            socketChannel.configureBlocking(false);
            selector=Selector.open();
            socketChannel.register(selector, SelectionKey.OP_READ);
            username = socketChannel.getLocalAddress().toString().substring(1);
            System.out.println("客户端就绪");
        }catch (IOException e) {
            e.printStackTrace();
        }
    }

    //发送消息
    public void sendMsg(String msg){
        msg = username + ":" + msg;
        try {
            socketChannel.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8)));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //读取回复消息
    public void readMsg(){
        try {
            int select = selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                if (key.isReadable()){
                    SocketChannel channel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    channel.read(buffer);

                    String str = new String(buffer.array());
                    System.out.println(str.trim());
                }
                iterator.remove();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while (true){
            this.readMsg();
            //每隔1秒监听一次
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        ChatClient chatClient = new ChatClient();
        //启动线程读取
        new Thread(chatClient).start();
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()){
            String str = scanner.nextLine();
            chatClient.sendMsg(str);
        }

    }
}
  1. 获取scocketServer连接服务端,设置非阻塞模式。
  2. 连接服务端。
  3. 发起事件:发送消息,将消息写入channel中。
  4. 读取服务器转发的其他客户单消息。

(3)运行效果:

服务端:

 客户端1:

 客户端2:

 客户端3:

4.Netty

Netty是 一个异步事件驱动的网络应用程序框架(即基于NIO架构),用于快速开发可维护的高性能协议服务器和客户端。

为什么会有Netty?

  • 1)NIO的类库和API繁杂,使用起来比较麻烦;
  • 2)开发工作量和难度大,面临例如断连重连、半包读写、失败缓存等问题;
  • 3)使用原生NIO编程需要掌握Reactor模型、多线程编程等额外技能,要求较高;
  • 4)java原生NIO存在一定的bug,可能会影响NIO编程。

而Netty就是基于Reactor多线程模型 对 JDK 自带的 NIO 的 API 进行了良好的封装,解决了上述问题。且Netty拥有高性能、 吞吐量更高,延迟更低,减少资源消耗,最小化不必要的内存复制等优点。

其核心在于 可拓展事件驱动模型、全局交互API、零拷贝。

Netty快速使用,其使用形式类似于NIO架构,也是分为服务端、客户端:

(1)服务端:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

public class NettyServer {
    public static void main(String[] args) {
        //定义线程组:BossEventLoop(负责连接) , WorkerEventLoop(负责业务读写)
        EventLoopGroup bossEventLoop = new NioEventLoopGroup(1);
        EventLoopGroup workerEventLoop = new NioEventLoopGroup();

        try {
            // 1. ServerBootstrap:启动器,负责组装netty组件,启动服务器
            new ServerBootstrap()
                    // 2.存入线程组
                    .group(bossEventLoop,workerEventLoop)
                    // 3.选择服务器的ServerSocketChannel实现
                    .channel(NioServerSocketChannel.class)
                    // 4. worker(child) , 决定了worker能执行什么操作(handler)
                    .childHandler(
                            // 5. channel 代表和客户端进行读写的通道 Initializer:初始化器,负责添加别的handler
                            new ChannelInitializer<NioSocketChannel>() {
                                @Override
                                protected void initChannel(NioSocketChannel ch) throws Exception {
                                    // 6. 添加具体handler
                                    ch.pipeline().addLast(new StringDecoder()); //将 ByteBuf 转为字符串
                                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {   //自定义handler
                                        @Override   //读事件
                                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                            System.out.println(msg);
                                        }
                                    });
                                }
                            })
                    .bind(8080);

        }catch (Exception e){
            System.out.println("客户端断开连接");
        }

    }
}

(2)客户端:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {

        try {
            // 1.启动类
            new Bootstrap()
                    // 2.添加EventLoop
                    .group(new NioEventLoopGroup())
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    })
                    .connect(new InetSocketAddress("localhost",8080))
                    .sync()
                    .channel()
                    //发送数据
                    .writeAndFlush("hello world");
        }catch (Exception e){
            System.out.println("服务器异常");
        }

    }
}

上述代码实现了服务端创建ServerSocketChannel,客户端连接服务端并写入消息,服务端成功读取的功能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/464835.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java基础--->并发部分(1)

文章目录 线程基本概念线程的创建方式线程调度-------常用的方法线程的生命周期和状态并发编程的根本原因Java内存模型(JMM)多线程核心的根本问题volatile关键字保障原子性synchronized和ReentrantLock的区别 线程基本概念 ​ 进程是程序的一次执行过程&#xff0c;是系统运行程…

【Linux】1.4 基本权限

文章目录 一、shell 命令以及运行原理二、Linux 权限的概念三、Linux 权限管理01.文件类型和访问权限&#xff08;事物属性&#xff09;02.文件访问的分类&#xff08;人&#xff09;①用户分类②角色划分 03.文件权限值的表现方法04.文件访问权限的相关设置方法&#xff08;a.…

STC8H8K64U单片机-看门狗配置与讲解

1、看门狗寄存器讲解 &#xff08;bit7&#xff09;WDT_FLAG&#xff1a;看门狗溢出标志&#xff0c;看门狗发生溢出时&#xff0c;硬件自动将此位置1&#xff0c;需要软件清零 &#xff08;bit5&#xff09;EN_WDT&#xff1a;看门狗使能位 0&#x…

被优化了怎么办?他苦学仨月拿到11koffer

网上有个段子叫做“生活就是起起落落落落落落”。人生在世&#xff0c;本就不易&#xff0c;再加上最近大环境影响&#xff0c;各行各业都在内卷&#xff0c;身为芸芸众生的一员&#xff0c;我们也难免受到影响&#xff0c;面临福利裁剪、降薪、甚至被优化的风险。 大环境我们…

面了20家大厂,发现这样介绍项目经验,显得项目很牛...

我在刚刚开始面试的时候&#xff0c;也遇到了这个问题&#xff0c;也是我第一个思考的问题&#xff0c;如何介绍自己的项目&#xff0c;既可以比较全面的让面试官了解这个项目&#xff0c;同时&#xff0c;也不会让面试官觉得废话太多。经过这么多的面试&#xff0c;我发现&…

JavaWeb+JSP+路径问题+跳转(HTML|Servlet|JSP)|这一篇就够了(超详细)

&#x1f648;作者简介&#xff1a;练习时长两年半的Java up主 &#x1f649;个人主页&#xff1a;老茶icon &#x1f64a; ps:点赞&#x1f44d;是免费的&#xff0c;却可以让写博客的作者开兴好久好久&#x1f60e; &#x1f4da;系列专栏&#xff1a;Java全栈&#xff0c;计…

ChatGPT真能取代程序员吗,看看它怎么解释SQL注入漏洞的问题

本文首发自「慕课网」&#xff0c;想了解更多IT干货内容&#xff0c;程序员圈内热闻&#xff0c;欢迎关注"慕课网"&#xff01; 作者&#xff1a;Beerus|慕课网讲师 背景 本周在《Web安全渗透测试》课程的QQ群中&#xff0c;有同学提问了一个关于网上一个关于SQL注入…

大淘宝技术斩获NTIRE 2023视频质量评价比赛冠军(内含夺冠方案)

近日&#xff0c;CVPR NTIRE 2023 Quality Assessment of Video Enhancement Challenge比赛结果公布&#xff0c;来自大淘宝音视频技术团队的同学组成「TB-VQA」队伍&#xff0c;从37支队伍中脱颖而出&#xff0c;拿下该比赛&#xff08;唯一赛道&#xff09;冠军。此次夺冠是团…

Mysql replace into与on duplicate key update区别

1、replace into REPLACE INTO 首先判断数据是否存在&#xff1b;如果不存在&#xff0c;则插入&#xff1b;如果已存在则更新&#xff08;先删除再插入 根据主键或唯一索引判断记录是否已存在&#xff0c;所以插入数据的表必须要有主键或者唯一索引&#xff01;否则的话&…

Java 实现访问Redis哨兵(六)

目录 一、哨兵和复制 1.1 哨兵(sentinal) 1.Redis哨兵主要功能 2.Redis哨兵的高可用 1.2 Redis复制(Replication) 1.数据复制原理(执行步骤) 1.3 Redis 主从复制、哨兵和集群这三个有什么区别 二、Java访问哨兵实现 一、哨兵和复制 谈到Redis服务器的高可用&#xff0c…

【笔记】I2S协议是什么?

什么是I2S协议 I2S协议的介绍I2S协议有什么 这两天在搞ESP32的btAudio库&#xff0c;接触到了I2S&#xff0c;简单做个笔记。 I2S协议的介绍 I2S(Inter—IC Sound)总线, 又称集成电路内置音频总线&#xff0c;是飞利浦公司为数字音频设备之间的音频数据传输而制定的一种总线标…

2.QT窗口部件

类继承关系图 &#xff08;上面为Base类&#xff0c;下面为Derived类&#xff09; 窗口与子部件 窗口&#xff1a;没有父部件的部件&#xff0c;称为顶级部件 子部件&#xff1a;非窗口部件 新建空的qmake项目mywidget1 //mywidget1.proQT core gui greaterThan(QT_MAJO…

这是一个黑科技:C++爬虫~(文末报名C/C++领域新星计划)

目录 写在前面 完整代码 这里必看&#xff01;&#xff01; 写在最后 写在前面 现在所有人都知道万能的Python可以做机器学习&#xff0c;可以做人工智能&#xff0c;可以爬取各种小网站&#xff0c;但是你不知道&#xff0c;基于C的正则表达式早就能够爬取各种网络数据啦&a…

华为OD机试——对称美学(通过率只有8.51%???)

用java写的这道题&#xff0c;两个样例都可以通过&#xff0c;但是提交之后最终的通过率只有8.51%&#xff1f;&#xff1f;&#xff1f;自己搞了半天一直都是这个通过率&#xff0c;然后用网上说的100%通过率的代码也是一样的结果&#xff0c;最后时间到了还是没有拿到满分&am…

这些vue基本语法,你掌握了吗

文章目录 一、 vue 项目重点概念介绍1. 单页面应用程序2. 单文件组件3.数据驱动视图 二、 vue 基本结构1、template2、script3、style 三、 vue 常用指令介绍1、内容渲染指令&#xff08;1&#xff09;插值表达式 {{xxx}} —常用方式&#xff08;2&#xff09;v-text&#xff0…

session和JWT的应用及区别

文章目录 登录认证(node)一、session1.下载session2.全部配置session3.存储session4.获取session5.销毁session 二、JWT (Json web token)1.JWT 的工作原理2.JWT 的组成3.下载JWT4.生成token5.解密TOken6.配置全局错误中间件 登录认证(node) 一、session 一、在node中使用sess…

还不懂Redis?看完这个故事就明白了!

还不懂Redis?看完这个故事就明白了! 我是Redis 你好,我是Redis,一个叫Antirez的男人把我带到了这个世界上。 说起我的诞生,跟关系数据库MySQL还挺有渊源的。 在我还没来到这个世界上的时候,MySQL过的很辛苦,互联网发展的越来越快,它容纳的数据也越来越多,用户请求也…

全国独家专利药品有哪些品种?全国独家药品产品汇总查阅

独家药品一般是指某一家药企拥有生产和销售某种药品的独家权利&#xff0c;细分有活性成分独家&#xff08;通用名成分独家&#xff09;、品种独家&#xff08;同成分不同剂型&#xff09;、独家品规&#xff0c;通俗可以分成药品功能独家和产品独家。独家药品经常被人认为是具…

证件拍照扫描——基于C++与深度神经网络实现证件识别扫描并1比1还原证件到A4纸上

前言 数字化时代的到来&#xff0c;越来越多的证件需要进行电子化处理&#xff0c;例如身份证、驾驶证、护照等。在进行电子化处理时&#xff0c;最常见的需求就是将证件照片复制到A4纸上&#xff0c;以便于打印、存档或传输。同时&#xff0c;为了方便信息的录入和管理&#…

一条命令搭建HTTP服务器

文章目录 1.前言2.本地http服务器搭建2.1.Python的安装和设置2.2.Python服务器设置和测试 3.cpolar的安装和注册3.1 Cpolar云端设置3.2 Cpolar本地设置 4.公网访问测试5.结语 转载自远程内网穿透的文章&#xff1a;【Python】快速简单搭建HTTP服务器并公网访问「cpolar内网穿透…