底层netty通信是异步的,那我们平时调用采取的同步是如何将底层的异步转为同步的呢?
dubbo远程rpc协议和网络框架有多种,我们以默认的dubbo协议、网络框架netty作为切入点.
注意点:debug时将过期时间设置长一点:
调用发送消息
1. DubboInvoker
这个类很重要,因为客户端没有具体的实现都是通过代理实现的调用逻辑,而这个类就是最终的工作者,其内部核心方法如下:
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
// 将Invocation转为RpcInvocation类型
RpcInvocation inv = (RpcInvocation) invocation;
// 获取方法名
final String methodName = RpcUtils.getMethodName(invocation);
// 将路径和版本设置为附件
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient;
// 获取可用的交换客户端列表
List<? extends ExchangeClient> exchangeClients = clientsProvider.getClients();
if (exchangeClients.size() == 1) {
// 若只有一个客户端则直接使用该客户端
currentClient = exchangeClients.get(0);
} else {
// 若有多个客户端则通过取模操作选择一个客户端
currentClient = exchangeClients.get(index.getAndIncrement() % exchangeClients.size());
}
// 将当前客户端的本地地址设置到RpcContext中
RpcContext.getServiceContext().setLocalAddress(currentClient.getLocalAddress());
try {
// 检查是否是单向调用
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 计算超时时间
int timeout = RpcUtils.calculateTimeout(getUrl(), invocation, methodName, DEFAULT_TIMEOUT);
if (timeout <= 0) {
// 若超时时间小于等于0,则返回默认的异步调用结果
return AsyncRpcResult.newDefaultAsyncResult(
new RpcException(
RpcException.TIMEOUT_TERMINATE,
"No time left for making the following call: " + invocation.getServiceName() + "."
+ RpcUtils.getMethodName(invocation) + ", terminate directly."),
invocation);
}
// 将超时时间设置为附件
invocation.setAttachment(TIMEOUT_KEY, String.valueOf(timeout));
// 获取数据的大小
Integer payload = getUrl().getParameter(PAYLOAD, Integer.class);
// 创建Request对象
Request request = new Request();
if (payload != null) {
request.setPayload(payload);
}
request.setData(inv);
request.setVersion(Version.getProtocolVersion());
if (isOneway) {
// 若为单向调用,则发送请求
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
request.setTwoWay(false);
currentClient.send(request, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
// 若为双向调用
request.setTwoWay(true);
// 获取回调执行器
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(request, timeout, executor).thenApply(AppResponse.class::cast);
// 保存兼容的Future
if (setFutureWhenSync || ((RpcInvocation) invocation).getInvokeMode() != InvokeMode.SYNC) {
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
}
// 返回异步调用结果
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
// 超时异常处理
throw new RpcException(
RpcException.TIMEOUT_EXCEPTION,
"Invoke remote method timeout. method: " + RpcUtils.getMethodName(invocation) + ", provider: "
+ getUrl() + ", cause: " + e.getMessage(),
e);
} catch (RemotingException e) {
// 远程调用异常处理
String remoteExpMsg = "Failed to invoke remote method: " + RpcUtils.getMethodName(invocation)
+ ", provider: " + getUrl() + ", cause: " + e.getMessage();
if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof SerializationException) {
throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, remoteExpMsg, e);
} else {
throw new RpcException(RpcException.NETWORK_EXCEPTION, remoteExpMsg, e);
}
}
}
其中看一下这行代码:
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(request, timeout, executor).thenApply(AppResponse.class::cast);
会调用到org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeClient#request(java.lang.Object, int, java.util.concurrent.ExecutorService)
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor)
throws RemotingException {
return channel.request(request, timeout, executor);
}
然后进入到org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int, java.util.concurrent.ExecutorService)
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor)
throws RemotingException {
if (closed) {
throw new RemotingException(
this.getLocalAddress(),
null,
"Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
Request req;
if (request instanceof Request) {
req = (Request) request;
} else {
// create request.
req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
}
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
2. DefaultFuture
可以看到这里创建了一个DefaultFuture类,而DefaultFuture继承了CompletableFuture<Object>
其中进入到这行中的newFuture方法
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
future.setExecutor(executor);
// timeout check
timeoutCheck(future);
return future;
}
继续进入new DefaultFuture(channel, request, timeout)
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
重点:FUTURES.put(id, this); CHANNELS.put(id, channel);
将创建出来的放入map中
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>()
DefaultFuture类是异步转同步的关键,
当netty监听到有数据返回时,会根据这里的id进行找到对应的DefaultFuture
继续回到org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int, java.util.concurrent.ExecutorService)中的channel.send(req);
它会进入到:
org.apache.dubbo.remoting.transport.AbstractPeer#send
然后继续进入到org.apache.dubbo.remoting.transport.netty4.NettyChannel#send
@Override
public void send(Object message, boolean sent) throws RemotingException {
// whether the channel is closed
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
Object outputMessage = message;
if (!encodeInIOThread) {
ByteBuf buf = channel.alloc().buffer();
ChannelBuffer buffer = new NettyBackedChannelBuffer(buf);
codec.encode(this, buffer, message);
outputMessage = buf;
}
ChannelFuture future = writeQueue.enqueue(outputMessage).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!(message instanceof Request)) {
return;
}
ChannelHandler handler = getChannelHandler();
if (future.isSuccess()) {
handler.sent(NettyChannel.this, message);
} else {
Throwable t = future.cause();
if (t == null) {
return;
}
Response response = buildErrorResponse((Request) message, t);
handler.received(NettyChannel.this, response);
}
}
});
if (sent) {
// wait timeout ms
timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
removeChannelIfDisconnected(channel);
throw new RemotingException(
this,
"Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to "
+ getRemoteAddress() + ", cause: " + e.getMessage(),
e);
}
if (!success) {
throw new RemotingException(
this,
"Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to "
+ getRemoteAddress() + "in timeout(" + timeout + "ms) limit");
}
}
然后看这行:
handler.sent(NettyChannel.this, message);
然后会进入:org.apache.dubbo.remoting.transport.AbstractPeer#sent
然后进入:org.apache.dubbo.remoting.transport.AbstractChannelHandlerDelegate#sent
然后进入:org.apache.dubbo.remoting.exchange.support.header.HeartbeatHandler#sent
org.apache.dubbo.remoting.transport.dispatcher.WrappedChannelHandler#sent
org.apache.dubbo.remoting.transport.AbstractChannelHandlerDelegate#sent
org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#sent
到这里消息发送出去后,我们还看HeaderExchangeHandler这里的一个重要方法:
@Override
public void received(Channel channel, Object message) throws RemotingException {
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
handleRequest(exchangeChannel, request);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message
+ " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(TRANSPORT_UNSUPPORTED_MESSAGE, "", "", e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (StringUtils.isNotEmpty(echo)) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
}
看一下这一行:handleResponse(channel, (Response) message);
最终会进入到org.apache.dubbo.remoting.exchange.support.DefaultFuture#received(org.apache.dubbo.remoting.Channel, org.apache.dubbo.remoting.exchange.Response, boolean)
public static void received(Channel channel, Response response, boolean timeout) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
// decrease Time
t.cancel();
}
future.doReceived(response);
shutdownExecutorIfNeeded(future);
} else {
logger.warn(
PROTOCOL_TIMEOUT_SERVER,
"",
"",
"The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response status is " + response.getStatus()
+ (channel == null
? ""
: ", channel: " + channel.getLocalAddress() + " -> "
+ channel.getRemoteAddress())
+ ", please check provider side for detailed result.");
}
} finally {
CHANNELS.remove(response.getId());
}
}
然后进入到org.apache.dubbo.remoting.exchange.support.DefaultFuture#doReceived
private void doReceived(Response res) {
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
this.complete(res.getResult());
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
this.completeExceptionally(
new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
} else if (res.getStatus() == Response.SERIALIZATION_ERROR) {
this.completeExceptionally(new SerializationException(res.getErrorMessage()));
} else {
this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
}
}
看一下这行:this.complete(res.getResult());
这就是利用了:
CompletableFuture
类中的 complete()
方法用于手动完成一个异步任务,并设置其结果。通过调用 complete()
方法,可以将一个特定的结果设置到 CompletableFuture
对象中,然后任何等待该异步任务的操作都会得到这个预先设置的结果。
注意:
一旦调用了 complete() 方法,CompletableFuture 对象的状态会立即变为已完成,而且之后任何对该对象的计算都不会再触发异步任务的执行。如果该对象已经处于完成状态,再次调用 complete() 方法不会有任何效果。
如果异步任务已经抛出了异常,调用 complete() 方法将不会有任何效果。此时,可以使用 completeExceptionally(Throwable ex) 方法手动设置异步任务的异常结果。
如果有多个线程同时尝试调用 complete() 方法,只有第一个成功的线程能够设置结果,其他线程的调用将被忽略。
CompletableFuture get()调用会阻塞等待结果,只要执行了complete(T value)就会立即得到结果
那我们接下来会想,那HeaderExchangeHandler中的received是何时被调用的呢?
就是netty监听到有返回值时调用的,会调用到org.apache.dubbo.remoting.transport.netty4.NettyClientHandler#channelRead
然后进入到org.apache.dubbo.remoting.transport.AbstractPeer#received
可以看下图:
最终进入到:org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received
然后回到org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
这个方法是org.apache.dubbo.rpc.protocol.AbstractInvoker#invoke调用的,看一下这个方法:
@Override
public Result invoke(Invocation inv) throws RpcException {
// if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
if (isDestroyed()) {
logger.warn(
PROTOCOL_FAILED_REQUEST,
"",
"",
"Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
+ ", dubbo version is " + Version.getVersion()
+ ", this invoker should not be used any longer");
}
RpcInvocation invocation = (RpcInvocation) inv;
// prepare rpc invocation
prepareInvocation(invocation);
// do invoke rpc invocation and return async result
AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);
// wait rpc result if sync
waitForResultIfSync(asyncResult, invocation);
return asyncResult;
}
其中waitForResultIfSync(asyncResult, invocation);就是同步阻塞等待
这个方法中的asyncResult.get(timeout, TimeUnit.MILLISECONDS);会进入到:org.apache.dubbo.rpc.AsyncRpcResult#get(long, java.util.concurrent.TimeUnit)
@Override
public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
if (executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
try {
while (!responseFuture.isDone() && !threadlessExecutor.isShutdown()) {
long restTime = deadline - System.nanoTime();
if (restTime > 0) {
threadlessExecutor.waitAndDrain(deadline);
} else {
throw new TimeoutException(
"Timeout after " + unit.toMillis(timeout) + "ms waiting for result.");
}
}
} finally {
threadlessExecutor.shutdown();
}
}
long restTime = deadline - System.nanoTime();
if (!responseFuture.isDone() && restTime < 0) {
throw new TimeoutException("Timeout after " + unit.toMillis(timeout) + "ms waiting for result.");
}
return responseFuture.get(restTime, TimeUnit.NANOSECONDS);
}
方法的最后一行:responseFuture.get(restTime, TimeUnit.NANOSECONDS)就是调用的java.util.concurrent.CompletableFuture#get(long, java.util.concurrent.TimeUnit)
同步阻塞等待结果
这里注意:org.apache.dubbo.rpc.protocol.AbstractInvoker#invoke返回的是AsyncRpcResult类型的结果,那真正将AsyncRpcResult中的result拿出来的是哪里呢?
是org.apache.dubbo.rpc.proxy.InvocationUtil#invoke这个方法中的
invoker.invoke(rpcInvocation).recreate()
我们进入recreate()看一下:org.apache.dubbo.rpc.AsyncRpcResult#recreate
@Override
public Object recreate() throws Throwable {
RpcInvocation rpcInvocation = (RpcInvocation) invocation;
if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {
return RpcContext.getClientAttachment().getFuture();
} else if (InvokeMode.ASYNC == rpcInvocation.getInvokeMode()) {
return createDefaultValue(invocation).recreate();
}
return getAppResponse().recreate();
}
然后先看org.apache.dubbo.rpc.AsyncRpcResult#getAppResponse方法:
public Result getAppResponse() {
try {
if (responseFuture.isDone()) {
return responseFuture.get();
}
} catch (Exception e) {
// This should not happen in normal request process;
logger.error(
PROXY_ERROR_ASYNC_RESPONSE,
"",
"",
"Got exception when trying to fetch the underlying result from AsyncRpcResult.");
throw new RpcException(e);
}
return createDefaultValue(invocation);
}
responseFuture.get()会拿到org.apache.dubbo.remoting.exchange.support.DefaultFuture#doReceived方法中complete(res.getResult())的值
即AppResponse类型:
然后回到getAppResponse().recreate();再进入到:org.apache.dubbo.rpc.AppResponse#recreate
@Override
public Object recreate() throws Throwable {
if (exception != null) {
// fix issue#619
try {
Object stackTrace = exception.getStackTrace();
if (stackTrace == null) {
exception.setStackTrace(new StackTraceElement[0]);
}
} catch (Exception e) {
// ignore
}
if (Dubbo2CompactUtils.isEnabled()
&& Dubbo2RpcExceptionUtils.isRpcExceptionClassLoaded()
&& (exception instanceof RpcException)
&& !Dubbo2RpcExceptionUtils.getRpcExceptionClass().isAssignableFrom(exception.getClass())) {
RpcException recreated = Dubbo2RpcExceptionUtils.newRpcException(
((RpcException) exception).getCode(), exception.getMessage(), exception.getCause());
if (recreated != null) {
recreated.setStackTrace(exception.getStackTrace());
throw recreated;
}
}
throw exception;
}
return result;
}
拿到最终想要得到的result。
最后补充一下其中的监听是否超时任务
3. 定时任务
org.apache.dubbo.common.resource.GlobalResourceInitializer
我们看一下DefaultFuture类:
private static final GlobalResourceInitializer<Timer> TIME_OUT_TIMER = new GlobalResourceInitializer<>(
() -> new HashedWheelTimer(new NamedThreadFactory("dubbo-future-timeout", true), 30, TimeUnit.MILLISECONDS),
DefaultFuture::destroy);
看一下 new HashedWheelTimer方法org.apache.dubbo.common.timer.HashedWheelTimer#HashedWheelTimer(java.util.concurrent.ThreadFactory, long, java.util.concurrent.TimeUnit, int, long)
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel,
long maxPendingTimeouts) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
// Normalize ticksPerWheel to power of two and initialize the wheel.
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
// Convert tickDuration to nanos.
this.tickDuration = unit.toNanos(tickDuration);
// Prevent overflow.
if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
workerThread = threadFactory.newThread(worker);
this.maxPendingTimeouts = maxPendingTimeouts;
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
其中:workerThread = threadFactory.newThread(worker);
而work为:Worker worker = new Worker();
然后接着看DefaultFuture类中的org.apache.dubbo.remoting.exchange.support.DefaultFuture#timeoutCheck:
private static void timeoutCheck(DefaultFuture future) {
TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
future.timeoutCheckTask = TIME_OUT_TIMER.get().newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
}
其中的newTimeout方法:
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
start();
// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// Guard against overflow.
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
然后看其中的start()方法
public void start() {
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// Wait until the startTime is initialized by the worker.
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
workerThread.start();
而Worker是一个Runnable,所以会调用到run()
org.apache.dubbo.common.timer.HashedWheelTimer.Worker#run
@Override
public void run() {
// Initialize the startTime.
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
// Notify the other threads waiting for the initialization at start().
startTimeInitialized.countDown();
do {
final long deadline = waitForNextTick();
if (deadline > 0) {
int idx = (int) (tick & mask);
processCancelledTasks();
HashedWheelBucket bucket =
wheel[idx];
transferTimeoutsToBuckets();
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket : wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (; ; ) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}