Dubbo 3.x源码(30)—Dubbo Consumer服务调用源码(2)发起远程调用

news2025/2/9 8:48:47

基于Dubbo 3.1,详细介绍了Dubbo Consumer服务调用源码。

上文我们学习了,Dubbo 发起服务调用的上半部分源码,我们学习到了FailoverClusterInvoker最终会通过服务提供者Invoker#invoke发起RPC调用,下面我们来学习Dubbo 发起服务调用的下半部分源码,也就是真正的RPC调用的源码。

Dubbo 3.x服务调用源码:

  1. Dubbo 3.x源码(29)—Dubbo Consumer服务调用源码(1)服务调用入口
  2. Dubbo 3.x源码(30)—Dubbo Consumer服务调用源码(2)发起远程调用

Dubbo 3.x服务引用源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
  3. Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
  4. Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
  5. Dubbo 3.x源码(21)—Dubbo服务引用源码(4)
  6. Dubbo 3.x源码(22)—Dubbo服务引用源码(5)服务引用bean的获取以及懒加载原理
  7. Dubbo 3.x源码(23)—Dubbo服务引用源码(6)MigrationRuleListener迁移规则监听器
  8. Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker
  9. Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新
  10. Dubbo 3.x源码(26)—Dubbo服务引用源码(9)应用级服务发现订阅refreshServiceDiscoveryInvoker
  11. Dubbo 3.x源码(27)—Dubbo服务引用源码(10)subscribeURLs订阅应用级服务url

Dubbo 3.x服务发布源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
  3. Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
  4. Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
  5. Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
  6. Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
  7. Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)
  8. Dubbo 3.x源码(28)—Dubbo服务发布导出源码(7)应用级服务接口元数据发布

文章目录

  • AbstractInvoker#invoke服务提供者调用
    • AbstractInvoker#doInvokeAndReturn执行rpc调用
      • DubboInvoker#doInvoke调用remote层远程通信
        • getCallbackExecutor用于获取回调线程池。
  • ReferenceCountExchangeClient#request发起rpc请求
    • HeaderExchangeClient#request发起rpc请求
  • HeaderExchangeChannel#request异步发起rpc请求
    • Request请求内容
    • DefaultFuture#newFuture创建Future并启动超时检测
      • new DefaultFuture
      • timeoutCheck超时检测
    • AbstractPeer#send发起异步请求
    • AbstractClient#send发起异步请求
  • NettyChannel#send基于netty发起异步请求
  • AbstractInvoker#waitForResultIfSync异步转同步等待
    • AsyncRpcResult#get阻塞获取结果
      • ThreadlessExecutor#waitAndDrain等待返回
    • 阻塞优化
  • Result#recreate获取结果
    • AsyncRpcResult#getAppResponse获取响应结果
    • AppResponse#createDefaultValue创建默认返回值
    • AppResponse#recreate处理结果
  • 总结

AbstractInvoker#invoke服务提供者调用

ListenerInvokerWrapper#invoke方法没有其他操作,因此直接看AbstractInvoker#invoker方法。到这里,调用者变成了服务提供者invoker。

该方法首先调用doInvokeAndReturn方法执行RPC调用并返回异步执行结果,最后调用waitForResultIfSync方法判断如果同步调用,则等待RPC结果。


/**
 * AbstractInvoker的方法
 */
@Override
public Result invoke(Invocation inv) throws RpcException {
    // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
    //如果由于注册表的地址刷新导致调用程序被销毁,则允许当前调用继续进行
    if (isDestroyed()) {
        logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
            + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
    }

    RpcInvocation invocation = (RpcInvocation) inv;

    //准备RpcInvocation
    prepareInvocation(invocation);

    //执行RPC调用并返回异步执行结果
    AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);

    //如果同步调用,则等待RPC结果
    waitForResultIfSync(asyncResult, invocation);

    return asyncResult;
}

AbstractInvoker#doInvokeAndReturn执行rpc调用

执行RPC调用并返回异步执行结果AsyncRpcResult。

/**
 * AbstractInvoker的方法
 * <p>
 * 执行RPC调用并返回异步执行结果
 *
 * @param invocation
 * @return
 */
private AsyncRpcResult doInvokeAndReturn(RpcInvocation invocation) {
    AsyncRpcResult asyncResult;
    try {
        /*
         * 执行调用
         */
        asyncResult = (AsyncRpcResult) doInvoke(invocation);
    } catch (InvocationTargetException e) {
        //异常处理
        Throwable te = e.getTargetException();
        //asyncResult封装抛出的异常
        if (te != null) {
            // if biz exception
            if (te instanceof RpcException) {
                ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
            }
            asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
        } else {
            asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        }
    } catch (RpcException e) {
        // if biz exception
        if (e.isBiz()) {
            asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        } else {
            throw e;
        }
    } catch (Throwable e) {
        //asyncResult封装抛出的异常
        asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
    }
    //当调用模式为异步时,或者setFutureWhenSync=true时
    if (setFutureWhenSync || invocation.getInvokeMode() != InvokeMode.SYNC) {
        // set server context 将future设置到线程本地变量中,使用FutureAdapter包装
        RpcContext.getServiceContext().setFuture(new FutureAdapter<>(asyncResult.getResponseFuture()));
    }

    return asyncResult;
}

