学习视频:Java网络编程教程——Netty深入浅出
参考笔记:网络编程
Netty
- 前言
- 一、NIO 基础
- 1. NIO三大核心组件
- 1.1 缓冲区 Buffer
- 1.1.1 创建Buffer的方式
- 1.1.2 HeapByteBuffer与DirectByteBuffer
- 1.1.3 Buffer初体验
- 1.1.4 Buffer三个重要参数
- 1.2 通道 Channel
- 1.2.1 FileChannel
- 1.2.2 SocketChannel
- 1.3 选择器 Selector
- 1.3.1 服务器设计演化
- 1.3.2 核心API
- 二、NIO快速入门
- 1. 阻塞与非阻塞
- 阻塞
- 非阻塞
- 多路复用
- 2. Selector
- 创建
- 绑定 Channel 事件
- 监听 Channel 事件
- 处理 accept 事件
前言
现在的互联网环境下,分布式大行其道,而分布式系统的根基在于网络编程,Netty恰恰是Java网络编程领域的王者。如果要致力于开发高性能的服务器、客户端程序,Netty是你必不可少的第一步,本笔记主要记录:
- 使用Netty开发基本网络应用程序
- 彻底理解阻塞、非阻塞区别,并将NIO与Netty的编码相互联系
- 懂得多路复用在服务器开发时的优势,为什么在此基础上还要使用多线程
- Netty 中是如何实现异步的?异步处理的优势是什么?
- Netty 中是如何管理线程的?EventLoop如何运作?
- Netty 中饰如何管理内存的?ByteBuffer特点与分配时机
- 掌握看源码、调试的一些技巧,提升源码阅读能力
该章节简要介绍了NIO相关基础内容,详细介绍了ByteBuffer、FileChannel的使用方式
一、NIO 基础
NIO:java.nio全称Java Non-blocking IO,是指JDK1.4及以上版本里提供的新API(New IO) ,为所有的原始类型(boolean类型除外)提供缓存支持的数据容器,使用它可以提供非阻塞式的高伸缩性网络
为什么使用NIO?
在上面的描述中提到,是在JDK1.4以上的版本才提供NIO,那在之前使用的是什么呢?答案很简单,就是BIO(阻塞式IO),也就是我们常用的IO流。
BIO的问题其实不用多说了,因为在使用BIO时,主线程会进入阻塞状态,这就非常影响程序的性能,不能充分利用机器资源。但是这样就会有人提出疑问了,那我使用多线程不就可以了吗?
但是在高并发的情况下,会创建很多线程,线程会占用内存,线程之间的切换也会浪费资源开销。
而NIO只有在连接/通道真正有读写事件发生时(事件驱动),才会进行读写,就大大地减少了系统的开销。不必为每一个连接都创建一个线程,也不必去维护多个线程。
避免了多个线程之间的上下文切换,导致资源的浪费。
1. NIO三大核心组件
NIO核心组件 | 对应的类或接口 | 应用 | 作用 |
---|---|---|---|
缓冲区 | Buffer | 文件IO/网络IO | 存储数据 |
通道 | Channel | 文件IO/网络IO | 传送数据 |
选择器 | Selector | 网络IO | 控制器 |
1.1 缓冲区 Buffer
我们先看以下这张类图,可以看到Buffer有七种类型。
Buffer
是一个内存块。在NIO
中,所有的数据都是用Buffer
处理,有读写两种模式。所以NIO和传统的IO的区别就体现在这里。传统IO是面向Stream
流,NIO而是面向缓冲区(Buffer
)。
一般我们常用的类型是ByteBuffer
,把数据转成字节进行处理。实质上是一个byte[]
数组。
public abstract class ByteBuffer extends Buffer implements Comparable<ByteBuffer>{
//存储数据的数组
final byte[] hb;
//构造器方法
ByteBuffer(int mark, int pos, int lim, int cap, byte[] hb, int offset) {
super(mark, pos, lim, cap);
//初始化数组
this.hb = hb;
this.offset = offset;
}
}
1.1.1 创建Buffer的方式
主要分成两种:JVM堆内内存块Buffer、堆外内存块Buffer。
创建堆内内存块(非直接缓冲区)的方法是:
//创建堆内内存块HeapByteBuffer
ByteBuffer byteBuffer1 = ByteBuffer.allocate(1024);
String msg = "java技术爱好者";
//包装一个byte[]数组获得一个Buffer,实际类型是HeapByteBuffer
ByteBuffer byteBuffer2 = ByteBuffer.wrap(msg.getBytes());
创建堆外内存块(直接缓冲区)的方法:
//创建堆外内存块DirectByteBuffer
ByteBuffer byteBuffer3 = ByteBuffer.allocateDirect(1024);
1.1.2 HeapByteBuffer与DirectByteBuffer
根据类名就可以看出:
HeapByteBuffer
字节缓冲区在JVM堆中,即JVM内部所维护的字节数组,读写效率低,会受到GC影响。DirectByteBuffer
是直接操作操作系统本地代码创建的内存缓冲数组,读写效率高(不需要再把文件内容copy到物理内存中),不会受GC影响,但该内存分配效率低。
DirectByteBuffer
的使用场景:
- Java程序与本地磁盘、socket传输数据
- 存储大文件对象。不会受到堆内存大小的限制。
- 不需要频繁创建,生命周期较长的情况,能重复使用的情况。
HeapByteBuffer
的使用场景:
除了以上的场景外,其他情况还是建议使用HeapByteBuffer
,没有达到一定的量级,实际上使用DirectByteBuffer
是体现不出优势的。
1.1.3 Buffer初体验
接下来,使用ByteBuffer
做一个小例子,熟悉一下:
public static void main(String[] args) throws Exception {
String msg = "java技术爱好者,起飞!";
//创建一个固定大小的buffer(返回的是HeapByteBuffer)
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byte[] bytes = msg.getBytes();
//写入数据到Buffer中
byteBuffer.put(bytes);
//切换成读模式,关键一步
byteBuffer.flip();
//创建一个临时数组,用于存储获取到的数据
byte[] tempByte = new byte[bytes.length];
int i = 0;
//如果还有数据,就循环。循环判断条件
while (byteBuffer.hasRemaining()) {
//获取byteBuffer中的数据
byte b = byteBuffer.get();
//放到临时数组中
tempByte[i] = b;
i++;
}
//打印结果
System.out.println(new String(tempByte));//java技术爱好者,起飞!
}
这上面有一个flip()
方法是很重要的。意思是切换到读模式。上面已经提到缓存区是双向的,既可以往缓冲区写入数据,也可以从缓冲区读取数据。但是不能同时进行,需要切换。那么这个切换模式的本质是什么呢?
1.1.4 Buffer三个重要参数
三个重要参数
//位置,默认是从第一个开始
private int position = 0;
//限制,不能读取或者写入的位置索引
private int limit;
//容量,缓冲区所包含的元素的数量
private int capacity;
那么我们以上面的例子,一句一句代码进行分析:
String msg = "java技术爱好者,起飞!";
//创建一个固定大小的buffer(返回的是HeapByteBuffer)
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
当创建一个缓冲区时,参数的值是这样的:
当执行到byteBuffer.put(bytes)
,当put()
进入多少数据,position
就会增加多少,参数就会发生变化:
接下来关键一步byteBuffer.flip()
,会发生如下变化:
flip()
方法的源码如下:
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
为什么要这样赋值呢?因为下面有一句循环条件判断:
byteBuffer.hasRemaining();
public final boolean hasRemaining() {
//判断position的索引是否小于limit。
//所以可以看出limit的作用就是记录写入数据的位置,那么当读取数据时,就知道读到哪个位置
return position < limit;
}
接下来就是在while
循环中get()
读取数据,读取完之后:
最后当position
等于limit
时,循环判断条件不成立,就跳出循环,读取完毕。
所以可以看出实质上capacity
容量大小是不变的,实际上是通过控制position
和limit
的值来控制读写的数据。
1.2 通道 Channel
首先我们看一下Channel有哪些子类:
常用的Channel有这四种:
- FileChannel:文件数据传输通道
- SocketChannel:TCP网络编程数据传输通道
- ServerSockectChannel:TCP服务端网络编程数据传输通道,监听新TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel
- DatagramChannel:UDP网络编程数据传输通道
Channel本身并不存储数据,只是负责数据的运输。必须要和Buffer一起使用。
Channel 通道类似于 Stream 流,它是读写数据的双向通道,可以从 channel 将数据读入 buffer,也可以将 buffer 的数据写入 channel,而 stream 要么是输入,要么是输出,channel 比 stream 更为底层。
1.2.1 FileChannel
FileChannel只能工作于阻塞模式下
获取
不能直接打开 FileChannel,必须通过 FileInputStream、FileOutputStream 或 RandomAccessFile来获取,它们都有 getChannel() 方法
- 通过 FileInputStream 获取的 channel 只能读
- 通过 FileOutputStream 获取的 channel 只能写
- 通过 RandomAccessFile 获取的 channel 能否读写,取决于构造 RandomAccessFile 时指定的读写模式
FileChannel 的获取方式,下面举个文件复制拷贝的例子进行说明:
首先准备一个"1.txt"放在项目的根目录下,然后编写一个 main 方法:
public static void main(String[] args) throws Exception {
//获取文件输入流
File file = new File("1.txt");
FileInputStream inputStream = new FileInputStream(file);
//从文件输入流获取通道
FileChannel inputStreamChannel = inputStream.getChannel();
//获取文件输出流
FileOutputStream outputStream = new FileOutputStream(new File("2.txt"));
//从文件输出流获取通道
FileChannel outputStreamChannel = outputStream.getChannel();
//创建一个byteBuffer,小文件所以就直接一次读取,不分多次循环了
ByteBuffer byteBuffer = ByteBuffer.allocate((int)file.length());
//把输入流通道的数据读取到缓冲区
inputStreamChannel.read(byteBuffer);
//切换成读模式
byteBuffer.flip();
//把数据从缓冲区写入到输出流通道
outputStreamChannel.write(byteBuffer);
//关闭通道
outputStream.close();
inputStream.close();
outputStreamChannel.close();
inputStreamChannel.close();
}
执行后,我们就获得一个"2.txt"。执行成功。以上的例子,可以用一张示意图表示,是这样的:
读取:
从 channel 读取数据填充 ByteBuffer,返回值表示读了多少字节,-1 表示达到文件末尾
int readBytes = channel.read(byteBuffer);
写入:
正确写入姿势如下:
ByteBuffer bytebuffer = ...;
bytebuffer.put(...); // 存入数据
bytebuffer.flip();
while (bytebuffer.hasRemaining()) {
channel.write(bytebuffer);
}
在 while 中调用 channel.write 是因为 write 方法并不能保证一次性将 byteBuffer 中的内容全部写入 channel
关闭
channel 必须关闭,不过调用了FileInputStream、FileOutputStream 或 RandomAccessFile 的 close() 方法会间接调用 channel 的 close 方法
大小
使用 size() 方法获取文件大小
强制写入
操作系统出于对性能的考虑,会将数据缓存,而不是立即写入磁盘。调用 force(true) 方法将文件内容和原数据(文件的权限等信息)立即写入磁盘
通道间的数据传输
这里主要介绍两个通道与通道之间数据传输的方式,且该方式效率高,底层会利用操作系统的零拷贝进行优化:
- transferTo():把源通道的数据传输到目的通道中,返回实际传输字节大小。
inputChannel.transferTo(0, inputChannel.size(), outputChannel);
- transferFrom():把来自源通道的数据传输到目的通道,返回实际传输字节大小。
outputChannel.transferFrom(inputChannel, 0, inputChannel.size);
这两个方法一次最多仅能传输2GB数据,超过2GB可以使用这种方式:
for (long less = input.size(); less > 0;) less -= input.transferTo(input.size() - less, less, output);
1.2.2 SocketChannel
接下来我们学习获取SocketChannel的方式。
还是一样,我们通过一个例子来快速上手:
public static void main(String[] args) throws Exception {
//获取ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 6666);
//绑定地址,端口号
serverSocketChannel.bind(address);
//创建一个缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while (true) {
//获取SocketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
while (socketChannel.read(byteBuffer) != -1){
//打印结果
System.out.println(new String(byteBuffer.array()));
//清空缓冲区
byteBuffer.clear();
}
}
}
然后运行 main() 方法,我们可以通过 telnet 命令进行连接测试:
通过上面的例子可以知道,通过ServerSocketChannel.open()
方法可以获取服务器的通道,然后绑定一个地址端口号,接着accept()
方法可获得一个SocketChannel
通道,也就是客户端的连接通道。最后配合使用Buffer
进行读写即可。
这就是一个简单的例子,实际上上面的例子是阻塞式的。要做到非阻塞还需要使用选择器Selector
。
1.3 选择器 Selector
Selector
翻译成选择器,有些人也会翻译成多路复用器,实际上指的是同一样东西。
只有网络IO才会使用选择器,文件IO是不需要使用的。
选择器可以说是NIO的核心组件,它可以监听通道的状态,来实现异步非阻塞的IO。换句话说,也就是事件驱动。以此实现单线程管理多个Channel的目的。
1.3.1 服务器设计演化
Selector单从字面意思不好理解,需要结合服务器的设计演化来理解它的用途
多线程版本设计:每个socket都会分配一个thread处理。缺点:1.内存占用高;2.线程频繁上下文切换;3.仅适合连接数少的场景
线程池版本设计:将socket交给线程池处理。缺点:1.阻塞模式下,每个线程仅能处理一个socket连接;2.仅适合短链接场景(socket完成相应业务后立即断开,比如http请求)
Selector 版设计:Selector的作用就是配合一个线程来管理多个channel,获取这些channel上发生的事件,这些channel工作在非阻塞模式下,不会让线程吊死在一个channel上(调selector的select()会阻塞直到channel发生了读写就绪事件,事件发生,select()方法就会返回这些事件交给thread来处理)适合连接数特别多,但流量低的场景(low traffic)。
1.3.2 核心API
API方法名 | 作用 |
---|---|
Selector.open() | 打开一个选择器 |
select() | 选择一组键,其相应的通道已为 I/O 操作准备就绪 |
selectedKeys() | 返回此选择器的已选择键集 |
以上的API会在后面的例子用到,先有个印象。
二、NIO快速入门
1. 阻塞与非阻塞
阻塞
阻塞模式下,相关方法会导致线程暂停。此时线程不会占用CPU,但处于闲置
- ServerSocketChannel.accept() 会在没有连接建立时阻塞
- SocketChannel.read() 会在没有数据可读时阻塞
缺点:
单线程下,阻塞方法之间相互影响,几乎不能正常工作,所以需要多线程支持。但在多线程下又有新的问题,体现在以下几方面:
- 32 位 JVM 一个线程 320KB,64 位 JVM 一个线程 1024KB,如果连接数过多会因频繁上下文切换导致性能降低,甚至线程数过多导致占用内存过多引发OOM
- 使用线程池技术来减少线程数,但无法治理根本问题。如果有很多连接建立,但长时间阻塞,还是会导致线程池中线程数过多,因此不适合长连接,只适合短连接
非阻塞
非阻塞模式下,相关方法不会让线程暂停
- ServerSocketChannel.accept() 在没有连接建立时,返回null,继续执行
- SocketChannel.read() 在没有数据可读时,返回0,线程不必阻塞,可以去处理来自其他socketChannel的事件
- 写数据时,线程只等待数据写入 channel 即可,无需等待 channel 通过网络把数据发出
缺点:
在非阻塞模式下,即使没有连接建立和数据可读,线程仍然在不断运行,浪费CPU
数据复制过沉重,线程实际还是处于阻塞状态(AIO改进)
多路复用
单线程可以配合 selector 完成对多个 channel 可读写事件的监控,这称之为多路复用
- 多路复用仅针对网络IO,普通文件IO无法多路复用
- 如果不用 selector 的非阻塞模式,线程大部分时间都在做无用功,而 selector 保证:
- 有可连接事件才去连接
- 有可读事件才去读取
- 有可写事件才去写入(限于网络传输能力,channel未必时时可写,一旦channel可写,会触发selector的可写事件)
2. Selector
优点:
- 一个线程配合 selector 就可以监控多个 channel 的事件,事件发生线程才去处理。避免非阻塞模式下所做无用功
- 让线程能够被充分利用
- 节约了线程的数量
- 减少了线程上下文切换
创建
Selector selector = Selector.open();
绑定 Channel 事件
也称之为注册事件,绑定的事件 selector 才会关心
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, 绑定事件);
- channel 必须工作在非阻塞模式
- FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用
- 绑定的事件类型可以有:
- connect - 客户端连接成功时触发
- accept - 服务器端成功接受连接时触发
- read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
- write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况
监听 Channel 事件
可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少 channel 发生了事件
方法1,阻塞直到绑定事件发生
int count = selector.select();
方法2,阻塞直到绑定事件发生,或是超时(时间单位为 ms)
int count = selector.select(long timeout);
方法3,不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件
int count = selector.selectNow();
💡 select 何时不阻塞:
- 事件发生时
- 客户端发起连接请求,会触发 accept 事件
- 客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件
- channel 可写,会触发 write 事件
- 在 linux 下 nio bug 发生时
- 调用 selector.wakeup()
- 调用 selector.close()
- selector 所在线程 interrupt
处理 accept 事件
客户端代码为
public class Client {
public static void main(String[] args) {
try (Socket socket = new Socket("localhost", 8080)) {
System.out.println(socket);
socket.getOutputStream().write("world".getBytes());
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}