BIO、NIO、selector、Netty代码Demo示例

news2025/1/25 4:36:57

文章目录

    • (一)BIO(Blocking I/O 阻塞I/O)
    • (二)NIO(Non-Blocking I/O 非阻塞I/O)
    • (三)IO多路复用--Selector
    • (四)Netty

(一)BIO(Blocking I/O 阻塞I/O)

阻塞I/O的连接accept()方法及数据读取的read()方法都是阻塞的,也就是说没有客户端发起连接时会阻塞,客户端发起连接后不发送数据也会阻塞。

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
 
/**
* 阻塞IO
*/
public class BioServerDemo {
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(9000);
        while(true){
            System.out.println("等待连接。。");
            //阻塞方法
            //可以通过控制台输入命令进行连接:telnet localhost 9000,CTRL+]进入Telnet指令
            Socket clientSocket = serverSocket.accept();
            System.out.println("有客户端连接了。。");
            //虽然采用多线程可以支持多个线程同时访问,但是会引发C10K问题
            //C10K->connection=1w,C10M->connection=1000w,就是连接数很多的意思
            //new Thread(new Runnable() {
            //	@Override
            //	public void run() {
	        //		try {
            			handler(clientSocket);
            //		} catch (IOException e) {
            //			e.printStackTrace();
            //		}
            //	}
            //}).start();
        }
    }
 
    private static void handler(Socket clientSocket) throws IOException {
        byte[] bytes = new byte[1024];
        System.out.println("准备read。。");
		//接收客户端的数据,阻塞方法,客户端没有发送数据,服务端就会没有数据可读时就阻塞
        int read  = clientSocket.getInputStream().read(bytes);
        System.out.println("read完毕。。");
        if (read !=-1){
            System.out.println("接收客户端的数据:"+new String(bytes,0,read));
        }
		//clientSocket.getOutputStream().write("HelloClint".getBytes(StandardCharsets.UTF_8));
		//clientSocket.getOutputStream().flush();
    }
}

(二)NIO(Non-Blocking I/O 非阻塞I/O)

非阻塞I/O在客户端连接方法accept()和read()方法中都不会阻塞,我们可以通过返回值判断是否有客户端发起连接或者发送数据,进行相应的处理。
简单的NIO因为是通过遍历的方式,会有大量的空循环

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
 
/**
 *初版- NIO(非阻塞)编程
 */
public class NioServer {
    static List<SocketChannel> channelList = new ArrayList<>();
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(9000));
        //这里可选非阻塞和阻塞,如果选择阻塞true,下面的accept()方法会阻塞线程
        serverSocketChannel.configureBlocking(false);
        System.out.println("服务启动成功");
        while (true){
            SocketChannel socketChannel = serverSocketChannel.accept();
           if(socketChannel!=null){
               System.out.println("连接成功");
        		//这里可选非阻塞和阻塞,如果选择阻塞true,下面的read()方法会阻塞线程
               socketChannel.configureBlocking(false);
               //将所有连接channel交给一个集合进行管理
               channelList.add(socketChannel);
           }
			//问题点: 空循环时间耗时太久
           Iterator<SocketChannel> iterator = channelList.iterator();
           //遍历访问所有channel集合获取客户端发送的数据,如果有任何一个连接客户端发送了数据,那么就处理当前channel里的数据
           while (iterator.hasNext()){
               SocketChannel sc = iterator.next();
               ByteBuffer byteBuffer = ByteBuffer.allocate(128);
               int len = sc.read(byteBuffer);
               if (len>0){
                   System.out.println("接收到消息:"+new String(byteBuffer.array()));
               }else if(len ==-1){
                   iterator.remove();
                   System.out.println("客户端断开连接");
               }
           }
        }
    }
}

(三)IO多路复用–Selector

通过一个多路复用器selector对channel进行管理,这样
在这里插入图片描述

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
 *进阶版- NIO(非阻塞)编程
 * 这是netty和Redis的雏形
 */
