NIO基础

news2024/11/23 12:04:53

NIO

在学习Netty之前,我们需要先了解一下NIO,以便更好的学习Netty

NIO是non-blocking io,也就是非阻塞IO

1.三大组件

1.1 channel & Buffer

channel 有一点类似于 stream,它就是读写数据的双向通道,可以从 channel 将数据读入 buffer,也可以将 buffer 的数据写入 channel,而之前的 stream 要么是输入,要么是输出,channel 比 stream 更为底层

channel
buffer

常见的 Channel 有

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

buffer 则用来缓冲读写数据,常见的 buffer 有

  • ByteBuffer
    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
  • CharBuffer

1.2 Selector

需要搭配实际案例来理解它的用途

假设我们现在需要设计一个聊天软件,那我们如何做到一个服务端与多个客户端通信呢?

多线程版设计

多线程版
socket1
thread
socket2
thread
socket3
thread

多线程版缺点

  • 内存占用高,一个客户端一条线程
  • 线程上下文切换成本高
  • 只适合连接数少的场景

线程池版设计

线程池版
socket1
thread
socket3
thread
socket2
socket4

线程池版缺点

  • 阻塞模式下,线程仅能处理一个socket连接
  • 仅适合短链接场景

selector版设计

selector 的作用就是配合一个线程来管理多个 channel,获取这些 channel 上发生的事件,这些 channel 工作在非阻塞模式下,不会让线程吊死在一个 channel 上。适合连接数特别多,但流量低的场景(low traffic)

selector版
selector
thread
channel1
channel2
channel3

调用 selector 的 select() 会阻塞直到 channel 发生了读写就绪事件,这些事件发生,select 方法就会返回这些事件交给 thread 来处理

举个实际生活中的案例,餐厅中的服务员就是一个thread,而每桌的顾客就是一个channel,一个服务员负责多组顾客,而服务员需要一个监视器,监视器就是selector,例如海底捞的服务员他的selector就是他的眼睛,当看到顾客桌子上没饮品时过去帮忙加上。

2. ByteBuffer

通过以下一个案例来了解ByteBuffer

@Slf4j
@SpringBootTest
public class TestByteBuffer {
    @Test
    public void byteBuffer(){
        /**
         * 获取FileChannel(两种方式)
         * 1. 输入输出流
         * 2. RandomAccessFile
         */
        try(FileChannel channel = new FileInputStream("test.txt").getChannel()) { //channel实现了Closeable接口,可以结束时自动释放资源
            //创建一个缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(10);
            while (true) {
                //从channel中读取数据,写入buffer
                int len = channel.read(buffer);
                log.debug("当前读取字节数:{}" , len);
                if (len == -1) { //len为-1时代表数据已经读完
                    break;
                }
                //打印buffer中的内容
                buffer.flip(); //将buffer切换为读模式
                while (buffer.hasRemaining()) { //只要buffer中还有数据就一直读
                    byte b = buffer.get();
                    log.debug("实际字节:{}", (char) b);
                }
                buffer.clear(); //将buffer切换为写模式
            }
        }catch (IOException e) {
        }
    }
}

输出如下

13:37:44 [DEBUG] [main] c.y.n.TestByteBuffer - 当前读取字节数:10
13:37:44 [DEBUG] [main] c.y.n.TestByteBuffer - 实际字节:1
13:37:44 [DEBUG] [main] c.y.n.TestByteBuffer - 实际字节:2
13:37:44 [DEBUG] [main] c.y.n.TestByteBuffer - 实际字节:3
13:37:44 [DEBUG] [main] c.y.n.TestByteBuffer - 实际字节:4
13:37:44 [DEBUG] [main] c.y.n.TestByteBuffer - 实际字节:5
13:37:44 [DEBUG] [main] c.y.n.TestByteBuffer - 实际字节:6
13:37:44 [DEBUG] [main] c.y.n.TestByteBuffer - 实际字节:7
13:37:44 [DEBUG] [main] c.y.n.TestByteBuffer - 实际字节:8
13:37:44 [DEBUG] [main] c.y.n.TestByteBuffer - 实际字节:9
13:37:44 [DEBUG] [main] c.y.n.TestByteBuffer - 实际字节:0
13:37:44 [DEBUG] [main] c.y.n.TestByteBuffer - 当前读取字节数:4
13:37:44 [DEBUG] [main] c.y.n.TestByteBuffer - 实际字节:a
13:37:44 [DEBUG] [main] c.y.n.TestByteBuffer - 实际字节:b
13:37:44 [DEBUG] [main] c.y.n.TestByteBuffer - 实际字节:c
13:37:44 [DEBUG] [main] c.y.n.TestByteBuffer - 实际字节:d
13:37:44 [DEBUG] [main] c.y.n.TestByteBuffer - 当前读取字节数:-1

2.1 正确使用步骤

  1. 向ByteBuffer中写入数据
  2. 调用flip()方法将buffer切换为读模式
  3. 从ByteBuffer中读数据
  4. 调用clear()方法将buffer切换为写模式
  5. 重复1-4步骤

2.2 内部结构

通过观察源码,可以发现ByteBuffer有几个比较重要的属性

  1. position:类似于循环中的指针,position标记的是下一个待读/写的位置
  2. limit:写入/读取限制
  3. capacity:容量

image-20230508133407107

执行以下代码创建一个新的ByteBuffer

通过allocate()创建的ByteBuffer默认是在写模式下

ByteBuffer buffer = ByteBuffer.allocate(10);

image-20230508140519332

下图表示写入了4个数据后的状态

image-20230508140913336

通过调用flip()方法将ByteBuffer切换为读模式,以下是flip()源码以及切换后的状态

public final Buffer flip() {
    limit = position;
    position = 0;
    mark = -1;
    return this;
}

image-20230508141300726

调用get()方法读取4个字节后的状态如下

image-20230508141429085

调用clear()方法切换为写模式

    public final Buffer clear() {
        position = 0;
        limit = capacity;
        mark = -1;
        return this;
    }

image-20230508140519332

另外还有一个compact() 方法,是把未读完的部分向前压缩,然后切换至写模式

0022

调试工具类

public class ByteBufferUtil {
    private static final char[] BYTE2CHAR = new char[256];
    private static final char[] HEXDUMP_TABLE = new char[256 * 4];
    private static final String[] HEXPADDING = new String[16];
    private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
    private static final String[] BYTE2HEX = new String[256];
    private static final String[] BYTEPADDING = new String[16];

