一,阻塞IO线程池模型(BIO)
这是传统的网络编程方案所采用的线程模型。
即有一个主循环,socket.accept阻塞等待,当建立连接后,创建新的线程/从线程池中取一个,把该socket连接交由新线程全权处理。
这种方案优点即实现简单,缺点则是方案的伸缩性受到线程数的限制。
// 循环监听
while (true) {
// 阻塞监听客户端请求
client = server.accept();
System.out.println(client.getRemoteSocketAddress() + "客户端连接成功!");
// 将该客户端请求通过线程池放入HandlMsg线程中进行处理
executorService.execute(new HandleMsg(client));
}
二,Reactor单线程模型
有了NIO后,可以采用IO多路复用机制了。
这是一个单Reactor单线程模型,时序图见下文,该方案只有一个线程,所有Channel的连接均注册在了该Reactor上,由一个线程全权负责所有的任务。
这种方案实现简单,且不受线程数的限制,但受限于使用场景,仅适合于IO密集的应用,不太适合CPU密集的应用,且适合于CPU资源紧张的应用上。
三,Reactor线程池模型
Reactor负责全部IO任务(包括每个Channel的连接和读写),线程池负责业务逻辑的处理。
虽然该方案可以充分利用CPU资源,但是这个方案比单线程版本多了进出Thread Pool的两次上下文切换。
四,主从Reactor模型(Netty的线程模型)
-
MainReactor负责连接任务,SubReactor负责IO读写、业务计算。
-
MainReactor和每个SubReactor都是单独的线程,可以调整SubReactor的数量适应CPU资源紧张的应用。
-
该方案有一个不太明显的缺点,即Session没有分优先级,所有Session平等对待均分到所有的线程中,这样可能会导致优先级低耗资源的Session堵塞高优先级的Session。( TODO 看下Netty的优化)
五,主从Reactor线程池模型
和主从Reactor模型相比,
只是把业务计算放到线程池里了,IO读写还是在SubReactor线程里。
该模型可以更为灵活的适应大多应用场景,通过:调整SubReactor数量、调整Thread Pool参数等。
注意:
-
如果将IO读写放到线程池里,可能会出现问题:SubReactor选中读就绪事件立马交给线程池,但线程还没来得及read,Channel由于仍然读就绪被select出来重复执行。
-
上图这样把Channel的读写放在SubReactor,那么此SubReactor上不同Channel的读写会阻塞,但可能效率很高也问题不大。
主从Reactor线程池模型代码示例(调试过了,注意细节见注释)
客户端
public class ReactorClient {
public static void main(String[] args) throws IOException, InterruptedException {
for (int i = 0; i < 4; i++) {
new Thread(() -> {
try {
send();
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
static void send() throws IOException {
// 阻塞模式读写
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9090));
ByteBuffer writeBuff = ByteBuffer.allocate(20);
/**
* 分配太小时,客户端表现:接收数据不完整,但正常退出;服务端表现:业务读写正常,但业务结束后会收到2次读就绪事件,一次读返-1,关闭channel,一次读就会报java.io.IOException: Connection reset by peer
* TODO 研究下这个原理和如何分配大小
*/
ByteBuffer readBuff = ByteBuffer.allocate(2000);
writeBuff.put(("i am client " + Thread.currentThread().getName()).getBytes());
writeBuff.flip();
new Thread(new Runnable() {
@Override
@SneakyThrows
public void run() {
socketChannel.write(writeBuff);
System.out.println(Thread.currentThread().getName() + " 已发送数据,等待返回");
readBuff.clear();
// 阻塞等服务端消息
socketChannel.read(readBuff);
readBuff.flip();
System.out.println(Thread.currentThread().getName() + " 接受服务端消息:" + new String(readBuff.array()));
// 正常来讲应放入finally
socketChannel.close();
}
}).start();
}
}
服务端
/**
* 主从Reactor多线程模型
*/
public class MainSubReactorMultiThread {
private static final int SUB_COUNT = 4;
public static void main(String[] args) {
MainSubReactorMultiThread.MainReactor mainReactor = new MainSubReactorMultiThread.MainReactor(9090);
mainReactor.run();
}
/**
* 选择就绪的连接事件
*/
public static class MainReactor implements Runnable {
ServerSocketChannel serverSocketChannel;
Selector selector;
public MainReactor(int port) {
try {
serverSocketChannel = ServerSocketChannel.open();
selector = Selector.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
// 注册了连接事件
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 并且在selectionKey对象附加了一个Acceptor对象,这是用来处理连接请求的类
selectionKey.attach(new MainSubReactorMultiThread.Acceptor(serverSocketChannel));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while (true) {
try {
System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "mainSelector, 开始监听");
selector.select();
System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "mainSelector, 监听到连接件");
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 这里因为是通过attach附加了事件响应的Runnable,所以不用区分事件类型
dispatcher(selectionKey);
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void dispatcher(SelectionKey selectionKey) {
Runnable runnable = (Runnable) selectionKey.attachment();
// 同线程执行
runnable.run();
}
}
/**
* 选择就绪的读写事件
*/
public static class SubReactor implements Runnable {
Selector subSelector;
int index;
public SubReactor(Selector subSelector, int index) {
this.subSelector = subSelector;
this.index = index;
}
@Override
public void run() {
while (true) {
try {
System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "subSelector:" + index + ", 开始监听");
int selectNum = subSelector.select();
if (selectNum != 0) {
System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "subSelector:" + index + ", 监听到就绪事件:" + JSON.toJSONString(subSelector.selectedKeys()));
} else {
System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "subSelector:" + index + ", 未监听到事件,继续轮训");
continue;
}
Set<SelectionKey> selectionKeys = subSelector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 这里因为是通过attach附加了事件响应的Runnable,所以不用区分事件类型
dispatcher(selectionKey);
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
@SneakyThrows
private void dispatcher(SelectionKey selectionKey) {
while (true) {
Runnable runnable = (Runnable) selectionKey.attachment();
if (runnable != null) {
// 同线程执行
runnable.run();
return;
}
System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "subSelector:" + index + ", runnable对象未添加完成,等待10ms");
Thread.sleep(10);
}
/**
* 可能在Acceptor里刚注册channel到selector就被reactor选中执行了,这时注册channel的地方还没执行attach方法,runnable会报NPE,所以要判空
*/
// Runnable runnable = (Runnable) selectionKey.attachment();
// runnable.run();
}
}
/**
* 处理连接
*/
public static class Acceptor implements Runnable {
private static Selector[] subSelector = new Selector[SUB_COUNT];
private ServerSocketChannel serverSocketChannel;
/**
* 单线程不会冲突
*/
private int index = -1;
@SneakyThrows
public Acceptor(ServerSocketChannel serverSocketChannel) {
for (int i = 0; i < SUB_COUNT; i++) {
subSelector[i] = Selector.open();
SubReactor subReactor = new SubReactor(subSelector[i], i);
new Thread(subReactor).start();
}
this.serverSocketChannel = serverSocketChannel;
}
@Override
public void run() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
int ind = getNextIndex();
/**
* 本来以为没必要的,但如果不wakeup,会在下一步register阻塞!底层在等待synchronized同步锁
* TODO 研究下原理
*/
subSelector[ind].wakeup();
SelectionKey selectionKey = socketChannel.register(subSelector[ind], SelectionKey.OP_READ);
selectionKey.attach(new MainSubReactorMultiThread.ThreadPollWorkHandler(socketChannel));
System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "客户端已连接:" + socketChannel.getRemoteAddress());
} catch (IOException e) {
e.printStackTrace();
}
}
private int getNextIndex() {
if (index++ == SUB_COUNT - 1) {
index = 0;
}
return index;
}
}
/**
* 处理读写
*/
public static class ThreadPollWorkHandler implements Runnable {
private static ExecutorService executorService = Executors.newCachedThreadPool();
private SocketChannel socketChannel;
public ThreadPollWorkHandler(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void run() {
try {
System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "socketChannel:" + socketChannel.hashCode() + ", " + "开始处理socket读");
/**
* 读数据
*/
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int readLength = socketChannel.read(byteBuffer);
if (readLength == -1) {
System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "socketChannel:" + socketChannel.hashCode() + ", " + "客户端已关闭,关闭此通道");
socketChannel.close();
return;
}
String message = new String(byteBuffer.array(), StandardCharsets.UTF_8);
System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "socketChannel:" + socketChannel.hashCode() + ", 客户端:" + socketChannel.getRemoteAddress() + ", socket读完成: " + message);
/**
* 线程池处理业务计算
*/
TaskHandler taskHandler = new TaskHandler(socketChannel, message);
Future<String> taskResult = executorService.submit(taskHandler);
/**
* 写数据
*/
ByteBuffer writeBuffer = ByteBuffer.wrap((socketChannel.getRemoteAddress() + ":" + taskResult.get()).getBytes(StandardCharsets.UTF_8));
socketChannel.write(writeBuffer);
System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "socketChannel:" + socketChannel.hashCode() + ", " + "已返回客户端数据,请求处理最终完成");
} catch (Exception e) {
e.printStackTrace();
}
}
}
static class TaskHandler implements Callable<String> {
private SocketChannel socketChannel;
private String parameter;
public TaskHandler(SocketChannel socketChannel, String parameter) {
this.socketChannel = socketChannel;
this.parameter = parameter;
}
@Override
public String call() throws Exception {
System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】线程池Thread:" + Thread.currentThread().getName() + "socketChannel:" + socketChannel.hashCode() + ", 客户端:" + socketChannel.getRemoteAddress() + ", 开始处理业务计算 参数: " + parameter);
Thread.sleep(1000);
String result = String.format("response(%s) for (%s)", RandomStringUtils.randomAlphanumeric(30), parameter).trim();
System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】线程池Thread:" + Thread.currentThread().getName() + "socketChannel:" + socketChannel.hashCode() + ", 客户端:" + socketChannel.getRemoteAddress() + ", 业务计算完成 返回: " + result);
return result;
}
}
}