public class NioSelectorServer {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(9000));
        serverSocketChannel.configureBlocking(false);
       //启用epoll模型
        Selector selector = Selector.open();
       //注册阻塞事件:创建连接
        SelectionKey selectionKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
        System.out.println("服务启动成功");
        while (true){
       		//阻塞等待需要处理的事件发生,这个时候的事件是等待连接
            selector.select();
       		//获取阻塞的事件
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
       		//对阻塞事件进行遍历
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            if(iterator.hasNext()){
                SelectionKey key =iterator.next();
                if(key.isAcceptable()){//这里只针对连接事件,
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = server.accept();
                    socketChannel.configureBlocking(false);
                    //连接建立后,注册阻塞事件:读取数据
                    SelectionKey selKey = socketChannel.register(selector,SelectionKey.OP_READ);
                }else if(key.isReadable()){//这里只针对read事件,有需要可以针对write事件处理
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(128);
                    socketChannel.configureBlocking(false);
                    int len  = socketChannel.read(byteBuffer);
                    if(len >0 ){
                        System.out.println("接收到消息:"+new String(byteBuffer.array()));
                    }else if(len ==-1 )
                    	//关闭socket
                        socketChannel.close();
                        System.out.println("接收完成");
                    }
                }
                //把处理完的阻塞事件移除
                iterator.remove();
            }
        }
    }
}

(四)Netty

Netty核心组件

  • Bootstrap和ServerBootstrap:当需要连接客户端或者服务器绑定指定端口时需要使用Bootstrap,ServerBootstrap有两种类型,一种是用于客户端的Bootstrap,一种是用于服务端 的ServerBootstrap。
  • Channel:相当于socket,与另一端进行通信的通道,具备bind、connect、read、write等IO操作的能力。
  • EventLoop:事件循环,负责处理Channel的IO事件,一个EventLoopGroup包含多个EventLoop,一个EventLoop可被分配至多个Channel,一个Channel只能注册于一个EventLoop,一个EventLoop只能与一个Thread绑定。
  • ChannelFuture:channel IO事件的异步操作结果。
  • ChannelHandler:包含IO事件具体的业务逻辑。
  • ChannelPipeline:ChannelHandler的管道容器。

DEMO
Netty服务端

public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();
        try {
            //2.创建服务端启动引导/辅助类:ServerBootstrap
            ServerBootstrap bootstrap = new ServerBootstrap();
            //3.给引导类配置两大线程组,确定了线程模型
            bootstrap.group(parentGroup, childGroup)
                    // (非必备)打印日志
                    .handler(new LoggingHandler(LogLevel.INFO))
                    // 4.指定 IO 模型
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            /**
                             *  服务端添加IdleStateHandler心跳检测处理器,添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理.
                             *  IdleStateHandler心跳检测每十五秒进行一次读检测,如果十五秒内ChannelRead()方法未被调用则触发一次userEventTrigger()方法
                             *  服务端为读IDLE
                             *  pipeline.AddLast(new IdleStateHandler(15, 0, 0));//第一个参数为读,第二个为写,第三个为读写全部
                             */
                            pipeline.addLast(new IdleStateHandler(15, 0, 0, TimeUnit.SECONDS));
                            //5.可以自定义客户端消息的业务处理逻辑
                            pipeline.addLast(new DemoSocketServerHandler());
                        }
                    });

//            ChannelFuture future = bootstrap.bind(8888).sync().addListener(new ChannelFutureListener() {
//                @Override
//                public void operationComplete(ChannelFuture channelFuture) throws Exception {
//                    System.out.println("监听端口已经启动");
//                }
//            });
            ChannelFuture future = bootstrap.bind(8888).sync().addListener( future1 -> {
                if (future1.isSuccess()){
                    System.out.println("监听端口已经启动!");
                } else {
                    System.out.println("监听端口还未启动!");
                }
            } );
            System.out.println("服务器已启动。。。");
            future.channel().closeFuture().sync();
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}

Netty客户端

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new DemoSocketClientHandler());
                        }
                    });

            ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
            future.channel().closeFuture().sync();
        } finally {
            if(eventLoopGroup != null) {
                eventLoopGroup.shutdownGracefully();
            }
        }
    }
}

服务端处理handler

public class DemoSocketServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
        String ip = inetSocketAddress.getAddress().getHostAddress();
        int port = inetSocketAddress.getPort();
        super.channelActive(ctx);
        System.out.println(ip+":"+port+" 上线了");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(msg);
        System.out.println("Client Address ====== " + ctx.channel().remoteAddress());
        ctx.channel().writeAndFlush("from server:" + UUID.randomUUID());
        ctx.fireChannelActive();
        TimeUnit.MILLISECONDS.sleep(500);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            if (idleStateEvent.state() == IdleState.READER_IDLE) {
                InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
                String ip = inetSocketAddress.getAddress().getHostAddress();
                System.out.println((ip + ":" + inetSocketAddress.getPort() + "close"));
                ctx.channel().close();
            }
        }
    }
}

