基于Dubbo 3.1,详细介绍了Dubbo Consumer服务调用源码。
上文我们学习了,Dubbo 发起服务调用的上半部分源码,我们学习到了FailoverClusterInvoker最终会通过服务提供者Invoker#invoke发起RPC调用,下面我们来学习Dubbo 发起服务调用的下半部分源码,也就是真正的RPC调用的源码。
Dubbo 3.x服务调用源码:
- Dubbo 3.x源码(29)—Dubbo Consumer服务调用源码(1)服务调用入口
- Dubbo 3.x源码(30)—Dubbo Consumer服务调用源码(2)发起远程调用
Dubbo 3.x服务引用源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
- Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
- Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
- Dubbo 3.x源码(21)—Dubbo服务引用源码(4)
- Dubbo 3.x源码(22)—Dubbo服务引用源码(5)服务引用bean的获取以及懒加载原理
- Dubbo 3.x源码(23)—Dubbo服务引用源码(6)MigrationRuleListener迁移规则监听器
- Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker
- Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新
- Dubbo 3.x源码(26)—Dubbo服务引用源码(9)应用级服务发现订阅refreshServiceDiscoveryInvoker
- Dubbo 3.x源码(27)—Dubbo服务引用源码(10)subscribeURLs订阅应用级服务url
Dubbo 3.x服务发布源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
- Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
- Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
- Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
- Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
- Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)
- Dubbo 3.x源码(28)—Dubbo服务发布导出源码(7)应用级服务接口元数据发布
文章目录
- AbstractInvoker#invoke服务提供者调用
- AbstractInvoker#doInvokeAndReturn执行rpc调用
- DubboInvoker#doInvoke调用remote层远程通信
- getCallbackExecutor用于获取回调线程池。
- ReferenceCountExchangeClient#request发起rpc请求
- HeaderExchangeClient#request发起rpc请求
- HeaderExchangeChannel#request异步发起rpc请求
- Request请求内容
- DefaultFuture#newFuture创建Future并启动超时检测
- new DefaultFuture
- timeoutCheck超时检测
- AbstractPeer#send发起异步请求
- AbstractClient#send发起异步请求
- NettyChannel#send基于netty发起异步请求
- AbstractInvoker#waitForResultIfSync异步转同步等待
- AsyncRpcResult#get阻塞获取结果
- ThreadlessExecutor#waitAndDrain等待返回
- 阻塞优化
- Result#recreate获取结果
- AsyncRpcResult#getAppResponse获取响应结果
- AppResponse#createDefaultValue创建默认返回值
- AppResponse#recreate处理结果
- 总结
AbstractInvoker#invoke服务提供者调用
ListenerInvokerWrapper#invoke方法没有其他操作,因此直接看AbstractInvoker#invoker方法。到这里,调用者变成了服务提供者invoker。
该方法首先调用doInvokeAndReturn方法执行RPC调用并返回异步执行结果,最后调用waitForResultIfSync方法判断如果同步调用,则等待RPC结果。
/**
* AbstractInvoker的方法
*/
@Override
public Result invoke(Invocation inv) throws RpcException {
// if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
//如果由于注册表的地址刷新导致调用程序被销毁,则允许当前调用继续进行
if (isDestroyed()) {
logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
+ ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
}
RpcInvocation invocation = (RpcInvocation) inv;
//准备RpcInvocation
prepareInvocation(invocation);
//执行RPC调用并返回异步执行结果
AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);
//如果同步调用,则等待RPC结果
waitForResultIfSync(asyncResult, invocation);
return asyncResult;
}
AbstractInvoker#doInvokeAndReturn执行rpc调用
执行RPC调用并返回异步执行结果AsyncRpcResult。
/**
* AbstractInvoker的方法
* <p>
* 执行RPC调用并返回异步执行结果
*
* @param invocation
* @return
*/
private AsyncRpcResult doInvokeAndReturn(RpcInvocation invocation) {
AsyncRpcResult asyncResult;
try {
/*
* 执行调用
*/
asyncResult = (AsyncRpcResult) doInvoke(invocation);
} catch (InvocationTargetException e) {
//异常处理
Throwable te = e.getTargetException();
//asyncResult封装抛出的异常
if (te != null) {
// if biz exception
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
} else {
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
}
} catch (RpcException e) {
// if biz exception
if (e.isBiz()) {
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
} else {
throw e;
}
} catch (Throwable e) {
//asyncResult封装抛出的异常
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
}
//当调用模式为异步时,或者setFutureWhenSync=true时
if (setFutureWhenSync || invocation.getInvokeMode() != InvokeMode.SYNC) {
// set server context 将future设置到线程本地变量中,使用FutureAdapter包装
RpcContext.getServiceContext().setFuture(new FutureAdapter<>(asyncResult.getResponseFuture()));
}
return asyncResult;
}
DubboInvoker#doInvoke调用remote层远程通信
该方法由invoker具体的子类实现,以默认dubbo协议为例子,对应的invoker为DubboInvoker。该方法将会调用remote层进行rpc通信。
- 基于轮询的策略选择一个remote层客户端ExchangeClient,用以发起rpc调用,一般来说只有一个client,即共享连接。
- 判断如果是单向发送,那么直接发送异步请求,随后返回默认的AsyncRpcResult,不需要获取真正的请求响应结果。
- 同步、异步发送处理。Dubbo默认都是走的异步发送,发送完毕之后得到一个CompletableFuture,将CompletableFuture包装为一个AsyncRpcResult返回。
- 在外层AbstractInvoker#invoker方法中会判断,如果同步则会调用future.get阻塞等待结果。这就是Dubbo所谓的异步转同步处理。
/**
* DubboInvoker的方法
* <p>
* 执行RPC调用并返回执行结果
*/
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
//调用方法名
final String methodName = RpcUtils.getMethodName(invocation);
//设置附加属性 path={serviceInterface}
inv.setAttachment(PATH_KEY, getUrl().getPath());
//设置附加属性 version={version}
inv.setAttachment(VERSION_KEY, version);
/*
* 基于轮询的策略选择一个remote层客户端,用以发起rpc调用
* 一般来说只有一个client,即共享连接
*/
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
//是否是单向通讯,即只管发送不需要返回结果
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
//计算rpc调用超时时间,默认3000ms
int timeout = calculateTimeout(invocation, methodName);
invocation.setAttachment(TIMEOUT_KEY, timeout);
/*
* 单向发送处理
*/
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
//发送单向请求
currentClient.send(inv, isSent);
//返回默认的AsyncRpcResult,不需要真正的请求响应结果
return AsyncRpcResult.newDefaultAsyncResult(invocation);
}
/*
* 同步、异步发送处理
* 默认都是走的异步发送
*/
else {
//回调线程池
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
//异步发送请求,发送完毕之后得到一个CompletableFuture
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
//将CompletableFuture包装为一个AsyncRpcResult
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
getCallbackExecutor用于获取回调线程池。
- 如果是同步调用,那么返回一个ThreadlessExecutor实例,默认同步。
- ThreadlessExecutor与其他普通Executor之间最重要的区别是它不管理任何线程。通过execute(Runnable)方法提交给该Executor的任务并不会被调度到特定的线程执行。这些任务将会被存储在阻塞队列中,只有当线程调用waitAndDrain()方法时才会执行,执行该任务的线程与调用waitAndDrain的线程完全相同。
- 如果是异步调用,则获取对应的多线程的线程池。
protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
//如果是同步调用,那么返回一个ThreadlessExecutor实例,默认同步
//ThreadlessExecutor与其他普通Executor之间最重要的区别是它不管理任何线程
if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
return new ThreadlessExecutor();
}
//如果是异步调用,则获取对应的多线程的线程池
return url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class)
.getDefaultExtension()
.getExecutor(url);
}
ReferenceCountExchangeClient#request发起rpc请求
接下来我们就来到了remote层,将会真正的发起rpc请求。入口方法是ReferenceCountExchangeClient#request方法,内部默认委托HeaderExchangeClient#request发起请求。
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
//默认通过HeaderExchangeClient发起请求
return client.request(request, timeout, executor);
}
HeaderExchangeClient#request发起rpc请求
HeaderExchangeClient#request方法,内部默认委托HeaderExchangeChannel#request发起请求。
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
//默认通过HeaderExchangeChannel发起请求
return channel.request(request, timeout, executor);
}
HeaderExchangeChannel#request异步发起rpc请求
该类的职责是负责发送网络请求,Dubbo所有的网络请求最终都会封装为Request对象,并且生成并记录唯一的mId,version,requestBody等信息,创建结果对象DefaultFuture 对象,最终通过NettyClient#send发起异步请求。
/**
* HeaderExchangeChannel的方法
* <p>
* 发起异步请求
*
* @param request 请求,如RpcInvocation
* @param timeout 请求超时时间
* @param executor 回调执行器
* @return 未来执行结果
* @throws RemotingException
*/
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request. 创建Request,生成唯一请求id
Request req = new Request();
//设置Dubbo RPC protocol version
req.setVersion(Version.getProtocolVersion());
//设置双向请求
req.setTwoWay(true);
//设置请求数据,如RpcInvocation
req.setData(request);
//创建 DefaultFuture 对象
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {
/*
* 通过NettyClient#send发起异步请求
*/
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
Request请求内容
每次请求都会生成一个Request对象,表示请求的内容。创建Request的时候会生成一个自增的id,表示本次请求的唯一标识。
public Request() {
//生成新的自增唯一id
mId = newId();
}
private static final AtomicLong INVOKE_ID = new AtomicLong(0);
private static long newId() {
//通过一个原子变量获取唯一自增id
// getAndIncrement() 当它增长到MAX_VALUE时,它将增长到MIN_VALUE,并且负数可以用作ID
return INVOKE_ID.getAndIncrement();
}
DefaultFuture#newFuture创建Future并启动超时检测
每次请求都会通过该方法创建一个DefaultFuture对象并进行超时检测。
/**
* 创建一个DefaultFuture并进行超时检测
*
* @param channel NettyClient
* @param request 请求内容Request
* @param timeout 请求超时时间
* @return DefaultFuture
*/
public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
//创建一个DefaultFuture,存入缓存
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
//设置回调执行器
future.setExecutor(executor);
// ThreadlessExecutor needs to hold the waiting future in case of circuit return.
//ThreadlessExecutor需要保存waitingFuture以防响应返回。同步调用的执行器为ThreadlessExecutor
if (executor instanceof ThreadlessExecutor) {
((ThreadlessExecutor) executor).setWaitingFuture(future);
}
/*
* 超时检查
*/
timeoutCheck(future);
return future;
}
new DefaultFuture
在DefaultFuture的构造器中,会将将请求id与当前DefaultFuture实例存入,静态map FUTURES内部,将请求id与当前NettyClient实例存入,静态map CHANNELS内部。
请求id和响应id是同一个值,因此当有响应返回时,就能从此缓存中根据id找到对应的DefaultFuture并唤醒阻塞线程。
/**
* 正在处理的channel
*/
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();
/**
* 正在处理的请求
*/
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
private DefaultFuture(Channel channel, Request request, int timeout) {
//保存NettyClient
this.channel = channel;
//保存request
this.request = request;
//获取mid,即请求唯一id
this.id = request.getId();
//获取请求超时时间
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
//将请求id与当前DefaultFuture实例存入,静态map FUTURES内部
FUTURES.put(id, this);
//将请求id与当前NettyClient实例存入,静态map CHANNELS内部
CHANNELS.put(id, channel);
}
timeoutCheck超时检测
创建超时检查任务TimeoutCheckTask,并且添加到dubbo时间轮中。
当超时时间到了,便会执行TimeoutCheckTask中的检查任务。将会通过id获取到对应的DefaultFuture,然后构建一个超时响应Response,通过DefaultFuture#received处理超时响应。
/**
* 检查future的超时
*/
private static void timeoutCheck(DefaultFuture future) {
//创建超时检查任务
TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
//添加到dubbo时间轮中
future.timeoutCheckTask = TIME_OUT_TIMER.get().newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
}
private static class TimeoutCheckTask implements TimerTask {
//请求id
private final Long requestID;
TimeoutCheckTask(Long requestID) {
this.requestID = requestID;
}
@Override
public void run(Timeout timeout) {
//从FUTURES中获取请求id对应的DefaultFuture
DefaultFuture future = DefaultFuture.getFuture(requestID);
if (future == null || future.isDone()) {
return;
}
//通过执行器或者直接通知超时
if (future.getExecutor() != null) {
future.getExecutor().execute(() -> notifyTimeout(future));
} else {
notifyTimeout(future);
}
}
private void notifyTimeout(DefaultFuture future) {
// create exception response.
Response timeoutResponse = new Response(future.getId());
// set timeout status.
timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
// handle response. 处理超时响应
DefaultFuture.received(future.getChannel(), timeoutResponse, true);
}
}
AbstractPeer#send发起异步请求
@Override
public void send(Object message) throws RemotingException {
//AbstractClient#send发起异步请求
send(message, url.getParameter(Constants.SENT_KEY, false));
}
AbstractClient#send发起异步请求
内部通过NettyChannel#send发起请求。
@Override
public void send(Object message, boolean sent) throws RemotingException {
//重新连接判断
if (needReconnect && !isConnected()) {
connect();
}
//获取NettyChannel,这是dubbo中的类
Channel channel = getChannel();
//TODO Can the value returned by getChannel() be null? need improvement.
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
}
//通过NettyChannel#send发起请求
channel.send(message, sent);
}
NettyChannel#send基于netty发起异步请求
此处的NettyChannel是位于dubbo的netty4包下的类。该方法中,将会通过NioSocketChannel#writeAndFlush方法发送请求,NioSocketChannel是netty包的类,到此,真正开始的发送请求,走到netty里面的逻辑。
NettyChannel内部有一个静态属性CHANNEL_MAP,存储着netty的NioSocketChannel到NettyChannel实例的映射关系,而NettyClient内部也持有netty的NioSocketChannel。
基于netty的NioSocketChannel#writeAndFlush方法发起请求的时候,实际上内部调用ChannelPipeline#writeAndFlush方法进行数据出站过程,包括一系列的ChannelOutboundHandler的链式处理,出站处理器执行顺序与处理器添加顺序相反,Dubbo3.1版本中对于consumer内部的NettyClient,在NettyClient#initBootstrap方法中初始化netty客户端的时候,绑定了如下的handler:
//自定义客户端消息的业务处理逻辑Handler
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
//解码
.addLast("decoder", adapter.getDecoder())
//编码
.addLast("encoder", adapter.getEncoder())
//心跳检测
.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
//最后是此前创建的nettyClientHandler
.addLast("handler", nettyClientHandler);
其中NettyClientHandler内部包括的dubbo handler顺序为:NettyClient、MultiMessageHandler、HeartbeatHandler、AllChannelHandler、DecodeHandler、HeaderExchangeHandler、ExchangeHandlerAdapter(DubboProtocol.requestHandler),但是在发送消息的时候这些handler没有太多的功能。关键是消息编码,这一点我们下面单独说。
/**
* NettyChannel
* 通过netty发送消息以及是否等待发送完成。
*
* @param message 要发送的消息
* @param sent 是否ack async-sent,默认false
* @throws RemotingException 如果等待到超时或被try-catch包围的方法体抛出的任何异常,则抛出RemotingException。
*/
@Override
public void send(Object message, boolean sent) throws RemotingException {
//channel是否关闭
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
/*
* 通过NioSocketChannel#writeAndFlush方法发送请求
* NioSocketChannel是netty包的类,到此真正的发送请求
*/
ChannelFuture future = channel.writeAndFlush(message);
//是否ack async-sent,默认false
if (sent) {
// wait timeout ms
timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
success = future.await(timeout);
}
//获取异常并抛出
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
removeChannelIfDisconnected(channel);
throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
AbstractInvoker#waitForResultIfSync异步转同步等待
当AbstractInvoker#doInvokeAndReturn执行完毕之后,将会返回一个AsyncRpcResult。随后调用waitForResultIfSync方法,判断如果是同步调用,则线程阻塞等待结果返回。如果是异步调用,则不会阻塞直接返回了,在代码中我们可以通过RpcContext.getContext().getFuture().get() 来获取异步调用的结果,而通过 Dubbo上下文获取的是 FutureAdapter。
waitForResultIfSync内部是调用AsyncRpcResult#get方法完成阻塞等待的。
/**
* AbstractInvoker的方法
* <p>
* 阻塞等待获取结果
*
* @param asyncResult 异步执行结果
* @param invocation 方法调用抽象
*/
private void waitForResultIfSync(AsyncRpcResult asyncResult, RpcInvocation invocation) {
//如果不是同步调用,则直接返回
if (InvokeMode.SYNC != invocation.getInvokeMode()) {
return;
}
try {
/*
* NOTICE!
* must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
* {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
* 必须调用CompletableFuture#get(long, TimeUnit)方法而非CompletableFuture#get()方法等待
* 因为CompletableFuture#get()方法被证明有严重的性能下降
*/
//获取超时时间,默认3000ms
Object timeout = invocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY);
//阻塞等待给定的时间
if (timeout instanceof Integer) {
asyncResult.get((Integer) timeout, TimeUnit.MILLISECONDS);
} else {
//阻塞Integer.MAX_VALUE毫秒
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RpcException("Interrupted unexpectedly while waiting for remote result to return! method: " +
invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (ExecutionException e) {
Throwable rootCause = e.getCause();
if (rootCause instanceof TimeoutException) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +
invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} else if (rootCause instanceof RemotingException) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " +
invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} else {
throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail to invoke remote method: " +
invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
} catch (java.util.concurrent.TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +
invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (Throwable e) {
throw new RpcException(e.getMessage(), e);
}
}
AsyncRpcResult#get阻塞获取结果
判断如果执行器属于ThreadlessExecutor,那么调用waitAndDrain方法阻塞当前线程直到响应返回或者超时,在新版本默认都是ThreadlessExecutor,否则通过responseFuture#get进行阻塞等待。
如果使用responseFuture#get,因为responseFuture的类型是CompletableFuture,这是JDK提供的Future实现类,所以他的get方法的阻塞实现就和Dubbo无关了,那么JDK怎么实现的呢?实际上很简单,如果CompletableFuture的内部result还为null,说明还没有结果,那么会对调用get方法的线程通过LockSupport.park阻塞并等待结果,这里可以参考我此前写的关于FutureTask的源码的文章。
/**
* AsyncRpcResult的方法
* <p>
* 阻塞等待获取结果
*
* @param timeout 超时时间
* @param unit 单位
*/
@Override
public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
//如果执行器属于ThreadlessExecutor,表示同步调用
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
//调用waitAndDrain方法阻塞当前线程直到响应返回或者超时
threadlessExecutor.waitAndDrain();
}
//通过responseFuture阻塞等待
return responseFuture.get(timeout, unit);
}
ThreadlessExecutor#waitAndDrain等待返回
ThreadlessExecutor是Dubbo2.7.5新增的同步调用时使用的一个执行器,内部没有任何多余的线程。这实际上是一个线程模型的优化,具体的优化见这篇文章:https://blog.csdn.net/weixin_39860915/article/details/103917841,简单的说对于响应结果的序列化的这步操作由业务线程自己来执行,不再需要额外的消费端的线程池线程来执行了。
相比于老的线程池模型,由业务线程自己负责监测并解析返回结果,免去了额外的消费端线程池开销。因为消费端的线程池策略默认是使用cached ,线程池会为每次的消费请求创建一个线程,那么当消费者应用并发较大或者提供者响应的时间较长时,就会出现消费者线程很多的情况。
- waitAndDrain方法将会等待直到任务队列中有一个任务,执行该任务和所有排队的任务(如果有的话)。该任务要么是正常响应,要么是超时响应。
- 在另一边,当业务数据返回后,生成一个 Runnable Task 并放入 ThreadlessExecutor 队列。此时等待将会被唤醒。
- 获取到任务之后业务线程将 Task 取出并在本线程中执行:反序列化业务数据并 set 到 Future,随后返回。
所以说,ThreadlessExecutor是通过阻塞队列来实现同步等待的。这也是Dubbo同步调用实现,因为Dubbo底层通信使用的是Netty,Netty的调用都是异步调用,每次发送请求之后都会直接返回而不会等待结果,所以Dubbo所有的调用其实也都是异步调用,因此才需要这么个“异步转同步”的操作,即:发送请求之后,业务线程调用阻塞队列的take方法阻塞,后面如果收到服务端发过来的响应结果之后,将响应数据放到阻塞队列里面,这样当前阻塞的线程就被唤醒了,这样就实现了同步调用。
那么,收到的响应结果是怎么和此前发起的某个请求匹配上的呢?实际上就是通过唯一的请求id来匹配的,响应结果中也携带有发起的请求id。
/**
* 等待直到任务队列中有一个任务,执行该任务和所有排队的任务(如果有的话)。该任务要么是正常响应,要么是超时响应。
*/
public void waitAndDrain() throws InterruptedException {
/**
* Usually, {@link #waitAndDrain()} will only get called once. It blocks for the response for the first time,
* once the response (the task) reached and being executed waitAndDrain will return, the whole request process
* then finishes. Subsequent calls on {@link #waitAndDrain()} (if there're any) should return immediately.
*
* There's no need to worry that {@link #finished} is not thread-safe. Checking and updating of
* 'finished' only appear in waitAndDrain, since waitAndDrain is binding to one RPC call (one thread), the call
* of it is totally sequential.
*/
//如果已经完成,则本次请求则直接返回
if (isFinished()) {
return;
}
Runnable runnable;
try {
//当前线程等待直到任务队列中有一个任务,执行该任务
runnable = queue.take();
} catch (InterruptedException e) {
setWaiting(false);
throw e;
}
synchronized (lock) {
setWaiting(false);
//当前线程执行该任务
runnable.run();
}
//继续执行全部任务
runnable = queue.poll();
while (runnable != null) {
runnable.run();
runnable = queue.poll();
}
// mark the status of ThreadlessExecutor as finished.
setFinished(true);
}
阻塞优化
AbstractInvoker#waitForResultIfSync方法中,当超时时间不是Integer类型的时候,将会调用,asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS)方法,将可能会阻塞Integer.MAX_VALUE毫秒,这种情况就类似于CompletableFuture#get不带有超时时间的方法了。那么为什么还要用CompletableFuture#get(timeout, unit)方法呢?
在方法中能够看到这样的注释:必须调用CompletableFuture#get(long, TimeUnit)方法而非CompletableFuture#get()方法等待。因为CompletableFuture#get()方法被证明有严重的性能下降。
Result#recreate获取结果
在获取到Reuslt之后,将会调用recreate方法处理返回值获取最终结果。3.0.0中引入了AsyncRpcResult来替换RpcResult, RpcResult被AppResponse替换,因此目前无论同步还是异步的Reuslt都统一使用AsyncRpcResult。
- 请求如果是FUTURE模式,那么直接返回Future,不会阻塞。
- 请求如果是异步模式,那么直接创建默认返回值的AppResponse并执行recreate方法返回,不会阻塞。
- 请求如果是同步模式,那么在获取响应结果之后才会返回。
/**
* AsyncRpcResult的方法
*/
@Override
public Object recreate() throws Throwable {
RpcInvocation rpcInvocation = (RpcInvocation) invocation;
//如果是FUTURE模式,那么直接返回Future,不会阻塞
if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {
return RpcContext.getClientAttachment().getFuture();
}
//如果是异步
else if (InvokeMode.ASYNC == rpcInvocation.getInvokeMode()) {
//直接创建默认返回值的AppResponse并执行recreate方法返回,不会阻塞
return createDefaultValue(invocation).recreate();
}
//同步调用,获取响应结果之后才会返回
return getAppResponse().recreate();
}
AsyncRpcResult#getAppResponse获取响应结果
阻塞式的获取结果。当然,对于同步调用,在之前的ThreadlessExecutor#waitAndDrain方法中就完成了阻塞了。
public Result getAppResponse() {
try {
//阻塞式的获取结果
//当然,对于同步调用,在之前的ThreadlessExecutor#waitAndDrain方法中就完成了阻塞了
if (responseFuture.isDone()) {
return responseFuture.get();
}
} catch (Exception e) {
// This should not happen in normal request process;
logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.");
throw new RpcException(e);
}
//创建默认返回值的AppResponse
return createDefaultValue(invocation);
}
AppResponse#createDefaultValue创建默认返回值
异步调用的情况,将会创建默认返回值返回。
获取调用的方法,如果方法不为null,那么获取方法的返回值类型,如果是基本类型则返回基本类型的默认值,否则默认返回null。
private static Result createDefaultValue(Invocation invocation) {
//获取调用的方法
ConsumerMethodModel method = (ConsumerMethodModel) invocation.get(Constants.METHOD_MODEL);
//如果方法不为null,那么获取方法的返回值类型,如果是基本类型则返回基本类型的默认值,否则默认返回null
return method != null ? new AppResponse(defaultReturn(method.getReturnClass())) : new AppResponse();
}
AppResponse#recreate处理结果
最终结果的处理,如果有异常则抛出,否则返回结果。
/**
* AppResponse的方法
*/
@Override
public Object recreate() throws Throwable {
//如果有异常,则处理并抛出
if (exception != null) {
// fix issue#619
try {
Object stackTrace = exception.getStackTrace();
if (stackTrace == null) {
exception.setStackTrace(new StackTraceElement[0]);
}
} catch (Exception e) {
// ignore
}
throw exception;
}
//返回真实结果
return result;
}
总结
现在我们来总结一下Dubbo3.1版本中Dubbo Consumer发起服务调用请求的总体过程。
- 调用某个Dubbo接口的方法,实际上是调用之前服务引入时生成的代理类对象,最终所有方法被转发到InvokerInvocationHandler#invoke方法中,这是请求的通用入口。
- 随后利用服务消费者Invoker从服务引入是生成的一批服务提供者Invoker列表中经过路由过滤、负载均衡机制选择一个服务提供者Invoker,随后利用服务提供者Invoker内部封装的NettyClient序列化消息并发起远程rpc请求。
- 如果调用失败,则在服务消费者ClusterInvoker中使用容错策略,例如失败重试等等,发起调用的时候每次请求都会生成一个唯一id,消费者端会缓存本次请求的id与Future的映射关系,默认每次请求都是异步调用,对于同步调用使用异步转同步机制的阻塞等待直到返回响应。响应id和请求id一致,此时就可以找到对应的Future设置响应结果。
实际上还有很多详细的知识点没说出来,例如消费者MigrationInvoker最开始实际上会进行决策使用接口级还是应用级Invoker。
本次我们学习了Dubbo Consumer发起服务调用请求的过程源码,另外还有Dubbo Provider处理服务调用请求以及Dubbo Consumer接收服务调用响应这两个阶段的源码后续再聊。