IO知识整理

news2024/10/2 12:22:11

IO

面向系统IO

page cache

程序虚拟内存到物理内存的转换依靠cpu中的mmu映射

物理内存以page(4k)为单位做分配

多个程序访问磁盘上同一个文件,步骤

  • kernel将文件内容加载到pagecache
  • 多个程序读取同一份文件指向的同一个pagecache
  • 多个程序各自维护各自的fd,fd中seek记录偏移量,指向具体数据

pagecache特点

  • 为内核维护的中间层
  • 淘汰机制
  • 持久化机制,是否会丢失数据
  • 占用内存大小

Java输出流

  • 不使用buffer的输出流
  • 使用buffer的输出流
    • 速度超过不使用buffer的输出流,是因为先在jvm内存中写入,默认到8kb写完调用一次systemcall
  • 随机输出流

内存充足的情况下,会优先写入pagecache,只要未达到脏页持久化阈值,就不会写入磁盘,不论pagecache里面多少数据,关机重启后将全部丢失;内存不充足的情况下,pagecache中的新数据的写入会导致老数据的持久化,进而写入磁盘。

可以通过手动调用系统flush将脏页写入磁盘

脏页持久化之后cache依然存在于内存,只有内存不够分配时才会淘汰cache,且淘汰的cache一定不是脏页。

ByteBuffer

@Test
public void whatByteBuffer() {
  // 堆内分配内存
  // ByteBuffer buffer = ByteBuffer.allocate(1024);
  // 堆外分配内存
  ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

  System.out.println("postition: " + buffer.position());
  System.out.println("limit: " + buffer.limit());
  System.out.println("capacity: " + buffer.capacity());
  // position起点 limit终点 capacity容量
  System.out.println("mark: " + buffer);

  // position->3 
  buffer.put("123".getBytes());

  System.out.println("-------------put:123......");
  System.out.println("mark: " + buffer);

  // 读写交替 position->0 limit->3
  buffer.flip();

  System.out.println("-------------flip......");
  System.out.println("mark: " + buffer);

  // 读取一个byte pos->1 limit->3
  buffer.get();

  System.out.println("-------------get......");
  System.out.println("mark: " + buffer);

  // 读写交替 pos->2 limit->1024 已读取的字节删除
  buffer.compact();

  System.out.println("-------------compact......");
  System.out.println("mark: " + buffer);

  buffer.clear();

  System.out.println("-------------clear......");
  System.out.println("mark: " + buffer);
}

mmap

堆外创建一个地址空间,只有文件系统可以直接调用

该内存空间的数据不需要通过系统调用及用户态和内核态的切换,直接写入数据就可以同步

但是依然收到系统page cache限制,存在数据丢失的可能

Direct IO

可以绕过系统控制的page cache,不受系统page cache参数控制

程序自己维护page cache,通过自定义代码逻辑维护一致性/dirty等…

好处是自己维护page cache刷磁盘的阈值,与系统通用配置隔离,但是依然存在丢失数据的逻辑

@Test
public void testRandomAccessFileWrite() throws  Exception {
  RandomAccessFile raf = new RandomAccessFile(path, "rw");

  raf.write("hello mashibing\n".getBytes());
  raf.write("hello seanzhou\n".getBytes());
  System.out.println("write------------");
  System.in.read();

  // 指针调整,往前调整后继续写会直接覆盖原有数据
  raf.seek(4);
  raf.write("ooxx".getBytes());

  System.out.println("seek---------");
  System.in.read();

  FileChannel rafchannel = raf.getChannel();
  // mmap  堆外(jvm堆外且是linux的java进程堆外的内存)  和文件映射的   byte  not  object
  // 此时文件大小会变为4096kb
  MappedByteBuffer map = rafchannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);


  map.put("@@@".getBytes());  //不是系统调用  但是数据会到达 内核的pagecache
  //曾经我们是需要out.write()  这样的系统调用,才能让程序的data 进入内核的pagecache
  //曾经必须有用户态内核态切换
  //mmap的内存映射,依然是内核的pagecache体系所约束的!!!
  //换言之,丢数据

  System.out.println("map--put--------");
  System.in.read();

  // 数据刷入磁盘 等于flush
  // map.force(); 

  raf.seek(0);

  ByteBuffer buffer = ByteBuffer.allocate(8192);
  // ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

  // 将channel中的数据读入buffer 等价于buffer.put()
  int read = rafchannel.read(buffer);   
  System.out.println(buffer);
  
  // 此时翻转后可以开始读取数据 pos->0 limit->4096
  buffer.flip();
  System.out.println(buffer);

  for (int i = 0; i < buffer.limit(); i++) {
    Thread.sleep(200);
    System.out.print(((char)buffer.get(i)));
  }
}

请添加图片描述

面向网络IO

TCP

