NIO学习
一、前言
先来看一下NIO的工作流程图:
- NIO三大核心组件,
channel(通道)
、Buffer(缓冲区)
、selector(选择器)
。 - NIO利用的是多路复用模型,一个线程处理多个IO的读写操作,轮询的查看是否有就绪时间来进行后续的操作。
channel
并不直接拥有数据,他只是一个通道,通道内真实操作数据的是buffer
缓存,所以通道是一个双向的,既可以读也可以写。
二、Buffer缓冲区
缓冲区的作用就是作为运输数据的载体,在通道中进行运输,是在内存中进行的,本质其实就是数组。
JAVA NIO包内声明了常见的几种数据类型的实现。
2.1、核心属性
- position:是读取或者写入下一个元素的索引值,该值随着读取写入而改变,但是不会超过limit。
- limit:读取或者写入的限制值,比如读到该值,就不能继续读了。
- capacity:生命缓冲区数组的大小,该值不可改变。
position <= limit <= capacity
2.2、基本使用
编写一个输出方法:
public static void printf(Buffer buffer) {
System.out.println("limit:" + buffer.limit());
System.out.println("capacity:" + buffer.capacity());
System.out.println("position:" + buffer.position());
}
使用完整示例
public static void main(String[] args) {
System.out.println("《=====================================初始化数据=====================================》");
String s = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
// 初始化缓冲区
CharBuffer buffer = CharBuffer.allocate(1024);
buffer.put(s.toCharArray());
printf(buffer);
System.out.println("《=====================================flip=====================================》");
// 转换为读取模式
buffer.flip();
printf(buffer);
while (buffer.hasRemaining()) {
char c = buffer.get();
System.out.print(c);
}
System.out.println();
// 反转,可以重新读取
System.out.println("《=====================================rewind=====================================》");
buffer.rewind();
printf(buffer);
// 判断limit的值是否大于position,大于说明还没读取完
int i = 0;
while (buffer.hasRemaining()) {
if (i == 10) {
// 打上标记
buffer.mark();
}
char c = buffer.get();
System.out.print(c);
i++;
}
System.out.println();
System.out.println("《=====================================reset输出=====================================》");
buffer.reset();
System.out.println(buffer.get());
System.out.println("《=====================================compact=====================================》");
// 转换为写入模式
buffer.compact();
String s1 = "1234";
buffer.put(s1.toCharArray());
printf(buffer);
// 转换为读取模式
buffer.flip();
for (int j = 0; j < buffer.limit(); j++) {
char c = buffer.get();
System.out.print(c);
}
System.out.println();
System.out.println("《=====================================clear一下=====================================》");
// 转换为读取模式
buffer.clear();
String s2 = "5678";
buffer.put(s2.toCharArray());
printf(buffer);
// 转换为读取模式
buffer.flip();
for (int j = 0; j < buffer.limit(); j++) {
char c = buffer.get();
System.out.print(c);
}
System.out.println();
}
public static void printf(Buffer buffer) {
System.out.println("limit:" + buffer.limit());
System.out.println("capacity:" + buffer.capacity());
System.out.println("position:" + buffer.position());
}
上面代码的输出顺序以及解释:
-
初始化数据并插入
-
limit:1024 capacity:1024 position:26
-
allocate()
方法进行初始化buffer数组大小。 -
初始值
limit
与capacity
相等,position
随着插入数据后移。
-
-
flip操作
-
limit:26 capacity:1024 position:0 ABCDEFGHIJKLMNOPQRSTUVWXYZ
-
转换为读取模式,重置
limit
为position
,position
为0。 -
hasRemaining()
方法比较的是position
是否小于limit
,表示是否读取结束。 -
随着
get()
方法读取,position
后移,直到与limit
相等,读取完毕。
-
-
rewind操作
-
limit:26 capacity:1024 position:0 ABCDEFGHIJKLMNOPQRSTUVWXYZ
-
重现读取,把
position
设置为0,mark
标志位设置为-1。
-
-
reset操作
-
K
-
再上面rewind方法后的读取中,再第10个索引上,用
mark
方法打上了标志位。 -
reset
方法就是将标志位设置到position
,从标志位再次开始读取。 -
通常
mark
方法与reset
方法配合使用。
-
-
compact操作
-
limit:1024 capacity:1024 position:19 LMNOPQRSTUVWXYZ1234
-
转换为写操作,继续往缓冲区中写了
1234
字符串。 -
该方法并不会将
position
设置为0,而是将未读取得数据复制到数组起始处,然后接着写入到缓冲区。 -
在上面
reset
方法中,读取到了K
,所以这儿读取从L
开始,接着读取到了本次写入的1234
。
-
-
clear操作
-
limit:1024 capacity:1024 position:4 5678
-
转换为写操作,与上述compact方法区别是会清空缓冲区(并没有清除数据,重新写入得时候会覆盖原来得数据),重设
position
为0,从0开始写入5678
。 -
所以这儿flip转换为读取之后,仅仅读取到了
5678
。
-
总的来说:
- 使用创建子类实例对象的
allocate()
方法,创建一个Buffer
类的实例对象。 - 调用
put()
方法,将数据写入到缓冲区中。 - 写入完成后,在开始读取数据前,调用
Buffer.flip()
方法,将缓冲区转换为读模式。 - 调用
get()
方法,可以从缓冲区中读取数据。 - 读取完成后,调用
Buffer.clear()
方法或Buffer.compact()
方法,将缓冲区转换为写入模式,可以继续写入。
三、channel管道
channel是NIO的一个核心组件,表示一个打开的连接,是连接到支持IO设备的通道,配合buffer来进行数据的传输。
比较重要的通道由以下四个:
FileChannel
: 文件通道,用于文件的读写。ServerSocketChannel
: TCP连接的数据读写,常用作客户端。SocketChannel
: TCP连接的监听程序,常用作服务端。DatagramChannel
: 用于UDP协议的数据读写。
3.1、FileChannel用法
用FileChannel
实现文件的复制。
/**
* 文件通道
*
* @throws IOException
*/
public static void fileChannel() throws IOException {
// 文件输入流
FileInputStream fis = new FileInputStream(fileInputSrcFile);
// 文件输出流
FileOutputStream fos = new FileOutputStream(fileOutSrcFile);
// 开启通道
FileChannel inputChannel = fis.getChannel();
FileChannel outputChannel = fos.getChannel();
// 初始化缓冲区的大小为1m
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
// 输入通道读取文件到缓冲区中
while (inputChannel.read(buffer) > 0) {
// 转换为读取模式
buffer.flip();
outputChannel.write(buffer);
// 清空缓存,重新写入
buffer.clear();
}
inputChannel.close();
outputChannel.close();
fis.close();
fos.close();
}
3.2、SocketChannel和ServerSocketChannel用法
一个是SocketChannel
负责连接的数据传输,另一个是ServerSocketChannel
负责连接的监听。
ServerSocketChannel
仅仅应用于服务器端,而SocketChannel
则同时处于服务器端和客户端,所以,对应于一个连接,两端都有一个负责传输的SocketChannel
传输通道。
两种都有阻塞和非阻塞的模式,通过方法:
channel.configureBlocking(false); // 非阻塞模式
channel.configureBlocking(true); // 阻塞模式
这儿暂时按下不表,后面介绍选择器的时候一起讲解。
3.3、DatagramChannel用法
使用open
方法进行打开创建DatagramChannel
,但是还未进行连接,可使用send()
和receive()
方法收发数据,不过每次都要连接检查。若要使用read
和write
收发数据,则需要用connect
建立连接,连接状态可通过isConnected
方法检查。
send()
和receive()
使用的缓冲区若太小,则会丢弃超出缓冲区大小之外的数据。使用时需要注意。
服务端代码
public class UdpServer {
public static void main(String[] args) {
try {
DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(false);
// 如果在两台物理计算机中进行实验,则要把localhost改成服务端的IP地址
InetSocketAddress localhost = new InetSocketAddress("localhost", 8888);
InetSocketAddress remoteHost = new InetSocketAddress("localhost", 7777);
channel.bind(localhost);
channel.connect(remoteHost);
channel.isConnected();
Selector selector = Selector.open();
// SelectionKey.OP_WRITE |
channel.register(selector, SelectionKey.OP_READ);
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
channel = (DatagramChannel) key.channel();
if (key.isReadable()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// channel.receive(byteBuffer);
channel.read(byteBuffer);
System.out.println(new String(byteBuffer.array(), 0, byteBuffer.position()));
}
it.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
客户端代码:
public class UdpClient {
public static void main(String[] args) {
try {
DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(false);
InetSocketAddress localhost = new InetSocketAddress("localhost", 7777);
InetSocketAddress remoteHost = new InetSocketAddress("localhost", 8888);
channel.bind(localhost);
Selector selector = Selector.open();
channel.register(selector, SelectionKey.OP_WRITE);
channel.connect(remoteHost);
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
DatagramChannel client = (DatagramChannel) key.channel();
System.out.println(client == channel);
if (key.isWritable()) {
ByteBuffer byteBuffer = ByteBuffer.wrap("我来自客户端!".getBytes());
// client.send(byteBuffer, remoteHost);
client.write(byteBuffer);
client.close();
}
}
System.out.println("client end!");
} catch (IOException e) {
e.printStackTrace();
}
}
}
先启动server再启动client,再服务端会接收到客户端传输的数据并打印到控制台。
我来自客户端!
四、selector选择器
作用: 选择器的使命是完成IO的多路复用,其主要工作是通道的注册、监听、事件查询。一个通道代表一条连接通路,通过选择器可以同时监控多个通道的IO(输入输出)状况。选择器和通道的关系,是监控和被监控的关系。
4.1、选择器事件
IO事件有以下四种,常量定义在SelectionKey
类中:
OP_READ:
读事件OP_WRITE:
写事件OP_CONNECT:
连接事件OP_ACCEPT:
接收事件
什么是IO事件,这里的IO事件不是对通道的IO操作,而是通道处于某个IO操作的就绪状态,表示通道具备执行某个IO操作的条件。
- 某个
SocketChannel
传输通道,如果完成了和对端的三次握手过程,则会发生“连接就绪”(OP_CONNECT)的事件。 - 某个
ServerSocketChannel
服务器连接监听通道,在监听到一个新连接的到来时,则会发生“接收就绪”(OP_ACCEPT)的事件。 - 一个
SocketChannel
通道有数据可读,则会发生“读就绪”(OP_READ)事件。 - 一个等待写入数据的
SocketChannel
通道,会发生写就绪(OP_WRITE)事件。
4.2、使用示例
上面在通道用法的时候,已经介绍了选择器的使用。
这儿利用选择器和通道来实现文件的下载
// 服务端代码
public class DownloadFileServer {
public static String fileOutPngSrcFile = "C:\\Users\\Administrator\\Desktop\\origin.pdf";
public static void main(String[] args) throws IOException {
ServerSocketChannel channel = ServerSocketChannel.open();
Selector selector = Selector.open();
// 设置为非阻塞
channel.configureBlocking(false);
// 绑定ip端口
channel.bind(new InetSocketAddress(9999));
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 会发生阻塞
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
// 有新的连接
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 接受连接
SocketChannel socketChannel = server.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_WRITE);
System.out.println("客户端已连接....");
} else if (key.isWritable()) {
// 写的请求
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer[] bufferArray = new ByteBuffer[2];
ByteBuffer buffer = ByteBuffer.allocate(128);
ByteBuffer buffer1 = ByteBuffer.allocate(1024);
bufferArray[0] = buffer;
bufferArray[1] = buffer1;
FileInputStream fis = new FileInputStream(fileOutPngSrcFile);
FileChannel fileChannel = fis.getChannel();
System.out.println("正在读取文件....");
while (fileChannel.read(bufferArray) > 0) {
for (ByteBuffer byteBuffer : bufferArray) {
byteBuffer.flip();
}
client.write(bufferArray);
for (ByteBuffer byteBuffer : bufferArray) {
byteBuffer.clear();
}
}
// 服务端等待客户端读取
System.out.println("结束写操作");
fis.close();
fileChannel.close();
client.close();
}
}
}
}
}
客户端代码:
public class DownloadFileClient {
public static String fileOutPngSrcFile = "C:\\Users\\Administrator\\Desktop\\download.pdf";
public static void main(String[] args) throws IOException {
SocketChannel channel = SocketChannel.open();
Selector selector = Selector.open();
channel.configureBlocking(false);
// 连接到服务端
channel.connect(new InetSocketAddress(9999));
// 注册连接事件
channel.register(selector, SelectionKey.OP_CONNECT);
boolean finished = true;
while (finished) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 事件已处理完毕,避免重复处理,移除事件。
iterator.remove();
if (key.isConnectable()) {
// 连接事件
SocketChannel client = (SocketChannel) key.channel();
client.configureBlocking(false);
if (client.isConnectionPending()) {
while (!client.finishConnect()) {
}
}
// 连接成功后,注册接收服务器消息的事件
int ops = SelectionKey.OP_READ;
client.register(selector, ops);// 订阅读取事件
System.out.println("连接成功....");
} else if (key.isReadable()) {
// 读取事件,进行复制
SocketChannel client = (SocketChannel) key.channel();
// 这儿用了数组,利用了分散与聚集的写法,当然也可以使用单个缓冲区。
ByteBuffer[] bufferArray = new ByteBuffer[2];
ByteBuffer buffer = ByteBuffer.allocate(128);
ByteBuffer buffer1 = ByteBuffer.allocate(1024);
bufferArray[0] = buffer;
bufferArray[1] = buffer1;
FileOutputStream fos = new FileOutputStream(fileOutPngSrcFile);
FileChannel fileChannel = fos.getChannel();
System.out.println("正在下载文件....");
while (client.read(bufferArray) > 0) {
for (ByteBuffer byteBuffer : bufferArray) {
byteBuffer.flip();
}
fileChannel.write(bufferArray);
for (ByteBuffer byteBuffer : bufferArray) {
byteBuffer.clear();
}
}
fos.close();
fileChannel.close();
finished = false;
}
}
}
}
}
五、简易聊天框
5.1、先看效果
启动服务端:
启动服务中......
聊天室服务已启动!
启动客户端一:
请输入自定义用户名:
猪八戒
您的昵称通过验证 猪八戒
再启动一个客户端二:
请输入自定义用户名:
孙悟空
您的昵称通过验证 孙悟空
这是客户端一会提示,有新的用户上线。
欢迎'孙悟空'上线,当前在线人数2人。用户列表:[孙悟空, 猪八戒]
服务端也会提示客户端建立了连接
+++++客户端:/127.0.0.1:10197,建立连接+++++
+++++客户端:/127.0.0.1:10201,建立连接+++++
这时就可以在控制台聊天了。
客户端一客户端二就可以互相接收到彼此的消息了。
5.2、实现功能点
- 服务端作为服务器,用来监控客户端的情况,如注册,在线人数,谁连接了,谁退出了等。
- 客户端实现聊天,输入用户名的功能。
5.3、代码
服务端:
public class ChatDemoServer {
private final String hostname = "127.0.0.1";
private final Integer port = 7879;
private final String seperator = "[|]"; // 消息分隔符
private final Charset charset = StandardCharsets.UTF_8; // 字符集
private final ByteBuffer buffer = ByteBuffer.allocate(1024); // 缓存
private final Map<String, SocketChannel> onlineUsers = new HashMap<>();// 将用户对应的channel对应起来
private ServerSocketChannel ssc;// 将用户对应的channel对应起来
private Selector selector;// 将用户对应的channel对应起来
public static void main(String[] args) throws IOException {
ChatDemoServer chatDemoServer = new ChatDemoServer();
System.out.println("启动服务中......");
chatDemoServer.startServer();
}
public void startServer() throws IOException {
// 监控
ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(hostname, port));
// 设置为非阻塞模式
ssc.configureBlocking(false);
selector = Selector.open();
// 监听链接
ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("聊天室服务已启动!");
while (true) {
// 若无可处理的则阻塞
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> ite = keys.iterator();
while (ite.hasNext()) {
SelectionKey key = ite.next();
ite.remove();
if (key.isAcceptable()) {
// 如果检测到已连接
SocketChannel client = ssc.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
System.out.println("+++++客户端:" + client.getRemoteAddress() + ",建立连接+++++");
// 链接上之后直接给客户端发消息,提示输入注册用户名
client.write(charset.encode("请输入自定义用户名:"));
} else if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();// 通过key取得客户端channel
buffer.clear();
StringBuilder msg = new StringBuilder();
try {
while (client.read(buffer) > 0) {
buffer.flip();
msg.append(charset.decode(buffer));
buffer.clear();
}
} catch (IOException e) {
// 如果client.read(buffer)抛出异常,说明此客户端主动断开连接,需做下面处理
client.close(); // 关闭channel
key.cancel(); // 将channel对应的key置为不可用
onlineUsers.values().remove(client); // 将问题连接从map中删除
System.out.println("-----用户'" + key.attachment() + "'退出连接,当前用户列表:" + onlineUsers.keySet().toString() + "-----");
continue; // 跳出循环
}
if (msg.length() > 0) {
processMsg(msg.toString(), client, key); // 处理消息体
}
}
}
}
}
// 消息处理
public void processMsg(String msg, SocketChannel client, SelectionKey key) throws IOException {
String[] msArray = msg.split(seperator);
if (msArray.length == 1) {
// 注册中
String username = msArray[0];
if (onlineUsers.containsKey(username)) {
client.write(charset.encode("当前用户已存在,请重新输入用户名:"));
} else {
onlineUsers.put(username, client);
key.attach(username); // 给通道定义一个表示符
String welCome = "\t欢迎'" + username + "'上线,当前在线人数" + getOnLineNum() + "人。用户列表:" + onlineUsers.keySet();
client.write(charset.encode("您的昵称通过验证 " + username));
broadCast(welCome, client); // 给客户端广播上线
}
} else {
String message = msArray[0];
String username = msArray[1];
broadCast("【" + username + "】:" + message, client);
}
}
// 广播上线消息
private void broadCast(String msg, SocketChannel currentChannel) throws IOException {
Channel channel;
for (SelectionKey k : selector.keys()) {
channel = k.channel();
if (channel instanceof SocketChannel && currentChannel != channel) {
SocketChannel client = (SocketChannel) channel;
client.write(charset.encode(msg));
}
}
}
// map中的有效数量已被很好的控制,可以从map中获取,也可以用下面的方法取
private int getOnLineNum() {
int count = 0;
Channel channel;
for (SelectionKey k : selector.keys()) {
channel = k.channel();
if (channel instanceof SocketChannel) { // 排除ServerSocketChannel
count++;
}
}
return count;
}
}
- 服务端启动,注册监控连接事件。
- 客户端启动之后服务端执行
accept
事件,给客户端发消息提示注册用户名,并注册了read
事件。 - 客户端输入完用户名之后,执行
read
事件,通过processMsg
方法处理消息,如果已经注册成功,则用|
分割用户名以及消息体,length
为 2,否则就是注册,length
为 1,校验用户名是否重复,然后给客户端通知通过与否。通过则给所有的客户端广播新的用户上线了。
客户端代码:
public class ChatDemoClient {
private final String hostname = "127.0.0.1";
private final Integer port = 7879;
private final String seperator = "|"; // 消息分隔符
private final Charset charset = StandardCharsets.UTF_8; // 字符集
private final ByteBuffer buffer = ByteBuffer.allocate(1024); // 缓存
private SocketChannel client;// 将用户对应的channel对应起来
private boolean flag = true; // 服务端断开,客户端的读事件不会一直发生(与服务端不一样)
private String username = "";
public static void main(String[] args) throws IOException {
ChatDemoClient chatDemoClient = new ChatDemoClient();
// 启动客户端
chatDemoClient.startClient();
}
public void startClient() throws IOException {
// 将用户对应的channel对应起来
Selector selector = Selector.open();
client = SocketChannel.open();
client.configureBlocking(false);
client.connect(new InetSocketAddress(hostname, port));
// 注册连接事件
client.register(selector, SelectionKey.OP_CONNECT);
// 编写输入文字
writeMsgThread();
while (flag) {
try {
selector.select();
Iterator<SelectionKey> ite = selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = ite.next();
ite.remove();
SocketChannel channel = (SocketChannel) key.channel();
if (key.isConnectable()) {
// 连接中
if (channel.isConnectionPending()) {
while (!channel.finishConnect()) {
System.out.println("客户端连接中,请等待......");
}
}
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
buffer.clear();
StringBuilder msg = new StringBuilder();
try {
while (channel.read(buffer) > 0) {
buffer.flip();
msg.append(charset.decode(buffer));
buffer.clear();
}
} catch (IOException exception) {
System.out.println(exception.getMessage() + ",客户端'" + key.attachment().toString() + "'读线程退出!!");
stopMainThread();
}
if (msg.toString().contains("您的昵称通过验证")) {
String[] returnStr = msg.toString().split(" ");
username = returnStr[1];
key.attach(username);
}
// 打印消息
System.out.println(msg);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
System.out.println("房间已关闭,即将退出房间......");
}
public void writeMsgThread() {
Scanner scanner = new Scanner(System.in);
Thread thread = new Thread(() -> {
String input = "";
while (flag) {
input = scanner.nextLine();
if ("".equals(input)) {
System.out.println("不允许输入空串!");
continue;
} else if ("".equals(username)) { // 姓名如果没有初始化
// 啥也不干,之后发给服务端验证姓名
} else { // 如果姓名已经初始化,那么说明现在的字符串就是想说的话
input = input + seperator + username;
}
try {
// 写给其他人的信息
client.write(charset.encode(input));
} catch (Exception e) {
System.out.println(e.getMessage() + "客户端主线程退出连接!!");
}
}
});
thread.setDaemon(true);
thread.start();
}
private void stopMainThread() {
flag = false;
}
}
- 客户端启动之后通过
connect
方法与服务端建立连接,注册连接事件。 - 新开启了一个用来控制台输入的线程,用来聊天以及与服务端注册交互,并将线程设置为守护线程。
- 连接成功之后注册读取事件,读取服务端以及其他客户端的消息。