Linux-零拷贝及Java实现

news2024/11/25 3:06:36

RabbitMQ比RocketMQ、Kafka较慢点一点重要原因就是 零拷贝

什么是零拷贝?

零拷贝指的是在进行IO的时候减少或避免让CPU拷贝数据(数据在IO缓冲区中进行拷贝)
零拷贝的优点:

  • 减少甚至完全避免不必要的CPU拷贝,从而让CPU解脱出来去执行其他的任务
  • 减少内存带宽的占用
  • 通常零拷贝技术还能够减少用户空间和操作系统内核空间之间的上下文切换

零拷贝涉及用户空间、内核空间,用户态以及内核态。操作系统为了保证系统运行的安全与稳定,内存中专门有一块区域用于运行操作系统程序,此内存区域就称为内核空间。
file
内核空间不仅有操作系统内核代码,还有硬件驱动,可以直接操作硬件。内存中除了内核空间,剩下来就是用户空间,这里运行着应用层程序。应用程序通常通过 系统调用 来执行内核空间中的代码。对应,当系统在执行用户空间的代码时,成为用户态。当系统执行内核代码时,称为内核态。用户态切换到内核态,一般通过:系统调用、软中断、硬中断。

传统的I/O不仅需要用户程序在用户态和内核态多次切换,而且还需要CPU将I/O数据在用户空间和内核空间频繁拷贝,这种方式效率很低,而且占用率CPU的工作时间,下图是传统I/O的流程图,流程:
file

  1. 当用户程序使用read系统调用时,系统将会从用户态切换到内核态,内核程序接到请求后会从磁盘中读取数据,此时会用DMA(Direct Memory Access)方式将将数据读取到内核缓冲区,DMA不需要CPU的参与,这里可以节省CPU资源,提高读取效率,目前主流的存储器都支持DMA
  2. CPU将内核缓冲区的数据写入到用户缓冲区中,完成之后系统从内核态切换为用户态;
  3. 用户程序在对数据进行加工之后,使用write系统调用,系统会从用户态切换到内核态,由CPU将用户缓冲区数据写入socket缓冲区;
  4. 系统从内核态切换回用户态,由DMA将socket缓冲区数据发送到相应的硬件驱动中。

可以看到系统在用户态与内核态之间切换了4次、2次DMA数据拷贝、2次CPU数据拷贝。然而实际上,如果仅仅是数据传输,那么根本不需要经过这么多次拷贝。

零拷贝I/O的三种实现方式

1.内存映射 mmap(Meomory mapped)

mmap方式将内核缓冲区的内存地址与用户缓冲区的内存地址进行映射,这样系统处于用户台就可以直接操作内核缓冲区的数据。这种方式减少了内核态和用户态的来回切换次数以及CPU的拷贝次数。
file

  1. 用户程序使用mmap系统调用,系统从用户态切换到内核态,然后通过DMA将磁盘数据读取到内核缓冲区中,完成之后通过内存映射技术将内核缓冲区的地址映射到用户缓冲区的地址,这样用户程序对用户缓冲区的操作就可以直接作用到内核缓冲区中,这就可以避免将内核缓冲区数据拷贝到用户缓冲区;
  2. 系统从内核态切换到用户态,对数据进行加工,完成之后,调用write系统调用;
  3. 系统从用户态切换到内核态,CPU将内核缓冲区数据直接拷贝到socket缓冲区;
  4. 系统从内核态切换到用户态,然后由DMA将socket缓冲区数据发送到硬件上。

可以看到,mmap方式CPU拷贝数据的行为只发生了一次,但是内核态与用户态之间的切换仍然是4次。例如使用Java的InputStream与DataOutputStream进行文件的传输,就是采用内存映射的方式,实现零拷贝。

Socket socket = new Socket("localhost",8081);
InputStream in = new FileInputStream("test.txt");
DataOutputStream dataOut = new DataOutputStream(socket.getOutputStream());
byte[] bytes = new byte[1024];
int b;
while((b = in .read(bytes)) != -1)
{
    String s = new String(bytes, "utf-8");
    dataOutputStream.write(buffer)
}
dataOutputStream.close();

2.sendfile

Linux在2.1版本中引入了sendfile系统调用,它可以避免将内核空间中的数据拷贝到用户空间中。Java中的 FileChannel.transferTo()或者transferFrom(),就使用了sendfile。
file
使用sendfile进行网络数据传输流程:

  1. 发起sendfile系统调用,切换内核态,将文件中的数据通过DMA拷贝到内核缓存中。
  2. 继续将内核缓存中的数据通过CPU拷贝到Socket的内核的缓冲区中。
  3. 继续将Socket内核缓冲区中的数据 通过DMA拷贝到网卡,由网卡进行网络传输。
    可以看出来,CPU拷贝1次、2次DMA拷贝、内核态与用户态之间的切换只进行了2次。相比传统I/O、mmap,都有性能提升。

