BIO(同步阻塞)
利用网络连接传输数据为例:
服务端单线程
服务端只有一个主线程处理客户端的连接和读写处理,此时如果有第二个客户端欲连接并发送消息服务端是接收不到的。
因为读写和等待accept连接都是阻塞的。
sever端代码:
ServerSocket serverSocket = new ServerSocket(9000);
while (true) {
System.out.println("等待连接。。");
//阻塞方法
Socket clientSocket = serverSocket.accept();
System.out.println("有客户端连接了。。");
handler(clientSocket);
}
client端代码:
Socket socket = new Socket("127.0.0.1", 9000);
//向服务端发送数据
socket.getOutputStream().write("HelloServer".getBytes());
socket.getOutputStream().flush();
System.out.println("向服务端发送数据结束");
byte[] bytes = new byte[1024];
//接收服务端回传的数据
socket.getInputStream().read(bytes);
System.out.println("接收到服务端的数据:" + new String(bytes));
socket.close();
服务端多线程
服务端使用主线程接受客户端的accept连接并建立连接关系,利用其他线程处理读写操作,实现连接和处理任务分离。使得主线程可以在while中不断处理客户端的连接请求,而利用多个子线程处理多个客户端的读写处理。
改写服务端代码:
ServerSocket serverSocket = new ServerSocket(9000);
while (true) {
System.out.println("等待连接。。");
//阻塞方法
Socket clientSocket = serverSocket.accept();
System.out.println("有客户端连接了。。");
new Thread(new Runnable() {
@Override
public void run() {
try {
handler(clientSocket);
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
缺点:高并发场景下每一个客户端都对应生成一个线程一对一处理,并且对于只连接没有读写操作的客户端会持续阻塞线程资源,比如C10K问题,太耗费资源
NIO(非阻塞)
没有selector的笨蛋式轮询server
服务端的channel等待客户端的accept非阻塞,读read也是非阻塞。采用两个while循环单线程不断地去看看有没有客户端连接请求,将这些socketChannel加入一个集合中,不断地从该集合遍历socketChannel是否有读写操作,read也是非阻塞。所以没有读的需求就继续遍历下一个socketChannel。如果中途有新的client加入或者有read需求,那就下一次轮询的时候会处理。
// 保存客户端连接
static List<SocketChannel> channelList = new ArrayList<>();
public static void main(String[] args) throws IOException {
// 创建NIO ServerSocketChannel,与BIO的serverSocket类似
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(9000));
// 设置ServerSocketChannel为非阻塞
serverSocket.configureBlocking(false);
System.out.println("服务启动成功");
while (true) {
// 非阻塞模式accept方法不会阻塞,否则会阻塞
// NIO的非阻塞是由操作系统内部实现的,底层调用了linux内核的accept函数
SocketChannel socketChannel = serverSocket.accept();
if (socketChannel != null) { // 如果有客户端进行连接
System.out.println("连接成功");
// 设置SocketChannel为非阻塞
socketChannel.configureBlocking(false);
// 保存客户端连接在List中
channelList.add(socketChannel);
}
// 遍历连接进行数据读取
Iterator<SocketChannel> iterator = channelList.iterator();
while (iterator.hasNext()) {
SocketChannel sc = iterator.next();
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
// 非阻塞模式read方法不会阻塞,否则会阻塞
int len = sc.read(byteBuffer);
// 如果有数据,把数据打印出来
if (len > 0) {
System.out.println("接收到消息:" + new String(byteBuffer.array()));
} else if (len == -1) { // 如果客户端断开,把socket从集合中去掉
iterator.remove();
System.out.println("客户端断开连接");
}
}
}
}
缺点:此时是对所有的client无差别进行轮询,只要是连接了的client,就会对其进行内层while遍历看是否能read。和BIO的多线程问题有点相似之处。
就是对那些不经常发消息的客户端 给了不应该有的平等的对待~hhh。想想100万个client里其实99万都是僵尸,只有1万会说话,那我的服务端不应该每次轮询都要轮询这100万个client!
正确的操作应该是实现事件驱动的轮询处理,对那些不会动的僵尸们,采用冷漠忽视的态度!
那应该如何做呢?
NIO其实已经实现了!那就是IO多路复用器,也就是selector。
事件驱动的server
由于有了selector,我们只需要将serverSocket注册到selector上面,对于server Socket身上的收发事件(包括了accept),selector就能监听到。
server端创建serverSocket代码和上面的笨蛋式代码一致:
// 创建NIO ServerSocketChannel
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(9000));
// 设置ServerSocketChannel为非阻塞
serverSocket.configureBlocking(false);
但是紧接着创建一个selector,并注册server Socket,并关注其身上的accept事件。一旦channel注册成功,会返回一个绑定的key(下面代码中的selectionKey)并将其存进selector中的selectedKeys集合中:
// 打开Selector处理Channel,即创建epoll
Selector selector = Selector.open();
// 把ServerSocketChannel注册到selector上,并且selector对客户端accept连接操作感兴趣
SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
同样是通过while不断地使用selector监听各个channel,selector.select会阻塞!直到有事件触发才会继续往下走。
while (true) {
// 阻塞等待需要处理的事件发生
selector.select();
当有一个client试图连接server时,server的serverSocketChannel会触发事件响应,此时selector.select会监听到事件响应,程序继续进行,从selector的selectorKeys中遍历key:
while (true) {
// 阻塞等待需要处理的事件发生
selector.select();
// 获取selector中注册的全部事件的 SelectionKey 实例
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 遍历SelectionKey对事件进行处理
while (iterator.hasNext()) {
巴拉巴拉。。。。。
}
此时触发的是accept类型事件,进入该处理分支,并通过key得到绑定的channel,即socketServerChannel,并accept客户端的连接,建立起该客户端的socketChannel,并将其注册到selector关注读事件:
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 如果是OP_ACCEPT事件,则进行连接获取和事件注册
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = server.accept();
socketChannel.configureBlocking(false);
// 这里只注册了读事件,如果需要给客户端发送数据可以注册写事件
SelectionKey selKey = socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("客户端连接成功");
然后程序遍历结束,回到最外层while,再次阻塞在selector.select,等待下一次连接或者读写事件。此时连接的client发送消息给server,打破阻塞,进入内层selectedKeys遍历,这次走的是read事件响应,从socketChannel读出消息数据。
// 遍历SelectionKey对事件进行处理
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 如果是OP_ACCEPT事件,则进行连接获取和事件注册
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = server.accept();
socketChannel.configureBlocking(false);
// 这里只注册了读事件,如果需要给客户端发送数据可以注册写事件
SelectionKey selKey = socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("客户端连接成功");
} else if (key.isReadable()) { // 如果是OP_READ事件,则进行读取和打印
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(128);
int len = socketChannel.read(byteBuffer);
// 如果有数据,把数据打印出来
if (len > 0) {
System.out.println("接收到消息:" + new String(byteBuffer.array()));
} else if (len == -1) { // 如果客户端断开连接,关闭Socket
System.out.println("客户端断开连接");
socketChannel.close();
}
}
//从事件集合里删除本次处理的key,防止下次select重复处理
iterator.remove();
}
selector继续阻塞直到下一次事件触发…
流程如下:
底层实现原理和源码
JDK1.4之前都是通过Linux内核函数select或者poll去轮询所有channel查看哪些有事件,JDK1.4之后引入selector,底层实现采用了Linux的epoll函数,实现了将有事件的channel主动放入就绪事件列表。
selector底层使用epoll_create函数创建了一个epoll对象:
// 打开Selector处理Channel,即创建epoll
Selector selector = Selector.open();
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
/*
* epoll_create expects a size as a hint to the kernel about how to
* dimension internal structures. We can't predict the size in advance.
*/
int epfd = epoll_create(256);
if (epfd < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
}
return epfd;
}
对应的将channel注册到selector中底层是将channel的文件描述符添加进epoll的一个包装数组pollWrapper(底层并没有真正和epoll fd绑定):
SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
int fd = Integer.valueOf(ch.getFDVal());
fdToKey.put(fd, ski);
pollWrapper.add(fd);
keys.add(ski);
}
然后selector.select对应底层源码为:
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
其中pollWrapper包装数组会进行轮询poll:
int poll(long timeout) throws IOException {
updateRegistrations();
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
其中updateRegistrations函数会使用epoll_ctl对每一个channel fd绑定给epoll fd并监听事件:
epollCtl(epfd, opcode, fd, events);
如果有事件响应,其实是操作系统的硬中断感知并把channel fd放进ready list。
上述的epollWait底层是epoll_wait函数,会从ready list(操作系统维护)里面去看有没有就绪事件如果有就放入selectedKeys去。
总结:
epoll_create创建epoll对象,epoll_ctl实现真正注册,epoll_wait实现监听并将硬中断事件的channel fd放入ready list
对比select,poll和epoll
redis线程模型
基于epoll的NIO线程模型实现。
netty
简化NIO,进一步对其封装为异步非阻塞。不在AIO上封装是因为Linux底层还是用epoll模型实现AIO但是异步没有优化好。
AIO(异步)
NIO2.0(对NIO进行封装,不需要轮询ready list处理事件,而是响应式编程采用回调函数直接主动处理),NIO的select,accept和read等等都是主线程自己做的,AIO不是,AIO的accept和read等都是采用了回调函数,并且是不同的线程处理
final AsynchronousServerSocketChannel serverChannel =
AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(9000));
serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
try {
System.out.println("2--"+Thread.currentThread().getName());
// 再此接收客户端连接,如果不写这行代码后面的客户端连接连不上服务端
serverChannel.accept(attachment, this);
System.out.println(socketChannel.getRemoteAddress());
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
System.out.println("3--"+Thread.currentThread().getName());
buffer.flip();
System.out.println(new String(buffer.array(), 0, result));
socketChannel.write(ByteBuffer.wrap("HelloClient".getBytes()));
}
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
exc.printStackTrace();
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});
System.out.println("1--"+Thread.currentThread().getName());
Thread.sleep(Integer.MAX_VALUE);
有三个线程异步非阻塞处理。