1 线程模型概述
Dubbo默认的底层网络通信使用的是Netty。
服务提供方NettyServer使用两级线程池,其EventLoopGroup(boss)主要用来接收客户端的连接请求,并把完成TCP三次握手的连接分发给EventLoopGroup(worker)来处理。boss和worker线程组称为I/O线程。之后连接可以直接被I/O线程处理,或者派发给业务线程池进行处理。
根据请求的消息是被I/O线程处理还是被业务线程池处理,Dubbo提供了以下几种线程模型(或称线程调度模型)。其中 all 模型是默认的线程模型。
1.1 all(AllDispatcher)
所有消息都被派发到业务线程池,这些消息包括请求、响应、连接事件、断开事件、心跳事件等。
1.2 direct(DirectDispatcher)
所有消息都不被派发到业务线程池,全部在I/O线程上执行。
1.3 message(MessageOnlyDispatcher)
只有请求、响应消息被派发到业务线程池,其他消息直接在I/O线程上执行。
1.4 execution(ExecutionDispatcher)
只有请求消息被派发到业务线程池,其他消息直接在I/O线程上执行。
1.5 connection(ConnectionOrderedDispatcher)
除连接、断开事件以外,其他消息都被派发到业务线程池。在I/O线程上将连接、断开事件放入队列,有序的逐个执行。
2 源码分析
在Dubbo中,线程模型的扩展接口为Dispatcher,其提供的扩展实现都实现了该接口。
Dispatcher只有一个dispatch方法,用于分发消息给业务线程池,源码如下所示。
/**
* ChannelHandlerWrapper (SPI, Singleton, ThreadSafe)
*/
@SPI(value = AllDispatcher.NAME, scope = ExtensionScope.FRAMEWORK)
public interface Dispatcher {
/**
* dispatch the message to threadpool.
*
* @param handler
* @param url
* @return channel handler
*/
@Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"})
// The last two parameters are reserved for compatibility with the old configuration
ChannelHandler dispatch(ChannelHandler handler, URL url);
}
2.1 all(AllDispatcher)
所有消息都被派发到业务线程池,这些消息包括请求、响应、连接事件、断开事件、心跳事件等。
AllDispatcher的源码如下所示,其核心实现为AllChannelHandler类。
/**
* default thread pool configure
*/
public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
}
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
// 连接完成事件,交给业务线程池处理
@Override
public void connected(Channel channel) throws RemotingException {
// 业务线程池
ExecutorService executor = getSharedExecutorService();
try {
// 执行连接事件
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}
// 连接断开事件,交给业务线程池处理
@Override
public void disconnected(Channel channel) throws RemotingException {
ExecutorService executor = getSharedExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
}
}
// 请求响应事件,交给业务线程池处理
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
// 异常处理事件,交给业务线程池处理
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService executor = getSharedExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
}
}
}
其中AllChannelHandler类实现了ChannelHandler,ChannelHandler的主要方法如下所示。
@SPI(scope = ExtensionScope.FRAMEWORK)
public interface ChannelHandler {
/**
* on channel connected.
*
* @param channel channel.
*/
void connected(Channel channel) throws RemotingException;
/**
* on channel disconnected.
*
* @param channel channel.
*/
void disconnected(Channel channel) throws RemotingException;
/**
* on message sent.
*
* @param channel channel.
* @param message message.
*/
void sent(Channel channel, Object message) throws RemotingException;
/**
* on message received.
*
* @param channel channel.
* @param message message.
*/
void received(Channel channel, Object message) throws RemotingException;
/**
* on exception caught.
*
* @param channel channel.
* @param exception exception.
*/
void caught(Channel channel, Throwable exception) throws RemotingException;
}
2.2 direct(DirectDispatcher)
所有消息都不被派发到业务线程池,全部在I/O线程上执行。DirectDispatcher的源码如下所示。
public class DirectDispatcher implements Dispatcher {
public static final String NAME = "direct";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new DirectChannelHandler(handler, url);
}
}
public class DirectChannelHandler extends WrappedChannelHandler {
public DirectChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
if (executor instanceof ThreadlessExecutor) {
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
} else {
handler.received(channel, message);
}
}
}
2.3 message(MessageOnlyDispatcher)
只有请求、响应消息被派发到业务线程池,其他消息直接在I/O线程上执行。MessageOnlyDispatcher的源码如下所示。
/**
* Only message receive uses the thread pool.
*/
public class MessageOnlyDispatcher implements Dispatcher {
public static final String NAME = "message";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new MessageOnlyChannelHandler(handler, url);
}
}
public class MessageOnlyChannelHandler extends WrappedChannelHandler {
public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
}
2.4 execution(ExecutionDispatcher)
只有请求消息被派发到业务线程池,其他消息直接在I/O线程上执行。ExecutionDispatcher的源码如下所示。
public class ExecutionDispatcher implements Dispatcher {
public static final String NAME = "execution";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new ExecutionChannelHandler(handler, url);
}
}
public class ExecutionChannelHandler extends WrappedChannelHandler {
public ExecutionChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
if (message instanceof Request) {
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
// FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,
// therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent
// this scenario from happening, but a better solution should be considered later.
if (t instanceof RejectedExecutionException) {
sendFeedback(channel, (Request) message, t);
}
throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
}
} else if (executor instanceof ThreadlessExecutor) {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} else {
handler.received(channel, message);
}
}
}
2.5 connection(ConnectionOrderedDispatcher)
除连接、断开事件以外,其他消息都被派发到业务线程池。在I/O线程上将连接、断开事件放入队列,有序的逐个执行。ConnectionOrderedDispatcher的源码如下所示。
/**
* connect disconnect ensure the order
*/
public class ConnectionOrderedDispatcher implements Dispatcher {
public static final String NAME = "connection";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new ConnectionOrderedChannelHandler(handler, url);
}
}
public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {
// I/O线程的线程池
protected final ThreadPoolExecutor connectionExecutor;
private final int queueWarningLimit;
public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
connectionExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
new NamedThreadFactory(threadName, true),
new AbortPolicyWithReport(threadName, url)
); // FIXME There's no place to release connectionExecutor!
queueWarningLimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
}
// 连接建立事件由I/O线程处理
@Override
public void connected(Channel channel) throws RemotingException {
try {
checkQueueLength();
connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}
// 连接断开事件由I/O线程处理
@Override
public void disconnected(Channel channel) throws RemotingException {
try {
checkQueueLength();
connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);
}
}
// 请求、响应事件由业务线程处理
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if (message instanceof Request && t instanceof RejectedExecutionException) {
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
// 异常处理事件由业务线程处理
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService executor = getSharedExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
}
}
private void checkQueueLength() {
if (connectionExecutor.getQueue().size() > queueWarningLimit) {
logger.warn(TRANSPORT_CONNECTION_LIMIT_EXCEED, "", "", "connectionordered channel handler queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queueWarningLimit);
}
}
}