文章目录
- IO模型
- 传统阻塞的IO模型--BIO
- Client端案例
- Server端案例
- NIO(Java non-blocking IO)非阻塞IO
- NIO的三大组件 Channel Selector Buffer
- Buffer(缓冲区)
- Channel(通道)
- Channe的分类,与Buffer的关系
- Selector(选择器)
- SelectionKey(选择键)
- 案例NIOClient
- 案例NIOServer
- NIO、BIO、AIO比较
IO模型
就是用什么样的通道或者通信模式和架构进行数据的传输和接收,很大程度上决定了程序通信的性能,包括BIO(阻塞IO)、NIO(同步非阻塞IO)、AIO(异步非阻塞IO)
传统阻塞的IO模型–BIO
同步并阻塞,服务器实现模式为一个客户端创建一个线程,即用户端有连接请求时服务器就需要启动一个线程进行处理,如果这个连接不做任何事情就会造成不必要的线程开销(如图所示)
Client端案例
package com.yly.netty.bio.tcp;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
/**
* @author :yangliyuan
* @date :9:52 2022/10/14
*/
public class TcpClient {
/**
* 结束连接标识
*/
public static final String FINISH_MESSAGE = "bye";
public static void main(String[] args) throws IOException {
Socket socket = new Socket();
try {
System.out.println("启动客户端。。。");
// 设置获取流超时时间
socket.setSoTimeout(3000);
//连接目标服务器
socket.connect(new InetSocketAddress(InetAddress.getLocalHost(), 10001), 3000);
System.out.println("连接服务器成功!请输入要发送的消息,按ENTER发送!");
// 发送消息
sendMessage(socket);
} catch (SocketException | UnknownHostException e) {
e.printStackTrace();
} finally {
socket.close();
}
}
/**
* 发送消息
*
* @param client socket客户端
* @throws IOException io异常
*/
private static void sendMessage(Socket client) throws IOException {
// 构建键盘输入流
InputStream in = System.in;
BufferedReader input = new BufferedReader(new InputStreamReader(in));
// 得到socket输入流,并转换成打印流
OutputStream outputStream = client.getOutputStream();
PrintStream socketPrintStream = new PrintStream(outputStream);
// 获取服务器socket输入流
InputStream socketIn = client.getInputStream();
BufferedReader socketBufferedReader = new BufferedReader(new InputStreamReader(socketIn));
boolean flag = true;
do {
// 键盘读取一行
String str = input.readLine();
// 发送数据到服务器
socketPrintStream.println(str);
// 读取服务器数据并打印
String echo = socketBufferedReader.readLine();
if (FINISH_MESSAGE.equalsIgnoreCase(echo)) {
client.close();
flag = false;
System.out.println("服务器要求断开连接,bye! bye! ");
} else {
System.out.println("回传:" + echo);
}
} while (flag);
// 资源释放
socketPrintStream.close();
socketBufferedReader.close();
}
}
Server端案例
package com.yly.netty.bio.tcp;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.ServerSocket;
import java.net.Socket;
import lombok.extern.slf4j.Slf4j;
/**
* @author :yangliyuan
* @date :9:51 2022/10/14
*/
@Slf4j
public class TcpService {
private static boolean flag = true;
public static void main(String[] args) {
startSocketServer();
}
/**
* 开启SocketServer
*/
private static void startSocketServer() {
try (ServerSocket serverSocket = new ServerSocket(10001)) {
// 等待客户端连接
while (flag) {
Socket accept = serverSocket.accept();
// 每一个客户端连接就会新创建线程处理
ClientHandler clientHandler = new ClientHandler(accept);
clientHandler.start();
}
log.info("服务端已准备就绪!");
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 关闭SocketServer
*/
public static void closeServer() {
flag = false;
}
/**
* 客户端消息处理器
*/
private static class ClientHandler extends Thread {
/**
* 结束标识字符串
*/
private static final String FINISH_MESSAGE = "bye";
private final Socket socket;
private boolean flag;
public ClientHandler(Socket socket) {
this.socket = socket;
this.flag = true;
}
@Override
public void run() {
super.run();
log.info("客户端已连接:{}:{}", socket.getInetAddress(), socket.getPort());
try {
// 获取socket输入流
InputStream socketInputStream = socket.getInputStream();
BufferedReader clientBufferReader = new BufferedReader(new InputStreamReader(socketInputStream));
// 获取socket输出流
OutputStream outputStream = socket.getOutputStream();
PrintStream socketPrintStream = new PrintStream(outputStream);
do {
// 阻塞读取一行
final String echo = clientBufferReader.readLine();
// 客户端要求断开连接
if (FINISH_MESSAGE.equalsIgnoreCase(echo)) {
log.info("客户端请求断开连接,{}", socket.getInetAddress());
this.flag = false;
socket.close();
socketInputStream.close();
outputStream.close();
continue;
}
log.info("客户端消息:{}", echo);
// 服务端回传消息给客户端
socketPrintStream.println(echo.length());
} while (flag);
} catch (IOException e) {
log.error(e.getMessage());
}
}
}
}
NIO(Java non-blocking IO)非阻塞IO
- NIO是面向缓冲区的,基于通道的IO操作。数据总是从通道写到缓冲区或者从缓冲区写入通道
- Java NIO可以让你异步的使用IO
- 同步非阻塞的
NIO的三大组件 Channel Selector Buffer
Buffer(缓冲区)
一个用于特定基本数据类型的容器。由 java.nio 包定义的,所有缓冲区都是 Buffer 抽象类的子类。
-
缓冲区的基本类型
- ByteBuffer(字节缓冲区–常用缓冲区)
- MappedByteBuffer
- DirectByteBuffer
- HeapByteBuffer
- CharBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
- ByteBuffer(字节缓冲区–常用缓冲区)
- 获取缓冲区常用方法allocate(),例如 ByteBuffer.allocate() 获取缓冲区
- 缓冲区常用方法
- put() 存
- get() 取
- clear() 清空缓冲区并返回对缓冲区的引用
- flip() 为将缓冲区的界限设置为当前位置, 并将当前位置重置为 0
- hasRemaining() 判断缓冲区中是否还有元素
- 缓冲区四个核心属性
- capacity: 容量,表示缓冲区中最大存储数据的容量。一旦声明不能更改。
- limit: 界限,表示缓冲区中可以操作数据的大小。(limit 后的数据不能进行读写)
- position: 位置,表示缓冲区中正在操作数据的位置。
- mark: 标记,表示记录当前 position 的位置。可以通过 reset() 恢复到 mark 的位置
Channel(通道)
由 java.nio.channels 包定义的。Channel 表示 IO 源与目标打开的连接。Channel 类似于传统的流,只不过 Channel 本身不能存储数据,数据由Buffer存储,而Channel 只能与 Buffer 进行交互。
Channe的分类,与Buffer的关系
Selector(选择器)
为了避免传统阻塞IO线程一个客户端连接救护开启一个线程而可能导致线程浪费的情况,nio从创造了Selector(选择器),Selector 能够检测多个注册的服务端通道上是否有事件发生,当监测到有时间发生时,便可以获取事件集且针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。(当然也可以使用线程池处理事件,减少线程创建和销毁的开销)
SelectionKey(选择键)
SelectionKey是java.nio.channels包下的一个类,主要用于绑定selector和channel之间的关联(注册关系)
- 四个常量代表不同的事件
SelectionKey.OP_ACCEPT=1<<4 新网络请求,值为16
SelectionKey.OP_CONNECT=1<<3 连接已建立,值为8
SelectionKey.OP_WRITE=1<<2 写操作,值为4
SelectionKey.OP_READ= 1<<0 读操作,值为1
- 对应的常用的判断方法
SelectionKey.isAcceptable(): 是否是连接继续事件
SelectionKey.isConnectable(): 是否是连接就绪事件
SelectionKey.isReadable(): 是否是读就绪事件
SelectionKey.isWritable(): 是否是写就绪事件
案例NIOClient
package com.yly.netty.nio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import lombok.extern.slf4j.Slf4j;
/**
* @author :yangliyuan
* @date :10:54 2022/10/20
*/
@Slf4j
public class NioClient {
/**
* NIO服务器端口
*/
private static final Integer NIO_PORT = 10014;
public static void main(String[] args) throws IOException {
// 获取SocketChannel网络通道
SocketChannel client = SocketChannel.open();
// 设置客户端为非阻塞
client.configureBlocking(false);
// 连接到服务端
if (!client.connect(new InetSocketAddress(InetAddress.getLocalHost(), NIO_PORT))) {
while (!client.finishConnect()) {
// 非阻塞连接,可以做其他事情
log.info("客户端正在请求连接中...");
}
}
// 构建键盘输入流
InputStream inputStream = System.in;
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
while (true) {
// 获取键盘输入的一行
String str = bufferedReader.readLine();
log.info(str);
// 按需指定大小发送数据
client.write(ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8)));
}
}
}
案例NIOServer
package com.yly.netty.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
/**
* @author :yangliyuan
* @date :10:08 2022/10/20
*/
@Slf4j
public class NioServer {
private static boolean flag = true;
public static void main(String[] args) {
try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
// 绑定端口号为10013
serverSocketChannel.socket().bind(new InetSocketAddress(10014));
// 设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 开启Selector
Selector selector = Selector.open();
// serverSocketChannel注册到selector 关心事件为OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// selector 循环监听事件
while (flag) {
if (selector.select(1000) <= 0) {
continue;
}
// 有客户端连接 获取关注事件的集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 判断是哪种事件
// 如果是连接请求
if (selectionKey.isAcceptable()) {
acceptEvent(serverSocketChannel, selector);
}
// 如果是读就绪事件
if (selectionKey.isReadable()) {
readEvent(selectionKey);
}
// 删除当前selectionKey,不删除迭代器会会保留当前selectionKey
iterator.remove();
}
}
} catch (IOException exception) {
exception.printStackTrace();
}
}
/**
* 连接请求事件
*
* @param serverSocketChannel 服务套接字通道
* @param selector 选择去
*/
private static void acceptEvent(ServerSocketChannel serverSocketChannel, Selector selector) throws IOException {
// 生成socketChannel
SocketChannel client = serverSocketChannel.accept();
//设置为非阻塞客户端
client.configureBlocking(false);
// 注册并绑定buffer
client.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
log.info("接收到客户端连接:{}", client.getRemoteAddress());
}
/**
* 读取数据事件
* @param selectionKey 选择健
* @throws IOException io异常
*/
private static void readEvent(SelectionKey selectionKey) throws IOException {
// 通过selectionKey获取socketChannel
SocketChannel channel = (SocketChannel) selectionKey.channel();
// 通过selectionKey获取Buffer
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
buffer.clear();
channel.read(buffer);
String str = new String(buffer.array());
log.info("{}:{}", channel.getRemoteAddress(), str.trim());
}
}