NIO : new Input/Output,,在java1.4中引入的一套新的IO操作API,,,旨在替代传统的IO(即BIO:Blocking IO),,,nio提供了更高效的 文件和网络IO的 操作,,
NIO中分为阻塞模式(Blocking)和非阻塞模式(Non-blocking),,通过configureBlocking(boolean)
方法设置,
-
阻塞模式:
- I/O 操作阻塞线程:
read()
:如果没有数据可读,调用会一直阻塞等待数据
write()
: 如果网络缓冲区已经满了,会一直阻塞,直到缓冲区有位置 ,
accept()
: 会一直等待客户端连接
这些操作都需要一个线程去维持,如果是高并发项目,线程池会打满,,
与传统的BIO(blocking IO)类似,只是 底层实现更高效
- I/O 操作阻塞线程:
-
非阻塞模式 (多路复用)
一般会和Selector
一起使用,Selector
是NIO中的一个关键组件,,可以监听多个通道触发的事件
事件的类型:
- accept : 客户端发起连接请求时触发
- connect : 连接建立触发的事件
- read : 可读事件,读数据的时候触发,,或者在 客户端主动断开连接,或者客户端异常断开连接触发
- write : 写入事件,在需要写出数据并且缓冲区有写入位置的时候触发
Selector去建立和channel的关联,,并且监听你想关注的事件,,,当事件被触发之后,selector.select()
就会往下运行,如果没有事件发生,就会阻塞在那里,,
如果有事件发生,可以通过 selector.selectedKeys()
获取到所有的事件SelectionKey
,遍历并处理这些事件,,
这个selector.selectedKeys()
获取到的事件,,并不会主动移除,,需要在处理完这个事件之后,手动移除,,否则在下一次遍历事件的时候,还会再遍历一次
遇到的问题:
- 客户端向服务端发送了大量的数据,,read()事件,去读数据的ByteBuffer大小是有限制的,,就可能会产生
黏包
(多个数据黏到一起,需要拆解数据)和半包
(一个数据只发了一部分,需要根据另一部分组装数据),,,如果一个数据很大,设置的ByteBuffer读不完,就需要ByteBuffer扩容,,
每一个channel都需要一个自己的buffer,,这样数据才不会乱,,就可以将buffer设置在附件中:
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
System.out.println("有客户端连接了"+sc);
sc.configureBlocking(false);
// 第三个参数就是 附件,,, 一个selectionKey 对应一个 附件,,,将buffer写入附件
ByteBuffer buffer = ByteBuffer.allocate(4); // attachment 附件
SelectionKey selectionKey = sc.register(selector, 0, buffer);
selectionKey.interestOps(SelectionKey.OP_READ);
当这个buffer不够用,需要扩容,扩容完了之后使用attch()
放入新的附件,,attchment()获取附件
public class Server {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
SelectionKey sscKey = ssc.register(selector, 0, null);
sscKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
System.out.println("server start ");
while (true){
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()){
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
System.out.println("有客户端连接了"+sc);
sc.configureBlocking(false);
// 第三个参数就是 附件,,, 一个selectionKey 对应一个 附件,,,将buffer写入附件
ByteBuffer buffer = ByteBuffer.allocate(4); // attachment 附件
SelectionKey selectionKey = sc.register(selector, 0, buffer);
selectionKey.interestOps(SelectionKey.OP_READ);
}else if (key.isReadable()){
try {
//
SocketChannel channel = (SocketChannel) key.channel();
// 拿到附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
// -1 表示客户端断开
int read = channel.read(buffer);
if (read == -1){
key.cancel();
}else{
boolean isExtend = split(buffer);
if (isExtend){
// 需要扩容
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);
buffer.flip();
// 将旧的buffer中数据,,同步到新的buffer中
newBuffer.put(buffer);
// 替换新的附件
key.attach(newBuffer);
}
// // 这个buffer读到最后,还是没有提取出来,,
// if (buffer.position() == buffer.limit()){
// // 需要扩容
// ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);
// buffer.flip();
// // 将旧的buffer中数据,,同步到新的buffer中
// newBuffer.put(buffer);
//
// // 替换新的附件
// key.attach(newBuffer);
// }
}
// String s = Charset.defaultCharset().decode(buffer).toString();
// System.out.println("s = " + s);
} catch (IOException e) {
key.cancel();
throw new RuntimeException(e);
}
}
}
}
}
private static boolean split(ByteBuffer source){
// debugAll(source);
boolean flag = false;
source.flip();
for (int i = 0; i < source.limit(); i++) {
if (source.get(i)== '\n'){
int pointPosition = source.position();
int len = i+1 - pointPosition;
ByteBuffer buffer = ByteBuffer.allocate(len);
for (int j = 0; j < len; j++) {
byte b = source.get();
buffer.put(b);
}
flag = true;
System.out.println("buffer1 = " + Charset.defaultCharset().decode(buffer));
debugAll(buffer);
buffer.flip();
System.out.println("buffer2 = " + Charset.defaultCharset().decode(buffer));
// System.out.println(Charset.defaultCharset().decode(buffer));
}
}
source.compact();
return !flag;
}
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
// sc.configureBlocking(false);
sc.connect(new InetSocketAddress("localhost", 8080));
SocketAddress localAddress = sc.getLocalAddress();
sc.write(Charset.defaultCharset().encode("hello\n123131server\n"));
new Scanner(System.in).next();
System.in.read();
}
}
- 客户端正常关闭会触发read事件,导致服务端无限循环去处理这个read事件
判断 如果read返回-1,表示没读到数据,,客户端已经关闭,,使用cancel()
取消事件
if(key.isReadable()){
ByteBuffer buffer = ByteBuffer.allocate(2);
// 关闭客户端会触发读事件 ,,这个read会进入selectkey
try {
SocketChannel channel = (SocketChannel) key.channel();
// 返回读到的字节数,,,如果返回-1 : 表示正常断开
int read = channel.read(buffer);
if (read == -1){
key.cancel();
}else {
buffer.flip();
// debugAll(buffer);
System.out.println(Charset.defaultCharset().decode(buffer));
}
} catch (IOException e) {
// 异常断开
key.cancel();
throw new RuntimeException(e);
}
}
- 如果服务器发送很大的数据,,网络缓冲区一次性读不下,,就需要注册一个write事件进去,让Selector监测一旦网络缓冲区有位置了就去执行write事件
public class WriteServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
System.out.println("server start");
while(true){
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if(key.isAcceptable()){
// key.channel()
// serverSocketChannel只有一个,,就是创建的那个
SocketChannel sc = ssc.accept();
System.out.println("有客户端连接了"+sc);
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
// 向客户端发送大量数据
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 这个write并不能保证,一次性吧数据都写给客户端 ==> 返回值表示一次写了多少字节
// while(buffer.hasRemaining()){
// // 网络的缓冲区是有限制的,,写不进去了,返回就是0 ===》 这样不符合非阻塞的思想,,,只要内容没发完,就一直在循环这里卡着,,虽然能将大量的数据发送给客户端,但是效率不搞
// // 发送缓冲区是有限制的 ==》 不要一直卡在这里
// int write = sc.write(buffer);
// System.out.println("write = " + write);
// }
if (buffer.hasRemaining()) {
// 是否有剩余内容
// 注册写事件 ===> 必须把之前的interest加上去,,不然会把之前的事件覆盖掉
scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
// scKey.interestOps(scKey.interestOps() | SelectionKey.OP_WRITE);
// 把buffer关联到selectionKey
scKey.attach(buffer);
}
}else if(key.isWritable()){
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
int write = sc.write(buffer);
System.out.println("write = " + write);
// if (write < buffer.)
// 写完了,,清除buffer,不用再关注可写事件
if (!buffer.hasRemaining()){
key.attach(null);
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
}
}
}
}
}
}
public class WriteClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
int count = 0;
// 接收数据
while (true) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
System.out.println("count = " + count);
// 重置指针
buffer.clear();
}
}
}