DubboInvoker#doInvoke调用remote层远程通信

该方法由invoker具体的子类实现,以默认dubbo协议为例子,对应的invoker为DubboInvoker。该方法将会调用remote层进行rpc通信。

  1. 基于轮询的策略选择一个remote层客户端ExchangeClient,用以发起rpc调用,一般来说只有一个client,即共享连接。
  2. 判断如果是单向发送,那么直接发送异步请求,随后返回默认的AsyncRpcResult,不需要获取真正的请求响应结果。
  3. 同步、异步发送处理。Dubbo默认都是走的异步发送,发送完毕之后得到一个CompletableFuture,将CompletableFuture包装为一个AsyncRpcResult返回。
    1. 在外层AbstractInvoker#invoker方法中会判断,如果同步则会调用future.get阻塞等待结果。这就是Dubbo所谓的异步转同步处理。
/**
 * DubboInvoker的方法
 * <p>
 * 执行RPC调用并返回执行结果
 */
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    //调用方法名
    final String methodName = RpcUtils.getMethodName(invocation);
    //设置附加属性 path={serviceInterface}
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    //设置附加属性 version={version}
    inv.setAttachment(VERSION_KEY, version);
    /*
     * 基于轮询的策略选择一个remote层客户端,用以发起rpc调用
     * 一般来说只有一个client,即共享连接
     */
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        //是否是单向通讯,即只管发送不需要返回结果
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        //计算rpc调用超时时间,默认3000ms
        int timeout = calculateTimeout(invocation, methodName);
        invocation.setAttachment(TIMEOUT_KEY, timeout);
        /*
         * 单向发送处理
         */
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            //发送单向请求
            currentClient.send(inv, isSent);
            //返回默认的AsyncRpcResult,不需要真正的请求响应结果
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        }
        /*
         * 同步、异步发送处理
         * 默认都是走的异步发送
         */
        else {
            //回调线程池
            ExecutorService executor = getCallbackExecutor(getUrl(), inv);
            //异步发送请求,发送完毕之后得到一个CompletableFuture
            CompletableFuture<AppResponse> appResponseFuture =
                currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
            // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
            FutureContext.getContext().setCompatibleFuture(appResponseFuture);
            //将CompletableFuture包装为一个AsyncRpcResult
            AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
            result.setExecutor(executor);
            return result;
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
getCallbackExecutor用于获取回调线程池。
  1. 如果是同步调用,那么返回一个ThreadlessExecutor实例,默认同步。
    1. ThreadlessExecutor与其他普通Executor之间最重要的区别是它不管理任何线程。通过execute(Runnable)方法提交给该Executor的任务并不会被调度到特定的线程执行。这些任务将会被存储在阻塞队列中,只有当线程调用waitAndDrain()方法时才会执行,执行该任务的线程与调用waitAndDrain的线程完全相同。
  2. 如果是异步调用,则获取对应的多线程的线程池。
protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
    //如果是同步调用,那么返回一个ThreadlessExecutor实例,默认同步
    //ThreadlessExecutor与其他普通Executor之间最重要的区别是它不管理任何线程
    if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
        return new ThreadlessExecutor();
    }
    //如果是异步调用,则获取对应的多线程的线程池
    return url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class)
        .getDefaultExtension()
        .getExecutor(url);
}

ReferenceCountExchangeClient#request发起rpc请求

接下来我们就来到了remote层,将会真正的发起rpc请求。入口方法是ReferenceCountExchangeClient#request方法,内部默认委托HeaderExchangeClient#request发起请求。

@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
    //默认通过HeaderExchangeClient发起请求
    return client.request(request, timeout, executor);
}

HeaderExchangeClient#request发起rpc请求

HeaderExchangeClient#request方法,内部默认委托HeaderExchangeChannel#request发起请求。

@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
    //默认通过HeaderExchangeChannel发起请求
    return channel.request(request, timeout, executor);
}

HeaderExchangeChannel#request异步发起rpc请求

该类的职责是负责发送网络请求,Dubbo所有的网络请求最终都会封装为Request对象,并且生成并记录唯一的mId,version,requestBody等信息,创建结果对象DefaultFuture 对象,最终通过NettyClient#send发起异步请求。