    static {
        final char[] DIGITS = "0123456789abcdef".toCharArray();
        for (int i = 0; i < 256; i++) {
            HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
            HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
        }

        int i;

        // Generate the lookup table for hex dump paddings
        for (i = 0; i < HEXPADDING.length; i++) {
            int padding = HEXPADDING.length - i;
            StringBuilder buf = new StringBuilder(padding * 3);
            for (int j = 0; j < padding; j++) {
                buf.append("   ");
            }
            HEXPADDING[i] = buf.toString();
        }

        // Generate the lookup table for the start-offset header in each row (up to 64KiB).
        for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
            StringBuilder buf = new StringBuilder(12);
            buf.append(NEWLINE);
            buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
            buf.setCharAt(buf.length() - 9, '|');
            buf.append('|');
            HEXDUMP_ROWPREFIXES[i] = buf.toString();
        }

        // Generate the lookup table for byte-to-hex-dump conversion
        for (i = 0; i < BYTE2HEX.length; i++) {
            BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
        }

        // Generate the lookup table for byte dump paddings
        for (i = 0; i < BYTEPADDING.length; i++) {
            int padding = BYTEPADDING.length - i;
            StringBuilder buf = new StringBuilder(padding);
            for (int j = 0; j < padding; j++) {
                buf.append(' ');
            }
            BYTEPADDING[i] = buf.toString();
        }

        // Generate the lookup table for byte-to-char conversion
        for (i = 0; i < BYTE2CHAR.length; i++) {
            if (i <= 0x1f || i >= 0x7f) {
                BYTE2CHAR[i] = '.';
            } else {
                BYTE2CHAR[i] = (char) i;
            }
        }
    }

    /**
     * 打印所有内容
     * @param buffer
     */
    public static void debugAll(ByteBuffer buffer) {
        int oldlimit = buffer.limit();
        buffer.limit(buffer.capacity());
        StringBuilder origin = new StringBuilder(256);
        appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
        System.out.println("+--------+-------------------- all ------------------------+----------------+");
        System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
        System.out.println(origin);
        buffer.limit(oldlimit);
    }

    /**
     * 打印可读取内容
     * @param buffer
     */
    public static void debugRead(ByteBuffer buffer) {
        StringBuilder builder = new StringBuilder(256);
        appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
        System.out.println("+--------+-------------------- read -----------------------+----------------+");
        System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
        System.out.println(builder);
    }

    private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
        if (isOutOfBounds(offset, length, buf.capacity())) {
            throw new IndexOutOfBoundsException(
                    "expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
                            + ") <= " + "buf.capacity(" + buf.capacity() + ')');
        }
        if (length == 0) {
            return;
        }
        dump.append(
                "         +-------------------------------------------------+" +
                        NEWLINE + "         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |" +
                        NEWLINE + "+--------+-------------------------------------------------+----------------+");

        final int startIndex = offset;
        final int fullRows = length >>> 4;
        final int remainder = length & 0xF;

        // Dump the rows which have 16 bytes.
        for (int row = 0; row < fullRows; row++) {
            int rowStartIndex = (row << 4) + startIndex;

            // Per-row prefix.
            appendHexDumpRowPrefix(dump, row, rowStartIndex);

            // Hex dump
            int rowEndIndex = rowStartIndex + 16;
            for (int j = rowStartIndex; j < rowEndIndex; j++) {
                dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
            }
            dump.append(" |");

            // ASCII dump
            for (int j = rowStartIndex; j < rowEndIndex; j++) {
                dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
            }
            dump.append('|');
        }

        // Dump the last row which has less than 16 bytes.
        if (remainder != 0) {
            int rowStartIndex = (fullRows << 4) + startIndex;
            appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);

            // Hex dump
            int rowEndIndex = rowStartIndex + remainder;
            for (int j = rowStartIndex; j < rowEndIndex; j++) {
                dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
            }
            dump.append(HEXPADDING[remainder]);
            dump.append(" |");

            // Ascii dump
            for (int j = rowStartIndex; j < rowEndIndex; j++) {
                dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
            }
            dump.append(BYTEPADDING[remainder]);
            dump.append('|');
        }

        dump.append(NEWLINE +
                "+--------+-------------------------------------------------+----------------+");
    }

    private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
        if (row < HEXDUMP_ROWPREFIXES.length) {
            dump.append(HEXDUMP_ROWPREFIXES[row]);
        } else {
            dump.append(NEWLINE);
            dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
            dump.setCharAt(dump.length() - 9, '|');
            dump.append('|');
        }
    }

    public static short getUnsignedByte(ByteBuffer buffer, int index) {
        return (short) (buffer.get(index) & 0xFF);
    }
}

2.3 常见方法

分配空间

可以使用 allocate() 方法为 ByteBuffer 分配空间,其它 buffer 类也有该方法

Bytebuffer buf = ByteBuffer.allocate(16);

向 buffer 写入数据

有两种办法

  • 调用 channel 的 read() 方法

    int readBytes = channel.read(buf);
    
  • 调用 buffer 自己的 put() 方法

    buf.put((byte)61);
    

    image-20230508143040115

从 buffer 读取数据

同样有两种办法

  • 调用 channel 的 write() 方法

    int writeBytes = channel.write(buf);
    
  • 调用 buffer 自己的 get() 方法

    byte b = buf.get();
    

get 方法会让 position 读指针向后走,如果想重复读取数据

  • 可以调用 rewind() 方法将 position 重新置为 0
  • 或者调用 get(int i) 方法获取索引 i 的内容,它不会移动读指针

mark 和 reset

mark 是在读取时,做一个标记,即使 position 改变,只要调用 reset 就能回到 mark 的位置

注意

rewind 和 flip 都会清除 mark 位置

创建ByteBuffer的方式

通过ByteBuffer.allocate(16)创建,此方法创建后,ByteBuffer默认是写模式。可以从postion=5看出

@Test
public void byteBuffer1() {
    ByteBuffer buffer = ByteBuffer.allocate(16);
    buffer.put("hello".getBytes());
    ByteBufferUtil.debugAll(buffer);
}

image-20230508143840511

通过StandardCharsets来创建,此方法创建出的ByteBuffer模式是读模式

@Test
public void byteBuffer2() {
    ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello");
    ByteBufferUtil.debugAll(buffer);
}

image-20230508144113883

通过ByteBuffer.wrap()方法创建,此方法创建出的ByteBuffer模式也是读模式

@Test
public void byteBuffer3() {
    ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());
    ByteBufferUtil.debugAll(buffer);
}

2.4 分散读集中写

了解即可

Scattering Reads

分散读,指的是将一条信息,分批读到不同的ByteBuffer中

例如:demo.txt文件中有这么一行信息hello123world,现在我们需要将hello123world分别读出来