Linux2.4+ 的内核对sendfile系统调用进行了修改,将DMA拷贝引入了gather操作。它将内核空间的读缓冲区中对应的数据描述信息(内存地址、地址偏移量) 记录到相应的网路缓冲区(socket buffer)中,由DMA根据内存地址、地址偏移量将数据批量地从读缓冲区拷贝到网卡设备中,这样就省去了内核空间中仅剩到1次CPU拷贝操作。
file
在硬件的支持下,sendfile拷贝方式不再从内核缓冲区的数据拷贝到socket缓冲区,取而代之的仅仅是缓冲区文件描述符和数据长度的拷贝,这样DMA引擎直接利用gather操作将页缓存中数据打包发送到网络中即可。
总的说,sendfile + DMA gather = 0次CPU拷贝、2次DMA拷贝、2次内核态与用户态切换。sendfile两种方式都存在用户程序不能对数据进行修改,而且需要硬件支持。只适用于数据从文件拷贝到socket套接字上。

3.splice 方式

sendfile 只适用于将数据从文件拷贝到 socket 套接字,同时需要硬件的支持,这也限定了它的使用范围。Linux 在 2.6版本引入 splice 系统调用,不需要硬件支持,实现了两个已经打开的文件描述符之间的数据零拷贝()。所以sendfile机制的实现已经没有了,目前其API及相应的功能是利用了splice机制来实现的了。

splice(fd_in, off_in, fd_out, off_out, len, flags);

splice 系统调用可以在内核空间的读缓冲区(read buffer)和网络缓冲区(socket buffer)之间建立管道,从而避免了两者之间的CPU拷贝。
其与sendfile不同是:

  1. splice允许从一个文件描述符读取数据并将其直接写到另一个文件描述符(文件与文件、文件与socket)。而sendfile是从文件与socket之间的数据传输。
    2.sendfile需要硬件支持,splice不需要硬件支持。
    3.splice调用利用了Linux提出的管道缓冲区机制, 所以至少一个描述符要为管道。

零拷贝在Java中的实现

下文主要整理自:https://zhuanlan.zhihu.com/p/83398714

在 Java NIO 中的通道(Channel)就相当于操作系统的内核空间(kernel space)的缓冲区,而缓冲区(Buffer)对应的相当于操作系统的用户空间(user space)中的用户缓冲区(user buffer)。通道(Channel)是全双工的(双向传输),它既可能是读缓冲区(read buffer),也可能是网络缓冲区(socket buffer)。缓冲区(Buffer)分为堆内存(HeapBuffer)和堆外内存(DirectBuffer),这是通过 malloc() 分配出来的用户态内存。堆外内存(DirectBuffer)在使用后需要应用程序手动回收,而堆内存(HeapBuffer)的数据在 GC 时可能会被自动回收。因此,在使用 HeapBuffer 读写数据时,为了避免缓冲区数据因为 GC 而丢失,NIO 会先把 HeapBuffer 内部的数据拷贝到一个临时的 DirectBuffer 中的本地内存(native memory),这个拷贝涉及到 sun.misc.Unsafe.copyMemory() 的调用,背后的实现原理与 memcpy() 类似。 最后,将临时生成的 DirectBuffer 内部的数据的内存地址传给 I/O 调用函数,这样就避免了再去访问 Java 对象处理 I/O 读写。

MappedByteBuffer

MappedByteBuffer 是 NIO 基于**内存映射(mmap)**这种零拷贝方式的提供的一种实现,它继承自 ByteBuffer。FileChannel 定义了一个 map() 方法,它可以把一个文件从 position 位置开始的 size 大小的区域映射为内存映像文件。抽象方法 map() 方法在 FileChannel 中的定义如下:

public abstract MappedByteBuffer map(MapMode mode, long position, long size) throws IOException;
  • mode:限定内存映射区域(MappedByteBuffer)对内存映像文件的访问模式,包括只可读(READ_ONLY)、可读可写(READ_WRITE)和写时拷贝(PRIVATE)三种模式。
  • position:文件映射的起始地址,对应内存映射区域(MappedByteBuffer)的首地址。
  • size:文件映射的字节长度,从 position 往后的字节数,对应内存映射区域(MappedByteBuffer)的大小。

MappedByteBuffer 相比 ByteBuffer 新增了 fore()、load() 和 isLoad() 三个重要的方法:

  • fore():对于处于 READ_WRITE 模式下的缓冲区,把对缓冲区内容的修改强制刷新到本地文件。
  • load():将缓冲区的内容载入物理内存中,并返回这个缓冲区的引用。
  • isLoaded():如果缓冲区的内容在物理内存中,则返回 true,否则返回 false。

下面给出一个利用 MappedByteBuffer 对文件进行读写的使用示例:

private final static String CONTENT = "Zero copy implemented by MappedByteBuffer";
private final static String FILE_NAME = "/mmap.txt";
private final static String CHARSET = "UTF-8";
  • 写文件数据:打开文件通道 fileChannel 并提供读权限、写权限和数据清空权限,通过 fileChannel 映射到一个可写的内存缓冲区 mappedByteBuffer,将目标数据写入 mappedByteBuffer,通过 force() 方法把缓冲区更改的内容强制写入本地文件。
