阻塞
理论
- 阻塞模式下,相关方法(accept、read、write)都会导致线程暂停。
- ServerSocketChannel.accept 会在没有连接建立时让线程暂停。
- SocketChannel.read 会在没有数据可读时让线程暂停。
- 阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置。
- 单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持。
- 多线程下,有新的问题,体现在以下方面:
- 32 位 jvm 一个线程 320k,64 位 jvm 一个线程 1024k,如果连接数过多,必然导致 OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低。
- 可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间 inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接。
Code
Server
package netty.netProgram.block;
import lombok.extern.slf4j.Slf4j;
import netty.utils.ByteBufferUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
/**
* 阻塞
*/
@Slf4j
public class Server {
public static void main(String[] args) {
try {
// 创建buffer用于接收从客户端获取的数据
ByteBuffer buffer = ByteBuffer.allocate(10);
// 创建服务端channel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 绑定连接的端口
serverSocketChannel.bind(new InetSocketAddress(8080));
// 连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true){
// 阻塞等待连接事件
log.debug("connecting...");
SocketChannel sc = serverSocketChannel.accept();
log.debug("connected sc : {}", sc);
channels.add(sc);
for (SocketChannel channel : channels) {
// 接收客户端发送的数据
log.debug("receive data from client...");
int read = channel.read(buffer);
if(read == -1){
continue;
}
log.debug("receive data len is: {}", read);
// 切换为读模式
buffer.flip();
ByteBufferUtil.debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Client
package netty.netProgram.block;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
/**
* 阻塞Client代码编写
*/
@Slf4j
public class Client {
public static void main(String[] args) {
try {
// 创建client socketChannel
SocketChannel sc = SocketChannel.open();
// 连接到localhost 8080端口
sc.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8080));
log.debug("client connected");
while (true){
// 死循环,防止代码退出
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
非阻塞
理论
- 非阻塞模式下,相关方法都会不会让线程暂停。
- 在 ServerSocketChannel.accept 在没有连接建立时,会返回 null,继续运行。
- SocketChannel.read 在没有数据可读时,会返回 0,但线程不必阻塞,可以去执行其它 SocketChannel 的 read 或是去执行 ServerSocketChannel.accept。
- 写数据时,线程只是等待数据写入 Channel 即可,无需等 Channel 通过网络把数据发送出去。
- 但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了 cpu
- 数据复制过程中,线程实际还是阻塞的(AIO 改进的地方)
Code
Server
package netty.netProgram.nonblock;
import lombok.extern.slf4j.Slf4j;
import netty.utils.ByteBufferUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
/**
* nonBlock Server code
*/
@Slf4j
public class Server {
public static void main(String[] args) {
try {
ByteBuffer buffer = ByteBuffer.allocate(16);
// 创建服务端ServerSocketChannel
ServerSocketChannel ssc = ServerSocketChannel.open();
// 设置为非阻塞
ssc.configureBlocking(false);
// 绑定端口
ssc.bind(new InetSocketAddress(8080));
List<SocketChannel> channels = new ArrayList<>();
while (true){
// 建立连接
// log.debug("server connecting...");
SocketChannel sc = ssc.accept();
if(sc != null){
// 设置sc为非阻塞
sc.configureBlocking(false);
log.debug("server connected, sc : {}", sc);
channels.add(sc);
}
for (SocketChannel channel : channels) {
int read = channel.read(buffer);
if(read <= 0){
continue;
}
buffer.flip();
ByteBufferUtil.debugAll(buffer);
buffer.clear();
log.debug("receive data from client end, sc : {}", channel);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Client
client端代码同阻塞代码。
多路复用
单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用
- 多路复用仅针对网络 IO、普通文件 IO 没法利用多路复用。普通文件IO不支持非阻塞模式,无法实现多路复用。
- 如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 能够保证:
- 有可连接事件时才去连接。
- 有可读事件才去读取。
- 有可写事件才去写入。
- 限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件
Selector使用步骤
- 创建Selector:Selector selector = Selector.open();
- 绑定Channel事件,也称为注册事件,绑定的事件Selector才会关心。
- channel 必须工作在非阻塞模式。
- FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用。
- 绑定的事件类型可以有:connect、read、write、connect。
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, 绑定事件);
- 监听channel事件:
-
// 获取发生事件的channel数量 int count = selector.select(); if(count <= 0){ continue; } log.debug("current connected nums : {}", count); // 获取所有发生连接的事件 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iter = selectionKeys.iterator();
-
Select处理Accept事件
Code
Server:
package netty.netProgram.selector;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* selector处理accept事件Server
*/
@Slf4j
public class ServerAccept {
public static void main(String[] args) {
try {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
// 创建selector
Selector selector = Selector.open();
// 绑定ssc到selector并指定当发生accept事件时触发
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true){
// 获取发生事件的channel数量
int count = selector.select();
if(count <= 0){
continue;
}
log.debug("current connected nums : {}", count);
// 获取所有发生连接的事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectionKeys.iterator();
while (iter.hasNext()){
SelectionKey key = iter.next();
// 判断事件类型
if(key.isAcceptable()){
ServerSocketChannel sSC = (ServerSocketChannel) key.channel();
SocketChannel sc = sSC.accept();
log.debug("sc is :{}", sc);
}
// 及时移除处理好的事件
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Client:
package netty.netProgram.selector;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* selector处理accept事件Server
*/
@Slf4j
public class Client {
public static void main(String[] args) {
try {
SocketChannel socketChannel = SocketChannel.open();
log.debug("client is connecting...");
socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8080));
log.debug("client connected");
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.put("Hello Server i am client...".getBytes());
buffer.flip();
socketChannel.write(buffer);
while (true){}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Attention
事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发
Select处理Read事件
Code
Server:
package netty.netProgram.selector;
import lombok.extern.slf4j.Slf4j;
import netty.utils.ByteBufferUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* selector Accept And Read Server
*/
@Slf4j
public class ServerAcceptRead {
public static void main(String[] args) {
try {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
// 注册ssc到selector上,并且注明事件类型为ACCEPT事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true){
int count = selector.select();
if(count <= 0){
continue;
}
log.debug("current connect num : {}", count);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectionKeys.iterator();
while (iter.hasNext()){
SelectionKey key = iter.next();
// 判断事件类型是accept还是read
if(key.isAcceptable()){
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel sc = serverSocketChannel.accept();
log.debug("ssc is: {}, sc is:{}", serverSocketChannel, sc);
sc.configureBlocking(false);
// 将sc注册到selector上,并设置其类型为Read
sc.register(selector, SelectionKey.OP_READ);
}else if(key.isReadable()){
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
int read = sc.read(buffer);
if(read == -1){
// 此时client已经关闭连接
// key.cancel()取消注册在selector上的channel,并从keys集合中删除key后续不在监听事件
key.cancel();
sc.close();
}else{
buffer.flip();
ByteBufferUtil.debugAll(buffer);
}
}
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Client:
package netty.netProgram.selector;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* selector处理accept事件Server
*/
@Slf4j
public class Client {
public static void main(String[] args) {
try {
SocketChannel socketChannel = SocketChannel.open();
log.debug("client is connecting...");
socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8080));
log.debug("client connected");
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.put("Hello Server i am client...".getBytes());
buffer.flip();
socketChannel.write(buffer);
while (true){}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Attention
Select处理Connect事件
处理Write事件时需要考虑到buffer数据量过大时,不能一次性将其全部写入到channel中,需要注册write事件,并且需要关联buffer,以便下次继续写入。
- 非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)
- 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略
- 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上
- selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册
- 如果不取消,会每次可写均会触发 write 事件
Code
Server:
package netty.netProgram.selector;
import lombok.extern.slf4j.Slf4j;
import netty.utils.ByteBufferUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
@Slf4j
public class ServerAcceptReadWrite {
public static void main(String[] args) {
try {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
// 注册ssc到selector上,并且注明事件类型为ACCEPT事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true){
int count = selector.select();
if(count <= 0){
continue;
}
log.debug("current connect num : {}", count);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectionKeys.iterator();
while (iter.hasNext()){
SelectionKey key = iter.next();
// 判断事件类型是accept还是read
if(key.isAcceptable()){
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel sc = serverSocketChannel.accept();
log.debug("ssc is: {}, sc is:{}", serverSocketChannel, sc);
sc.configureBlocking(false);
// 将sc注册到selector上,并设置其类型为Read
SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);
// 向client写入数据
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 10000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
int write = sc.write(buffer);
log.debug("actual write bytes:{}", write);
if(buffer.hasRemaining()){
// 如果此时没有将buffer中的所有数据写入到channel,scKey在原有Read事件的基础上添加Write事件
scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
// 同时需要将buffer作为附件关联到scKey上
scKey.attach(buffer);
}
}else if(key.isReadable()){
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
int read = sc.read(buffer);
if(read == -1){
// 此时client已经关闭连接
// key.cancel()取消注册在selector上的channel,并从keys集合中删除key后续不在监听事件
key.cancel();
sc.close();
}else{
buffer.flip();
ByteBufferUtil.debugAll(buffer);
}
}else if(key.isWritable()){
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel channel = (SocketChannel) key.channel();
int write = channel.write(buffer);
log.debug("hit write, current write bytes:{}", write);
if(!buffer.hasRemaining()){
// 只要向 channel 发送数据时,socket 缓冲可写,这个事件会频繁触发,
// 因此应当只在 socket 缓冲区写不下时再关注可写事件,数据写完之后再取消关注
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
key.attach(null);
}
}
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Client:
package netty.netProgram.selector;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* selector处理accept事件Server
*/
@Slf4j
public class Client {
public static void main(String[] args) {
try {
SocketChannel socketChannel = SocketChannel.open();
log.debug("client is connecting...");
socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8080));
log.debug("client connected");
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.put("Hello Server i am client...".getBytes());
buffer.flip();
socketChannel.write(buffer);
while (true){}
} catch (IOException e) {
e.printStackTrace();
}
}
}