@Test
public void scatteringRead(){
    //使用RandomAccessFile创建channel
    try (FileChannel channel = new RandomAccessFile("demo.txt", "r").getChannel()) { // r表示只读
        ByteBuffer b1 = ByteBuffer.allocate(5);
        ByteBuffer b2 = ByteBuffer.allocate(3);
        ByteBuffer b3 = ByteBuffer.allocate(5);
        channel.read(new ByteBuffer[]{b1,b2,b3});
        ByteBufferUtil.debugAll(b1);
        ByteBufferUtil.debugAll(b2);
        ByteBufferUtil.debugAll(b3);
    } catch (IOException e) {
    }
}

结果如下

image-20230508162905213

Gathering Writes

集中写,指的是将多个ByteBuffer里的数据集中写入一个文件中

例如:现在有3个ByteBuffer,里面的数据为yellowstar459现在将他们组合起来写入demo2.txt文件

这里需要注意的是通过StandardCharsets.UTF_8.encode("yellow")创建的ByteBuffer已经是读模式,不要重复使用flip(),否则会导致读不到数据

@Test
public void gatheringWrites(){
    try (FileChannel channel = new RandomAccessFile("demo2.txt", "rw").getChannel()) { // rw代表读写
        ByteBuffer b1 = StandardCharsets.UTF_8.encode("yellow");
        ByteBuffer b2 = StandardCharsets.UTF_8.encode("star");
        ByteBuffer b3 = StandardCharsets.UTF_8.encode("459");
        channel.write(new ByteBuffer[]{b1,b2,b3});
    } catch (IOException e) {
    }
}

2.5 黏包、半包

网络上有多条数据发送给服务端,数据之间使用 \n 进行分隔
但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为

  • Hello,world\n
  • I’m zhangsan\n
  • How are you?\n

变成了下面的两个 byteBuffer (黏包,半包)

  • Hello,world\nI’m zhangsan\nHo
  • w are you?\n

现在要求你编写程序,将错乱的数据恢复成原始的按 \n 分隔的数据

public class TestByteBuffer2 {
    public static void main(String[] args) {
        ByteBuffer source = ByteBuffer.allocate(32);
        source.put("Hello,world\nI'm zhangsan\nHo".getBytes());
        split(source);

        source.put("w are you?\nhaha!\n".getBytes());
        split(source);
    }

    private static void split(ByteBuffer buffer) {
        //将buffer转为读模式
        buffer.flip();
        int limit = buffer.limit();
        for (int i = 0; i < limit; i++) {
            if (buffer.get(i) == '\n') {
                int length = i + 1 - buffer.position();
                //用一个ByteBuffer存起来
                ByteBuffer temp = ByteBuffer.allocate(length);
                buffer.limit(i + 1);
                //从buffer读,写到temp
                temp.put(buffer);
                ByteBufferUtil.debugAll(temp);
                buffer.limit(limit);
            }
        }
        buffer.compact();
    }
}

测试结果如下:

image-20230508172205818

3. 文件编程

3.1FileChannel

工作模式

FileChannel只能工作在阻塞模式下

获取

不能直接打开 FileChannel,必须通过 FileInputStream、FileOutputStream 或者 RandomAccessFile 来获取 FileChannel,它们都有 getChannel 方法

  • 通过 FileInputStream 获取的 channel 只能读
  • 通过 FileOutputStream 获取的 channel 只能写
  • 通过 RandomAccessFile 是否能读写根据构造 RandomAccessFile 时的读写模式决定

读取

会从 channel 读取数据填充 ByteBuffer,返回值表示读到了多少字节,-1 表示到达了文件的末尾

int readBytes = channel.read(buffer);

写入

写入的正确姿势如下, SocketChannel

ByteBuffer buffer = ...;
buffer.put(...); // 存入数据
buffer.flip();   // 切换读模式

while(buffer.hasRemaining()) {
    channel.write(buffer);
}

在 while 中调用 channel.write 是因为 write 方法并不能保证一次将 buffer 中的内容全部写入 channel

关闭

channel 必须关闭,不过调用了 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 close 方法会间接地调用 channel 的 close 方法

位置

获取当前位置

long pos = channel.position();

设置当前位置

long newPos = ...;
channel.position(newPos);

设置当前位置时,如果设置为文件的末尾

  • 这时读取会返回 -1
  • 这时写入,会追加内容,但要注意如果 position 超过了文件末尾,再写入时在新内容和原末尾之间会有空洞(00)

大小

使用 size 方法获取文件的大小

强制写入

操作系统出于性能的考虑,会将数据缓存,不是立刻写入磁盘。可以调用 force(true) 方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘

3.2 两个Channel传输数据

现在有这么个需求,将demo.txt中的数据复制到to.txt

@SpringBootTest
public class TestChannel {
    @Test
    public void transferTo(){
        try (
                FileChannel from = new FileInputStream("demo.txt").getChannel();
                FileChannel to = new FileOutputStream("to.txt").getChannel()
        ) {
            from.transferTo(0,from.size(),to);
        } catch (IOException e) {
        }
    }
}

以上代码确实可以达到复制的效果,transferTo一次只能读到2g大小的内容,所以当文件大小超过2g时,需要循环读取

@SpringBootTest
public class TestChannel {
    @Test
    public void transferTo(){
        try (
                FileChannel from = new FileInputStream("demo.txt").getChannel();
                FileChannel to = new FileOutputStream("to.txt").getChannel()
        ) {
            long size = from.size();
            //left表示还有多少文件未读取
            for (long left = size;left > 0;) {
                left -= from.transferTo(0, size, to);
            }
        } catch (IOException e) {
        }
    }
}

3.3 Path

jdk7 引入了 Path 和 Paths 类

  • Path 用来表示文件路径
  • Paths 是工具类,用来获取 Path 实例
Path source = Paths.get("1.txt"); // 相对路径 使用 user.dir 环境变量来定位 1.txt

Path source = Paths.get("d:\\1.txt"); // 绝对路径 代表了  d:\1.txt

Path source = Paths.get("d:/1.txt"); // 绝对路径 同样代表了  d:\1.txt

Path projects = Paths.get("d:\\data", "projects"); // 代表了  d:\data\projects
  • . 代表了当前路径
  • .. 代表了上一级路径

例如目录结构如下

d:
	|- data
		|- projects
			|- a
			|- b

代码

Path path = Paths.get("d:\\data\\projects\\a\\..\\b");
System.out.println(path);
System.out.println(path.normalize()); // 正常化路径

会输出

d:\data\projects\a\..\b
d:\data\projects\b