/**
 * HeaderExchangeChannel的方法
 * <p>
 * 发起异步请求
 *
 * @param request  请求,如RpcInvocation
 * @param timeout  请求超时时间
 * @param executor 回调执行器
 * @return 未来执行结果
 * @throws RemotingException
 */
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // create request. 创建Request,生成唯一请求id
    Request req = new Request();
    //设置Dubbo RPC protocol version
    req.setVersion(Version.getProtocolVersion());
    //设置双向请求
    req.setTwoWay(true);
    //设置请求数据,如RpcInvocation
    req.setData(request);
    //创建 DefaultFuture 对象
    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
    try {
        /*
         * 通过NettyClient#send发起异步请求
         */
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}

Request请求内容

每次请求都会生成一个Request对象,表示请求的内容。创建Request的时候会生成一个自增的id,表示本次请求的唯一标识。

public Request() {
    //生成新的自增唯一id
    mId = newId();
}
private static final AtomicLong INVOKE_ID = new AtomicLong(0);
private static long newId() {
    //通过一个原子变量获取唯一自增id
    // getAndIncrement() 当它增长到MAX_VALUE时,它将增长到MIN_VALUE,并且负数可以用作ID
    return INVOKE_ID.getAndIncrement();
}

DefaultFuture#newFuture创建Future并启动超时检测

每次请求都会通过该方法创建一个DefaultFuture对象并进行超时检测。

/**
 * 创建一个DefaultFuture并进行超时检测
 *
 * @param channel NettyClient
 * @param request 请求内容Request
 * @param timeout 请求超时时间
 * @return DefaultFuture
 */
public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
    //创建一个DefaultFuture,存入缓存
    final DefaultFuture future = new DefaultFuture(channel, request, timeout);
    //设置回调执行器
    future.setExecutor(executor);
    // ThreadlessExecutor needs to hold the waiting future in case of circuit return.
    //ThreadlessExecutor需要保存waitingFuture以防响应返回。同步调用的执行器为ThreadlessExecutor
    if (executor instanceof ThreadlessExecutor) {
        ((ThreadlessExecutor) executor).setWaitingFuture(future);
    }
    /*
     * 超时检查
     */
    timeoutCheck(future);
    return future;
}

new DefaultFuture

在DefaultFuture的构造器中,会将将请求id与当前DefaultFuture实例存入,静态map FUTURES内部,将请求id与当前NettyClient实例存入,静态map CHANNELS内部。

请求id和响应id是同一个值,因此当有响应返回时,就能从此缓存中根据id找到对应的DefaultFuture并唤醒阻塞线程。

/**
 * 正在处理的channel
 */
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();

/**
 * 正在处理的请求
 */
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
private DefaultFuture(Channel channel, Request request, int timeout) {
    //保存NettyClient
    this.channel = channel;
    //保存request
    this.request = request;
    //获取mid,即请求唯一id
    this.id = request.getId();
    //获取请求超时时间
    this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
    //将请求id与当前DefaultFuture实例存入,静态map FUTURES内部
    FUTURES.put(id, this);
    //将请求id与当前NettyClient实例存入,静态map CHANNELS内部
    CHANNELS.put(id, channel);
}

timeoutCheck超时检测

创建超时检查任务TimeoutCheckTask,并且添加到dubbo时间轮中。

当超时时间到了,便会执行TimeoutCheckTask中的检查任务。将会通过id获取到对应的DefaultFuture,然后构建一个超时响应Response,通过DefaultFuture#received处理超时响应。

/**
 * 检查future的超时
 */
private static void timeoutCheck(DefaultFuture future) {
    //创建超时检查任务
    TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
    //添加到dubbo时间轮中
    future.timeoutCheckTask = TIME_OUT_TIMER.get().newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
}

private static class TimeoutCheckTask implements TimerTask {
    //请求id
    private final Long requestID;

    TimeoutCheckTask(Long requestID) {
        this.requestID = requestID;
    }

    @Override
    public void run(Timeout timeout) {
        //从FUTURES中获取请求id对应的DefaultFuture
        DefaultFuture future = DefaultFuture.getFuture(requestID);
        if (future == null || future.isDone()) {
            return;
        }
        //通过执行器或者直接通知超时
        if (future.getExecutor() != null) {
            future.getExecutor().execute(() -> notifyTimeout(future));
        } else {
            notifyTimeout(future);
        }
    }

    private void notifyTimeout(DefaultFuture future) {
        // create exception response.
        Response timeoutResponse = new Response(future.getId());
        // set timeout status.
        timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
        timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
        // handle response. 处理超时响应
        DefaultFuture.received(future.getChannel(), timeoutResponse, true);
    }
}

AbstractPeer#send发起异步请求

