1 多线程版本的Reactor模式演进
Reactor和Handler挤在一个单线程中会造成非常严重的性能缺陷,可以使用多线程来对基础的Reactor模式进行改造。
多线程Reactor的演进分为两个方面:
1、升级Handler。既要使用多线程,又要尽可能高效率,则可以考虑使用线程池。
2、升级Reactor。可以考虑引入多个Selector(选择器),提升选择大量通道的能力。
总体来说,多线程版本的Reactor模式大致如下:
(1)将负责数据传输处理的IOHandler处理器的执行放入独立的线程池中。这样,业务处理线程与负责新连接监听的反应器线程就能相互隔离,避免服务器的连接监听收到阻塞。
(2)如果服务器为多核CPU,可以将反应器线程拆分为多个子反应器(SubReactor)线程;同时,引入多个选择器,并且为每一个SubReactor引入一个线程,一个线程负责一个选择器的事件轮询。这样充分释放了系统资源的能力,也大大提升了反应器管理大量连接或者监听大量传输通道的能力。
2 多线程版本Reactor的实战案例
在上一篇中:Java 高并发编程——Reactor模式(单线程)的基础上完成多线程反应器的升级。多线程反应器的实战案例设计如下:
1、引入多个选择器。
2、设计一个新的子反应器(SubReactor)类,子反应器负责查询一个选择器。
3、开发多个处理线程,一个处理器负责执行一个子反应器。
为了提升效率,可以让SubReactor的数量和选择器的数量一致,避免多个线程负责一个选择器,导致需要进行线程同步,引起效率低下。
多线程版本反应器MultiThreadEchoServerReactor的逻辑模型如下:
多线程版本反应器MultiThreadEchoServerReactor的代码如下:
public class MultiThreadEchoServerReactor {
ServerSocketChannel serverSocketChannel;
AtomicInteger next = new AtomicInteger(0);
Selector bollSelector = null;
Reactor bossReactor = null;
//selectors集合,引入多个selector选择器
Selector[] workSelectors = new Selector[2];
//引入多个子反应器
Reactor[] workReactors = null;
MultiThreadEchoServerReactor() throws IOException{
//初始化多个selector选择器
bollSelector = Selector.open();//用于监听新连接
workSelectors[0] = Selector.open();//用于监听read,write事件
workSelectors[1] = Selector.open();//用于监听read,write事件
serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress("127.0.0.1",8080);
serverSocketChannel.socket().bind(address);
serverSocketChannel.configureBlocking(false);
//bossSelector 负责监听新连接事件,将serverSocketChannel注册到bossSelector
SelectionKey sk = serverSocketChannel.register(bollSelector,SelectionKey.OP_ACCEPT);
//绑定Handler:新连接监控handler绑定到SelectionKey(选择键)
sk.attach(new AcceptorHandler());
//bossSelector 反应器,处理新连接的bossSelector
bossReactor = new Reactor(bollSelector);
//第一个子反应器,一个子反应器负责一个worker选择器
Reactor workReactor1 = new Reactor(workSelectors[0]);
Reactor workReactor2 = new Reactor(workSelectors[1]);
workReactors = new Reactor[]{workReactor1,workReactor2};
}
public MultiThreadEchoServerReactor( Selector bollSelector,ServerSocketChannel serverSocketChannel,) {
this.serverSocketChannel = serverSocketChannel;
this.bollSelector = bollSelector;
}
public void startServer(){
new Thread(bossReactor).start();
new Thread(workReactors[0]).start();
new Thread(workReactors[1]).start();
}
//反应器
class Reactor implements Runnable{
private Selector selector;
public Reactor(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
try {
while(!Thread.interrupted()){
selector.select(1000);
Set<SelectionKey> selectededKeys = selector.selectedKeys();
if(null == selectededKeys | selectededKeys.size() == 0){
continue;
}
Iterator<SelectionKey> it = selectededKeys.iterator();
while(it.hasNext()){
SelectionKey sk = it.next();
dispatch(sk);
}
selectededKeys.clear();
}
}catch (IOException e){
e.printStackTrace();
}
}
}
void dispatch(SelectionKey selectionKey){
Runnable handler = (Runnable) selectionKey.attachment();
//调用之前attach绑定到选择键的handler处理器对象
if(handler != null){
handler.run();
}
}
//Handler:新连接处理器
class AcceptorHandler implements Runnable{
@Override
public void run() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
Logger.info("接收到一个新的连接");
if(socketChannel != null){
int index = next.get();
Logger.info("选择器的编号:" + index);
Selector selector = workSelectors[index];
new MultiThreadEchoHandler(selector,socketChannel);
}
}catch (IOException e){
e.printStackTrace();
}
if(next.incrementAndGet() == workSelectors.length){
next.set(0);
}
}
}
public static void main(String[] args) throws IOException {
MultiThreadEchoServerReactor server =
new MultiThreadEchoServerReactor();
server.startServer();
}
}
class MultiThreadEchoHandler implements Runnable {
final SocketChannel channel;
final SelectionKey sk;
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
//引入线程池
static ExecutorService pool = Executors.newFixedThreadPool(4);
MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
channel.configureBlocking(false);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
//仅仅取得选择键,后设置感兴趣的IO事件
sk = channel.register(selector, 0);
//将本Handler作为sk选择键的附件,方便事件dispatch
sk.attach(this);
//向sk选择键注册Read就绪事件
sk.interestOps(SelectionKey.OP_READ);
//唤醒 查询线程,使得OP_READ生效
selector.wakeup();
Logger.info("新的连接 注册完成");
}
public void run() {
//异步任务,在独立的线程池中执行
//提交数据传输任务到线程池
//使得IO处理不在IO事件轮询线程中执行,在独立的线程池中执行
pool.execute(new AsyncTask());
}
//异步任务,不在Reactor线程中执行
//数据传输与业务处理任务,不在IO事件轮询线程中执行,在独立的线程池中执行
public synchronized void asyncRun() {
try {
if (state == SENDING) {
//写入通道
channel.write(byteBuffer);
//写完后,准备开始从通道读,byteBuffer切换成写模式
byteBuffer.clear();
//写完后,注册read就绪事件
sk.interestOps(SelectionKey.OP_READ);
//写完后,进入接收的状态
state = RECIEVING;
} else if (state == RECIEVING) {
//从通道读
int length = 0;
while ((length = channel.read(byteBuffer)) > 0) {
Logger.info(new String(byteBuffer.array(), 0, length));
}
//读完后,准备开始写入通道,byteBuffer切换成读模式
byteBuffer.flip();
//读完后,注册write就绪事件
sk.interestOps(SelectionKey.OP_WRITE);
//读完后,进入发送的状态
state = SENDING;
}
//处理结束了, 这里不能关闭select key,需要重复使用
//sk.cancel();
} catch (IOException ex) {
ex.printStackTrace();
}
}
//异步任务的内部类
class AsyncTask implements Runnable {
public void run() {
MultiThreadEchoHandler.this.asyncRun();
}
}
}
3 Reactor模式的优缺点
在总结Reactor模式的优缺点之前,先看看Reactor模式和其他模式的对比。
(1)Reactor模式与生产者消费者模式对比
二者的相似之处:在一定程度上,Reactor模式优点类似生产者消费者模式。在生产消费者模式中,一个或多个生产者将事件加入一个队列中,一个或多个消费者主动从这个队列中拉去事件来处理。
二者的不同之处:Reactor模式是基于查询的,没有专门的队列去缓冲存储IO事件,查到IO事件之后,反应器会根据不同的IO选择器将其分发给对应的Handler来处理。
(2)Reactor模式与观察者模式对比
二者的相似之处:在Reactor模式中,当查询到IO时间后,服务处理程序使用单路/多路分发策略,同步分发这些IO事件。观察者模式也被称为发布/订阅模式,它定义了一种依赖关系,让多个观察者同时监听同一个主题(topic)。这个主题对象在状态发生变化时会通知所有观察者,它们能够执行相应的处理。
二者的不同之处:在Reactor模式中,Handler实例和IO事件的订阅关系基本上是一个事件绑定到一个Handler,每一个IO事件被查询后,反应器会将事件分发给所绑定的Handler,也就是一个事件只能被一个Handler处理;在观察者模式中,同一个时刻、同一个主题可以被订阅过的多个观察者处理。
最后,总结一下Reactor模式的优缺点。作为高性能的IO模式,Reactor模式的优点如下:
1、响应快,虽然同一反应器线程本身是同步的,但是不会被单个连接的IO操作所阻塞。
2、编程相对简单,最大限度避免了复杂的多线程同步,也避免了多线程各个进程之间切换的开销。
3、可扩展,可以方便地通过反应器线程的个数来充分利用CPU资源。
Reactor模式的缺点如下:
1、Reactor模式依赖于操作系统底层的IO多路复用系统调用的支持,如Linux中的epoll系统调用。如果操作系统的底层不支持IO多路复用,Reactor模式不会那么高效。
2、在同一个Handler业务线程中,如果出现了一个长时间的数据读写,就会影响着反应器中其他通道的IO处理。例如,在大文件传输时,IO操作就会影响其他客户端的响应时间。对于这种操作,还需要进一步对Reactor模式进行改进。