3.4 Files

主要学习一下API的用法

检查文件是否存在

@Test
public void test1(){
    Path path = Paths.get("demo11.txt");
    boolean exists = Files.exists(path);
    System.out.println(exists);
}

创建一级目录

@Test
public void test2() throws IOException {
    Path path = Paths.get("hello");
    Files.createDirectory(path);
}

如果目录已存在会报FileAlreadyExistsException错误

一次创建多级目录,会报NoSuchFileException错误

创建多级目录

@Test
public void test3() throws IOException {
    Path path = Paths.get("hello1/d1");
    Files.createDirectories(path);
}

拷贝文件

@Test
public void test4() throws IOException {
    Path source = Paths.get("hello/source.txt");
    Path target = Paths.get("hello/target.txt");
    Files.copy(source,target);
}

如果文件已存在,会报FileAlreadyExistsException错误

可以使用StandardCopyOption.REPLACE_EXISTING表示存在则替换

Files.copy(source,target, StandardCopyOption.REPLACE_EXISTING);

移动文件

@Test
public void test5() throws IOException {
    Path source = Paths.get("hello/source.txt");
    Path target = Paths.get("hello1/source.txt");
    Files.move(source,target, StandardCopyOption.ATOMIC_MOVE);
}

StandardCopyOption.ATOMIC_MOVE保证文件移动的原子性

删除文件

@Test
public void test6() throws IOException {
    Path path = Paths.get("hello1/source.txt");
    Files.delete(path);
}

如果文件不存在,会报NoSuchFileException错误

遍历目录文件

@Test
public void test7() throws IOException {
    Path path = Paths.get("E:\\hkx\\apache-maven-3.9.1");
    AtomicInteger dirCount = new AtomicInteger();
    AtomicInteger fileCount = new AtomicInteger();
    Files.walkFileTree(path,new SimpleFileVisitor<Path>(){
        @Override
        public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
            System.out.println("====>  " + dir);
            dirCount.incrementAndGet();
            return super.preVisitDirectory(dir, attrs);
        }

        @Override
        public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
            System.out.println(file);
            fileCount.incrementAndGet();
            return super.visitFile(file, attrs);
        }
    });
    System.out.println("文件夹数量:" + dirCount);
    System.out.println("文件数量:" + fileCount);
}

统计jar的数目

@Test
public void test8() throws IOException {
    Path path = Paths.get("E:\\hkx\\apache-maven-3.9.1");
    AtomicInteger jarCount = new AtomicInteger();
    Files.walkFileTree(path,new SimpleFileVisitor<Path>(){
        @Override
        public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
            if (file.toString().endsWith(".jar")) {
                System.out.println(file);
                jarCount.incrementAndGet();
            }
            return super.visitFile(file, attrs);
        }
    });
    System.out.println("jar数量:" + jarCount);
}

删除多级目录

目录下如果有东西存在,则不能直接删除,会报DirectoryNotEmptyException错误

也就是说只有目录为空时才能删除目录,所以我们要遍历整个文件夹,先删除文件在删除目录

@Test
public void test9() throws IOException {
    Path path = Paths.get("E:\\nacos");
    Files.walkFileTree(path,new SimpleFileVisitor<Path>(){
        @Override
        public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
            Files.delete(file);
            return super.visitFile(file, attrs);
        }

        @Override
        public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
            Files.delete(dir);
            return super.postVisitDirectory(dir, exc);
        }
    });
}

删除是危险操作,确保要递归删除的文件夹没有重要内容!

拷贝多级目录

@Test
public void  test10() throws IOException {
    String source = "E:\\nacos";
    String target = "E:\\nacosaaa";
    Files.walk(Paths.get(source)).forEach(path -> {
        try {
            String file = path.toString().replace(source,target);
            //如果是目录则创建
            if (Files.isDirectory(path)) {
                Files.createDirectory(Paths.get(file));
            }else if (Files.isRegularFile(path)){ //如果是普通文件
                Files.copy(path,Paths.get(file));
            }
        }catch (IOException e) {
            e.printStackTrace();
        }
    });
}

4. 网络编程

4.1 阻塞 vs 非阻塞

阻塞

  • 阻塞模式下,相关方法都会导致线程暂停
    • ServerSocketChannel.accept 会在没有连接建立时让线程暂停
    • SocketChannel.read 会在没有数据可读时让线程暂停
    • 阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置
  • 单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持
  • 但多线程下,有新的问题,体现在以下方面
    • 32 位 jvm 一个线程 320k,64 位 jvm 一个线程 1024k,如果连接数过多,必然导致 OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
    • 可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间 inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接

服务端代码

@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
        // 1.创建服务器
        ServerSocketChannel ssc = ServerSocketChannel.open();
        // 2.绑定监听端口
        ssc.bind(new InetSocketAddress(8080));
        // 3.建立连接集合
        ArrayList<SocketChannel> channels = new ArrayList<>();
        while (true) {
            // 4.accept 建立与客户端连接, SocketChannel 用来与客户端之间通信
            log.debug("connecting...");
            SocketChannel socketChannel = ssc.accept(); // 阻塞方法,线程停止运行
            log.debug("connected!");
            channels.add(socketChannel);
            for (SocketChannel channel : channels) {
                // 5.开辟一个缓存区
                ByteBuffer buffer = ByteBuffer.allocate(16);
                log.debug("before read... {}", channel);
                channel.read(buffer); // 阻塞方法,线程停止运行
                // 6.切换为读模式
                buffer.flip();
                ByteBufferUtil.debugRead(buffer);
                // 7.切换为写模式
                buffer.clear();
                log.debug("after read...{}", channel);
            }
        }
    }
}

客户端代码

客户端代码就很简单

public class Client {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost",8080));
        System.out.println();
    }
}

非阻塞

  • 非阻塞模式下,相关方法都会不会让线程暂停
    • 在 ServerSocketChannel.accept 在没有连接建立时,会返回 null,继续运行
    • SocketChannel.read 在没有数据可读时,会返回 0,但线程不必阻塞,可以去执行其它 SocketChannel 的 read 或是去执行 ServerSocketChannel.accept
    • 写数据时,线程只是等待数据写入 Channel 即可,无需等 Channel 通过网络把数据发送出去
  • 但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了 cpu
  • 数据复制过程中,线程实际还是阻塞的(AIO 改进的地方)

服务端代码

服务端代码小小的改动一下,客户端代码不变

主要修改configureBlocking(false)修改为非阻塞状态