@Override
public void send(Object message) throws RemotingException {
    //AbstractClient#send发起异步请求
    send(message, url.getParameter(Constants.SENT_KEY, false));
}

AbstractClient#send发起异步请求

内部通过NettyChannel#send发起请求。

@Override
public void send(Object message, boolean sent) throws RemotingException {
    //重新连接判断
    if (needReconnect && !isConnected()) {
        connect();
    }
    //获取NettyChannel,这是dubbo中的类
    Channel channel = getChannel();
    //TODO Can the value returned by getChannel() be null? need improvement.
    if (channel == null || !channel.isConnected()) {
        throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
    }
    //通过NettyChannel#send发起请求
    channel.send(message, sent);
}

NettyChannel#send基于netty发起异步请求

此处的NettyChannel是位于dubbo的netty4包下的类。该方法中,将会通过NioSocketChannel#writeAndFlush方法发送请求,NioSocketChannel是netty包的类,到此,真正开始的发送请求,走到netty里面的逻辑。

NettyChannel内部有一个静态属性CHANNEL_MAP,存储着netty的NioSocketChannel到NettyChannel实例的映射关系,而NettyClient内部也持有netty的NioSocketChannel。

基于netty的NioSocketChannel#writeAndFlush方法发起请求的时候,实际上内部调用ChannelPipeline#writeAndFlush方法进行数据出站过程,包括一系列的ChannelOutboundHandler的链式处理,出站处理器执行顺序与处理器添加顺序相反,Dubbo3.1版本中对于consumer内部的NettyClient,在NettyClient#initBootstrap方法中初始化netty客户端的时候,绑定了如下的handler:

//自定义客户端消息的业务处理逻辑Handler
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
    //解码
        .addLast("decoder", adapter.getDecoder())
    //编码
        .addLast("encoder", adapter.getEncoder())
    //心跳检测
        .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
    //最后是此前创建的nettyClientHandler
        .addLast("handler", nettyClientHandler);

其中NettyClientHandler内部包括的dubbo handler顺序为:NettyClient、MultiMessageHandler、HeartbeatHandler、AllChannelHandler、DecodeHandler、HeaderExchangeHandler、ExchangeHandlerAdapter(DubboProtocol.requestHandler),但是在发送消息的时候这些handler没有太多的功能。关键是消息编码,这一点我们下面单独说。

/**
 * NettyChannel
 * 通过netty发送消息以及是否等待发送完成。
 *
 * @param message 要发送的消息
 * @param sent    是否ack async-sent,默认false
 * @throws RemotingException 如果等待到超时或被try-catch包围的方法体抛出的任何异常,则抛出RemotingException。
 */
@Override
public void send(Object message, boolean sent) throws RemotingException {
    //channel是否关闭
    super.send(message, sent);

    boolean success = true;
    int timeout = 0;
    try {
        /*
         * 通过NioSocketChannel#writeAndFlush方法发送请求
         * NioSocketChannel是netty包的类,到此真正的发送请求
         */
        ChannelFuture future = channel.writeAndFlush(message);
        //是否ack async-sent,默认false
        if (sent) {
            // wait timeout ms
            timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            success = future.await(timeout);
        }
        //获取异常并抛出
        Throwable cause = future.cause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        removeChannelIfDisconnected(channel);
        throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }
    if (!success) {
        throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress()
            + "in timeout(" + timeout + "ms) limit");
    }
}

AbstractInvoker#waitForResultIfSync异步转同步等待

当AbstractInvoker#doInvokeAndReturn执行完毕之后,将会返回一个AsyncRpcResult。随后调用waitForResultIfSync方法,判断如果是同步调用,则线程阻塞等待结果返回。如果是异步调用,则不会阻塞直接返回了,在代码中我们可以通过RpcContext.getContext().getFuture().get() 来获取异步调用的结果,而通过 Dubbo上下文获取的是 FutureAdapter。

waitForResultIfSync内部是调用AsyncRpcResult#get方法完成阻塞等待的。

/**
 * AbstractInvoker的方法
 * <p>
 * 阻塞等待获取结果
 *
 * @param asyncResult 异步执行结果
 * @param invocation  方法调用抽象
 */
