文章目录
- 1. 概述
- 2. Reactor 单线程模型
- 2.1 ByteBufferUtil
- 2.2 服务端代码
- 2.3 客户端
- 2.4 运行截图
- 3. Reactor多线程模型
- 3.1 服务端代码
- 3.2 运行截图
- 4. 主从 Reactor多线程模型
- 4.1 服务端代码
- 4.2 运行截图
- 参考文献
1. 概述
在 I/O 多路复用的场景下,当有数据处于就绪状态后,需要一个事件分发器(Event Dispather),它负责将读写事件分发给对应的读写事件处理器(Event Handler)。
Reactor 模型主要分为三种
- Reactor 单线程模型
- Reactor 多线程模型
- 主从 Reactor 多线程模型
Doug Lea 教授的课件 : https://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
Java Socket 网络编程实例(阻塞IO、非阻塞IO、多路复用Selector、AIO)
2. Reactor 单线程模型
Reactor 单线程模型,是指所有I/O操作(监听服务端, 接受客户端连接请求;消息的读取、解码、编码、发送)都在同一个NIO线程上面完成
2.1 ByteBufferUtil
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.StandardCharsets;
/**
* ByteBufferUtil类提供了ByteBuffer和String之间转换的便捷方法。
* 这些方法使用UTF-8编码进行转换,确保了数据的正确性和一致性。
*/
public class ByteBufferUtil {
/**
* 从ByteBuffer中读取字符串。
*
* @param byteBuffer 待读取的ByteBuffer,应确保其为读模式。
* @return 从ByteBuffer解码得到的字符串。
* @throws CharacterCodingException 如果解码过程中发生错误。
*/
public static String read(ByteBuffer byteBuffer) throws CharacterCodingException {
// 使用UTF-8解码器将ByteBuffer中的字节解码为CharBuffer。
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(byteBuffer);
// 将CharBuffer转换为字符串并返回。
return charBuffer.toString();
}
/**
* 将字符串写入ByteBuffer。
*
* @param string 待写入的字符串。
* @return 编码后的ByteBuffer。
* @throws CharacterCodingException 如果编码过程中发生错误。
*/
public static ByteBuffer read(String string) throws CharacterCodingException {
// 使用UTF-8编码器将字符串编码为ByteBuffer。
return StandardCharsets.UTF_8.encode(string);
}
/**
* 主函数用于演示ByteBuffer和字符串之间的相互转换。
*
* @param args 命令行参数。
* @throws CharacterCodingException 如果编码或解码过程中发生错误。
*/
public static void main(String[] args) throws CharacterCodingException {
// 将字符串"test"编码为ByteBuffer,然后从ByteBuffer解码回字符串并打印。
System.out.println(ByteBufferUtil.read(ByteBufferUtil.read("test")));
}
}
2.2 服务端代码
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.util.Set;
public class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
/**
* 初始化Reactor,打开选择器和服务器套接字通道,并注册接受操作。
*
* @param port 服务器监听的端口号。
* @throws IOException 如果打开选择器或服务器套接字通道失败。
*/
public Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor());
}
/**
* Reactor的主要运行方法,负责循环监听选择器上的事件,并分派处理。
*/
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
System.out.println("selected:" + selected.size());
for (SelectionKey selectionKey : selected) {
dispatch(selectionKey);
}
selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
/**
* 根据选择键分派相应的处理逻辑。
*
* @param k 选择键。
* @throws IOException 如果发生I/O错误。
*/
void dispatch(SelectionKey k) throws IOException {
Run r = (Run) (k.attachment());
if (r != null)
r.run();
}
/**
* 接受者类,负责接受新的客户端连接。
*/
class Acceptor implements Run { // inner
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
/**
* 处理者类,负责处理客户端的读写操作。
*/
final class Handler implements Run {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(1024);
ByteBuffer output = ByteBuffer.allocate(1024);
/**
* 初始化处理者,注册读操作兴趣。
*
* @param sel 选择器。
* @param c 客户端套接字通道。
* @throws IOException 如果注册操作失败。
*/
Handler(Selector sel, SocketChannel c)
throws IOException {
socket = c;
c.configureBlocking(false);
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
/**
* 检查输入缓冲区是否已完成读取。
*
* @return 如果输入缓冲区还有剩余数据,则返回true;否则返回false。
*/
boolean inputIsComplete() {
return input.hasRemaining();
}
/**
* 检查输出缓冲区是否已完成写入。
*
* @return 如果输出缓冲区没有剩余空间,则返回true;否则返回false。
*/
boolean outputIsComplete() {
return !output.hasRemaining();
}
/**
* 处理输入缓冲区的数据。
*
* @throws CharacterCodingException 如果字符编码转换失败。
*/
void process() throws CharacterCodingException {
// 否则,将缓冲区反转并打印读取的数据
input.flip();
String request = ByteBufferUtil.read(input);
System.out.println(request);
input.clear();
output = ByteBufferUtil.read("你好: " + request);
}
/**
* 执行处理逻辑,包括读取数据、处理数据和准备写操作。
*
* @throws IOException 如果发生I/O错误。
*/
public void run() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
sk.attach(new Sender());
sk.interestOps(SelectionKey.OP_WRITE);
sk.selector().wakeup();
}
}
/**
* 发送者类,负责将处理后的数据写回客户端。
*/
class Sender implements Run {
public void run() throws IOException {
socket.write(output);
if (outputIsComplete()) {
new Handler(selector, socket);
}
}
}
}
/**
* 接口Run定义了所有处理逻辑的运行方法。
*/
public interface Run {
public abstract void run() throws IOException;
}
/**
* 程序入口点,创建并启动Reactor线程。
*
* @param args 命令行参数。
* @throws IOException 如果创建Reactor失败。
*/
public static void main(String[] args) throws IOException {
Reactor reactor = new Reactor(6666);
new Thread(reactor).start();
while (true) ;
}
}
2.3 客户端
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
public class SelectorClient {
public static void main(String[] args) throws IOException, InterruptedException {
// 创建Socket通道并连接到服务器
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 6666));
// 初始化输入和输出ByteBuffer
ByteBuffer inputBuffer = ByteBuffer.allocate(512);
ByteBuffer serverOutput = ByteBuffer.allocate(512);
// 循环接收用户输入并发送给服务器
while (true) {
// 使用Scanner获取用户输入
Scanner in = new Scanner(System.in);
String input = in.nextLine();
System.out.println("user input: " + input);
if (StringUtils.isBlank(input)) {
continue;
}
// 清空输入缓冲区,放入用户输入,然后反转准备写入
inputBuffer.clear();
inputBuffer.put(input.getBytes(StandardCharsets.UTF_8));
inputBuffer.flip();
// 将输入数据写入Socket通道
sc.write(inputBuffer);
System.out.println("send to server " + input);
// 循环读取服务器响应
int times = 1;
while (true) {
// 清空服务器响应缓冲区,准备读取数据
serverOutput.clear();
// 从Socket通道读取数据
sc.read(serverOutput);
// 如果没有读取到数据,继续尝试读取
if (!serverOutput.hasRemaining()) {
TimeUnit.SECONDS.sleep(1);
times++;
System.out.println(times);
if (times > 10) {
break;
}
continue;
}
// 反转缓冲区,读取数据并打印
serverOutput.flip();
System.out.println("server response " + ByteBufferUtil.read(serverOutput));
// 读取完成后退出内层循环
break;
}
}
}
}
2.4 运行截图
3. Reactor多线程模型
Reactor多线程模型 和 Reactor单线程模型最大的区别就是有一组NIO线程来处理I/O操作:
- 有一个NIO线程 Acceptor线程,监听服务端, 接受客户端连接请求
- 网络I/O操作,读写(消息的读取、解码、编码、发送)等由一个NIO线程池负责
- 一个NIO线程可以处理N条链路, 一个链路之对应一个NIO线程, 防止出现并发问题
3.1 服务端代码
服务端端代码如下,客户端同上:
import lombok.SneakyThrows;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 该类实现了使用线程池处理NIO服务器的逻辑。
*/
public class ReactorWithThreadPool implements Runnable {
/**
* 处理器线程池,用于执行具体的处理任务。
*/
static ThreadPoolExecutor HANDLER_POOL = new ThreadPoolExecutor(2, 4,
10, TimeUnit.MINUTES, new ArrayBlockingQueue<>(20), new ThreadPoolExecutor.CallerRunsPolicy());
final Selector selector;
final ServerSocketChannel serverSocket;
/**
* 创建一个NIO服务器,监听指定端口。
*
* @param port 服务器监听的端口号。
* @throws IOException 如果打开选择器或服务器套接字失败。
*/
public ReactorWithThreadPool(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor());
}
/**
* 主循环,负责监听选择器上的事件。
*/
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
System.out.println("selected:" + selected.size());
for (SelectionKey selectionKey : selected) {
if (selectionKey.isReadable()) {
System.out.println("selectionKey read");
}
if (selectionKey.isWritable()) {
System.out.println("selectionKey write");
}
dispatch(selectionKey);
}
selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
/**
* 分派选择键对应的处理程序。
*
* @param k 需要处理的选择键。
* @throws IOException 如果操作通道失败。
*/
void dispatch(SelectionKey k) throws IOException {
Run r = (Run) (k.attachment());
if (r != null)
r.run();
}
/**
* 接受者处理程序,负责接受新的客户端连接。
*/
class Acceptor implements Run { // inner
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
/**
* 处理客户端请求的处理程序。
*/
final class Handler implements Run {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(1024);
ByteBuffer output = ByteBuffer.allocate(1024);
/**
* 创建一个新的处理程序实例。
*
* @param sel 选择器。
* @param c 客户端套接字通道。
* @throws IOException 如果注册选择键或配置套接字失败。
*/
Handler(Selector sel, SocketChannel c)
throws IOException {
socket = c;
c.configureBlocking(false);
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
/**
* 检查输入缓冲区是否已完成读取。
*
* @return 如果输入缓冲区还有剩余,则为true;否则为false。
*/
boolean inputIsComplete() {
return input.hasRemaining();
}
/**
* 检查输出缓冲区是否已完成发送。
*
* @return 如果输出缓冲区没有剩余,则为true;否则为false。
*/
boolean outputIsComplete() {
return !output.hasRemaining();
}
/**
* 处理输入数据。
*
* @throws CharacterCodingException 如果字符编码失败。
*/
void process() throws CharacterCodingException {
// 否则,将缓冲区反转并打印读取的数据
input.flip();
String request = ByteBufferUtil.read(input);
System.out.println(request);
input.clear();
output = ByteBufferUtil.read("你好: " + request);
}
/**
* 读取客户端输入,并根据情况启动处理程序或发送器。
*
* @throws IOException 如果读取通道失败。
*/
public void run() throws IOException {
socket.read(input);
if (inputIsComplete()) {
HANDLER_POOL.execute(new Processor());
}
}
/**
* 发送器处理程序,负责向客户端发送数据。
*/
class Sender implements Run {
public void run() throws IOException {
socket.write(output);
if (outputIsComplete()) {
new Handler(selector, socket);
}
}
}
/**
* 处理请求的处理程序,负责处理输入数据并准备输出。
*/
class Processor implements Runnable {
@Override
@SneakyThrows
public void run() {
process();
sk.attach(new Sender());
sk.interestOps(SelectionKey.OP_WRITE);
sk.selector().wakeup();
}
}
}
/**
* 处理器接口,定义了处理程序应实现的运行方法。
*/
public interface Run {
public abstract void run() throws IOException;
}
/**
* 程序入口点。
*
* @param args 命令行参数。
* @throws IOException 如果启动服务器失败。
*/
public static void main(String[] args) throws IOException {
ReactorWithThreadPool reactor = new ReactorWithThreadPool(6666);
new Thread(reactor).start();
while (true) ;
}
}
3.2 运行截图
4. 主从 Reactor多线程模型
主从 Reactor多线程模型的特点:服务端接受客户端连接,不再是一个单独的NIO线程,而是一个独立的NIO线程池。
Acceptor 接收到客户端TCP连接请求并处理完成后, 将新创建的SocketChannel 注册到 I/O线程池 (sub Reactor)。
Acceptor线程池仅负责客户端的登陆、握手、安全认证, 一旦链路建立成功, 就将链路注册到 I/O线程池 (sub Reactor), I/O线程池 (sub Reactor)负责后续的 I/O操作
4.1 服务端代码
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.SneakyThrows;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MultiReactor implements Runnable {
Selector selector = null;
ServerSocketChannel serverSocket;
static ThreadPoolExecutor REACTOR_THREAD_POOL = new ThreadPoolExecutor(2, 16,
10, TimeUnit.MINUTES, new ArrayBlockingQueue<>(20),
new ThreadFactoryBuilder().setNameFormat("REACTOR_THREAD_POOL-%d").build(), new ThreadPoolExecutor.CallerRunsPolicy());
/**
* 构造函数,初始化多线程反应器。
*
* @param port 服务器监听端口。
* @throws IOException 如果打开selector或服务器SocketChannel时发生错误。
*/
public MultiReactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor(serverSocket));
}
/**
* 构造函数,使用已有的selector。
*
* @param selector 已打开的selector。
*/
public MultiReactor(Selector selector) throws IOException {
this.selector = selector;
}
/**
* 主运行方法,负责监听和分发事件。
*/
@Override
public void run() {
try {
while (!Thread.interrupted()) {
System.out.println(Thread.currentThread() + " select start");
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
System.out.println(Thread.currentThread() + " " + "selected:" + selected.size());
for (SelectionKey selectionKey : selected) {
if (selectionKey.isReadable()) {
System.out.println("selectionKey read");
}
if (selectionKey.isWritable()) {
System.out.println("selectionKey write");
}
dispatch(selectionKey);
}
selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
/**
* 分发已选择的事件到相应的处理程序。
*
* @param k 选择的关键。
* @throws IOException 如果操作通道时发生错误。
*/
void dispatch(SelectionKey k) throws IOException {
Run r = (Run) (k.attachment());
if (r != null)
r.run();
}
/**
* Acceptor类负责接受新的客户端连接,并将它们分配给子反应器处理。
*/
class Acceptor implements Run {
private final ServerSocketChannel listenSocketChannel;
private final List<MultiReactor> subReactors = new ArrayList<>(ACCEPTOR_POOL_NUM);
private static final int ACCEPTOR_POOL_NUM = 4;
private final ThreadPoolExecutor ACCEPTOR_POOL = new ThreadPoolExecutor(ACCEPTOR_POOL_NUM, ACCEPTOR_POOL_NUM,
10, TimeUnit.MINUTES, new ArrayBlockingQueue<>(20),
new ThreadFactoryBuilder().setNameFormat("ACCEPTOR_POOL-%d").build(), new ThreadPoolExecutor.CallerRunsPolicy());
Acceptor(ServerSocketChannel listenSocketChannel) throws IOException {
this.listenSocketChannel = listenSocketChannel;
for (int i = 0; i < ACCEPTOR_POOL_NUM; i++) {
MultiReactor subReactor = new MultiReactor(Selector.open());
subReactors.add(subReactor);
ACCEPTOR_POOL.execute(subReactor);
}
}
/**
* 接受新的客户端连接,并分配给子反应器处理。
*/
@Override
public void run() {
try {
SocketChannel clientSocketChannel = listenSocketChannel.accept();
// 设置为非阻塞
// 任意选择一个从Reactor,让其监听连接的客户端的READ事件
Optional<MultiReactor> anySubReactor = subReactors.stream().findAny();
if (anySubReactor.isPresent() && clientSocketChannel != null) {
MultiReactor subReactor = anySubReactor.get();
System.out.println(Thread.currentThread() + ": "+ subReactor);
new Handler(subReactor.selector, clientSocketChannel);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* Handler类负责处理与客户端的通信,包括读取请求和发送响应。
*/
final class Handler implements Run {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(1024);
ByteBuffer output = ByteBuffer.allocate(1024);
Handler(Selector sel, SocketChannel c)
throws IOException {
socket = c;
c.configureBlocking(false);
sel.wakeup();
sk = socket.register(sel, SelectionKey.OP_READ);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
}
/**
* 检查输入缓冲区是否已完成读取。
*
* @return 如果输入缓冲区还有剩余,则返回true;否则返回false。
*/
boolean inputIsComplete() {
return input.hasRemaining();
}
/**
* 检查输出缓冲区是否已完成发送。
*
* @return 如果输出缓冲区没有剩余,则返回true;否则返回false。
*/
boolean outputIsComplete() {
return !output.hasRemaining();
}
/**
* 处理输入数据,将其解码并准备生成响应。
*
* @throws CharacterCodingException 如果字符编码发生错误。
*/
void process() throws CharacterCodingException {
// 否则,将缓冲区反转并打印读取的数据
input.flip();
String request = ByteBufferUtil.read(input);
System.out.println(Thread.currentThread() + ": " + request);
input.clear();
System.out.println(input.toString());
System.out.println(output.toString());
output = ByteBufferUtil.read("你好: " + request);
}
/**
* 读取客户端请求,并根据需要启动处理过程。
*
* @throws IOException 如果读取通道时发生错误。
*/
@Override
public void run() throws IOException {
socket.read(input);
if (inputIsComplete()) {
REACTOR_THREAD_POOL.execute(new Processor(sk.selector()));
}
}
/**
* Sender类负责发送响应给客户端。
*/
class Sender implements Run {
private Selector selector;
public Sender(Selector selector) {
this.selector = selector;
}
/**
* 发送输出缓冲区中的数据到客户端。
*
* @throws IOException 如果写入通道时发生错误。
*/
public void run() throws IOException {
System.out.println("start write");
socket.write(output);
if (outputIsComplete()) {
new Handler(this.selector, socket);
}
}
}
/**
* Processor类负责处理请求并准备响应。
*/
class Processor implements Runnable {
private Selector selector;
public Processor(Selector selector) {
this.selector = selector;
}
@Override
@SneakyThrows
public void run() {
process();
sk.attach(new Sender(this.selector));
sk.interestOps(SelectionKey.OP_WRITE);
sk.selector().wakeup();
}
}
}
/**
* Run接口定义了处理事件的运行时行为。
*/
public interface Run {
void run() throws IOException;
}
/**
* 程序入口点。
*
* @param args 命令行参数。
* @throws IOException 如果初始化反应器时发生错误。
*/
public static void main(String[] args) throws IOException {
MultiReactor reactor = new MultiReactor(6666);
new Thread(reactor).start();
while (true) ;
}
}
4.2 运行截图
参考文献
- Doug Lea 教授的课件 : https://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
- Netty权威指南(第2版)李林锋 / 著
- https://juejin.cn/post/7210375522512666679?searchId=20240612213218FE474007F2FADD0130AA