@Test
public void writeToFileByMappedByteBuffer() {
    Path path = Paths.get(getClass().getResource(FILE_NAME).getPath());
    byte[] bytes = CONTENT.getBytes(Charset.forName(CHARSET));
    try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ,
            StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) {
        MappedByteBuffer mappedByteBuffer = fileChannel.map(READ_WRITE, 0, bytes.length);
        if (mappedByteBuffer != null) {
            mappedByteBuffer.put(bytes);
            mappedByteBuffer.force();
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}
  • 读文件数据:打开文件通道 fileChannel 并提供只读权限,通过 fileChannel 映射到一个只可读的内存缓冲区 mappedByteBuffer,读取 mappedByteBuffer 中的字节数组即可得到文件数据。
@Test
public void readFromFileByMappedByteBuffer() {
    Path path = Paths.get(getClass().getResource(FILE_NAME).getPath());
    int length = CONTENT.getBytes(Charset.forName(CHARSET)).length;
    try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ)) {
        MappedByteBuffer mappedByteBuffer = fileChannel.map(READ_ONLY, 0, length);
        if (mappedByteBuffer != null) {
            byte[] bytes = new byte[length];
            mappedByteBuffer.get(bytes);
            String content = new String(bytes, StandardCharsets.UTF_8);
            assertEquals(content, "Zero copy implemented by MappedByteBuffer");
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

下面介绍 map() 方法的底层实现原理。map() 方法是 java.nio.channels.FileChannel 的抽象方法,由子类 sun.nio.ch.FileChannelImpl.java 实现,下面是和内存映射相关的核心代码:

public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
    int pagePosition = (int)(position % allocationGranularity);
    long mapPosition = position - pagePosition;
    long mapSize = size + pagePosition;
    try {
        addr = map0(imode, mapPosition, mapSize);
    } catch (OutOfMemoryError x) {
        System.gc();
        try {
            Thread.sleep(100);
        } catch (InterruptedException y) {
            Thread.currentThread().interrupt();
        }
        try {
            addr = map0(imode, mapPosition, mapSize);
        } catch (OutOfMemoryError y) {
            throw new IOException("Map failed", y);
        }
    }

    int isize = (int)size;
    Unmapper um = new Unmapper(addr, mapSize, isize, mfd);
    if ((!writable) || (imode == MAP_RO)) {
        return Util.newMappedByteBufferR(isize, addr + pagePosition, mfd, um);
    } else {
        return Util.newMappedByteBuffer(isize, addr + pagePosition, mfd, um);
    }
}

map() 方法通过本地方法 map0() 为文件分配一块虚拟内存,作为它的内存映射区域,然后返回这块内存映射区域的起始地址。

  • 文件映射需要在 Java 堆中创建一个 MappedByteBuffer 的实例。如果第一次文件映射导致 OOM,则手动触发垃圾回收,休眠 100ms 后再尝试映射,如果失败则抛出异常。
  • 通过 Util 的 newMappedByteBuffer (可读可写)方法或者 newMappedByteBufferR(仅读) 方法方法反射创建一个 DirectByteBuffer 实例,其中 DirectByteBuffer 是 MappedByteBuffer 的子类。

map() 方法返回的是内存映射区域的起始地址,通过(起始地址 + 偏移量)就可以获取指定内存的数据。这样一定程度上替代了 read() 或 write() 方法,底层直接采用 sun.misc.Unsafe类的 getByte() 和 putByte() 方法对数据进行读写。

private native long map0(int prot, long position, long mapSize) throws IOException;

上面是本地方法(native method)map0 的定义,它通过 JNI(Java Native Interface)调用底层 C 的实现,这个 native 函数(Java_sun_nio_ch_FileChannelImpl_map0)的实现位于 JDK 源码包下的 native/sun/nio/ch/FileChannelImpl.c这个源文件里面。

JNIEXPORT jlong JNICALL
Java_sun_nio_ch_FileChannelImpl_map0(JNIEnv *env, jobject this,
                                     jint prot, jlong off, jlong len)
{
    void *mapAddress = 0;
    jobject fdo = (*env)->GetObjectField(env, this, chan_fd);
    jint fd = fdval(env, fdo);
    int protections = 0;
    int flags = 0;

    if (prot == sun_nio_ch_FileChannelImpl_MAP_RO) {
        protections = PROT_READ;
        flags = MAP_SHARED;
    } else if (prot == sun_nio_ch_FileChannelImpl_MAP_RW) {
        protections = PROT_WRITE | PROT_READ;
        flags = MAP_SHARED;
    } else if (prot == sun_nio_ch_FileChannelImpl_MAP_PV) {
        protections =  PROT_WRITE | PROT_READ;
        flags = MAP_PRIVATE;
    }

    mapAddress = mmap64(
        0,                    /* Let OS decide location */
        len,                  /* Number of bytes to map */
        protections,          /* File permissions */
        flags,                /* Changes are shared */
        fd,                   /* File descriptor of mapped file */
        off);                 /* Offset into file */

    if (mapAddress == MAP_FAILED) {
        if (errno == ENOMEM) {
            JNU_ThrowOutOfMemoryError(env, "Map failed");
            return IOS_THROWN;
        }
        return handle(env, -1, "Map failed");
    }

    return ((jlong) (unsigned long) mapAddress);
}

可以看出 map0() 函数最终是通过 mmap64() 这个函数对 Linux 底层内核发出内存映射的调用, mmap64() 函数的原型如下:

#include <sys/mman.h>
void *mmap64(void *addr, size_t len, int prot, int flags, int fd, off64_t offset);

下面详细介绍一下 mmap64() 函数各个参数的含义以及参数可选值:

  • addr:文件在用户进程空间的内存映射区中的起始地址,是一个建议的参数,通常可设置为 0 或 NULL,此时由内核去决定真实的起始地址。当 + flags 为 MAP_FIXED 时,addr 就是一个必选的参数,即需要提供一个存在的地址。
  • len:文件需要进行内存映射的字节长度
  • prot:控制用户进程对内存映射区的访问权限
    PROT_READ:读权限
    PROT_WRITE:写权限
    PROT_EXEC:执行权限
    PROT_NONE:无权限
  • flags:控制内存映射区的修改是否被多个进程共享
    MAP_PRIVATE:对内存映射区数据的修改不会反映到真正的文件,数据修改发生时采用写时复制机制
    MAP_SHARED:对内存映射区的修改会同步到真正的文件,修改对共享此内存映射区的进程是可见的
    MAP_FIXED:不建议使用,这种模式下 addr 参数指定的必须的提供一个存在的 addr 参数
  • fd:文件描述符。每次 map 操作会导致文件的引用计数加 1,每次 unmap 操作或者结束进程会导致引用计数减 1
  • offset:文件偏移量。进行映射的文件位置,从文件起始地址向后的位移量。

下面总结一下 MappedByteBuffer 的特点和不足之处:

  • MappedByteBuffer 使用是堆外的虚拟内存,因此分配(map)的内存大小不受 JVM 的 -Xmx 参数限制,但是也是有大小限制的。 如果当文件超出 Integer.MAX_VALUE 字节限制时,可以通过 position 参数重新 map 文件后面的内容。
  • MappedByteBuffer 在处理大文件时性能的确很高,但也存内存占用、文件关闭不确定等问题,被其打开的文件只有在垃圾回收的才会被关闭,而且这个时间点是不确定的。
  • MappedByteBuffer 提供了文件映射内存的 mmap() 方法,也提供了释放映射内存的 unmap() 方法。然而 unmap() 是 FileChannelImpl 中的私有方法,无法直接显示调用。因此,用户程序需要通过 Java 反射的调用 sun.misc.Cleaner 类的 clean() 方法手动释放映射占用的内存区域。

DirectByteBuffer

DirectByteBuffer 的对象引用位于 Java 内存模型的堆里面,JVM 可以对 DirectByteBuffer 的对象进行内存分配和回收管理,一般使用 DirectByteBuffer 的静态方法 allocateDirect() 创建 DirectByteBuffer 实例并分配内存。
DirectByteBuffer 内部的字节缓冲区位在于堆外的(用户态)直接内存,它是通过 Unsafe 的本地方法 allocateMemory() 进行内存分配,底层调用的是操作系统的 malloc() 函数。

DirectByteBuffer(int cap) {
    super(-1, 0, cap, cap);
    boolean pa = VM.isDirectMemoryPageAligned();
    int ps = Bits.pageSize();
    long size = Math.max(1L, (long)cap + (pa ? ps : 0));
    Bits.reserveMemory(size, cap);

    long base = 0;
    try {
        base = unsafe.allocateMemory(size);
    } catch (OutOfMemoryError x) {
        Bits.unreserveMemory(size, cap);
        throw x;
    }
    unsafe.setMemory(base, size, (byte) 0);
    if (pa && (base % ps != 0)) {
        address = base + ps - (base & (ps - 1));
    } else {
        address = base;
    }
    cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
    att = null;
}

除此之外,初始化 DirectByteBuffer 时还会创建一个 Deallocator 线程,并通过 Cleaner 的 freeMemory() 方法来对直接内存进行回收操作,freeMemory() 底层调用的是操作系统的 free() 函数。
由于使用 DirectByteBuffer 分配的是系统本地的内存,不在 JVM 的管控范围之内,因此直接内存的回收和堆内存的回收不同,直接内存如果使用不当,很容易造成 OutOfMemoryError。说了这么多,那么 DirectByteBuffer 和零拷贝有什么关系?前面有提到在 MappedByteBuffer 进行内存映射时,它的 map() 方法会通过 Util.newMappedByteBuffer() 来创建一个缓冲区实例,初始化的代码如下:
说了这么多,那么 DirectByteBuffer 和零拷贝有什么关系?前面有提到在 MappedByteBuffer 进行内存映射时,它的 map() 方法会通过 Util.newMappedByteBuffer() 来创建一个缓冲区实例,初始化的代码如下:

static MappedByteBuffer newMappedByteBuffer(int size, long addr, FileDescriptor fd,
                                            Runnable unmapper) {
    MappedByteBuffer dbb;
    if (directByteBufferConstructor == null)
        initDBBConstructor();
    try {
        dbb = (MappedByteBuffer)directByteBufferConstructor.newInstance(
            new Object[] { new Integer(size), new Long(addr), fd, unmapper });
    } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
        throw new InternalError(e);
    }
    return dbb;
}

