写在前面
注意:这里的NIO指的是Java nio技术。
源码 。
本文看下NIO相关内容。NIO太重要了,netty,tomcat,jetty等底层使用的都是Java nio,所以很有必要好好了解一下咯,涨薪不涨薪的咱不知道,至少在这个行业寒冬里,为自己不被淘汰增加一些筹码吧(典型的悲观主义者)
!!!
1:BIO就那么多一无是处?
想要知道BIO是否真的是一无是处,我们就需要先来了解BIO的工作方式,如下是一段典型的基于BIO的代码:
class ConnectionPerThreadWithPool implements Runnable {
public void run() {
//线程池
//注意,生产环境不能这么用,具体请参考《java高并发核心编程卷2》
ExecutorService executor = Executors.newFixedThreadPool(100);
try {
//服务器监听socket
ServerSocket serverSocket =
new ServerSocket(NioDemoConfig.SOCKET_SERVER_PORT);
//主线程死循环, 等待新连接到来
while (!Thread.interrupted()) {
Socket socket = serverSocket.accept();
//接收一个连接后,为socket连接,新建一个专属的处理器对象
Handler handler = new Handler(socket);
//创建新线程来handle
//或者,使用线程池来处理
new Thread(handler).start();
}
} catch (IOException ex) { /* 处理异常 */ }
}
static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) {
socket = s;
}
public void run() {
//死循环处理读写事件
boolean ioCompleted = false;
while (!ioCompleted) {
try {
byte[] input = new byte[NioDemoConfig.SERVER_BUFFER_SIZE];
/* 读取数据 */
socket.getInputStream().read(input);
byte[] output = null;
/* 写入结果 */
socket.getOutputStream().write(output);
} catch (IOException ex) { /*处理异常*/ }
}
}
}
}
ServerSocket阻塞在accept中等待新连接的到来,有一个新连接则创建一个线程负责该连接的数据读写操作。其实,BIO主要的问题也就是在这里了,每个连接都需要一个线程
,即Connection Per Thread模式,每连接一个线程模式。这种线程模式下,如果是连接数不是特别多的情况下问题还是不大的,这个数的理论值一般在1000左右,但是现代应用连接数动辄几万,大的还有几百万,所以BIO肯定是无能为力的。
归根结底,BIO问题还是在过多的连接需要过多的线程上。那么为什么过多的线程就会有问题呢,原因主要有如下的几个方面:
1:线程的创建是一个很重的操作,需要通过系统调用完成(所以如果是小应用考虑使用BIO,也要进行池化)
2:过多的线程会导致频繁的线程上下文切换,各个线程的运行时数据都要进行复制保存等,这个操作可以通过观察top 的sy占用值,如果是达到20%,则说明CPU过多的时间片用在了线程的切换工作上
3:操作系统本身对一个进程可以创建的线程个数也是有限制,不同的操作系统这个值一般都是3000~5000之间的一个值。这个来自操作系统的硬性限制就有些无解了,就算是你想"硬用BIO"恐怕也是行不通了。
4:对于Java来说,每个线程都要分配一定的内存(-Xss参数),一般设置的值都在512byte~1M之间。即一个线程就要占用这么多的内存,比如设置-Xss1M则1000个线程就要占用JVM 1G的内存,所以过多的线程,内存也是无法满足要求的。这也是一个在过多的连接时无法使用BIO的硬性指标。也是无解。
所以,就很有必要隆重的请我们的Java NIO闪亮登场了。Java NIO通过使用IO多路复用模型来满足使用少量的线程来满足大量连接读写请求的需求。
2:Java NIO的核心组件
3个,分别是channel,selector,buffer。
2.1:channel
在BIO中对于读取操作定义了inputstream,对于写操作定义了outpustream,而NIO为了简化read,write的操作,对二者进行了整合,抽象定义了java.nio.channels.Channel
接口。
针对网络操作,文件操作等分别定义了如下的Channel对象:
1.FileChannel 用于文件IO操作
2.DatagramChannel 用于UDP的IO操作
3.SocketChannel 用于TCP的传输操作(客户端类)
4.ServerSocketChannel 用于TCP连接监听操作(服务端类)
2.2:selector
前面已经说过了,JavaNIO底层使用的是IO多路复用的线程模型,而IO多路复用的线程模型是通过一组套接字的状态来判断读写状态的,而selector就是负责对操作系统提供的IO多路复用线程模型实现(select,poll,epoll)进行封装的组件。而套接字对应到Java nio中是channel组件,所以,channel组件需要注册selector进行读写操作:
如图,使用一个线程就使用selector来监听一组channel的数据读写了(核心所在)
。
2.3:buffer
为了能够更好的配合channel进行数据的读写操作,Java NIO定义了buffer,即一个缓存类,负责从channel中读写数据:
这里要注意buffer类是一个抽象类不是接口:
public abstract class Buffer {
}
3:Buffer
3.1:读写模式
Buffer支持两种模式,读模式,写模式,在读模式下可以进行数据读取操作,在写模式下可以进行数据写入操作。当Buffer被首次创建出来时默认处于写模式。
3.2:主要的buffer子类
ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer、MappedByteBuffer。可以方便操作对应的数据类型。
3.3:主要属性
- capacity
缓存区的总容量。 - position
在读写模式下,分别表示当前可以进行读写的位置。每次读一个数据,或者是写一个数据则position加一。 - limit
在默写模式下,分别表示可读可写的最大位置。在写模式下就是capasity的值,在读模式下就是写模式下的position值,即写模式下写的最后一个位置。 - mark
用来临时记录position的位置,通过mark()方法完成该操作:
public final Buffer mark() {
mark = position;
return this;
}
以上描述可能看起来有些云里雾里,不要着急,后续结合buffer的主要方法来看就会很清晰了。
3.4:主要方法
3.4.1:allocate
该方法用来申请缓存区,程序:
@Test
public void allocateTest() {
IntBuffer intBuffer = IntBuffer.allocate(20);
System.out.println("capacity: " + intBuffer.capacity());
System.out.println("position: " + intBuffer.position());
System.out.println("limit: " + intBuffer.limit());
}
输出:
capacity: 20
position: 0
limit: 20
Process finished with exit code 0
3.4.2:put
写数据到缓冲区中,程序:
@Test
public void putTest() {
IntBuffer intBuffer = IntBuffer.allocate(20);
intBuffer.put(1);
intBuffer.put(2);
System.out.println("capacity: " + intBuffer.capacity());
System.out.println("position: " + intBuffer.position());
System.out.println("limit: " + intBuffer.limit());
}
输出:
capacity: 20
position: 2
limit: 20
Process finished with exit code 0
此时position为2,代表下一个可put数据的位置。
3.4.3:flip
转换写模式为读模式,主要是对position,limit的值做一些修改,直接看源码吧:
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
测试程序:
@Test
public void flipTest() {
IntBuffer intBuffer = IntBuffer.allocate(20);
intBuffer.put(1);
intBuffer.put(2);
System.out.println("capacity: " + intBuffer.capacity());
System.out.println("position: " + intBuffer.position());
System.out.println("limit: " + intBuffer.limit());
System.out.println("flip后");
intBuffer.flip();
System.out.println("capacity: " + intBuffer.capacity());
System.out.println("position: " + intBuffer.position());
System.out.println("limit: " + intBuffer.limit());
}
输出:
capacity: 20
position: 2
limit: 20
flip后
capacity: 20
position: 0
limit: 2
Process finished with exit code 0
3.4.4:get
读模式下,从缓冲区中读取数据,读取一次position+1,超过limit时读取则会抛出BufferUnderflowException异常:
@Test
public void getTest() {
IntBuffer intBuffer = IntBuffer.allocate(20);
intBuffer.put(1);
intBuffer.put(2);
System.out.println("capacity: " + intBuffer.capacity());
System.out.println("position: " + intBuffer.position());
System.out.println("limit: " + intBuffer.limit());
System.out.println("flip后");
intBuffer.flip();
System.out.println("capacity: " + intBuffer.capacity());
System.out.println("position: " + intBuffer.position());
System.out.println("limit: " + intBuffer.limit());
System.out.println("get一次后");
System.out.println(intBuffer.get());
System.out.println("capacity: " + intBuffer.capacity());
System.out.println("position: " + intBuffer.position());
System.out.println("limit: " + intBuffer.limit());
}
3.4.5:rewind
重放方法,可以从头重复读取数据,所以主要就是修改position的值为0,jdk源码如下:
@Test
public void rewindTest() {
IntBuffer intBuffer = IntBuffer.allocate(20);
intBuffer.put(1);
intBuffer.put(2);
// System.out.println("capacity: " + intBuffer.capacity());
// System.out.println("position: " + intBuffer.position());
// System.out.println("limit: " + intBuffer.limit());
// System.out.println("flip后");
intBuffer.flip();
// System.out.println("capacity: " + intBuffer.capacity());
// System.out.println("position: " + intBuffer.position());
// System.out.println("limit: " + intBuffer.limit());
// System.out.println("get一次后");
System.out.println(intBuffer.get());
System.out.println("capacity: " + intBuffer.capacity());
System.out.println("position: " + intBuffer.position());
System.out.println("limit: " + intBuffer.limit());
System.out.println("rewind后");
intBuffer.rewind();
System.out.println("capacity: " + intBuffer.capacity());
System.out.println("position: " + intBuffer.position());
System.out.println("limit: " + intBuffer.limit());
System.out.println(intBuffer.get());
}
输出:
1
capacity: 20
position: 1
limit: 2
rewind后
capacity: 20
position: 0
limit: 2
1
Process finished with exit code 0
3.4.6:mark()和reset()
mark()方法临时保存position值到mark中,reset恢复临时保存的值到position中:
public final Buffer mark() {
mark = position;
return this;
}
public final Buffer reset() {
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}
测试:
@Test
public void markAndResetTest() {
IntBuffer intBuffer = IntBuffer.allocate(20);
intBuffer.put(1);
intBuffer.put(2);
System.out.println("记录position位置2到mark");
// 记录position位置2到mark
intBuffer.mark();
System.out.println("position变为3");
// position变为3
intBuffer.put(3);
System.out.println("position: " + intBuffer.position());
System.out.println("reset mark恢复position为2");
// reset mark恢复position为2
intBuffer.reset();
System.out.println("position: " + intBuffer.position());
}
运行:
记录position位置2到mark
position变为3
position: 3
reset mark恢复position为2
position: 2
Process finished with exit code 0
3.4.7:clear
从读取模式转换为写入模式,做如下的事情:
(1)会将position清零;
(2)limit设置为capacity最大容量值,可以一直写入,直到缓冲区写满。
jdk源码:
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
3.4.8:compact
该方法也是将buffer从读取模式转换为写入模式的方法之一,但不同于clear,该方法不会无脑清空所有数据,而是会将用户还没有读取的数据拷贝到底层数组的开始位置,已经读取过的会删除掉,之后将position设置为下一个可写入数据的位置,jdk源码:
public IntBuffer compact() {
System.arraycopy(hb, ix(position()), hb, ix(0), remaining());
position(remaining());
limit(capacity());
discardMark();
return this;
}
看个示例代码:
@Test
public void compactTest() {
IntBuffer intBuffer = IntBuffer.allocate(20);
intBuffer.put(1);
intBuffer.put(2);
System.out.println("写俩数据后:");
System.out.println("capacity: " + intBuffer.capacity());
System.out.println("position: " + intBuffer.position());
System.out.println("limit: " + intBuffer.limit());
intBuffer.flip(); // 转到读取模式
System.out.println("flip后:");
System.out.println("capacity: " + intBuffer.capacity());
System.out.println("position: " + intBuffer.position());
System.out.println("limit: " + intBuffer.limit());
// System.out.println(intBuffer.get());
// 读一个 1
System.out.println(intBuffer.get());
intBuffer.compact();
System.out.println("compact后:");
System.out.println("capacity: " + intBuffer.capacity());
System.out.println("position: " + intBuffer.position());
System.out.println("limit: " + intBuffer.limit());
//
intBuffer.flip();
System.out.println(intBuffer.get());
}
运行:
写俩数据后:
capacity: 20
position: 2
limit: 20
flip后:
capacity: 20
position: 0
limit: 2
1
compact后:
capacity: 20
position: 1
limit: 20
2
Process finished with exit code 0
对应的变化过程如下图:
3.4.8:buffer使用各个方法的核心步骤
(1)使用创建子类实例对象的allocate( )方法,创建一个Buffer类的实例对象。
(2)调用put( )方法,将数据写入到缓冲区中。
(3)写入完成后,在开始读取数据前,调用Buffer.flip()方法,将缓冲区转换为读模式。
(4)调用get( )方法,可以从缓冲区中读取数据。
(5)读取完成后,调用Buffer.clear()方法或Buffer.compact()方法,将缓冲区转换为写入模式,可以继续写入。
4:channel
主要有4种Channel(通道)实现:FileChannel、SocketChannel、ServerSocketChannel、DatagramChannel。
(1)FileChannel文件通道,用于文件的数据读写;
(2)SocketChannel套接字通道,用于Socket套接字TCP连接的数据读写;
(3)ServerSocketChannel服务器套接字通道(或服务器监听通道),允许我们监听TCP连接请求,为每个监听到的请求,创建一个SocketChannel套接字通道;
(4)DatagramChannel数据报通道,用于UDP协议的数据读写。
4.1:FileChannel
支持文件的读取,写入。仅支持阻塞模式,不支持非阻塞模式。
4.1.1:获取filechannel
程序如下:
@Test
public void createFileChannel() throws Exception {
String srcFile = "d:\\test\\Snipaste_2024-09-02_14-29-46.png";
String destFile = "d:\\test\\Snipaste_2024-09-02_14-29-46.png";
//创建一个文件输入流
FileInputStream fis = new FileInputStream(srcFile);
//获取文件流的通道
FileChannel inChannel = fis.getChannel();
//创建一个文件输出流
FileOutputStream fos = new FileOutputStream(destFile);
//获取文件流的通道
FileChannel outchannel = fos.getChannel();
//也可以通过RandomAccessFile文件随机访问类,获取FileChannel文件通道实例,代码如下:
// 创建RandomAccessFile随机访问对象
RandomAccessFile rFile = new RandomAccessFile("filename.txt", "rw");
//获取文件流的通道(可读可写)
FileChannel channel = rFile.getChannel();
}
4.1.2:读取filechannel
bio中内容一般是读取到byte数组中,而Java nio则是读取到bytebuffer中,并且会返回读取到的数据量。
程序:
@Test
public void readFileChannelTest() throws Exception {
// 文件编码是utf8,需要用utf8解码
Charset charset = Charset.forName("utf-8");
CharsetDecoder decoder = charset.newDecoder();
// 创建RandomAccessFile随机访问对象
RandomAccessFile rFile = new RandomAccessFile("d:\\test\\aaaa.txt", "rw");
//获取文件流的通道(可读可写)
FileChannel fChannel = rFile.getChannel();
ByteBuffer bBuf = ByteBuffer.allocate(32); // 缓存大小设置为32个字节。仅仅是测试用。
CharBuffer cBuf = CharBuffer.allocate(32);
int bytesRead = fChannel.read(bBuf); // 从文件通道读取字节到buffer.
char[] tmp = null; // 临时存放转码后的字符
byte[] remainByte = null;// 存放decode操作后未处理完的字节。decode仅仅转码尽可能多的字节,此次转码不了的字节需要缓存,下次再转
int leftNum = 0; // 未转码的字节数
while (bytesRead != -1) {
bBuf.flip(); // 切换buffer从写模式到读模式
decoder.decode(bBuf, cBuf, true); // 以utf8编码转换ByteBuffer到CharBuffer
cBuf.flip(); // 切换buffer从写模式到读模式
remainByte = null;
leftNum = bBuf.limit() - bBuf.position();
if (leftNum > 0) { // 记录未转换完的字节
remainByte = new byte[leftNum];
bBuf.get(remainByte, 0, leftNum);
}
// 输出已转换的字符
tmp = new char[cBuf.length()];
while (cBuf.hasRemaining()) {
cBuf.get(tmp);
System.out.print(new String(tmp));
}
bBuf.clear(); // 切换buffer从读模式到写模式
cBuf.clear(); // 切换buffer从读模式到写模式
if (remainByte != null) {
bBuf.put(remainByte); // 将未转换完的字节写入bBuf,与下次读取的byte一起转换
}
bytesRead = fChannel.read(bBuf);
}
rFile.close();
}
测试文件:
-Xthejrepath D:\programs\javas\java1.8/jre -Xthetargetclazz D:\test\itstack-demo-jvm-master\tryy-too-simulate-invokexxx-and-xreturn\target\test-classes\org\itstack\demo\test\HelloWorld
-Xthejrepath D:\programs\javas\java1.8/jre -Xthetargetclazz D:\test\itstack-demo-jvm-master\tryy-too-simulate-array-operation\target\test-classes\org\itstack\demo\test\HelloWorld
sync_al为sync_alt
10100000 想要以负数显示,必须转换成补码形式
符号位外全部无脑取反 11011111
+1 11100000
64+32 96 -96
10100001 想要以负数显示,必须转换成补码形式
符号位外全部无脑取反 11011110
+1 11011111 = 64+16+8+4+2+1=95 加上符号就是-95
00000000 00000000 00000000 10100001 = 128+32+1 = 161
-Xthejrepath D:\programs\javas\java1.8/jre -Xthetargetclazz D:\test\itstack-demo-jvm-master\tryy-too-simulate-array-operation\target\test-classes\org\itstack\demo\test\HelloWorld
测试输出:
4.1.3:写入filechannel
@Test
public void writeFileChannelTest() throws Exception {
File file = new File("d:\\test\\jj.txt");
if (!file.exists()) file.createNewFile();
RandomAccessFile randomAccessFile = new RandomAccessFile("d:\\test\\jj.txt", "rw");
FileChannel channel = randomAccessFile.getChannel(); // 获取一个可读写文件通道
ByteBuffer buf = ByteBuffer.allocate(5);
byte[] data = "Hello, Java NIO.".getBytes();
for (int i = 0; i < data.length; ) {
buf.put(data, i, Math.min(data.length - i, buf.limit() - buf.position()));
buf.flip();
// 返回写入数据的数量
i += channel.write(buf);
// buf.compact();
buf.clear(); // 写入的都读完了,所以调用compact和调用clear效果一样
}
channel.force(false);
channel.close();
}
结果:
4.1.4:关闭以及强刷filechannel
//关闭通道
channel.close( );
//强制刷新到磁盘
channel.force(true);
4.2:SocketChannel和ServerSocketChannel
4.4:DatagramChannel
4:selector
选择器,封装操作系统底层监听各个套接字的功能。在Java nio中套接字对应的就是通道,因此通道需要注册到选择中,并指定自己感兴趣的io事件,才能被监听。一般使用一个线程负责使用选择器完成其管理的所有通道的监听的工作:
通道注册到选择器的方法:
// java.nio.channels.SelectableChannel#register(java.nio.channels.Selector, int, java.lang.Object)
public final SelectionKey register(Selector sel, int ops)
throws ClosedChannelException
{
return register(sel, ops, null);
}
其中第一个参数很好理解,就是选择器,第二参数是感兴趣的io事件,在SelectionKey抽象类中定义:
public abstract class SelectionKey {
// -- Operation bits and bit-testing convenience methods --
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
}
如果是多个的话则可以通过|
或运算来设置,比如感兴趣OP_READ和OP_ACCEPT则就是OP_READ | OP_ACCEPT
。
一般使用过程如下:
1:获取选择器(SPI方式)
Selector selector = Selector.open();
2:将通道注册到选择器中
ServerSocketChannelserverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); // 必须处于非阻塞模式下,否则将抛出IllegalBlockingModeException异常
serverSocketChannel.bind(new InetSocketAddress(18899));
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT); // 通道并非支持所有IO事件,可以在注册之前,可以通过通道的validOps()方法,来获取该通道所有支持的IO事件集合。
3:处理就绪事件(SelectionKey:封装了触发的事件和对应通道,这样方便我们处理事件了)
while (selector.select() > 0) {
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
//根据具体的IO事件类型,执行对应的业务操作
if(key.isAcceptable()) {// IO事件:ServerSocketChannel服务器监听通道有新连接
} else if (key.isConnectable()) {// IO事件:传输通道连接成功
} else if (key.isReadable()) { // IO事件:传输通道可读
} else if (key.isWritable()) { // IO事件:传输通道可写
}
//处理完成后,移除选择键
keyIterator.remove();
}
}
selector.select()负责调用操作系统底层轮询channel的状态,一旦返回值大于0,则说明通道有感兴趣的事件发生了,会将发生的事件以及对应的通道封装为SelectionKey,这样我们就可以通过SelectionKey获取对应的事件,以及对应的通道来进行对应的业务逻辑处理了,如下图:
5:实战之实现一个discard server
just go。
写在后面
参考文章列表
Java NIO (图解+秒懂+史上最全) 。