面向连接的,可靠的传输协议

  • 三次握手
  • 四次分手
  • 内核级开辟资源,建立连接
    • 即使服务端没有accept,也可以建立连接(established状态),只是不分配具体的pid
    • 而从java进程内部,看到的连接依然是listen状态
    • 此时可以从客户端发送数据,而服务端不能接收,服务端开启accept后可以接收到已发送的数据

三次握手报文

  • win 滑动窗口长度 协商结果为115
    • tcp拥塞控制,提速
      • 服务端窗口已满
      • 客户端阻塞不再继续发包(如果继续发包会丢弃后发的数据)
  • seq 序列号 client->server server将序列号+1作为ack返回给client
  • mtu ifconfig可以看到网口字节长度
  • mss 实际数据字节长度,基本上为mtu-ip长度-port长度 各自20字节

请添加图片描述

四次分手

  • 异常状态
    • CLOSE_WAIT(接收方第三次FIN信号没有发送成功)
    • TIME_WAIT (连接关闭后,为防止接收方没有收到最后一次ACK,保留连接对应资源)
    • FIN_WAIT2(没有收到FIN信号)

请添加图片描述

Socket

四元组(cip + sport +sip + sport)

  • 服务端不需要给客户端连接分配新的端口号(四元组标识唯一,客户端服务端各存一份)

内核级别

关键配置

  • backlog

    • 配置后续队列,超过配置条数+1之后,会将连接状态置为SYNC_REC状态,不可连接

    • 应当根据处理速度、cpu条数来进行配置,防止建立过多连接

  • nodelay

    • false:使用优化,会积攒超过buffer长度的字节一次发到服务端,但是会有延时
    • true:不使用优化,根据内核调度尽快发送,会多出几次网络io
  • oobinline

    • 结合nodelay配置使用效果明显,会单独发出第一个字节
  • keepalive

    • 开启后会保持心跳,判断相互之间连接是否生效

IO模型

模型分类

  • 同步:程序自己进行R/W操作
  • 异步:内核完成R/W 程序像是不访问IO一样,直接使用buffer,linux不可用
  • 阻塞:BLOCKING BIO
  • 非阻塞:NONBLOCKING NIO

linux

  • 同步阻塞:BIO
  • 同步非阻塞:NIO、多路复用

BIO

多线程BIO模型,主线程accept+创建子线程,子线程做recv

public static void main(String[] args) throws Exception {
  ServerSocket server = new ServerSocket(9090,20);

  System.out.println("step1: new ServerSocket(9090) ");

  while (true) {
    // linux指令 socket() ->fd3 bind(fd3,8090) accept(fd3)->fd5
    Socket client = server.accept();  //阻塞1
    System.out.println("step2:client\t" + client.getPort());

    // 主线程只负责创建子线程用来接收数据
    new Thread(new Runnable(){

      public void run() {
        InputStream in = null;
        try {
          in = client.getInputStream();
          BufferedReader reader = new BufferedReader(new InputStreamReader(in));
          while(true){
            // linux指令 recv(fd5) 子线程负责读取数据
            String dataline = reader.readLine(); //阻塞2

            if(null != dataline){
              System.out.println(dataline);
            }else{
              client.close();
              break;
            }
          }
          System.out.println("客户端断开");

        } catch (IOException e) {
          e.printStackTrace();
        }

      }



    }).start();

  }

请添加图片描述

BIO多线程模型需要创建新线程(系统调用clone)+线程调度,导致速率降低

BIO的弊端主要来自于阻塞,系统内核级别阻塞(accept+recv)

  • accept等待连接和recv接收数据会阻塞,所以通过创建子线程的方式进行规避,但是clone指令+线程切换也是有很高成本的,当客户端连接数量增加时,处理速度会明显降低
  • 因为recv接收数据会发生阻塞,所以当多个客户端连接的时候只能使用多个线程的方式来进行读取,如果只使用一个线程,当连接阻塞时,没办法读取其他连接发送的数据

NIO

accept指令不会阻塞,如果没有连接会直接返回-1,在Java api中会体现为对象为null

不阻塞的好处

  • 可以不额外创建线程,在一个线程中执行建立连接和接收信息两个操作,节省了创建连接和线程切换的成本

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kpaWtaly-1676460489179)(IO.assets/image-20230209151431908.png)]