@Slf4j
public class Server2 {
    public static void main(String[] args) throws IOException {
        // 1.创建服务器
        ServerSocketChannel ssc = ServerSocketChannel.open();
        // 2.绑定监听端口
        ssc.bind(new InetSocketAddress(8080));
        // 设置服务器为非阻塞状态
        ssc.configureBlocking(false);
        // 3.建立连接集合
        ArrayList<SocketChannel> channels = new ArrayList<>();
        while (true) {
            // 4.accept 建立与客户端连接, SocketChannel 用来与客户端之间通信
            SocketChannel socketChannel = ssc.accept(); //如果没有连接,返回null
            if (socketChannel != null) {
                // 设置通道为非阻塞状态
                socketChannel.configureBlocking(false);
                log.debug("connected!");
                channels.add(socketChannel);
            }
            for (SocketChannel channel : channels) {
                // 5.开辟一个缓存区
                ByteBuffer buffer = ByteBuffer.allocate(16);
                int read = channel.read(buffer);// 如果没有读到字节返回0
                if (read > 0) {
                    // 6.切换为读模式
                    buffer.flip();
                    ByteBufferUtil.debugRead(buffer);
                    // 7.切换为写模式
                    buffer.clear();
                    log.debug("after read...{}", channel);
                }
            }
        }
    }
}

缺点

即使没有请求连接时,程序也在不断循环,造成了资源浪费!

4.2 Selector

好处

  • 一个线程配合 selector 就可以监控多个 channel 的事件,事件发生线程才去处理。避免非阻塞模式下所做无用功
  • 让这个线程能够被充分利用
  • 节约了线程的数量
  • 减少了线程上下文切换

创建

Selector selector = Selector.open();

绑定Channel事件

也称之为注册事件,绑定的事件 selector 才会关心

ssc.configureBlocking(false);
SelectionKey sscKey = ssc.register(selector, 0, null);
//绑定事件类型
sscKey.interestOps(SelectionKey.OP_ACCEPT);
  • channel必须工作在非阻塞模式
  • FileChannel没有非阻塞模式,因此不能配合selector使用
  • 绑定的事件类型有:
    • connect:客户端连接成功时触发
    • accept:服务端成功接受连接时出发
    • read:数据可读入时触发,有因为接受能力弱,数据暂时不能读入的情况
    • write:数据可写出时触发,有因为发送能力弱,数据在那时不能写出的情况

监听 Channel 事件

可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少 channel 发生了事件

方法1,阻塞直到绑定事件发生

int count = selector.select();

方法2,阻塞直到绑定事件发生,或是超时(时间单位为 ms)

int count = selector.select(long timeout);

方法3,不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件

int count = selector.selectNow();

4.3 处理accept事件

客户端代码依旧不变,服务端代码如下

@Slf4j
public class Server3 {
    public static void main(String[] args) throws IOException {
        //1.创建selector
        Selector selector = Selector.open();
        //2.创建用于监听服务连接的channel
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(8080));
        ssc.configureBlocking(false);
        //3.将channel注册到selector中
        SelectionKey sscKey = ssc.register(selector, 0, null);
        log.debug("register key : {}",sscKey);
        //4.设置channel的监听事件类型 OP_ACCEPT 代表连接事件
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        while (true) {
            // select() 没有事件发生,线程阻塞;有事件发生,线程恢复运行
            selector.select();
            // 处理事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                log.debug("key :{}",key);
                //5.拿到channel对象
                ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                SocketChannel sc = channel.accept();
                log.debug("{}",sc);
            }
        }
    }
}

拿到事件后,能否不处理?

事件发生后,要么处理,要么取消(调用key.cancel()),不能什么都不做,否则下次该事件仍会触发

4.4 处理read事件

public class Server3 {
    public static void main(String[] args) throws IOException {
        //1.创建selector
        Selector selector = Selector.open();
        //2.创建用于监听服务连接的channel
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(8080));
        ssc.configureBlocking(false);
        //3.将channel注册到selector中
        SelectionKey sscKey = ssc.register(selector, 0, null);
        log.debug("register key : {}",sscKey);
        //4.设置channel的监听事件类型 OP_ACCEPT 代表连接事件
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        while (true) {
            // select() 没有事件发生,线程阻塞;有事件发生,线程恢复运行
            selector.select();
            // 处理事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                //判断事件类型
                if (key.isAcceptable()) {
                    //如果是接收类型
                    log.debug("key :{}",key);
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false);
                    log.debug("sc {}",sc);
                    //将sc注册到selector中,监听读入事件
                    SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);
                }else if (key.isReadable()) {
                    //如果是读入类型
                    SocketChannel channel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(16);
                    channel.read(buffer);
                    buffer.flip();
                    ByteBufferUtil.debugRead(buffer);
                    buffer.clear();
                }
                //执行完毕后,删除
                iterator.remove();
            }
        }
    }
}

为什么要删除Key

因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如

  • 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey
  • 第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常

处理服务端异常

客户端异常关闭

当我们直接停止客户端服务时,服务端也会报错,这是因为服务端断开时会发送一个消息,而客户端读不到该消息

image-20230511170711711

改写读入类型处理,捕捉异常,key.cancel()可以将key反注册掉

//如果是读入类型
try {
    SocketChannel channel = (SocketChannel) key.channel();
    ByteBuffer buffer = ByteBuffer.allocate(16);
    channel.read(buffer);
    buffer.flip();
    ByteBufferUtil.debugRead(buffer);
    buffer.clear();
} catch (IOException e) {
    e.printStackTrace();
    key.cancel();
}

客户端主动断开

调用客户端断开方法,发现服务端拼命的读空的内容

image-20230511171105729

处理消息边界

消息边界指的是:如果发送端连续发送数据,接收端有可能再一次接收动作中接收两个多多个数据包,也有可能因为接收ByteBuffer的长度限制在一次接收中接收到不完整的信息

目前我们定义两句不同的话通过换行符\n来隔开,客户端发送一条数据0123456789abcd3333\n,服务端接收时默认长度为16,测试下来发现一句话被分割成了两个

image-20230512084430529

如何解决?

  1. 采用全局ByteBuffer,这里说的全局ByteBuffer指的是在同一个SelectionKey
  2. 当ByteBuffer大小不足时,自动扩容成2倍