private static void initDBBRConstructor() {
    AccessController.doPrivileged(new PrivilegedAction<Void>() {
        public Void run() {
            try {
                Class<?> cl = Class.forName("java.nio.DirectByteBufferR");
                Constructor<?> ctor = cl.getDeclaredConstructor(
                    new Class<?>[] { int.class, long.class, FileDescriptor.class,
                                    Runnable.class });
                ctor.setAccessible(true);
                directByteBufferRConstructor = ctor;
            } catch (ClassNotFoundException | NoSuchMethodException |
                     IllegalArgumentException | ClassCastException x) {
                throw new InternalError(x);
            }
            return null;
        }});
}

DirectByteBuffer 是 MappedByteBuffer 的具体实现类。实际上,Util.newMappedByteBuffer() 方法通过反射机制获取 DirectByteBuffer 的构造器,然后创建一个 DirectByteBuffer 的实例,对应的是一个单独用于内存映射的构造方法:

protected DirectByteBuffer(int cap, long addr, FileDescriptor fd, Runnable unmapper) {
    super(-1, 0, cap, cap, fd);
    address = addr;
    cleaner = Cleaner.create(this, unmapper);
    att = null;
}

因此,除了允许分配操作系统的直接内存以外,DirectByteBuffer 本身也具有文件内存映射的功能,这里不做过多说明。我们需要关注的是,DirectByteBuffer 在 MappedByteBuffer 的基础上提供了内存映像文件的随机读取 get() 和写入 write() 的操作。

  • 内存映像文件的随机读操作