public static void main(String[] args) throws Exception {

  LinkedList<SocketChannel> clients = new LinkedList<>();

  ServerSocketChannel ss = ServerSocketChannel.open();  //服务端开启监听:接受客户端
  ss.bind(new InetSocketAddress(9090));
  ss.configureBlocking(false); //重点  OS  NONBLOCKING!!!  //只让接受客户端  不阻塞


  while (true) {
    //接受客户端的连接
    Thread.sleep(1000);
    SocketChannel client = ss.accept(); //不会阻塞?  -1 NULL
    //accept  调用内核了:1,没有客户端连接进来在BIO 的时候一直卡着,但是在NIO不卡着,返回-1,NULL
    //如果来客户端的连接,accept 返回的是这个客户端的fd  5,client  object
    //NONBLOCKING 就是代码能往下走了,只不过有不同的情况

    if (client == null) {
      System.out.println("null.....");
    } else {
      client.configureBlocking(false); //重点 连接socket非阻塞
      // socket
      // 服务端的listen socket(连接请求三次握手后,通过accept 得到 连接的socket)
      // 连接socket(连接后的数据读写使用的)
      int port = client.socket().getPort();
      System.out.println("client..port: " + port);
      clients.add(client);
    }

    ByteBuffer buffer = ByteBuffer.allocateDirect(4096);  //可以在堆里   堆外

    //遍历已经连接进来的客户端能不能读写数据
    for (SocketChannel c : clients) {   //串行化!!!!  多线程!!
      int num = c.read(buffer);  // >0  -1  0   //不会阻塞
      if (num > 0) {
        buffer.flip();
        byte[] aaa = new byte[buffer.limit()];
        buffer.get(aaa);

        String b = new String(aaa);
        System.out.println(c.socket().getPort() + " : " + b);
        buffer.clear();
      }

    }
  }
}

NIO的弊端

  • 每次获取数据是O(n)级别的时间复杂度

    • 例如有10w连接,只有100个数据传输,nio需要10w次recv系统调用,这里面大部分的系统调用是无意义的(没有数据传输,徒增成本)

多路复用IO

程序通过一次系统调用获得其中IO状态,然后程序自己实现对于有状态的IO进行R/W,时间负责度O(m) + O(1)

无论select、poll还是nio,本质上都是对程序持有的fds依次遍历,只不过区别是nio的依次遍历发生在程序侧,需要发生n次系统调用,以及n次用户态内核态的切换;而select和poll,是需要程序传递fds给内核,内核触发遍历,只发生一次系统调用。

linux下 多路复用器

  • SELECT POSIX规范
    • 受到FD_SETSIZE的限制,一次最多1024个fd
    • 每次都要重新传递fds,每次内核调用都需要触发全量调用传递的fds
  • POLL
    • 不受到FD_SETSIZE的限制
    • 每次都要重新传递fds,每次内核调用都需要触发全量调用传递的fds
  • EPOLL
    • 内核开辟空间保存fd,规避程序重复传递fd的问题和遍历全量fd的问题

请添加图片描述

public class SocketMultiplexingSingleThreadv1 {

    private ServerSocketChannel server = null;
  	//linux 多路复用器(select poll    epoll kqueue) nginx  event{}
    private Selector selector = null;   
    int port = 9090;