@Slf4j
public class Server3 {
    public static void main(String[] args) throws IOException {
        //1.创建selector
        Selector selector = Selector.open();
        //2.创建用于监听服务连接的channel
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(8080));
        ssc.configureBlocking(false);
        //3.将channel注册到selector中
        SelectionKey sscKey = ssc.register(selector, 0, null);
        log.debug("register key : {}",sscKey);
        //4.设置channel的监听事件类型 OP_ACCEPT 代表连接事件
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        while (true) {
            // select() 没有事件发生,线程阻塞;有事件发生,线程恢复运行
            selector.select();
            // 处理事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                //判断事件类型
                if (key.isAcceptable()) {
                    //如果是接收类型
                    log.debug("key :{}",key);
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false);
                    log.debug("sc {}",sc);
                    //将sc注册到selector中,监听读入事件
                    ByteBuffer buffer = ByteBuffer.allocate(16); //将buffer绑定到SelectionKey
                    SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ,buffer);
                }else if (key.isReadable()) {
                    //如果是读入类型
                    try {
                        SocketChannel channel = (SocketChannel) key.channel();
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        int read = channel.read(buffer); //当服务端断开连接时,返回值为-1
                        if (read == -1) {
                            key.cancel();
                        }else {
                            //将buffer根据指定字符分割
                            split(buffer);
                            if (buffer.position() == buffer.limit()) {//代表没读到一个完整信息
                                //创建一个扩容的bytebuffer,复制原来buffer中的信息,并新的buffer存到key中
                                ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
                                buffer.flip();
                                newBuffer.put(buffer);
                                key.attach(newBuffer);
                            }
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        key.cancel();
                    }
                }
                iterator.remove();
            }
        }
    }

    private static void split(ByteBuffer buffer) {
        //将buffer转为读模式
        buffer.flip();
        int limit = buffer.limit();
        for (int i = 0; i < limit; i++) {
            if (buffer.get(i) == '\n') {
                int length = i + 1 - buffer.position();
                //用一个ByteBuffer存起来
                ByteBuffer temp = ByteBuffer.allocate(length);
                buffer.limit(i + 1);
                //从buffer读,写到temp
                temp.put(buffer);
                ByteBufferUtil.debugAll(temp);
                buffer.limit(limit);
            }
        }
        buffer.compact();
    }
}

修改完后,发现即使超过了指定长度,也可以读到一条完整信息

image-20230512085456097

ByteBuffer 大小分配

  • 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer
  • ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer
    • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现 http://tutorials.jenkov.com/java-performance/resizable-array.html
    • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

4.5 处理write事件

一次无法写完

当服务端发送大量消息时,有时候因为消息过长,导致一次无法写完,服务端一次能发送多少消息,是由系统的缓存区决定的,当缓存区满时,就无法写入消息

服务端

public class WriteServer {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8080));
        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    sc.register(selector,0,null);

                    // 模拟发送大量数据
                    StringBuilder sb = new StringBuilder();
                    for (int i = 0; i < 30000000; i++) {
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    while (buffer.hasRemaining()) {
                        //如果buffer中有数据没发完
                        int write = sc.write(buffer);
                        System.out.println(write);
                    }
                }
            }
        }
    }
}

客户端

public class WriteClient {
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        SocketChannel sc = SocketChannel.open();
        sc.configureBlocking(false);
        sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
        sc.connect(new InetSocketAddress("localhost", 8080));
        int count = 0;
        while (true) {
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isConnectable()) {
                    System.out.println(sc.finishConnect());
                } else if (key.isReadable()) {
                    ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
                    count += sc.read(buffer);
                    buffer.clear();
                    System.out.println(count);
                }
            }
        }
    }
}

测试

启动代码,发现服务端发一部分停一下子,这是由于缓存区满了,但是网络原因还没有及时的将缓存区的数据发送出去,所以导致无法像缓存区写入,虽然结果是客户端收到了完整的消息,但这种情况属于阻塞,会导致其他的事件无法正常运行

image-20230512100947693

解决方案

如果有数据没有写完,不采取循环的方式,而是将他存到write事件中

修改服务端

public class WriteServer {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8080));
        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    SelectionKey scKey = sc.register(selector, 0, null);
                    // 模拟发送大量数据
                    StringBuilder sb = new StringBuilder();
                    for (int i = 0; i < 30000000; i++) {
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    //先发送一次
                    int write = sc.write(buffer);
                    System.out.println(write);
                    //如果buffer中还有数据没发完
                    if (buffer.hasRemaining()) {
                        //在原有监听事件的基础上加上写事件
                        scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
                        //buffer绑定到key中
                        scKey.attach(buffer);
                    }
                }else if (key.isWritable()) {
                    SocketChannel sc = (SocketChannel) key.channel();
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    int write = sc.write(buffer);
                    System.out.println(write);
                    if (!buffer.hasRemaining()) { // 写完了
                        key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                        key.attach(null);
                    }
                }
            }
        }
    }
}

write 为何要取消

只要向 channel 发送数据时,socket 缓冲可写,这个事件会频繁触发,因此应当只在 socket 缓冲区写不下时再关注可写事件,数据写完之后再取消关注

4.6 多线程优化

现代计算机都是多核 cpu,设计时要充分考虑别让 cpu 的力量被白白浪费

前面的代码只有一个选择器,没有充分利用多核 cpu,如何优化呢?

如下图所示,分两组选择器

  • 单线程配一个选择器,专门处理 accept 事件
  • 创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理 read 事件

image-20230515152305670

代码实现

目前代码并不是完整的,还有许多漏洞需要一步步处理

客户端

public class ThreadClient {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost",8080));
        sc.write(Charset.defaultCharset().encode("0123456789abcdef"));
        System.in.read();
    }
}

服务端