public byte get() {
    return ((unsafe.getByte(ix(nextGetIndex()))));
}

public byte get(int i) {
    return ((unsafe.getByte(ix(checkIndex(i)))));
}
  • 内存映像文件的随机写操作
public ByteBuffer put(byte x) {
    unsafe.putByte(ix(nextPutIndex()), ((x)));
    return this;
}

public ByteBuffer put(int i, byte x) {
    unsafe.putByte(ix(checkIndex(i)), ((x)));
    return this;
}

内存映像文件的随机读写都是借助 ix() 方法实现定位的, ix() 方法通过内存映射空间的内存首地址(address)和给定偏移量 i 计算出指针地址,然后由 unsafe 类的 get() 和 put() 方法和对指针指向的数据进行读取或写入。

private long ix(int i) {
    return address + ((long)i << 0);
}

FileChannel

FileChannel 是一个用于文件读写、映射和操作的通道,同时它在并发环境下是线程安全的,基于 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 getChannel() 方法可以创建并打开一个文件通道。FileChannel 定义了 transferFrom() 和 transferTo() 两个抽象方法,它通过在通道和通道之间建立连接实现数据传输的。

  • transferTo():通过 FileChannel 把文件里面的源数据写入一个 WritableByteChannel 的目的通道。
public abstract long transferTo(long position, long count, WritableByteChannel target)
        throws IOException;
  • transferFrom():把一个源通道 ReadableByteChannel 中的数据读取到当前 FileChannel 的文件里面。
public abstract long transferFrom(ReadableByteChannel src, long position, long count)
        throws IOException;

下面给出 FileChannel 利用 transferTo() 和 transferFrom() 方法进行数据传输的使用示例:

private static final String CONTENT = "Zero copy implemented by FileChannel";
private static final String SOURCE_FILE = "/source.txt";
private static final String TARGET_FILE = "/target.txt";
private static final String CHARSET = "UTF-8";

首先在类加载根路径下创建 source.txt 和 target.txt 两个文件,对源文件 source.txt 文件写入初始化数据。