    public void initServer() {
        try {
            server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(port));


            //如果在epoll模型下,open--》  epoll_create -> fd3
            selector = Selector.open();  //  select  poll  *epoll  优先选择:epoll  但是可以 -D修正

            //server 约等于 listen状态的 fd4
            /*
            register
            如果:
            select,poll:jvm里开辟一个数组 fd4 放进去
            epoll:  epoll_ctl(fd3,ADD,fd4,EPOLLIN
             */
            server.register(selector, SelectionKey.OP_ACCEPT);


        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void start() {
        initServer();
        System.out.println("服务器启动了。。。。。");
        try {
            while (true) {  //死循环

                Set<SelectionKey> keys = selector.keys();
                System.out.println(keys.size()+"   size");


                //1,调用多路复用器(select,poll  or  epoll  (epoll_wait))
                /*
                select()是啥意思:
                1,select,poll  其实  内核的select(fd4)  poll(fd4)
                2,epoll:  其实 内核的 epoll_wait()
                *, 参数可以带时间:没有时间,0  :  阻塞,有时间设置一个超时
                selector.wakeup()  结果返回0

                懒加载:
                其实再触碰到selector.select()调用的时候触发了epoll_ctl的调用

                 */
                while (selector.select() > 0) {
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();  //返回的有状态的fd集合
                    Iterator<SelectionKey> iter = selectionKeys.iterator();
                    //  NIO 对着每一个fd调用系统调用,浪费资源
                    //  多路复用IO调用一次select方法,返回的那些fc可以R/W
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove(); //set  不移除会重复循环处理
                        if (key.isAcceptable()) {
                            //看代码的时候,这里是重点,如果要去接受一个新的连接
                            //语义上,accept接受连接且返回新连接的FD对吧?
                            //那新的FD怎么办?
                            //select,poll,因为他们内核没有空间,那么在jvm中保存和前边的fd4那个listen的一起
                            //epoll: 我们希望通过epoll_ctl把新的客户端fd注册到内核空间
                            acceptHandler(key);
                        } else if (key.isReadable()) {
                            readHandler(key);  //连read 还有 write都处理了
                            //在当前线程,这个方法可能会阻塞  ,如果阻塞了十年,其他的IO早就没电了。。。
                            //所以,为什么提出了 IO THREADS
                            //redis  是不是用了epoll,redis是不是有个io threads的概念 ,redis是不是单线程的
                            //tomcat 8,9  异步的处理方式  IO  和   处理上  解耦
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void acceptHandler(SelectionKey key) {
        try {
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            SocketChannel client = ssc.accept(); //来啦,目的是调用accept接受客户端  fd7
            client.configureBlocking(false);

            ByteBuffer buffer = ByteBuffer.allocate(8192);

            //你看,调用了register
            /*
            select,poll:jvm里开辟一个数组 fd7 放进去
            epoll:  epoll_ctl(fd3,ADD,fd7,EPOLLIN
             */
            client.register(selector, SelectionKey.OP_READ, buffer);
            System.out.println("-------------------------------------------");
            System.out.println("新客户端:" + client.getRemoteAddress());
            System.out.println("-------------------------------------------");

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void readHandler(SelectionKey key) {
        SocketChannel client = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.clear();
        int read = 0;
        try {
            while (true) {
                read = client.read(buffer);
                if (read > 0) {
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        client.write(buffer);
                    }
                    buffer.clear();
                } else if (read == 0) {
                    break;
                } else {
                    client.close();
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();

        }
    }

    public static void main(String[] args) {
        SocketMultiplexingSingleThreadv1 service = new SocketMultiplexingSingleThreadv1();
        service.start();
    }
}

不同多路复用器对应的系统指令

  • POLL(jdk native 用户空间 保存了fd)
    • 创建server并进行监听
      • socket(PF_INET, SOCK_STREAM, IPPROTO_IP) = 4
      • fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK) = 0 //server.configureBlocking(false);
      • bind(4, {sa_family=AF_INET, sin_port=htons(9090)
      • listen(4, 50)
    • 多路复用建立连接
      • poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}], 2, -1) = 1 ([{fd=4, revents=POLLIN}])
      • accept(4, = 7 //获取到新的客户端
      • fcntl(7, F_SETFL, O_RDWR|O_NONBLOCK) //非阻塞
      • poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}, {fd=7, events=POLLIN}], 3, -1) = 1 //selector.select() 注册新的客户端并查询客户端fd是否有状态变更
  • EPOLL
    • 创建server并进行监听
      • socket(PF_INET, SOCK_STREAM, IPPROTO_IP) = 4
      • fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK) = 0 //server.configureBlocking(false);
      • bind(4, {sa_family=AF_INET, sin_port=htons(9090)
      • listen(4, 50)
    • 多路复用创建连接
      • epoll_create(256) = 7 (epfd)
      • epoll_ctl(7, EPOLL_CTL_ADD, 4,
      • epoll_wait(7, {{EPOLLIN, {u32=4, u64=2216749036554158084}}}, 4096, -1) = 1 // selector.select()
      • accept(4 =8 //建立连接获取新client的fd
      • fcntl(8, F_SETFL, O_RDWR|O_NONBLOCK) //设置为非阻塞
      • epoll_ctl(7, EPOLL_CTL_ADD, 8, {EPOLLIN, //注册新的客户端到多路复用器
      • epoll_wait(7, //等待有状态变更的fd返回

多线程Selector

多线程 多路复用IO

分一个bossGroup 和一个workerGroup;bossGroup负责listen,workGroup负责人R/W

public class MainThread {

    public static void main(String[] args) {
        //1,创建 IO Thread  (一个或者多个)
        SelectorThreadGroup boss = new SelectorThreadGroup(3);  //混杂模式
        //boss有自己的线程组

        SelectorThreadGroup worker = new SelectorThreadGroup(3);  //混杂模式
        //worker有自己的线程组

        boss.setWorker(worker);
        //但是,boss得多持有worker的引用:
        /**
         * boss里选一个线程注册listen , 触发bind,从而,这个不选中的线程得持有 workerGroup的引用
         * 因为未来 listen 一旦accept得到client后得去worker中 next出一个线程分配
         */

        boss.bind(9999);
        boss.bind(8888);
        boss.bind(6666);
        boss.bind(7777);
    }
}
public class SelectorThread  extends  ThreadLocal<LinkedBlockingQueue<Channel>>  implements   Runnable{
    // 每线程对应一个selector,
    // 多线程情况下,该主机,该程序的并发客户端被分配到多个selector上
    //注意,每个客户端,只绑定到其中一个selector
    //其实不会有交互问题

    Selector  selector = null;
    LinkedBlockingQueue<Channel> lbq = get();  //lbq  在接口或者类中是固定使用方式逻辑写死了。你需要是lbq每个线程持有自己的独立对象

    SelectorThreadGroup stg;

    @Override
    protected LinkedBlockingQueue<Channel> initialValue() {
        return new LinkedBlockingQueue<>();
    }

    SelectorThread(SelectorThreadGroup stg){
        try {
            this.stg = stg;
            selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    @Override
    public void run() {
        //Loop
        while (true){
            try {
                //1,select()
                int nums = selector.select();  //阻塞  wakeup()

                //2,处理selectkeys
                if(nums>0){
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> iter = keys.iterator();
                    while(iter.hasNext()){  //线程处理的过程
                        SelectionKey key = iter.next();
                        iter.remove();
                        if(key.isAcceptable()){  //复杂,接受客户端的过程(接收之后,要注册,多线程下,新的客户端,注册到那里呢?)
                            acceptHandler(key);
                        }else if(key.isReadable()){
                            readHander(key);
                        }else if(key.isWritable()){
                        }
                    }

                }
                //3,处理一些task :  listen  client
                if(!lbq.isEmpty()){
                    //只有方法的逻辑,本地变量是线程隔离的
                    Channel c = lbq.take();
                    if(c instanceof ServerSocketChannel){
                        ServerSocketChannel server = (ServerSocketChannel) c;
                        server.register(selector,SelectionKey.OP_ACCEPT);
                        System.out.println(Thread.currentThread().getName()+" register listen");
                    }else if(c instanceof  SocketChannel){
                        SocketChannel client = (SocketChannel) c;
                        ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
                        client.register(selector, SelectionKey.OP_READ, buffer);
                        System.out.println(Thread.currentThread().getName()+" register client: " + client.getRemoteAddress());

                    }
                }

            } catch (Exception e) {
                e.printStackTrace();
            } 

        }

    }

    private void readHander(SelectionKey key) {
        System.out.println(Thread.currentThread().getName()+" read......");
        ByteBuffer buffer = (ByteBuffer)key.attachment();
        SocketChannel client = (SocketChannel)key.channel();
        buffer.clear();
        while(true){
            try {
                int num = client.read(buffer);
                if(num > 0){
                    buffer.flip();  //将读到的内容翻转,然后直接写出
                    while(buffer.hasRemaining()){
                        client.write(buffer);
                    }
                    buffer.clear();
                }else if(num == 0){
                    break;
                }else {
                    //客户端断开了
                    System.out.println("client: " + client.getRemoteAddress()+"closed......");
                    key.cancel();
                    break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void acceptHandler(SelectionKey key) {
        System.out.println(Thread.currentThread().getName()+"   acceptHandler......");
        ServerSocketChannel server = (ServerSocketChannel)key.channel();
        try {
            SocketChannel client = server.accept();
            client.configureBlocking(false);
            stg.nextSelectorV3(client);
            // stg.nextSelectorV2(client);
        } catch (IOException e) {
            e.printStackTrace();
        }


    }

    public void setWorker(SelectorThreadGroup stgWorker) {
        this.stg =  stgWorker;
    }
}
public class SelectorThreadGroup {  //天生都是boss

    SelectorThread[] sts;
    ServerSocketChannel server=null;
    AtomicInteger xid = new AtomicInteger(0);

    SelectorThreadGroup  stg =  this;

    public void setWorker(SelectorThreadGroup  stg){
        this.stg =  stg;
    }

    SelectorThreadGroup(int num){
        //num  线程数
        sts = new SelectorThread[num];
        for (int i = 0; i < num; i++) {
            sts[i] = new SelectorThread(this);
            new Thread(sts[i]).start();
        }

    }

    
    public void bind(int port) {
        try {
            server =  ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(port));
            //注册到那个selector上呢?
            nextSelectorV3(server);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public void nextSelectorV3(Channel c) {

        try {
            if(c instanceof  ServerSocketChannel){
                SelectorThread st = next();  //listen 选择了 boss组中的一个线程后,要更新这个线程的work组
                st.lbq.put(c);
                st.setWorker(stg);
                st.selector.wakeup();
            }else {
                SelectorThread st = nextV3();  //在 main线程种,取到堆里的selectorThread对象
                //1,通过队列传递数据 消息
                st.lbq.add(c);
                //2,通过打断阻塞,让对应的线程去自己在打断后完成注册selector
                st.selector.wakeup();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //无论 serversocket  socket  都复用这个方法
    private SelectorThread next() {
        int index = xid.incrementAndGet() % sts.length;  //轮询就会很尴尬,倾斜
        return sts[index];
    }

    private SelectorThread nextV3() {
        int index = xid.incrementAndGet() % stg.sts.length;  //动用worker的线程分配
        return stg.sts[index];
    }
}

Netty

ButeBuf

类似于jdk原生ByteBuffer封装

//initialCapacity maxCapacity 默认分配为堆外内存
//ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(8, 20);
ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(8, 20);

System.out.println("buf.isReadable()    :" + buf.isReadable());
System.out.println("buf.readerIndex()   :" + buf.readerIndex());
System.out.println("buf.readableBytes() " + buf.readableBytes());
System.out.println("buf.isWritable()    :" + buf.isWritable());
System.out.println("buf.writerIndex()   :" + buf.writerIndex());
System.out.println("buf.writableBytes() :" + buf.writableBytes());
System.out.println("buf.capacity()  :" + buf.capacity());
System.out.println("buf.maxCapacity()   :" + buf.maxCapacity());
//是否为堆外内存
System.out.println("buf.isDirect()  :" + buf.isDirect());

Client端

NioEventLoopGroup

NioSocketChannel

客户端读写需要注册到类似于多路复用器

@Test
public void clientMode() throws Exception {
  NioEventLoopGroup thread = new NioEventLoopGroup(1);

  //客户端模式:
  NioSocketChannel client = new NioSocketChannel();
  thread.register(client);  //epoll_ctl(5,ADD,3)

  //响应式:输入处理
  ChannelPipeline p = client.pipeline();
  p.addLast(new MyInHandler());

  //reactor  异步的特征
  ChannelFuture connect = client.connect(new InetSocketAddress("192.168.150.11", 9090));
  ChannelFuture sync = connect.sync();

  ByteBuf buf = Unpooled.copiedBuffer("hello server".getBytes());
  ChannelFuture send = client.writeAndFlush(buf);
  //同步处理  
  send.sync();

  //同步处理 等待服务端断开连接
  sync.channel().closeFuture().sync();
  System.out.println("client over....");
}

class MyInHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client  registed...");
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client active...");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
      	// 这里readCharSequence会移动ByteBuffer里面的指针,所以再写回去是没有数据的
				// CharSequence str = buf.readCharSequence(buf.readableBytes(), CharsetUtil.UTF_8);
        CharSequence str = buf.getCharSequence(0, buf.readableBytes(), CharsetUtil.UTF_8);
        System.out.println(str);
        ctx.writeAndFlush(buf);
    }
}

Server端

NioEventLoopGroup

NioServerSocketChannel

还是响应式编程,有客户端连接后通过acceptHandler进行accept和注册R/W Handler

@Test
public void serverMode() throws Exception {

  NioEventLoopGroup thread = new NioEventLoopGroup(1);
  NioServerSocketChannel server = new NioServerSocketChannel();


  thread.register(server);
  ChannelPipeline p = server.pipeline();
  
  //这里通过ChannelInit处理注册R/W处理器
  //accept接收客户端,并且注册到selector
  p.addLast(new MyAcceptHandler(thread, new ChannelInit())); 

  ChannelFuture bind = server.bind(new InetSocketAddress("192.168.150.1", 9090));
  bind.sync().channel().closeFuture().sync();
  System.out.println("server close....");
}


class MyAcceptHandler extends ChannelInboundHandlerAdapter {

    private final EventLoopGroup selector;
    private final ChannelHandler handler;

    public MyAcceptHandler(EventLoopGroup thread, ChannelHandler myInitHandler) {
        this.selector = thread;
        this.handler = myInitHandler;  //ChannelInit
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("server registerd...");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //  listen  socket   accept    client
        //  socket           R/W       ByteBuf
      
        SocketChannel client = (SocketChannel) msg;
        //响应式的  handler
        ChannelPipeline p = client.pipeline();
        p.addLast(handler);  //1,client::pipeline[ChannelInit,]

        //注册
        selector.register(client);
    }
}

/**
* @ChannelHandler.Sharable表示线程间共享
* 通过ChannelInit初始化来进行与业务处理器之间的解耦
*/
@ChannelHandler.Sharable
class ChannelInit extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        Channel client = ctx.channel();
        ChannelPipeline p = client.pipeline();
        p.addLast(new MyInHandler());//2,client::pipeline[ChannelInit,MyInHandler]
      
      	//用完清除自己即可
        ctx.pipeline().remove(this);//3,client::pipeline[MyInHandler]
    }
}

Nio Bootstrap

Client端

@Test
public void nettyClient() throws InterruptedException {

  NioEventLoopGroup group = new NioEventLoopGroup(1);
  Bootstrap bs = new Bootstrap();
  ChannelFuture connect = bs.group(group)
    .channel(NioSocketChannel.class)
    // .handler(new ChannelInit())
    .handler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline p = ch.pipeline();
        p.addLast(new MyInHandler());
      }
    })
    .connect(new InetSocketAddress("192.168.150.11", 9090));

  Channel client = connect.sync().channel();

  ByteBuf buf = Unpooled.copiedBuffer("hello server".getBytes());
  ChannelFuture send = client.writeAndFlush(buf);
  send.sync();

  client.closeFuture().sync();
}

Server端


@Test
public void nettyServer() throws InterruptedException {
  NioEventLoopGroup group = new NioEventLoopGroup(1);
  ServerBootstrap bs = new ServerBootstrap();
  ChannelFuture bind = bs.group(group, group)
    .channel(NioServerSocketChannel.class)
    // .childHandler(new ChannelInit())
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
      @Override
      protected void initChannel(NioSocketChannel ch) throws Exception {
        ChannelPipeline p = ch.pipeline();
        p.addLast(new MyInHandler());
      }
    })
    .bind(new InetSocketAddress("192.168.150.1", 9090));

  bind.sync().channel().closeFuture().sync();
  
}

C10K problem

单机连接10k客户端问题,随着单服务端连接客户端越来越多,才逐渐出现Nio、多路复用IO等模型

http://www.kegel.com/c10k.html

public static void main(String[] args) {
  LinkedList<SocketChannel> clients = new LinkedList<>();
  InetSocketAddress serverAddr = new InetSocketAddress("192.168.150.11", 9090);

  //端口号的问题:65535
  //  windows
  for (int i = 10000; i < 65000; i++) {
    try {
      SocketChannel client1 = SocketChannel.open();
      SocketChannel client2 = SocketChannel.open();

      /*
       linux中你看到的连接就是:
       client...port: 10508
       client...port: 10508
      */

      client1.bind(new InetSocketAddress("192.168.150.1", i));
      //  192.168.150.1:10000   192.168.150.11:9090
      client1.connect(serverAddr);
      clients.add(client1);

      client2.bind(new InetSocketAddress("192.168.110.100", i));
      //  192.168.110.100:10000  192.168.150.11:9090
      client2.connect(serverAddr);
      clients.add(client2);

    } catch (IOException e) {
      e.printStackTrace();
    }


  }
  System.out.println("clients "+ clients.size());

  try {
    System.in.read();
  } catch (IOException e) {
    e.printStackTrace();
  }
}

IO数据流向图

EPOLL即为在内核空间中通过链表保存有数据状态变更的fd,直接从链表中获取fd即可

请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/348024.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL(五)

通过索引进行优化 索引基本知识 索引的优点 1、大大减少了服务器需要扫描的数据量2、帮助服务器避免排序和临时表3、将随机io变成顺序io 索引的用处 1、快速查找匹配WHERE子句的行2、从consideration中消除行,如果可以在多个索引之间进行选择&#xff0c;mysql通常会使用找到…

【Python爬虫案例教学】采集某网站壁纸,实现壁纸自由

前言 (&#xff61;&#xff65;∀&#xff65;)&#xff89;&#xff9e;嗨 大家好&#xff0c;这里是小圆 现在开始每天都给大家 分享些关于python爬虫的案例教学 从最简单的开始 — 采集图片壁纸 今天就来扒拉这个优质的壁纸网站~ 网址 &#x1f447; 顺便瞧一眼 这里的…

30 - 面向对象的其他语法

目录 一、本章重点 二、对象的分类 1、类对象 &#xff08;1&#xff09;理解 &#xff08;2&#xff09;作用 2、实例对象 &#xff08;1&#xff09;理解 三、属性的划分 1、实例属性 &#xff08;1&#xff09;概念 &#xff08;2&#xff09;定义 &#xff08;3&#xff09…

操作系统的概念,功能和目标

一、概念&#xff1a; 操作系统是指①控制和管理整个计算机系统的硬件和软件资源&#xff0c;并合理地组织调度计算机的工作和资源的分配&#xff0c;以②提供给用户和其他软件方便的接口和环境&#xff0c;它③是计算机系统中最基本的系统软件。 二、功能和目标&#xff1a;…

零入门kubernetes网络实战-14->基于veth pair、namespace以及路由技术,实现跨主机命名空间之间的通信测试案例

《零入门kubernetes网络实战》视频专栏地址 https://www.ixigua.com/7193641905282875942 本篇文章视频地址(稍后上传) 本篇文章继续提供测试案例&#xff1a; 基于veth pair、namespace以及路由技术&#xff0c;实现跨主机命名空间之间的通信 1、网络拓扑如下 2、网络拓扑构建…

【第二章】(1)了解系统内核和 Shell 终端的关系与作用

&#x1f427;2.1强大好用的Shell&#x1f9ca;1.什么是Shell&#xff1f;&#x1f9ca;2.Bash解释器的优势&#x1f427;2.2 执行命令的必备知识&#x1f9ca;1.Linux命令的格式&#x1f9ca;2.四个快捷键/组合键小技巧&#x1f9ca;1.什么是Shell&#xff1f; ​ 一台…

shell脚本的编写以及shell中语句(嵌入式学习)

shell学习shell脚本编写步骤shell变量功能性语句1.read2.expr3.let4.test逻辑运算符的书写格式结构性语句1.if…then…fi2、case...esac3、for..do..done4、while..do..doneshell脚本 将shell命令按照一定的逻辑顺序实现指定功能&#xff0c;放到一个文件中。文件叫脚本文件&a…

几个Base64编码工具,也有蹊跷

起因 需求&#xff1a;对一段内容进行base64加密&#xff0c;然后通过url的get请求进行发送到后台&#xff0c;由于加密的内容比较少&#xff0c;base64串也不是很长&#xff0c;我认为此方案可行。 于是找了三个base64编码的在线工具&#xff0c;分别是&#xff1a; 平台1&…

设计模式第六讲:责任链模式和迭代器模式详解

一. 责任链模式 1. 背景 在现实生活中&#xff0c;常常会出现这样的事例&#xff1a;一个请求有多个对象可以处理&#xff0c;但每个对象的处理条件或权限不同。例如&#xff0c;公司员工请假&#xff0c;可批假的领导有部门负责人、副总经理、总经理等&#xff0c;但每个领导…

【Java基础】020 -- 常见API

目录 一、游戏打包exe 二、Math 1、Math类的常用方法 ①、代码实现 2、小结 3、练习 ①、练习一&#xff1a;改进判断一个数是否为一个质数 ②、练习二&#xff1a;自幂数 三、System 1、时间原点 2、常用方法 3、课堂练习 4、注意事项 5、小结 四、Runtime 1、常用方法 2、练习…

微服务项目【商品秒杀接口压测及优化】

生成测试用户 将UserUtils工具类导入到zmall-user模块中&#xff0c;运行生成测试用户信息&#xff0c;可根据自身电脑情况来生成用户数量。 UserUtils&#xff1a; package com.xujie.zmall.utils;import com.alibaba.nacos.common.utils.MD5Utils; import com.fasterxml.j…

【黄啊码】我问ChatGPT如何学习PHP语言,它是这么说的

大家好&#xff0c;我是黄啊码&#xff0c;最近大家都在流行整chatGPT&#xff0c;今天它来了&#xff01;别人都在吹嘘它万能&#xff0c;能够代替程序员写代码&#xff0c;今天我们就让它教教我们学习PHP语言&#xff1a; 黄啊码&#xff1a; 如何有效学习php语言&#xff1…

关于剩余电流动作继电器在配电系统中的应用探讨

摘 要&#xff1a;据了解&#xff0c;我国每年剩余电流动作继电器&#xff08;RCD&#xff09;的使用量超过2.7亿台&#xff0c;属于CCC认证产品&#xff0c;广泛应用于住宅、办公、商业、酒店、学校等民用建筑和数据中心及工业场所。 现在剩余电流动作继电器依据的标准是GB/T…

vue项目如何使用 SheetJS(xlsx)插件?

简言 SheetJS是一款非常好用的前端处理表格文件的工具。它分社区版和专业版&#xff0c;我们今天来介绍如何简单使用它的社区版。 SheetJS社区版官网 介绍 你应该打开官网浏览具体使用详情。 安装 打开官网在如上图的Installation板块中可以找到各种运行模块的使用方式。 …

MongoDB 覆盖索引查询

MongoDB 覆盖索引查询 官方的MongoDB的文档中对覆盖查询做了说明&#xff1a; 所有的查询字段是索引的一部分所有的查询返回字段在同一个索引中 由于所有出现在查询中的字段是索引的一部分&#xff0c; MongoDB 无需在整个数据文档中检索匹配查询条件和返回使用相同索引的查询…

【半监督医学图像分割 2023 CVPR】UCMT 论文翻译

文章目录【半监督医学图像分割 2023 CVPR】UCMT 论文翻译摘要1. 介绍2. 相关工作2.1 半监督学习2.2 半监督分割2.3 不确定性引导的半监督语义分割3. 方法3.1 问题的定义3.2 总览3.3 协作式均值教师3.4 不确定性指导混合4. 实验和结论5. 总结【半监督医学图像分割 2023 CVPR】UC…

webpack(4版本)使用

webpack简介&#xff1a;webpack 是一种前端资源构建工具&#xff0c;一个静态模块打包器(module bundler)。在 webpack 看来, 前端的所有资源文件(js/json/css/img/less/...)都会作为模块处理。它将根据模块的依赖关系进行静态分析&#xff0c;打包生成对应的静态资源(bundle)…

sentinel持久化方案

一.sentinel规则推送原理 1.原有内存规则存储原理 (1)dashborad中请求到服务器后&#xff0c;在controller中通过http把规则直接推送给client&#xff0c;client接收后把规则放入内存&#xff1b; 2.持久化推送规则原理 ![在这里插入代码片](https://img-blog.csdnimg.cn/1…

质量保障体系建设演进案例

在业务早期发展阶段&#xff0c;主要是产品驱动、研发和测试互相配合。不同的测试方法是验证和保障交付质量的手段&#xff0c;而不是构建质量体系的基石。测试的努力带来的更多是一些“安全感”&#xff0c;而非安全保障。因此&#xff0c;要做到高质量的交付&#xff0c;就需…

k8s简单搭建

前言 最近学习k8s&#xff0c;跟着网上各种教程搭建了简单的版本&#xff0c;一个master节点&#xff0c;两个node节点&#xff0c;这里记录下防止以后忘记。 具体步骤 准备环境 用Oracle VM VirtualBox虚拟机软件安装3台虚拟机&#xff0c;一台master节点&#xff0c;两台…