客户端处理handler

public class DemoSocketClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        System.out.println(msg);
        ctx.channel().writeAndFlush("from client: " + System.currentTimeMillis());
        TimeUnit.MILLISECONDS.sleep(5000);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().writeAndFlush("from client:begin talking");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
    //超时则关闭链路
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            if (idleStateEvent.state() == IdleState.READER_IDLE) {
                InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
                String ip = inetSocketAddress.getAddress().getHostAddress();
                System.out.println((ip + ":" + inetSocketAddress.getPort() + "close"));
                ctx.channel().close();
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1268114.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

翻译求职简历,如何做效果好?

随着国内经济的蓬勃发展&#xff0c;众多求职者都渴望能在外企寻得一席之地。而一份精彩绝伦的外文简历&#xff0c;往往能瞬间提高求职者的成功率。但如何才能做好这份简历翻译呢&#xff1f; 其实&#xff0c;简历翻译绝非简单的中英文对照。不同国家有着各自独特的语言表达方…

【海思SS528 | VDEC】MPP媒体处理软件V5.0 | VDEC的使用总结

&#x1f601;博客主页&#x1f601;&#xff1a;&#x1f680;https://blog.csdn.net/wkd_007&#x1f680; &#x1f911;博客内容&#x1f911;&#xff1a;&#x1f36d;嵌入式开发、Linux、C语言、C、数据结构、音视频&#x1f36d; &#x1f923;本文内容&#x1f923;&a…

操作系统 day14(进程同步、进程互斥、互斥的代码实现、互斥的硬件实现、互斥锁)

进程同步 概念 进程的异步性体现在&#xff0c;例如&#xff1a;当有I/O操作时&#xff0c;进程需要等待I/O操作&#xff0c;而每个I/O操作又是不同的&#xff0c;所以进程没有一个固定的顺序&#xff0c;固定的时间来执行&#xff0c;而这体现了进程的异步性。 进程互斥 …

【Java】泛型的简单使用

文章目录 一、包装类1.基本数据类型和对应的包装类2.自动装箱和自动拆箱3.手动装箱和手动拆箱 二、什么是泛型三、泛型的使用四、裸类型&#xff08;Raw Type&#xff09;五、泛型是如何编译的六、泛型的上界七、泛型方法总结 一、包装类 在了解泛型之前我们先了解什么是包装类…

【Java学习笔记】75 - 算法优化入门 - 马踏棋盘问题

一、意义 1.算法是程序的灵魂&#xff0c;为什么有些程序可以在海量数据计算时&#xff0c;依然保持高速计算? 2.拿老韩实际工作经历来说&#xff0c;在Unix下开发服务器程序&#xff0c;功能是要支持上千万人同时在线&#xff0c;在上线前&#xff0c; 做内测&#xff0c;一…

FPGA设计时序约束十、others类约束之Set_Disable_Timing

目录 一、序言 二、Set Disable Timing 2.1 基本概念 2.2 设置界面 2.3 命令语法 2.4 命令示例 三、工程示例 四、参考资料 一、序言 在Vivado的时序约束窗口中&#xff0c;存在一类特殊的约束&#xff0c;划分在others目录下&#xff0c;可用于设置忽略或修改默认的时…

一文学会Aiohttp

一、什么是aiohttp库 aiohttp库官网&#xff1a;https://docs.aiohttp.org/en/stable/ aiohttp是一个Python的HTTP客户端/服务器框架&#xff0c;它基于asyncio库实现异步编程模型&#xff0c;可以支持高性能和高并发的HTTP通信。aiohttp用于编写异步的Web服务器、Web应用程序…

【hacker送书第5期】SQL Server从入门到精通(第5版)

第5期图书推荐 内容简介作者简介图书目录参与方式 内容简介 SQL Server从入门到精通&#xff08;第5版&#xff09;》从初学者角度出发&#xff0c;通过通俗易懂的语言、丰富多彩的实例&#xff0c;详细介绍了SQL Server开发所必需的各方面技术。全书分为4篇共19章&#xff0c;…

Linux下文件操作函数

一.常见IO函数 fopen fclose fread fwrite fseek fflush fopen 运行过程 &#xff1a;打开文件 写入数据 数据写到缓冲区 关闭文件后 将数据刷新入磁盘 1.fopen 返回文件类型的结构体的指针 包括三部分 1).文件描述符&#xff08;整形值 索引到磁盘文件&#xff09;…

【重磅合作】九章云极DataCanvas公司与生态伙伴强强联手,构建人工智能强生态!

11月21日&#xff0c;在「筑基赋能 智向未来」九章云极DataCanvas大模型系列成果发布会上&#xff0c;九章云极DataCanvas公司与人工智能产业链上下游合作伙伴广东民营投资股份有限公司&#xff08;以下简称“粤民投”&#xff09;、西藏赛富合银投资有限公司&#xff08;以下简…

网络入门---网络编程预备知识

目录标题 ifconfigip地址和mac地址的区别端口号pid和端口号UDP和TCP的初步了解网络字节序socket套接字 ifconfig 通过指令ifconfig便可以查看到两个网络接口&#xff1a; 我们当前使用的是一个linux服务器并是一个终端设备&#xff0c;所以他只需要一个接口用来入网即可&…

重排链表,剑指offerII 26,力扣 120

目录 力扣题目地址&#xff1a; 题目&#xff1a; 那我们直接看题解吧&#xff1a; 解题方法&#xff1a; 难度分析&#xff1a; 审题目事例提示&#xff1a; 解题分析&#xff1a; 解题思路&#xff1a; 解题补充&#xff1a; 力扣题目地址&#xff1a; 143. 重排链表 - 力扣&…

树与二叉树堆:堆的意义

目录 堆的意义&#xff1a; 第一是堆的排序&#xff0c;第二是堆的top k 排行问题 堆的 top k 排行问题&#xff1a; 面对大量数据的top k 问题&#xff1a; 堆排序的实现&#xff1a;——以升序为例 方法一 交换首尾&#xff1a; 建立大堆&#xff1a; 根结点尾结点的…

Python之数据可视化

文章目录 一、1、matplotlib简单应用1.1、绘制带有中文标签和图例的图1.2、 绘制散点图1.3、绘制饼状图1.4、多个图形一起显示 一、 1、matplotlib简单应用 matplotlib模块依赖于numpy模块和tkinter模块&#xff0c;可以绘制多种形式的图形&#xff0c;包括线图、直方图、饼状…

理解DALL-E 2

1.简介 DALL-E 2的效果想必大家都已经很清楚了&#xff0c;效果是非常惊人的&#xff0c;该篇文章就是讲一下DALL-E 2的原理是什么。 2.方法 DALL-E 2的原理不难理解&#xff0c;前提是你知道CLIP。简单来说&#xff0c;CLIP是一个由文本和图片多模态训练的一个zero-shot模型…

PTA-2023年软件设计综合实践_9(动态规划法)

7-1 数塔 数塔如图所示&#xff0c;若每一步只能走到相邻的结点&#xff08;图中有数字的方格&#xff09;&#xff0c;则从最顶层走到最底层所经过的所有结点的数字之和最大是多少&#xff1f;测试数据保证结果不大于231−1。 C #include <bits/stdc.h> using namespa…

15个超级实用的Python操作,肯定有你意想不到的!

文章目录 1&#xff09;映射代理&#xff08;不可变字典&#xff09;2&#xff09;dict 对于类和对象是不同的3) any() 和 all()4) divmod()5) 使用格式化字符串轻松检查变量6) 我们可以将浮点数转换为比率7) 用globals()和locals()显示现有的全局/本地变量8) import() 函数9) …

Flink-时间流与水印

时间流与水印 一、背景二、时间语义1.事件时间&#xff08;event time&#xff09;2.读取时间&#xff08;ingestion time&#xff09;3.处理时间&#xff08;processing time&#xff09; 三、水印-Watermarks1.延迟和正确性2.延迟事件3.顺序流4.无序流5.并行流 四、Windows1.…

【Openstack Train安装】八、placement安装

Placement 肩负着这样的历史使命&#xff0c;最早在 Newton 版本被引入到 openstack/nova repo&#xff0c;以 API 的形式进行孵化&#xff0c;所以也经常被称呼为 Placement API。它参与到 nova-scheduler 选择目标主机的调度流程中&#xff0c;负责跟踪记录 Resource Provide…

Windows下命令行启动与关闭WebLogic的相关服务

WebLogic 的服务器类型 WebLogic提供了三种类型的服务器&#xff1a; 管理服务器节点服务器托管服务器 示例和关系如下图&#xff1a; 对应三类服务器&#xff0c; 就有三种启动和关闭的方式。本篇介绍使用命令行脚本的方式启动和关闭这三种类型的服务器。 关于WebLogic 的…