@Before
public void setup() {
    Path source = Paths.get(getClassPath(SOURCE_FILE));
    byte[] bytes = CONTENT.getBytes(Charset.forName(CHARSET));
    try (FileChannel fromChannel = FileChannel.open(source, StandardOpenOption.READ,
            StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) {
        fromChannel.write(ByteBuffer.wrap(bytes));
    } catch (IOException e) {
        e.printStackTrace();
    }
}

对于 transferTo() 方法而言,目的通道 toChannel 可以是任意的单向字节写通道 WritableByteChannel;而对于 transferFrom() 方法而言,源通道 fromChannel 可以是任意的单向字节读通道 ReadableByteChannel。其中,FileChannel、SocketChannel 和 DatagramChannel 等通道实现了 WritableByteChannel 和 ReadableByteChannel 接口,都是同时支持读写的双向通道。为了方便测试,下面给出基于 FileChannel 完成 channel-to-channel 的数据传输示例。

通过 transferTo() 将 fromChannel 中的数据拷贝到 toChannel

@Test
public void transferTo() throws Exception {
    try (FileChannel fromChannel = new RandomAccessFile(
             getClassPath(SOURCE_FILE), "rw").getChannel();
         FileChannel toChannel = new RandomAccessFile(
             getClassPath(TARGET_FILE), "rw").getChannel()) {
        long position = 0L;
        long offset = fromChannel.size();
        fromChannel.transferTo(position, offset, toChannel);
    }
}

通过 transferFrom() 将 fromChannel 中的数据拷贝到 toChannel

@Test
public void transferFrom() throws Exception {
    try (FileChannel fromChannel = new RandomAccessFile(
             getClassPath(SOURCE_FILE), "rw").getChannel();
         FileChannel toChannel = new RandomAccessFile(
             getClassPath(TARGET_FILE), "rw").getChannel()) {
        long position = 0L;
        long offset = fromChannel.size();
        toChannel.transferFrom(fromChannel, position, offset);
    }
}

下面介绍 transferTo() 和 transferFrom() 方法的底层实现原理,这两个方法也是 java.nio.channels.FileChannel 的抽象方法,由子类 sun.nio.ch.FileChannelImpl.java 实现。transferTo() 和 transferFrom() 底层都是基于 sendfile 实现数据传输的,其中 FileChannelImpl.java 定义了 3 个常量,用于标示当前操作系统的内核是否支持 sendfile 以及 sendfile 的相关特性。

private static volatile boolean transferSupported = true;
private static volatile boolean pipeSupported = true;
private static volatile boolean fileSupported = true;
  • transferSupported:用于标记当前的系统内核是否支持 sendfile() 调用,默认为 true。
  • pipeSupported:用于标记当前的系统内核是否支持文件描述符(fd)基于管道(pipe)的 sendfile() 调用,默认为 true。
  • fileSupported:用于标记当前的系统内核是否支持文件描述符(fd)基于文件(file)的 sendfile() 调用,默认为 true。

下面以 transferTo() 的源码实现为例。FileChannelImpl 首先执行 transferToDirectly() 方法,以 sendfile 的零拷贝方式尝试数据拷贝。如果系统内核不支持 sendfile,进一步执行 transferToTrustedChannel() 方法,以 mmap 的零拷贝方式进行内存映射,这种情况下目的通道必须是 FileChannelImpl 或者 SelChImpl 类型。如果以上两步都失败了,则执行 transferToArbitraryChannel() 方法,基于传统的 I/O 方式完成读写,具体步骤是初始化一个临时的 DirectBuffer,将源通道 FileChannel 的数据读取到 DirectBuffer,再写入目的通道 WritableByteChannel 里面。

public long transferTo(long position, long count, WritableByteChannel target)
        throws IOException {
    // 计算文件的大小
    long sz = size();
    // 校验起始位置
    if (position > sz)
        return 0;
    int icount = (int)Math.min(count, Integer.MAX_VALUE);
    // 校验偏移量
    if ((sz - position) < icount)
        icount = (int)(sz - position);

    long n;

    if ((n = transferToDirectly(position, icount, target)) >= 0)
        return n;

    if ((n = transferToTrustedChannel(position, icount, target)) >= 0)
        return n;

    return transferToArbitraryChannel(position, icount, target);
}

接下来重点分析一下 transferToDirectly() 方法的实现,也就是 transferTo() 通过 sendfile 实现零拷贝的精髓所在。可以看到,transferToDirectlyInternal() 方法先获取到目的通道 WritableByteChannel 的文件描述符 targetFD,获取同步锁然后执行 transferToDirectlyInternal() 方法。

private long transferToDirectly(long position, int icount, WritableByteChannel target)
        throws IOException {
    // 省略从target获取targetFD的过程
    if (nd.transferToDirectlyNeedsPositionLock()) {
        synchronized (positionLock) {
            long pos = position();
            try {
                return transferToDirectlyInternal(position, icount,
                        target, targetFD);
            } finally {
                position(pos);
            }
        }
    } else {
        return transferToDirectlyInternal(position, icount, target, targetFD);
    }
}

最终由 transferToDirectlyInternal() 调用本地方法 transferTo0() ,尝试以 sendfile 的方式进行数据传输。如果系统内核完全不支持 sendfile,比如 Windows 操作系统,则返回 UNSUPPORTED 并把 transferSupported 标识为 false。如果系统内核不支持 sendfile 的一些特性,比如说低版本的 Linux 内核不支持 DMA gather copy 操作,则返回 UNSUPPORTED_CASE 并把 pipeSupported 或者 fileSupported 标识为 false。

private long transferToDirectlyInternal(long position, int icount,
                                        WritableByteChannel target,
                                        FileDescriptor targetFD) throws IOException {
    assert !nd.transferToDirectlyNeedsPositionLock() ||
            Thread.holdsLock(positionLock);

    long n = -1;
    int ti = -1;
    try {
        begin();
        ti = threads.add();
        if (!isOpen())
            return -1;
        do {
            n = transferTo0(fd, position, icount, targetFD);
        } while ((n == IOStatus.INTERRUPTED) && isOpen());
        if (n == IOStatus.UNSUPPORTED_CASE) {
            if (target instanceof SinkChannelImpl)
                pipeSupported = false;
            if (target instanceof FileChannelImpl)
                fileSupported = false;
            return IOStatus.UNSUPPORTED_CASE;
        }
        if (n == IOStatus.UNSUPPORTED) {
            transferSupported = false;
            return IOStatus.UNSUPPORTED;
        }
        return IOStatus.normalize(n);
    } finally {
        threads.remove(ti);
        end (n > -1);
    }
}

本地方法(native method)transferTo0() 通过 JNI(Java Native Interface)调用底层 C 的函数,这个 native 函数(Java_sun_nio_ch_FileChannelImpl_transferTo0)同样位于 JDK 源码包下的 native/sun/nio/ch/FileChannelImpl.c 源文件里面。JNI 函数 Java_sun_nio_ch_FileChannelImpl_transferTo0() 基于条件编译对不同的系统进行预编译,下面是 JDK 基于 Linux 系统内核对 transferTo() 提供的调用封装。

#if defined(__linux__) || defined(__solaris__)
#include <sys/sendfile.h>
#elif defined(_AIX)
#include <sys/socket.h>
#elif defined(_ALLBSD_SOURCE)
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/uio.h>

#define lseek64 lseek
#define mmap64 mmap
#endif

JNIEXPORT jlong JNICALL
Java_sun_nio_ch_FileChannelImpl_transferTo0(JNIEnv *env, jobject this,
                                            jobject srcFDO,
                                            jlong position, jlong count,
                                            jobject dstFDO)
{
    jint srcFD = fdval(env, srcFDO);
    jint dstFD = fdval(env, dstFDO);

#if defined(__linux__)
    off64_t offset = (off64_t)position;
    jlong n = sendfile64(dstFD, srcFD, &offset, (size_t)count);
    return n;
#elif defined(__solaris__)
    result = sendfilev64(dstFD, &sfv, 1, &numBytes);    
    return result;
#elif defined(__APPLE__)
    result = sendfile(srcFD, dstFD, position, &numBytes, NULL, 0);
    return result;
#endif
}

对 Linux、Solaris 以及 Apple 系统而言,transferTo0() 函数底层会执行 sendfile64 这个系统调用完成零拷贝操作,sendfile64() 函数的原型如下:

#include <sys/sendfile.h>

ssize_t sendfile64(int out_fd, int in_fd, off_t *offset, size_t count);

下面简单介绍一下 sendfile64() 函数各个参数的含义:

  • out_fd:待写入的文件描述符
  • in_fd:待读取的文件描述符
  • offset:指定 in_fd 对应文件流的读取位置,如果为空,则默认从起始位置开始
  • count:指定在文件描述符 in_fd 和 out_fd 之间传输的字节数

在 Linux 2.6.3 之前,out_fd 必须是一个 socket,而从 Linux 2.6.3 以后,out_fd 可以是任何文件。也就是说,sendfile64() 函数不仅可以进行网络文件传输,还可以对本地文件实现零拷贝操作。

学习过程中发现的优秀文章:
https://heapdump.cn/article/3290793
https://zhuanlan.zhihu.com/p/83398714

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/448006.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

paddlepaddle 的 CPU 和 GPU

想记录一下一个 bug 改了一上午改到最后发现并没有 bug 的 bug。 总结&#xff1a; 因为下午要跑很久&#xff0c;为了省 GPU 算力&#xff0c;我想上午先用 CPU 把数据处理部分跑出来&#xff08;感觉数据处理部分不像网络训练那样涉及太多计算&#xff0c;所以感觉用 CPU 就…

JavaWeb开发 —— MyBatis动态SQL

目录 一、XML映射文件 1. 介绍 2. MyBatisX插件 二、MyBatis动态SQL 1. if 2. foreach 3. sql & include 一、XML映射文件 1. 介绍 ① XML映射文件的名称与Mapper接口名称一致&#xff0c;并且将XML映射文件和Mapper接口放置在相同包下&#xff08;同包同名…

【Java EE】-网络编程(三) TCP/IP协议详解

作者&#xff1a;学Java的冬瓜 博客主页&#xff1a;☀冬瓜的主页&#x1f319; 专栏&#xff1a;【JavaEE】 主要内容&#xff1a;应用层HTTP协议、DNS域名解析系统、传输层UDP协议&#xff0c;TCP协议。TCP协议的工作机制&#xff1a;确认应答、超时重传、连接管理、滑动窗口…

【Linux】MySQL高可用之读写分离监控实践

一、Mycat-web安装配置 1、Mycat节点安装zookeeper&#xff08;在mycat实现了读写分离上安装&#xff09; ① 解压zookeeper压缩包 tar -zxvf zookeeper-3.4.14.tar.gz -C /opt/② cd到cnf目录下将文件复制 ③ cd到bin目录下启动 ./zkServer.sh start2、Mycat节点安装mycat-we…

跨境卖家不可错过的2023开斋节选品和营销技巧,轻松拓展海外市场

开斋节是穆斯林世界最重要的节日之一&#xff0c;同时也是跨境电商一个非常重要的销售节点。在这个节日期间&#xff0c;跨境卖家可以通过合适的选品和营销策略吸引更多的消费者&#xff0c;提高销售额。本文将探讨2023年跨境卖家在开斋节期间如何做好选品和营销。 一、选品 1…

MySQL到ClickHouse数据同步方案对比

ClickHouse 在执行分析查询时的速度优势很好的弥补了 MySQL 的不足&#xff0c;但是对于很多开发者和DBA来说&#xff0c;如何将MySQL稳定、高效、简单的同步到 ClickHouse 却很困难。本文对比了 NineData、MaterializeMySQL&#xff08;ClickHouse自带&#xff09;、Bifrost 三…

下一代听歌识曲技术——从信号处理到深度学习

音乐丰富我们的生活&#xff1b;音乐传达人类的情感&#xff1b;音乐表达人类的艺术。人类文明的进程中离不开音乐这个载体&#xff0c;音乐也离不开人类的真情创作。在听到好听却没听过的歌曲时&#xff0c;如何快速准确得到该歌曲的歌名成为当务之急。LiveVideoStackCon 2022…

网页学习-小试牛刀

网页学习 一、 网页组成二、HTML认知2.1 结构2.2 常用标签2.3 列表标签2.4 表格标签2.5 表单标签2.6 语义化标签2.7 字符实体 三、CSS认知四、JS认知 一、 网页组成 分为三大部分&#xff1a;HTML、CSS和JavaScript。 HTML&#xff08;Hyper Text Markup Language&#xff0c…

根据端口号查询进程路径

研究背景&#xff1a; 在工作的时候&#xff0c;有时候我们会在服务器上部署很多API接口程式&#xff0c;每个程式都有不同的端口号&#xff0c;便于提供服务。当时间久了&#xff0c;我们需要对接口操作的时候&#xff0c;我们有可能会忘掉接口程式所在的路径&#xff0c;而只…

一文掌握如何使用Java操作文件与IO流

文章目录 1. 认识文件2. 文件的类型3. 操作文件3.1 属性3.2 构造方法3.3 常用方法 4. IO流4.1 字节流4.1.1 InputStream4.1.2 OutputStream4.1.3 flush刷新4.1.4 关闭文件close4.1.5 字节缓冲流 4.2 字符流4.2.1 Reader4.2.2 Writer4.2.3 Scanner4.2.4 字符缓冲流 5. 复制文件5…

C语言入门篇——语句篇

目录 1、空语句 2、表达式语句 3、复合语句 4、控制语句 4.1、C控制语句&#xff1a;循环 4.1.1、while 4.1.2、while里的break和continue 4.2.1、for 4.2.2、for里的break和continue 4.3.1、do while 4.3.2、do while里的break和continue 5、C控制语句&#xff1a…

Flink 实时数仓 (一) --------- 数据采集层

目录 一、数仓分层介绍二、实时需求概览三、统计架构分析四、日志数据采集1. 模拟日志生成器的使用2. 日志采集模块-本地测试3. 日志采集模块-打包单机部署 五、业务数据库数据采集1. MySQL 的准备2. 环境搭建3. 代码实现 六、Nginx 安装七、Maxwell 安装八、Canal 安装 一、数…

STM32 平衡小车之电机驱动

TB6612FNG简介 单片机引脚的电流一般只有几十个毫安&#xff0c;无法驱动电机&#xff0c;因此一般是通过单片机控制电机驱动芯片进而控制电机。TB6612是比较常用的电机驱动芯片之一。 TB6612FNG可以同时控制两个电机&#xff0c;工作电流1.2A&#xff0c;最大电流3.2A。 VM电…

通信方式基础知识

文章目录 前言一、分类方式1、串行通信和并行通信2、同步通信和异步通信3、单工、半双工、全双工通信 前言 南京的梧桐树可以鲨掉我的程度 一、分类方式 1、串行通信和并行通信 串行通信&#xff1a;按位顺序&#xff0c;占用引脚资源较少&#xff0c;速度较慢 并行通信&…

移除链表元素(链表篇)

给你一个链表的头节点 head 和一个整数 val &#xff0c;请你删除链表中所有满足 Node.val val 的节点&#xff0c;并返回 新的头节点 。 示例 1&#xff1a; 思路&#xff1a; ①直接使用原来的链表来进行删除操作。 ②设置一个虚拟头结点在进行删除操作 ①直接使用原来的链表…

【数据结构】- (带头结点)循环双向链表 - 详细实现思路及代码

目录 一、概述 二、循环双向链表 三、循环双向链表实现步骤  &#x1f4cc;3.1 C语言定义循环双向链表结点  &#x1f4cc;3.2 循环双向链表初始化  &#x1f4cc;3.3 循环双向链表插入数据  &#x1f4cc;3.4 循环双向链表删除数据  &#x1f4cc;3.5 循环双向链表查找数…

【python中的多线程了解一下?】

基本说明 线程&#xff08;Thread&#xff09;是操作系统进行调度的最小单位&#xff0c;是进程中的一个独立执行单元。线程与进程相比&#xff0c;具有更轻量级、更高效率、更易调度、共享资源等优点。 在传统的单核CPU中&#xff0c;操作系统通过时间片轮转算法将CPU的时间…

各种通讯总线的学习记要

一、在B站板道题看到一个比较好完视频&#xff08;爱上半导体&#xff09; 我觉得将232和485之前先将串口通信&#xff0c;因为它们都是串口通讯的变种。 串口通讯&#xff1a; 串口通讯我们约定好帧格式和波特率&#xff0c;通讯正常起始位为低开始&#xff0c;8位数据位&a…

【ABAQUS文档阅读笔记】关于体单元、壳单元、梁单元 、truss单元的总体认识

我的主页&#xff1a; 技术邻&#xff1a;小铭的ABAQUS学习的技术邻主页博客园 : HF_SO4的主页哔哩哔哩&#xff1a;小铭的ABAQUS学习的个人空间csdn&#xff1a;qgm1702 博客园文章链接&#xff1a; 学习笔记&#xff0c;from abaqus document “getting start with ABAQUS…

【代码随想录】刷题Day4

1.交换链表 24. 两两交换链表中的节点 前后指针实现 1.没有元素或者只有一个元素无意义 2.给出一个前驱prev&#xff0c;以及用来交换的两个节点cur和next 3.我当时是这么想的&#xff0c;如果两个指针一起动&#xff0c;那么就要用cur和next同时判断结束&#xff0c;也许这个…