private void waitForResultIfSync(AsyncRpcResult asyncResult, RpcInvocation invocation) {
    //如果不是同步调用,则直接返回
    if (InvokeMode.SYNC != invocation.getInvokeMode()) {
        return;
    }
    try {
        /*
         * NOTICE!
         * must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
         * {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
         * 必须调用CompletableFuture#get(long, TimeUnit)方法而非CompletableFuture#get()方法等待
         * 因为CompletableFuture#get()方法被证明有严重的性能下降
         */
        //获取超时时间,默认3000ms
        Object timeout = invocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY);
        //阻塞等待给定的时间
        if (timeout instanceof Integer) {
            asyncResult.get((Integer) timeout, TimeUnit.MILLISECONDS);
        } else {
            //阻塞Integer.MAX_VALUE毫秒
            asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RpcException("Interrupted unexpectedly while waiting for remote result to return! method: " +
            invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (ExecutionException e) {
        Throwable rootCause = e.getCause();
        if (rootCause instanceof TimeoutException) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +
                invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } else if (rootCause instanceof RemotingException) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " +
                invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } else {
            throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail to invoke remote method: " +
                invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    } catch (java.util.concurrent.TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +
            invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (Throwable e) {
        throw new RpcException(e.getMessage(), e);
    }
}

AsyncRpcResult#get阻塞获取结果

判断如果执行器属于ThreadlessExecutor,那么调用waitAndDrain方法阻塞当前线程直到响应返回或者超时,在新版本默认都是ThreadlessExecutor,否则通过responseFuture#get进行阻塞等待。

如果使用responseFuture#get,因为responseFuture的类型是CompletableFuture,这是JDK提供的Future实现类,所以他的get方法的阻塞实现就和Dubbo无关了,那么JDK怎么实现的呢?实际上很简单,如果CompletableFuture的内部result还为null,说明还没有结果,那么会对调用get方法的线程通过LockSupport.park阻塞并等待结果,这里可以参考我此前写的关于FutureTask的源码的文章。

/**
 * AsyncRpcResult的方法
 * <p>
 * 阻塞等待获取结果
 *
 * @param timeout 超时时间
 * @param unit    单位
 */
@Override
public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    //如果执行器属于ThreadlessExecutor,表示同步调用
    if (executor != null && executor instanceof ThreadlessExecutor) {
        ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
        //调用waitAndDrain方法阻塞当前线程直到响应返回或者超时
        threadlessExecutor.waitAndDrain();
    }
    //通过responseFuture阻塞等待
    return responseFuture.get(timeout, unit);
}

ThreadlessExecutor#waitAndDrain等待返回

ThreadlessExecutor是Dubbo2.7.5新增的同步调用时使用的一个执行器,内部没有任何多余的线程。这实际上是一个线程模型的优化,具体的优化见这篇文章:https://blog.csdn.net/weixin_39860915/article/details/103917841,简单的说对于响应结果的序列化的这步操作由业务线程自己来执行,不再需要额外的消费端的线程池线程来执行了。

相比于老的线程池模型,由业务线程自己负责监测并解析返回结果,免去了额外的消费端线程池开销。因为消费端的线程池策略默认是使用cached ,线程池会为每次的消费请求创建一个线程,那么当消费者应用并发较大或者提供者响应的时间较长时,就会出现消费者线程很多的情况。

  1. waitAndDrain方法将会等待直到任务队列中有一个任务,执行该任务和所有排队的任务(如果有的话)。该任务要么是正常响应,要么是超时响应。
  2. 在另一边,当业务数据返回后,生成一个 Runnable Task 并放入 ThreadlessExecutor 队列。此时等待将会被唤醒。
  3. 获取到任务之后业务线程将 Task 取出并在本线程中执行:反序列化业务数据并 set 到 Future,随后返回。

所以说,ThreadlessExecutor是通过阻塞队列来实现同步等待的。这也是Dubbo同步调用实现,因为Dubbo底层通信使用的是Netty,Netty的调用都是异步调用,每次发送请求之后都会直接返回而不会等待结果,所以Dubbo所有的调用其实也都是异步调用,因此才需要这么个“异步转同步”的操作,即:发送请求之后,业务线程调用阻塞队列的take方法阻塞,后面如果收到服务端发过来的响应结果之后,将响应数据放到阻塞队列里面,这样当前阻塞的线程就被唤醒了,这样就实现了同步调用。

那么,收到的响应结果是怎么和此前发起的某个请求匹配上的呢?实际上就是通过唯一的请求id来匹配的,响应结果中也携带有发起的请求id。

/**
 * 等待直到任务队列中有一个任务,执行该任务和所有排队的任务(如果有的话)。该任务要么是正常响应,要么是超时响应。
 */
public void waitAndDrain() throws InterruptedException {
    /**
     * Usually, {@link #waitAndDrain()} will only get called once. It blocks for the response for the first time,
     * once the response (the task) reached and being executed waitAndDrain will return, the whole request process
     * then finishes. Subsequent calls on {@link #waitAndDrain()} (if there're any) should return immediately.
     *
     * There's no need to worry that {@link #finished} is not thread-safe. Checking and updating of
     * 'finished' only appear in waitAndDrain, since waitAndDrain is binding to one RPC call (one thread), the call
     * of it is totally sequential.
     */
    //如果已经完成,则本次请求则直接返回
    if (isFinished()) {
        return;
    }

    Runnable runnable;
    try {
        //当前线程等待直到任务队列中有一个任务,执行该任务
        runnable = queue.take();
    } catch (InterruptedException e) {
        setWaiting(false);
        throw e;
    }

    synchronized (lock) {
        setWaiting(false);
        //当前线程执行该任务
        runnable.run();
    }
    //继续执行全部任务
    runnable = queue.poll();
    while (runnable != null) {
        runnable.run();
        runnable = queue.poll();
    }
    // mark the status of ThreadlessExecutor as finished.
    setFinished(true);
}

阻塞优化

AbstractInvoker#waitForResultIfSync方法中,当超时时间不是Integer类型的时候,将会调用,asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS)方法,将可能会阻塞Integer.MAX_VALUE毫秒,这种情况就类似于CompletableFuture#get不带有超时时间的方法了。那么为什么还要用CompletableFuture#get(timeout, unit)方法呢?

在方法中能够看到这样的注释:必须调用CompletableFuture#get(long, TimeUnit)方法而非CompletableFuture#get()方法等待。因为CompletableFuture#get()方法被证明有严重的性能下降。


Result#recreate获取结果

在获取到Reuslt之后,将会调用recreate方法处理返回值获取最终结果。3.0.0中引入了AsyncRpcResult来替换RpcResult, RpcResult被AppResponse替换,因此目前无论同步还是异步的Reuslt都统一使用AsyncRpcResult。

  1. 请求如果是FUTURE模式,那么直接返回Future,不会阻塞。
  2. 请求如果是异步模式,那么直接创建默认返回值的AppResponse并执行recreate方法返回,不会阻塞。
  3. 请求如果是同步模式,那么在获取响应结果之后才会返回。
/**
 * AsyncRpcResult的方法
 */
@Override
public Object recreate() throws Throwable {
    RpcInvocation rpcInvocation = (RpcInvocation) invocation;
    //如果是FUTURE模式,那么直接返回Future,不会阻塞
    if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {
        return RpcContext.getClientAttachment().getFuture();
    }
    //如果是异步
    else if (InvokeMode.ASYNC == rpcInvocation.getInvokeMode()) {
        //直接创建默认返回值的AppResponse并执行recreate方法返回,不会阻塞
        return createDefaultValue(invocation).recreate();
    }
    //同步调用,获取响应结果之后才会返回
    return getAppResponse().recreate();
}

AsyncRpcResult#getAppResponse获取响应结果

阻塞式的获取结果。当然,对于同步调用,在之前的ThreadlessExecutor#waitAndDrain方法中就完成了阻塞了。

public Result getAppResponse() {
    try {
        //阻塞式的获取结果
        //当然,对于同步调用,在之前的ThreadlessExecutor#waitAndDrain方法中就完成了阻塞了
        if (responseFuture.isDone()) {
            return responseFuture.get();
        }
    } catch (Exception e) {
        // This should not happen in normal request process;
        logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.");
        throw new RpcException(e);
    }
    //创建默认返回值的AppResponse
    return createDefaultValue(invocation);
}

AppResponse#createDefaultValue创建默认返回值

异步调用的情况,将会创建默认返回值返回。

获取调用的方法,如果方法不为null,那么获取方法的返回值类型,如果是基本类型则返回基本类型的默认值,否则默认返回null。

private static Result createDefaultValue(Invocation invocation) {
    //获取调用的方法
    ConsumerMethodModel method = (ConsumerMethodModel) invocation.get(Constants.METHOD_MODEL);
    //如果方法不为null,那么获取方法的返回值类型,如果是基本类型则返回基本类型的默认值,否则默认返回null
    return method != null ? new AppResponse(defaultReturn(method.getReturnClass())) : new AppResponse();
}

AppResponse#recreate处理结果

最终结果的处理,如果有异常则抛出,否则返回结果。

/**
 * AppResponse的方法
 */
@Override
public Object recreate() throws Throwable {
    //如果有异常,则处理并抛出
    if (exception != null) {
        // fix issue#619
        try {
            Object stackTrace = exception.getStackTrace();
            if (stackTrace == null) {
                exception.setStackTrace(new StackTraceElement[0]);
            }
        } catch (Exception e) {
            // ignore
        }
        throw exception;
    }
    //返回真实结果
    return result;
}

总结

现在我们来总结一下Dubbo3.1版本中Dubbo Consumer发起服务调用请求的总体过程。

  1. 调用某个Dubbo接口的方法,实际上是调用之前服务引入时生成的代理类对象,最终所有方法被转发到InvokerInvocationHandler#invoke方法中,这是请求的通用入口。
  2. 随后利用服务消费者Invoker从服务引入是生成的一批服务提供者Invoker列表中经过路由过滤、负载均衡机制选择一个服务提供者Invoker,随后利用服务提供者Invoker内部封装的NettyClient序列化消息并发起远程rpc请求
  3. 如果调用失败,则在服务消费者ClusterInvoker中使用容错策略,例如失败重试等等,发起调用的时候每次请求都会生成一个唯一id,消费者端会缓存本次请求的id与Future的映射关系,默认每次请求都是异步调用,对于同步调用使用异步转同步机制的阻塞等待直到返回响应。响应id和请求id一致,此时就可以找到对应的Future设置响应结果。

实际上还有很多详细的知识点没说出来,例如消费者MigrationInvoker最开始实际上会进行决策使用接口级还是应用级Invoker。

本次我们学习了Dubbo Consumer发起服务调用请求的过程源码,另外还有Dubbo Provider处理服务调用请求以及Dubbo Consumer接收服务调用响应这两个阶段的源码后续再聊。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2295173.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Rocky Linux9安装Zabbix7.0(精简版)

Linux 系统版本 Rocky Linux release 9.3 (Blue Onyx) 注意&#xff1a;zabbix 7以上版本不支持CentOS 7系统&#xff0c;需要CentOS 8以上&#xff0c; 本教程支持CentOS9及Rocky Linux 9 在Rocky Linux release 9.3测试通过 Linux环境准备 关闭防火墙和selinux #关闭防…

网络分析工具—WireShark的安装及使用

Wireshark 是一个广泛使用的网络协议分析工具&#xff0c;常被网络管理员、开发人员和安全专家用来捕获和分析网络数据包。它支持多种网络协议&#xff0c;能够帮助用户深入理解网络流量、诊断网络问题以及进行安全分析。 Wireshark 的主要功能 数据包捕获与分析&#xff1a; …

C++开发(软件开发)常见面试题

目录 1、C里指针和数组的区别 2、C中空指针请使用nullptr不要使用NULL 3、http/https区别和头部结构&#xff1f; 4、有了mac地址为什么还要ip地址&#xff1f;ip地址的作用 5、有了路由器为什么还要交换机&#xff1f; 6、面向对象三大特性 7、友元函数 8、大端小端 …

WEB攻防-文件下载文件读取文件删除目录遍历目录穿越

目录 一、文件下载漏洞 1.1 文件下载案例&#xff08;黑盒角度&#xff09; 1.2 文件读取案例&#xff08;黑盒角度&#xff09; 二、文件删除 三、目录遍历与目录穿越 四、审计分析-文件下载漏洞-XHCMS 五、审计分析-文件读取漏洞-MetInfo-函数搜索 六、审计分析-…

MySQL数据库(七)SQL 优化

一 插入数据 采用方法 1 批量插入 2 手动提交事务 3 主键顺序插入 4* 使用load插入指令数据 二 主键优化 1 数据组织方式 在InnoDB存储引擎中&#xff0c;表中的数据都是根据主键顺序组织存放的&#xff0c;这种存储方式的表称为索引组织表 2 页分裂 页可以为空也可…

使用EVE-NG实现单臂路由

一、基础知识 1.三层vlan vlan在三层环境中通常用作网关vlan配上ip网关内部接口ip 2.vlan创建步骤 创建vlan将接口划分到不同的vlan给vlan配置ip地址 二、项目案例 1、项目拓扑 2、项目实现 PC1配置 配置PC1IP地址为192.168.1.10/24网关地址为192.168.1.1 ip 192.168.1…

本地部署DeepSeek(Mac版本,带图形化操作界面)

一、下载安装&#xff1a;Ollama 官网下载&#xff1a;Download Ollama on macOS 二、安装Ollama 1、直接解压zip压缩包&#xff0c;解压出来就是应用程序 2、直接将Ollama拖到应用程序中即可 3、启动终端命令验证 # 输入 ollama 代表已经安装成功。 4、下载模型 点击模型…

Linux LED 实验

一、Linux 下 LED 灯驱动原理 其实跟裸机实验很相似&#xff0c;只不过要编写符合 Linux 的驱动框架。 1. 地址映射 MMU全称 Memory Manage Unit&#xff0c;即内存存储单元。 MMU主要功能为&#xff1a; 1&#xff09;完成虚拟空间到物理空间的映射&#xff1b; 2&#x…

【Redis】redis 存储的列表如何分页和检索

博主介绍&#xff1a;✌全网粉丝22W&#xff0c;CSDN博客专家、Java领域优质创作者&#xff0c;掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域✌ 技术范围&#xff1a;SpringBoot、SpringCloud、Vue、SSM、HTML、Nodejs、Python、MySQL、PostgreSQL、大数据、物…

2025.2.6 数模AI智能体大更新,更专业的比赛辅导,同提示词效果优于gpt-o1/o3mini、deepseek-r1满血

本次更新重新梳理了回复逻辑规则&#xff0c;无任何工作流&#xff0c;一共3.2k字细节描述。具体效果可以看视频&#xff0c;同时也比对了gpt-o1、gpt-o3mini、deepseek-r1-67BI&#xff0c;从数学建模题目解答上来看&#xff0c;目前我的数模AI智能体具有明显优势。 AI智能体优…

cursor指令工具

Cursor 工具使用指南与实例 工具概览 Cursor 提供了一系列强大的工具来帮助开发者提高工作效率。本指南将通过具体实例来展示这些工具的使用方法。 1. 目录文件操作 1.1 查看目录内容 (list_dir) 使用 list_dir 命令可以查看指定目录下的文件结构: 示例: list_dir log…

【玩转全栈】----Django模板语法、请求与响应

目录 一、引言 二、模板语法 三、传参 1、视图函数到模板文件 2、模板文件到视图函数 四、引入静态文件 五、请求与响应 ?1、请求 2、响应 六、综合小案例 1、源码展示 2、注意事项以及部分解释 3、展示 一、引言 像之前那个页面&#xff0c;太过简陋&#xff0c;而且一个完整…

C++,设计模式,【单例模式】

文章目录 一、模式定义与核心价值二、模式结构解析三、关键实现技术演进1. 基础版(非线程安全)2. 线程安全版(双重检查锁)3. 现代C++实现(C++11起)四、实战案例:全局日志管理器五、模式优缺点深度分析✅ 核心优势⚠️ 潜在缺陷六、典型应用场景七、高级实现技巧1. 模板化…

基于yolov11的阿尔兹海默症严重程度检测系统python源码+onnx模型+评估指标曲线+精美GUI界面

【算法介绍】 基于YOLOv11的阿尔兹海默症严重程度检测系统是一种创新的医疗辅助工具&#xff0c;旨在通过先进的计算机视觉技术提高阿尔兹海默症的早期诊断和病情监测效率。阿尔兹海默症是一种渐进性的神经退行性疾病&#xff0c;通常表现为认知障碍、记忆丧失和语言障碍等症状…

设计模式-生产者消费者模型

阻塞队列&#xff1a; 在介绍生产消费者模型之前&#xff0c;我们先认识一下阻塞队列。 阻塞队列是一种支持阻塞操作的队列&#xff0c;常用于生产者消费者模型&#xff0c;它提供了线程安全的队列操作&#xff0c;并且在队列为空或满时&#xff0c;能够阻塞等待&#xff0c;…

RabbitMQ介绍以及基本使用

文章目录 一、什么是消息队列&#xff1f; 二、消息队列的作用&#xff08;优点&#xff09; 1、解耦 2、流量削峰 3、异步 4、顺序性 三、RabbitMQ基本结构 四、RabbitMQ队列模式 1、简单队列模式 2、工作队列模式 3、发布/订阅模式 4、路由模式 5、主题模式 6、…

嵌入式硬件篇---OpenMV的硬件流和软件流

文章目录 前言一、硬件流控制&#xff08;Hardware Flow Control&#xff09;1. 基本原理RTSCTS 2. OpenMV中的实现• 硬件要求• 代码配置• 工作流程 二、软件流控制&#xff08;Software Flow Control&#xff09;1. 基本原理XONXOFF 2. OpenMV中的实现• 代码配置• 工作流…

【AIGC提示词系统】基于 DeepSeek R1 + ClaudeAI 易经占卜系统

上篇因为是VIP&#xff0c;这篇来一个免费的 提示词在最下方&#xff0c;喜欢的点个关注吧 引言 在人工智能与传统文化交融的今天&#xff0c;如何让AI系统能够传递传统易经文化的智慧&#xff0c;同时保持易经本身的神秘感和权威性&#xff0c;是一个极具挑战性的课题。本文将…

OpenAI 实战进阶教程 - 第十节 : 结合第三方工具的向量数据库Pinecone

面向读者群体 本节课程主要面向有一定编程基础和数据处理经验的计算机从业人员&#xff0c;如后端开发工程师、数据工程师以及对 AI 应用有浓厚兴趣的技术人员。即使你之前没使用过向量数据库&#xff0c;也可以通过本节的实操内容快速上手&#xff0c;为企业或个人项目构建强…

深入Linux系列之进程地址空间

深入Linux系列之进程地址空间 1.引入 那么在之前的学习中&#xff0c;我们知道我们创建一个子进程的话&#xff0c;我们可以在代码层面调用fork函数来创建我们的子进程&#xff0c;那么fork函数的返回值根据我们当前所处进程的上下文是返回不同的值&#xff0c;它在父进程中返…