MergeInvoker生成过程分析
dubbo 3.2.0 merge模式是汇聚多个group内相同服务的返回,核心MergeInvoker代码在public class RegistryDirectory extends DynamicDirectory 内的如下函数。
private List<Invoker<T>> toMergeInvokerList(List<Invoker<T>> invokers) {
List<Invoker<T>> mergedInvokers = new ArrayList<>();
Map<String, List<Invoker<T>>> groupMap = new HashMap<>();
for (Invoker<T> invoker : invokers) {
String group = invoker.getUrl().getGroup("");
groupMap.computeIfAbsent(group, k -> new ArrayList<>());
groupMap.get(group).add(invoker);
}
if (groupMap.size() == 1) {
mergedInvokers.addAll(groupMap.values().iterator().next());
} else if (groupMap.size() > 1) {
for (List<Invoker<T>> groupList : groupMap.values()) {
StaticDirectory<T> staticDirectory = new StaticDirectory<>(groupList);
staticDirectory.buildRouterChain();
mergedInvokers.add(cluster.join(staticDirectory, false));
}
} else {
mergedInvokers = invokers;
}
return mergedInvokers;
}
如果进行dubug,会发现这段代码
mergedInvokers.add(cluster.join(staticDirectory, false));
无法跟踪,原因cluster是通过JavassistCompiler类编译生成的adaptive 类。
这个cluster的类是基于
@SPI("failover")
public interface Cluster {
String DEFAULT = "failover";
@Adaptive
<T> Invoker<T> join(Directory<T> directory, boolean buildFilterChain) throws RpcException;
static Cluster getCluster(ScopeModel scopeModel, String name) {
return getCluster(scopeModel, name, true);
}
static Cluster getCluster(ScopeModel scopeModel, String name, boolean wrap) {
if (StringUtils.isEmpty(name)) {
name = "failover";
}
return (Cluster)ScopeModelUtil.getApplicationModel(scopeModel).getExtensionLoader(Cluster.class).getExtension(name, wrap);
}
}
来生成,注意下这里的DEFAULT ,决定了实际invoker。
类里面的method,是有public class AdaptiveClassCodeGenerator 的
/**
* generate method content
*/
private String generateMethodContent(Method method) {
Adaptive adaptiveAnnotation = method.getAnnotation(Adaptive.class);
StringBuilder code = new StringBuilder(512);
if (adaptiveAnnotation == null) {
return generateUnsupported(method);
} else {
int urlTypeIndex = getUrlTypeIndex(method);
// found parameter in URL type
if (urlTypeIndex != -1) {
// Null Point check
code.append(generateUrlNullCheck(urlTypeIndex));
} else {
// did not find parameter in URL type
code.append(generateUrlAssignmentIndirectly(method));
}
String[] value = getMethodAdaptiveValue(adaptiveAnnotation);
boolean hasInvocation = hasInvocationArgument(method);
code.append(generateInvocationArgumentNullCheck(method));
code.append(generateExtNameAssignment(value, hasInvocation));
// check extName == null?
code.append(generateExtNameNullCheck(value));
code.append(generateScopeModelAssignment());
code.append(generateExtensionAssignment());
// return statement
code.append(generateReturnAndInvocation(method));
}
return code.toString();
}
来生成,其实就只有一个join方法,实际生成的类如下:
package org.apache.dubbo.rpc.cluster;
import org.apache.dubbo.rpc.model.ScopeModel;
import org.apache.dubbo.rpc.model.ScopeModelUtil;
public class Cluster$Adaptive implements org.apache.dubbo.rpc.cluster.Cluster {
public org.apache.dubbo.rpc.Invoker join(org.apache.dubbo.rpc.cluster.Directory arg0, boolean arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("cluster", "failover");
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.cluster.Cluster) name from url (" + url.toString() + ") use keys([cluster])");
ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.rpc.cluster.Cluster.class);
org.apache.dubbo.rpc.cluster.Cluster extension = (org.apache.dubbo.rpc.cluster.Cluster) scopeModel.getExtensionLoader(org.apache.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
return extension.join(arg0, arg1);
}
public org.apache.dubbo.rpc.cluster.Cluster getCluster(org.apache.dubbo.rpc.model.ScopeModel arg0, java.lang.String arg1) {
throw new UnsupportedOperationException("The method public static org.apache.dubbo.rpc.cluster.Cluster org.apache.dubbo.rpc.cluster.Cluster.getCluster(org.apache.dubbo.rpc.model.ScopeModel,java.lang.String) of interface org.apache.dubbo.rpc.cluster.Cluster is not adaptive method!");
}
public org.apache.dubbo.rpc.cluster.Cluster getCluster(org.apache.dubbo.rpc.model.ScopeModel arg0, java.lang.String arg1, boolean arg2) {
throw new UnsupportedOperationException("The method public static org.apache.dubbo.rpc.cluster.Cluster org.apache.dubbo.rpc.cluster.Cluster.getCluster(org.apache.dubbo.rpc.model.ScopeModel,java.lang.String,boolean) of interface org.apache.dubbo.rpc.cluster.Cluster is not adaptive method!");
}
}
把这个类放到Project,就可以进行debug,debug就会发现一个奇怪的类:MOCKcluster被引用,然后才是FailoverCluster
doJoin:33, FailoverCluster (org.apache.dubbo.rpc.cluster.support)
join:59, AbstractCluster (org.apache.dubbo.rpc.cluster.support.wrapper)
join:39, MockClusterWrapper (org.apache.dubbo.rpc.cluster.support.wrapper)
join:40, ScopeClusterWrapper (org.apache.dubbo.rpc.cluster.support.wrapper)
join:17, Cluster$Adaptive (org.apache.dubbo.rpc.cluster)
toMergeInvokerList:337, RegistryDirectory (org.apache.dubbo.registry.integration)
refreshInvoker:298, RegistryDirectory (org.apache.dubbo.registry.integration)
最后生成的mergedInvokers 信息如下:
可以通过cluster指定模式,比如改为failback,也就是merge是按设置调用相关group对应的service,但每个group内如何调用是通过cluster及balance控制。
@DubboReference(group = "*",merger = "true",cluster = "failback")
MergeService mergeService;
具体的cluster 类型在org.apache.dubbo.rpc.cluster.Cluster
内定义,文件内容如下:
mock=org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper
scope=org.apache.dubbo.rpc.cluster.support.wrapper.ScopeClusterWrapper
failover=org.apache.dubbo.rpc.cluster.support.FailoverCluster
failfast=org.apache.dubbo.rpc.cluster.support.FailfastCluster
failsafe=org.apache.dubbo.rpc.cluster.support.FailsafeCluster
failback=org.apache.dubbo.rpc.cluster.support.FailbackCluster
forking=org.apache.dubbo.rpc.cluster.support.ForkingCluster
available=org.apache.dubbo.rpc.cluster.support.AvailableCluster
mergeable=org.apache.dubbo.rpc.cluster.support.MergeableCluster
broadcast=org.apache.dubbo.rpc.cluster.support.BroadcastCluster
zone-aware=org.apache.dubbo.rpc.cluster.support.registry.ZoneAwareCluster
PS:
1、cluster模式可以通过SPI增加
2、可以自己定义merger方法
其它merger参考url:
https://www.jianshu.com/p/512e2211f84c
https://blog.csdn.net/qq_36882793/article/details/117366243
https://cn.dubbo.apache.org/en/docs/v2.7/user/examples/group-merger/
https://cn.dubbo.apache.org/en/docs3-v2/java-sdk/reference-manual/spi/description/merger/