一、Dubbo线程模型
首先明确一个基本概念:IO 线程和业务线程的区别
IO 线程:配置在netty 连接点的用于处理网络数据的线程,主要处理编解码等直接与网络数据
打交道的事件。
业务线程:用于处理具体业务逻辑的线程,可以理解为自己在provider 上写的代码所执行的线
程环境。
Dubbo 默认采用的是长链接的方式,即默认情况下一个consumer 和一个provider 之间只会建立
一条链接,这种情况下IO 线程的工作就是编码和解码数据,监听具体的数据请求,直接通过Channel发布数据等等;
有两个参数⽤来配置服务消费者和服务提供者直接的socket连接个数:
1. shareconnections:表示可共享的socket连接个数
2. connections:表示不共享的socket连接个数
服务A的shareconnections或者connections为2时,服务A的消费者会向服务A的提供者建⽴两个socket连接:
业务线程就是处理IO 线程处理之后的数据,业务线程并不知道任何跟网络相关的内容,只是纯
粹的处理业务逻辑,在业务处理逻辑的时候往往存在复杂的逻辑,所以业务线程池的配置往往都要
比IO 线程池的配置大很多。
IO 线程部分,Netty 服务提供方NettyServer 又使用了两级线程池,master 主要用来接受客户
端的链接请求,并把接受的请求分发给worker 来处理。整个过程如下图:
IO 线程与业务线程的交互如下:
IO 线程的派发策略:
默认是all:所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。即worker 线程接收到事件后,将该事件提交到业务线程池中,自己再去处理其他IO 事件。
direct:worker 线程接收到事件后,由worker 执行到底。
message:只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在IO 线程上执行
execution:只有请求消息派发到线程池,不含响应(客户端线程池),响应和其它连接断开事件,心跳等消息,直接在IO线程上执行
connection:在IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。
业务线程池设置:
fixed:固定大小线程池,启动时建立线程,不关闭,一直持有。(缺省)
coresize:200
maxsize:200
队列:SynchronousQueue
回绝策略:AbortPolicyWithReport - 打印线程信息jstack,之后抛出异常
cached:缓存线程池,空闲一分钟自动删除,需要时重建。
limited:可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。
配置示例:
<dubbo:protocol name="dubbo"dispatcher="all"threadpool="fixed"threads="100"/>
在整个消费者调用过程中,各个线程池都比较重要,其中比较有特色的就是AllChannelHandler,它完成了IO线程转向用户线程的任务转移,比较关键。
二、派发策略(All)源码解析
消费者启动的时候会执行DubboProtocol#initClient建议与服务端端的socket连接
private ExchangeClient initClient(URL url) {
ExchangeClient client;
try {
// Replace InstanceAddressURL with ServiceConfigURL.
url = new ServiceConfigURL(DubboCodec.NAME, url.getUsername(), url.getPassword(), url.getHost(), url.getPort(), url.getPath(), url.getParameters());
// connection should be lazy
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
Transporters#connect最终返回的是NettyClient,点进这个对象的构造方法
public Client connect(URL url, ChannelHandler handler) throws RemotingException {
return new NettyClient(url, handler);
}
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
super(url, wrapChannelHandler(url, handler));
}
NettyClient#wrapChannelHandler中再次利用SPI机制获取线程派发策略,dubbo默认的策略为allDispatcher策略。
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(url.getOrDefaultFrameworkModel().getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
除了默认的AllDispatcher,还有DirectDispatcher,MessageOnlyDispatcher等。
当消费端接收到远程服务端的响应之后,按照Netty的处理流程,消息会在channel绑定的handler上传递,netty底层会调用handler#received。可以看到connected,disconnected,received,caught等方法都是在新的线程池ExecutorService中执行,executor.execute方法会将任务ChannelEventRunnable提交到ExecutorService中。
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
@Override
public void connected(Channel channel) throws RemotingException {
ExecutorService executor = getSharedExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}
@Override
public void disconnected(Channel channel) throws RemotingException {
ExecutorService executor = getSharedExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
}
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService executor = getSharedExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
}
}
}
由于AllChannelHandler方法是在前面handler的基础上包装了一层,所以ChannelEventRunnable中会将消息传递给AllHandlel的下一个handler,从这里也清晰的看到了AllChannelHandler完成了IO线程转向用户线程的任务转移。
public void run() {
if (state == ChannelState.RECEIVED) {
try {
handler.received(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
} else {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case DISCONNECTED:
try {
handler.disconnected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case SENT:
try {
handler.sent(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
break;
case CAUGHT:
try {
handler.caught(channel, exception);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is: " + message + ", exception is " + exception, e);
}
break;
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
}
三、AllDispatcher策略异常超时问题
Dubbo有一个经典问题,就是当配置了消息派发策略为AllDispatcher时,当服务端线程池满了之后,当消费端再次发送请求,就会一直傻傻等待超时导致没有任何服务端响应。那么问题就出现在AllChannelHandler,前面已经说了AllDispatcher策略就是所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。即worker 线程接收到事件后,将该事件提交到业务线程池中,自己再去处理其他IO 事件。
问题出现原因:
那么当服务端线程池打满之后,此时又再次来了一个请求,此时依然会提交给线程池执行,那么了解线程池原理的就清楚线程池任务满了之后会执行拒绝策略抛出RejectedExecutionException异常,此时就会进入到received的catch方法中去,然后就又再次抛出ExecutionException异常。
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
那么抛出的异常就又会被netty捕获,进而继续执行nettyHandler的caught方法,可以看到这里又再次将任务丢到了线程池中。但是此时线程池依然是满的,业务线程池所有线程都堵住了,所以也不能将异常消息返回给客户端,然后客户端消费者只能傻傻等到超时。
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService executor = getSharedExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
}
}
解决办法可以设置dispatcher为message,只有请求和响应交给业务线程池处理,其他的在IO线程处理,配置如下:
<dubbo:protocol name="dubbo" dispatcher="message" />
后面dubbo也修复了这个问题,再received方法的catch中新加了一部分逻辑,注释的大致意思也就是说:修复当线程池满了之后异常信息无法被发送给消费端的问题(当线程池满了,拒绝执行任务,会引起消费端等待超时),所以代码中判断了下当抛出异常为RejectedExecutionException时,就不把异常抛出交给AllChannelHandler#caught方法中的线程池执行,而是直接用IO线程在通过channel将消息及时反馈给消费者,消费者也就会收到服务端的“threadpool is exhausted ,detail msg”等响应消息。
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full.
//fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}