Dubbo 客户端的使用
在 Dubbo 应用中,往类成员注解 @DubboReference,服务启动后便可以调用到远端:
@Component
public class InvokeDemoFacade {
@Autowired
@DubboReference
private DemoFacade demoFacade;
public String hello(String name){
// 经过网络调用到服务端的 DemoFacade
return demoFacade.sayHello(name);
}
}
在 Dubbo 通信流程 - 客户端代理对象的创建 中讲到,Dubbo 会为注解了 @DubboReference 的 Bean 创建代理对象并注册到 Spring 容器中。对类成员进行依赖注入时,Spring 会调用工厂对象 ReferenceBean 的 getObject 方法获取 Bean,该方法返回一个懒加载的代理对象。
所以,当调用一个注解了 @DubboReference 对象的方法时,调用的实际是其代理对象的方法:
public class LazyTargetInvocationHandler implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (target == null) {
target = lazyTargetSource.getTarget();
}
...
try {
return method.invoke(target, args);
} catch (InvocationTargetException exception) {
Throwable targetException = exception.getTargetException();
if (targetException != null) {
throw targetException;
}
}
...
}
}
Dubbo 客户端的调用流程
在 Dubbo 通信流程 - 客户端代理对象的创建 中分析到,上述 target 是 InvokerInvocationHandler 类型,在这里再回忆一下:
// ReferenceBean.class
private Object getCallProxy() throws Exception {
if (referenceConfig == null) {
synchronized (LockUtils.getSingletonMutex(applicationContext)) {
if (referenceConfig == null) {
referenceBeanManager.initReferenceBean(this);
applicationContext
.getBean(
DubboConfigApplicationListener.class.getName(),
DubboConfigApplicationListener.class)
.init();
logger.warn(
CONFIG_DUBBO_BEAN_INITIALIZER,
"",
"",
"ReferenceBean is not ready yet, please make sure to "
+ "call reference interface method after dubbo is started.");
}
}
}
// get reference proxy
// Subclasses should synchronize on the given Object if they perform any sort of extended singleton creation
// phase.
// In particular, subclasses should not have their own mutexes involved in singleton creation, to avoid the
// potential for deadlocks in lazy-init situations.
// The redundant type cast is to be compatible with earlier than spring-4.2
if (referenceConfig.configInitialized()) {
return referenceConfig.get();
}
synchronized (LockUtils.getSingletonMutex(applicationContext)) {
return referenceConfig.get();
}
}
private class DubboReferenceLazyInitTargetSource implements LazyTargetSource {
@Override
public Object getTarget() throws Exception {
return getCallProxy();
}
}
}
InvokerInvocationHandler 中,使用了 invoker + rpcInvocation 传入 InvocationUtil 工具类:
public class InvokerInvocationHandler implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
...
// RpcInvocation 封装了这次请求的相关信息
RpcInvocation rpcInvocation = new RpcInvocation(
serviceModel,
method.getName(),
invoker.getInterface().getName(),
protocolServiceKey,
method.getParameterTypes(),
args);
if (serviceModel instanceof ConsumerModel) {
rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel);
rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method));
}
return InvocationUtil.invoke(invoker, rpcInvocation);
}
}
上述 invoker 是 InvokerInvocationHandler 的成员变量,其实际类型是 MigrationInvoker。MigrationInvoker 是流量切换的核心组件,它的核心作用是在服务迁移期间,对服务调用流量进行灵活的控制和管理,实现新旧服务提供者之间的平稳切换,避免因服务迁移给业务带来影响。
MigrationInvoker
流量切换:根据预设的规则,逐步将服务调用的流量从旧的服务提供者转移到新的服务提供者。
服务兼容:在迁移过程中,同时支持对旧服务和新服务的调用,确保业务的连续性。
数据对比:在流量切换过程中,可以对新旧服务的调用结果进行对比,以验证新服务的正确性。
MigrationInvoker 的 invoker 是 ScopeClusterInvoker,ScopeClusterInvoker 是 Dubbo 框架里的一个关键组件,其核心作用是依据服务调用的范围(Scope)对服务调用进行集群管理,实现对不同范围的服务提供者进行有效的调用和容错处理。
ScopeClusterInvoker
支持多范围的服务调用:在复杂的分布式系统里,服务可能会存在于不同的范围中,例如不同的数据中心、不同的业务单元等。ScopeClusterInvoker 可以按照预先设定的范围,对服务提供者进行分组管理。当服务消费者发起调用时,它能够精准地在特定范围的服务提供者中挑选合适的节点进行调用。
实现服务调用的隔离:借助对不同范围的服务提供者进行隔离调用,ScopeClusterInvoker 可以防止某个范围的服务故障影响到其他范围的服务。例如,当一个数据中心出现故障时,服务调用会被限制在其他正常的数据中心内进行,这样就能保证服务的可用性和稳定性。
动态调整调用范围:在系统运行过程中,ScopeClusterInvoker 能够根据实际情况动态地调整服务调用的范围。例如,当某个范围的服务提供者性能下降时,可以减少对该范围的调用,将更多的流量导向其他性能较好的范围。这种动态调整的能力可以提高系统的整体性能和资源利用率。
容错和负载均衡:ScopeClusterInvoker 集成了 Dubbo 的容错和负载均衡机制。在每个范围内,它会依据配置的负载均衡策略(如随机、轮询、最少活跃调用数等)选择合适的服务提供者进行调用。同时,当调用失败时,它会按照预设的容错策略(如失败重试、失败快速返回等)进行处理,确保服务调用的可靠性。
MigrationInvoker 的 invoker 是 MockClusterInvoker,MockClusterInvoker 是一个集群调用器,它会在服务调用时拦截调用请求,并根据配置的 Mock 策略来决定是调用实际的服务提供者还是返回 Mock 结果。
经过一系列的 Invoker 后,调用来到 AbstractCluster,AbstractCluster 调用 FilterChainBuilder 的 invoke 方法,进入过滤器链,依次经过 FutureFilter、MonitorFilter 等过滤器后,进入 FailoverClusterInvoker,FailoverClusterInvoker 在服务调用失败时,通过重试机制选择其他可用的服务提供者进行调用,以此保证服务调用的可靠性。
继续 Debug,来到 DubboInvoker:
///
// org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
// 按照dubbo协议发起调用实现类
///
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
// 获取发送数据的客户端
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 看看是单程发送不需要等待响应,还是发送完了后需要等待响应
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 获取超时时间
int timeout = calculateTimeout(invocation, methodName);
invocation.setAttachment(TIMEOUT_KEY, timeout);
if (isOneway) {
// 单程发送,不需要等待响应
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
// 发送完了之后需要等待响应
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
// 操作 currentClient 发送了一个 request 请求,
// 然后接收了一个 CompletableFuture 对象,说明这里存在异步操作
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
从代码流程看,拿到了一个交换数据的客户端类,然后走两个发送数据的分支,一条分支逻辑单程调用不需要响应,一条有响应,两条分支最终都返回了一个异步结果对象。
ReferenceCountExchangeClient.request -> HeaderExchangeChannel.request -> AbstractPeer.send -> AbstractClient.send -> NettyChannel.send
在 HeaderExchangeChannel 中 new 了一个 DefaultFuture,便调用 AbstractPeer 的 send 方法,进入异步的发送流程。