@Slf4j
public class ThreadServer {
    public static void main(String[] args) throws IOException {
        //负责处理连接事项的boss线程
        Thread.currentThread().setName("boss");
        Selector boss = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(8080));
        ssc.configureBlocking(false);
        ssc.register(boss,SelectionKey.OP_ACCEPT);
        Worker worker = new Worker("worker-0");
        worker.init();
        while (true) {
            boss.select();
            Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey sscKey = iter.next();
                iter.remove();
                if (sscKey.isAcceptable()) {
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    log.debug("connected...{}",sc.getRemoteAddress());
                    //将sc注册到worker中的selector中
                    log.debug("before register...{}",sc.getRemoteAddress());
                    sc.register(worker.selector,SelectionKey.OP_READ);
                    log.debug("after register...{}",sc.getRemoteAddress());
                }
            }
        }
    }

    //负责处理读写事项的线程
    static class Worker implements Runnable{
        private Thread thread; // 每一个worker独享一个线程
        private Selector selector; // 每一个worker拥有一个selector
        private String name; // 线程名称
        private volatile boolean start = false; // worker是否第一次运行

        public Worker(String name) {
            this.name = name;
        }

        //初始化worker,start可以保证一个worker中使用同一个thread以及selector
        public void init() throws IOException {
            if (!start) {
                thread = new Thread(this, name);
                selector = Selector.open();
                thread.start();
                start = true;
            }
        }

        //演示只处理简单的读入请求
        @Override
        public void run() {
            while (true) {
                try {
                    selector.select(); //阻塞
                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey scKey = iter.next();
                        iter.remove();
                        if (scKey.isReadable()) {
                            SocketChannel channel = (SocketChannel) scKey.channel();
                            ByteBuffer buffer = ByteBuffer.allocate(16);
                            channel.read(buffer);
                            buffer.flip();
                            ByteBufferUtil.debugRead(buffer);
                        }
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }
}

无法读入数据

运行上述代码后,发现服务端并没有输出客户端发送的消息,也就是说这一步注册并没有注册成功

image-20230515155638791

这是因为worker-0中的selector.select()是个阻塞方法,现在的运行顺序如下,worker-0中的selector以及被阻塞,所以无法将sc注册到该selector

运行顺序
boss 中 sc.register
worker-0 中 selector.select

解决方案

使用selector.wakeup()让释放堵塞,让注册在worker-0线程中运行

@Slf4j
public class ThreadServer {
    public static void main(String[] args) throws IOException {
        //负责处理连接事项的boss线程
        Thread.currentThread().setName("boss");
        Selector boss = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(8080));
        ssc.configureBlocking(false);
        ssc.register(boss,SelectionKey.OP_ACCEPT);
        Worker worker = new Worker("worker-0");
        while (true) {
            boss.select();
            Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey sscKey = iter.next();
                iter.remove();
                if (sscKey.isAcceptable()) {
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    log.debug("connected...{}",sc.getRemoteAddress());
                    //将sc注册到worker中的selector中
                    log.debug("before register...{}",sc.getRemoteAddress());
                    worker.init(sc);  //****修改****
                }
            }
        }
    }

    //负责处理读写事项的线程
    static class Worker implements Runnable{
        private Thread thread; // 每一个worker独享一个线程
        private Selector selector; // 每一个worker拥有一个selector
        private String name; // 线程名称
        private volatile boolean start = false; // worker是否第一次运行

        //****修改****
        private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

        public Worker(String name) {
            this.name = name;
        }

        //初始化worker,start可以保证一个worker中使用同一个thread以及selector
        public void init(SocketChannel sc) throws IOException {
            if (!start) {
                thread = new Thread(this, name);
                selector = Selector.open();
                thread.start();
                start = true;
            }
            //****修改****
            queue.add(() -> {
                try {
                    sc.register(selector,SelectionKey.OP_READ);
                    log.debug("after register...{}",sc.getRemoteAddress());
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
            //释放阻塞
            selector.wakeup();
        }

        //演示只处理简单的读入请求
        @Override
        public void run() {
            while (true) {
                try {
                    selector.select(); //阻塞
                    //****修改****
                    Runnable task = queue.poll();
                    if (task != null) {
                        task.run();
                    }
                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey scKey = iter.next();
                        iter.remove();
                        if (scKey.isReadable()) {
                            SocketChannel channel = (SocketChannel) scKey.channel();
                            ByteBuffer buffer = ByteBuffer.allocate(16);
                            channel.read(buffer);
                            buffer.flip();
                            ByteBufferUtil.debugRead(buffer);
                        }
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }
}

多worker实现

上面实现了单worker处理读事项,还没有达到多线程的目的,现在修改创建多个worker,利用ribbon中负载均衡的思想,让多线程轮询处理事项

修改boss线程中的代码

public static void main(String[] args) throws IOException {
    //负责处理连接事项的boss线程
    Thread.currentThread().setName("boss");
    Selector boss = Selector.open();
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.bind(new InetSocketAddress(8080));
    ssc.configureBlocking(false);
    ssc.register(boss,SelectionKey.OP_ACCEPT);
    Worker[] workers = new Worker[2];
    for (int i = 0; i < workers.length; i++) {
        workers[i] = new Worker("worker-" + i);
    }
    //负载均衡计数器
    AtomicInteger atomicInteger = new AtomicInteger();
    while (true) {
        boss.select();
        Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
        while (iter.hasNext()) {
            SelectionKey sscKey = iter.next();
            iter.remove();
            if (sscKey.isAcceptable()) {
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                log.debug("connected...{}",sc.getRemoteAddress());
                //将sc注册到worker中的selector中
                log.debug("before register...{}",sc.getRemoteAddress());
                workers[atomicInteger.getAndIncrement() % workers.length].init(sc);
            }
        }
    }
}

测试

可以看到,事项以轮询的方式交给多个worker处理

image-20230515162036299

如何拿到 cpu 个数

一般情况下,这个线程数我们回设置的与物理cpu个数相同,可以调用Runtime.getRuntime().availableProcessors() 获取cpu个数

  • Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数
  • 这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/531536.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【YOLO系列】YOLO v3(网络结构图+代码)

文章目录 网络结构YOLO v3YOLOv3-SPP 多尺度预测损失函数参考 最近在研究YOLO系列&#xff0c;打算写一系列的YOLO博文。在YOLO的发展史中&#xff0c;v1到v3算法思想逐渐完备&#xff0c;后续的系列也都以v3为基石&#xff0c;在v3的基础上进行改进&#xff0c;所以很有必要单…

KD600A变频抗干扰精密介质损耗测量仪

一、产品概述 KD600A变压器介质损耗测试仪是发电厂、变电站等现场自动测量各种高压电力设备介损正切值及电容量的高精度仪器。由于采用了变频技术能保证在强电场干扰下准确测量。仪器采用中文菜单操作&#xff0c;微机自动完成测量。 该仪器同样适用于车间、试验室、科研单位测…

映射及有关概念

映射的概念:有两个集合A,B&#xff0c;若A的任何元素都有唯一的B中元素与之对应&#xff0c;B中元素与之对应的称为像&#xff0c;A中对应的元素称为原像 一个集合也有像&#xff0c;定义为各自像的集合 B中集合也有原像&#xff0c;定义为各自原像的集合 虽然采用了f-1的符号&…

端口隔离、MAC地址表项、MAC地址漂移防止与检测

目录 前言 端口隔离 MAC地址表项 端口安全 MAC地址漂移检测 前言 目前网络中以太网技术的应用非常广泛。然而&#xff0c;各种网络攻击的存在&#xff08;例如针对ARP、DHCP等协议的攻击&#xff09;&#xff0c;不仅造成了网络合法用户无法正常访问网络资源&#xff0c;…

【案例教程】山洪径流过程模拟及洪水危险性评价技术

GIS水文分析&#xff08;ArcHydro、Spatial Anlysist等模块&#xff09;是流域水文模拟建模的重要工具&#xff0c;能够自动提取及计算流域边界、河网水系、流向、汇流时间和其它流域特征参数。美国陆军工程兵团开发的开源、免费Hec-RAS软件具有强大的空间数据分析与整合功能、…

每日学术速递5.13

CV - 计算机视觉 | ML - 机器学习 | RL - 强化学习 | NLP 自然语言处理 Subjects: cs.CV 1.VideoChat: Chat-Centric Video Understanding 标题&#xff1a;VideoChat&#xff1a;以聊天为中心的视频理解 作者&#xff1a;KunChang Li, Yinan He, Yi Wang, Yizhuo Li, Wen…

计算机网络基础知识(五)——什么是TCPUDP协议?图文并茂的方式对两大传输层协议进行从头到尾的讲解

文章目录 01 | &#x1f4d5; 什么是 T C P &#xff1f; \color{red}{什么是TCP&#xff1f;} 什么是TCP&#xff1f;&#x1f4d5;特点三次握手 && 四次挥手超时重传滑动窗口 02 | &#x1f4d9; 什么是 U D P &#xff1f; \color{orange}{什么是UDP&#xff1f;} 什…

1. 链表

b站懒猫数据结构课程笔记&#xff1a;https://www.bilibili.com/read/cv8013121?spm_id_from333.999.0.0 一、链表的概念 单链表&#xff1a;线性表的链接存储结构 单链表存储特点&#xff1a; 逻辑次序和物理次序不一定相同 元素之间的逻辑关系用指针表示 举例&#xff1a…

PASCAL VOC数据集

一、前言 之前寒假好像就学了&#xff0c;但是没有记笔记&#xff0c;现在看来还是得记笔记&#xff0c;都忘得差不多了啊。 二、数据集的介绍 2.1数据集背景 分类类别 2.2数据集文件结构&#xff1a; 2.3文件夹 2.3.1Annotations文件夹 对于标注文件Annotations&#xff1a;里…

基于SpringBoot框架的程序开发步骤

SpringBoot简介 1. 入门案例问题导入1.1 入门案例开发步骤1.2 基于SpringBoot官网创建项目1.3 SpringBoot项目快速启动 2. SpringBoot概述问题导入2.1 起步依赖2.2 辅助功能 1. 入门案例 问题导入 SpringMVC的HelloWord程序怎么写&#xff1f; SpringBoot是由Pivotal团队提供…

死锁、生产者和消费者问题

目录 生产者和消费者问题 死锁的概念 内存的基础知识 内存管理的概念 覆盖与交换 介绍一下PCB 连续分配管理方式​编辑 生产者和消费者问题 死锁的概念 什么是死锁 进程死锁、饥饿、死循环的区别 死锁产生的必要条件 什么时候会发生死锁 死锁的处理策略 内存的基础知识 内存…

微三云润秋带你解析商城分销系统

管理大师德鲁克曾说过&#xff1a;当今企业间的竞争&#xff0c;不是产品之间的竞争&#xff0c;而是商业模式之间的竞争。创业不只是项目选择重要&#xff0c;好的商业模式同样重要&#xff0c;如果没有好的商业模式&#xff0c;企业将会被淘汰。 今天我们要聊的这个商城就有点…

怎样设置CRM目标?有什么作用?

实施CRM系统可以帮助企业提高客户保留率&#xff0c;增加收入&#xff0c;并推动业绩增长。然而&#xff0c;在实施CRM系统之前&#xff0c;必须设定明确的目标&#xff0c;与企业的整体战略保持一致。在这篇文章中&#xff0c;我们来讨论实施CRM目标是什么&#xff0c;如何设定…

基于AD9172/AD9176的4 通道12.6GSPS 采样率16 位DA 播放FMC JESD204B 接口子卡模块

板卡概述 FMC_XM131 是一款4 通道12.6GSPS 采样率16 位DA 播放FMC子卡模块&#xff0c;该板卡为FMC标准&#xff0c;符合VITA57.4 规范&#xff0c;可以作为一个理想的IO 模块耦合至FPGA 前端&#xff0c;16 通道的JESD204B 接口通过FMC连接器连接至FPGA 的高速串行端…

【Python TurboGears】零基础也能轻松掌握的学习路线与参考资料

Python TurboGears是一款开源的web框架&#xff0c;它篮了多种Python库和工具&#xff0c;可以更容易地开发和维护web应用程序。TurboGears具有优秀的文档和活跃的社区支持&#xff0c;是学习web开发的理想选择之一。以下是Python TurboGears学习路线&#xff0c;参考资料和优秀…

一句话简短解析 jsjiami.v6

jsjiami.v6 是一种广泛使用的 JavaScript 代码混淆工具&#xff0c;它提供了多种代码混淆技术&#xff0c;包括变量名重命名、函数名重构、字符串替换、代码结构混淆等&#xff0c;可以将代码转换为难以理解和阅读的形式。在本文中&#xff0c;我们将对 jsjiami.v6 进行分析&am…

【FMC137】基于 VITA57.4 标准的4 路2GSPS/2.6GSPS/3GSPS 14 位AD 采集子卡模块--AD9208得多通道中文资料

板卡概述 FMC137 是一款基于VITA57.4 标准规范的JESD204B 接口FMC 子卡模块&#xff0c; 该模块可以实现4 路14-bit 、2GSPS/2.6GSPS/3GSPSADC 采集功能。该板卡ADC 器件采用ADI公司的AD9208 芯片&#xff0c;&#xff0c;与ADI 公司的AD9689 可以实现PIN 脚兼容。该ADC 与FPGA…

Agisoft Metashape 基于影像的外部点云着色

Agisoft Metashape 基于影像的外部点云着色 提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 例如:第一章 Python 机器学习入门之pandas的使用 提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 Agisoft Metashape 基于影像的外…

JavaScript全解析——this指向

本系列内容为JS全解析&#xff0c;为千锋教育资深前端老师独家创作 致力于为大家讲解清晰JavaScript相关知识点&#xff0c;含有丰富的代码案例及讲解。如果感觉对大家有帮助的话&#xff0c;可以【点个关注】持续追更~ this指向&#xff08;掌握&#xff09; this 是一个关…

Python系列之判断和循环

感谢点赞和关注 &#xff0c;每天进步一点点&#xff01;加油&#xff01; 目录 一、判断语句 1.1 Shell里的判断语句格式 1.2 Python里的判断语句格式 二、循环语句 2.1 Python while循环 2.1.1 while 循环的基本格式 2.1.2 while 循环使用else语句 2.2 Python for 循…