一、BIO的工作原理
传统Io(BIO)的本质就是面向字节流来进行数据传输的
①:当两个进程之间进行相互通信,我们需要建立一个用于传输数据的管道(输入流、输出流),原来我们传输数据面对的直接就是管道里面一个个字节数据的流动(我们弄了一个 byte 数组,来回进行数据传递),所以说原来的 IO 它面对的就是管道里面的一个数据流动,所以我们说原来的 IO 是面向流的。
②:我们说传统的 IO 还有一个特点就是,它是单向的。解释一下就是:如果说我们想把目标地点的数据读取到程序中来,我们需要建立一个管道,这个管道我们称为输入流。相应的,如果如果我们程序中有数据想要写到目标地点去,我们也得再建立一个管道,这个管道我们称为输出流。所以我们说传统的 IO 流是单向的。
二、传统BIO的缺点
BIO属于同步阻塞行IO,在服务器的实现模型为,每一个连接都要对应一个线程。当客户端有连接请求的时候,服务器端需要启动一个新的线程与之对应处理,这个模型有很多缺陷。**当客户端不做出进一步IO请求的时候,服务器端的线程就只能挂着,**不能去处理其他请求。这样会对造成不必要的线程开销。
三、阻塞与同步
同步和异步都是由基于应用程序和操作系统处理IO事件所采用的方式所决定的。
阻塞和非阻塞式指线程在得到调用结果之前是否被挂起,主要针对线程。
四、NIO简介(同步非阻塞)
- Java NIO全称java non-blocking IO, 是指JDK提供的新API。从JDK1.4开始,Java提供了一系列改进的输入/输出的新特性,被统称为NIO(即New IO),是同步非阻塞的。
- NIO是一种面向缓冲区的、基于通道的IO操作,NIO有三大核心部分: Channel(通道), Buffer(缓冲区),Selector(选择器)
- java NIO的运行模式是: 客户端发送的链接请求都会被注册到Selector(选择器)上,多路复用器轮询到有I/O请求时才会启动一个线程去服务。
五、NIO三大核心原理
NIO有三大核心部分: Channel(通道), Buffer(缓冲区),Selector(选择器)
Buffer(缓冲区)
缓冲区本质上就是一块内存,数据的读写都是通过Buffer类实现的。缓冲区buffer主要是和通道数据交互,即从通道中读入数据到缓冲区,和从缓冲区中把数据写入到通道中,通过这样完成对数据的传输。
Channel(通道)
java NIO的类似于流,但是又有些不同:既可以从通道中读取数据,又可以写数据到通道。但流的(input和output)读写通常是单向的。通道可以非阻塞读取和写入通道,通道可以支持读取或写入缓冲区,也支持异步读写。
Selector选择器
Selector是一个java NIO组件,可以检测一个或多个NIO通道,并确定已经准备好进行读取或者写入。这样,一个单独的线程就可以管理多个Channel,从而管理多个网络连接,提高效率。
- 每个channel都会对应一个Buffer
- 一个线程对应Selector,一个Selector对应多个Channel
- 程序切换到那个channel是由事件决定
- Selector会根据不同的事件,在各个通道上切换
- Buffer就是一个内存块,底层就是一个数组,数据的读取和写入都是通过Buffer来实现的
六、NIO三板斧
七、NIO实现一个群聊系统
逻辑简述
服务器:
1)创建服务器NIO通道,绑定端口并启动服务器
2)开启非阻塞模式
3)创建选择器、并把通道注册到选择器上,关心的事件为新连接
4)循环监听选择器的事件,
5)监听到新连接事件:
5.1) 建立连接、创建客户端通道
5.2)客户端通道设置非阻塞
5.3)客户端注册到选择器上,关心的事件为读
6)监听到读 事件
6.1)获取到发送数据的客户端通道
6.2)把通道数据写入到一个缓冲区中
6.3)打印数据
6.4)发送给其他注册在选择器上的客户端,排除自己
客户器:
1)创建客户端通道,连接服务器 ip和端口
2)创建选择器,注册客户端通道到选择器上,关心的事件为读
3)开启一个线程 循环监听选择器事件
4)监听到读事件后
4.1)从通道中把数据读到缓冲区中
4.2)打印数据
5)主线程循环用scanner 来监听控制台输入
5.1)有输入后 发送给服务器
代码实现
服务器:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
/**
*/
public class GroupChatServer {
private int port = 8888;
private ServerSocketChannel serverSocketChannel;
private Selector selector;
public GroupChatServer() throws IOException {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(port));
//创建选择器
selector = Selector.open();
//通道注册到选择器上,关心的事件为 OP_ACCEPT:新连接
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("server is ok");
}
public void listener() throws IOException {
for (; ; ) {
if (selector.select() == 0) {
continue;
}
//监听到时间
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {//新连接事件
newConnection();
}
if (selectionKey.isReadable()) {//客户端消息事件
clientMsg(selectionKey);
}
iterator.remove();
}
}
}
/**
* 客户端消息处理
*/
private void clientMsg(SelectionKey selectionKey) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment();
try {
//通道数据读取到 byteBuffer缓冲区
socketChannel.read(byteBuffer);
//创建一个数组用于接受 缓冲区的本次写入的数据。
byte[] bytes = new byte[byteBuffer.limit()];
//转换模式 写->读
byteBuffer.flip();
//获取数据到 bytes 中 从位置0开始到limit结束
byteBuffer.get(bytes, 0, byteBuffer.limit());
String msg = socketChannel.getRemoteAddress() + "说:" + new String(bytes, "utf-8");
//倒带这个缓冲区。位置设置为零,标记为-1.这样下次写入数据会从0开始写。但是如果下次的数据比这次少。那么使用 byteBuffer.array方法返回的byte数组数据会包含上一次的部分数据
//例如 上次写入了 11111 倒带后 下次写入了 22 读取出来 却是 22111
byteBuffer.rewind();
System.out.println(msg);
//发送给其他客户端
sendOuterClient(msg, socketChannel);
} catch (Exception e) {
System.out.println(socketChannel.getRemoteAddress() + ":下线了");
socketChannel.close();
}
}
/**
* 发送给其他客户端
*
* @param msg 要发送的消息
* @param socketChannel 要排除的客户端
* @throws IOException
*/
private void sendOuterClient(String msg, SocketChannel socketChannel) throws IOException {
//获取selector上注册的全部通道集合
Set<SelectionKey> keys = selector.keys();
for (SelectionKey key : keys) {
SelectableChannel channel = key.channel();
//判断通道是客户端通道(因为服务器的通道也注册在该选择器上),并且排除发送人的通道
if (channel instanceof SocketChannel && !channel.equals(socketChannel)) {
try {
((SocketChannel) channel).write(ByteBuffer.wrap(msg.getBytes()));
} catch (Exception e) {
channel.close();
System.out.println(((SocketChannel) channel).getRemoteAddress() + ":已下线");
}
}
}
}
/**
* 新连接处理方法
* @throws IOException
*/
private void newConnection() throws IOException {
//连接获取SocketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
//设置非阻塞
socketChannel.configureBlocking(false);
//注册到选择器上,关心的事件是读,并附带一个ByteBuffer对象
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
System.out.println(socketChannel.getRemoteAddress() + " 上线了");
}
public static void main(String[] args) throws IOException {
GroupChatServer groupChatServer = new GroupChatServer();
//启动监听
groupChatServer.listener();
}
}
客户端:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
/**
*/
public class GroupChatClient {
private Selector selector;
private SocketChannel socketChannel;
public GroupChatClient(String host, int port) throws IOException {
socketChannel = SocketChannel.open(new InetSocketAddress(host, port));
socketChannel.configureBlocking(false);
selector = Selector.open();
//注册事件,关心读事件
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("我是:" + socketChannel.getLocalAddress());
}
/**
* 读消息
*/
private void read() {
try {
if(selector.select() == 0){
//没有事件,return
return;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
if(selectionKey.isReadable()){//判断是 读 事件
SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//读取数据到 byteBuffer 缓冲区
socketChannel.read(byteBuffer);
//打印数据
System.out.println(new String(byteBuffer.array()));
}
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 发送数据
* @param msg 消息
* @throws IOException
*/
private void send(String msg) throws IOException {
socketChannel.write(ByteBuffer.wrap(new String(msg.getBytes(),"utf-8").getBytes()));
}
public static void main(String[] args) throws IOException {
//创建客户端 指定 ip端口
GroupChatClient groupChatClient = new GroupChatClient("127.0.0.1",8888);
//启动一个线程来读取数据
new Thread(()->{
while (true){
groupChatClient.read();
}
}).start();
//Scanner 发送数据
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()){
String s = scanner.nextLine();
//发送数据
groupChatClient.send(s);
}
}
}