dubbo 3.2.0 的filterChain 的核心类是DefaultFilterChainBuilder 。
Builder
public class DefaultFilterChainBuilder implements FilterChainBuilder {
的buildInvokerChain函数
对于consumer refer
@Override
public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group)
的参数如上图,如果是provider,group值不同。
dubbo 通group识别出带有@Activate annotation的filter,并按照order进行排序。
/**
* TraceFilter
*/
@Activate(group = CommonConstants.PROVIDER)
public class TraceFilter implements Filter {
@Activate(group = CONSUMER, order = Integer.MIN_VALUE)
public class ConsumerContextFilter implements ClusterFilter, ClusterFilter.Listener {
@Activate(group = CONSUMER,onClass = "org.apache.dubbo.metrics.collector.DefaultMetricsCollector")
public class MetricsClusterFilter implements ClusterFilter, BaseFilter.Listener, ScopeModelAware {
@Activate(group = CONSUMER, order = -1, onClass = "io.micrometer.observation.NoopObservationRegistry")
public class ObservationSenderFilter implements ClusterFilter, BaseFilter.Listener, ScopeModelAware {
consumer 默认的filter如下
FutureFilter是异步future调用的处理
@Override
public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
if (result.hasException()) {
fireThrowCallback(invoker, invocation, result.getException());
} else {
fireReturnCallback(invoker, invocation, result.getValue());
}
}
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
fireThrowCallback(invoker, invocation, t);
}
后面3个都是可观察性相关。
调用过程
stack信息
buildClusterInvokerChain:86, DefaultFilterChainBuilder (org.apache.dubbo.rpc.cluster.filter)
<init>:87, AbstractCluster$ClusterFilterInvoker (org.apache.dubbo.rpc.cluster.support.wrapper)
buildClusterInterceptors:47, AbstractCluster (org.apache.dubbo.rpc.cluster.support.wrapper)
join:61, AbstractCluster (org.apache.dubbo.rpc.cluster.support.wrapper)
join:39, MockClusterWrapper (org.apache.dubbo.rpc.cluster.support.wrapper)
join:40, ScopeClusterWrapper (org.apache.dubbo.rpc.cluster.support.wrapper)
doCreateInvoker:583, RegistryProtocol
类MockClusterWrapper的
@Override
public <T> Invoker<T> join(Directory<T> directory, boolean buildFilterChain) throws RpcException {
return new MockClusterInvoker<T>(directory,
this.cluster.join(directory, buildFilterChain));
}
调用public abstract class AbstractCluster 的
@Override
public <T> Invoker<T> join(Directory<T> directory, boolean buildFilterChain) throws RpcException {
if (buildFilterChain) {
return buildClusterInterceptors(doJoin(directory));
} else {
return doJoin(directory);
}
}
然后调用到
public ClusterFilterInvoker(AbstractClusterInvoker<T> invoker) {
List<FilterChainBuilder> builders = ScopeModelUtil.getApplicationModel(invoker.getUrl().getScopeModel()).getExtensionLoader(FilterChainBuilder.class).getActivateExtensions();
if (CollectionUtils.isEmpty(builders)) {
filterInvoker = invoker;
} else {
ClusterInvoker<T> tmpInvoker = invoker;
for (FilterChainBuilder builder : builders) {
tmpInvoker = builder.buildClusterInvokerChain(tmpInvoker, REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
}
filterInvoker = tmpInvoker;
}
}
可以看到,可以扩展builder 进行filter组装,在循环过程中调用到DefaultFilterChainBuilder的buildClusterInvokerChain。
@Override
public <T> ClusterInvoker<T> buildClusterInvokerChain(final ClusterInvoker<T> originalInvoker, String key, String group) {
ClusterInvoker<T> last = originalInvoker;
URL url = originalInvoker.getUrl();
List<ModuleModel> moduleModels = getModuleModelsFromUrl(url);
List<ClusterFilter> filters;
if (moduleModels != null && moduleModels.size() == 1) {
filters = ScopeModelUtil.getExtensionLoader(ClusterFilter.class, moduleModels.get(0)).getActivateExtension(url, key, group);
} else if (moduleModels != null && moduleModels.size() > 1) {
filters = new ArrayList<>();
List<ExtensionDirector> directors = new ArrayList<>();
for (ModuleModel moduleModel : moduleModels) {
List<ClusterFilter> tempFilters = ScopeModelUtil.getExtensionLoader(ClusterFilter.class, moduleModel).getActivateExtension(url, key, group);
filters.addAll(tempFilters);
directors.add(moduleModel.getExtensionDirector());
}
filters = sortingAndDeduplication(filters, directors);
} else {
filters = ScopeModelUtil.getExtensionLoader(ClusterFilter.class, null).getActivateExtension(url, key, group);
}
if (!CollectionUtils.isEmpty(filters)) {
for (int i = filters.size() - 1; i >= 0; i--) {
final ClusterFilter filter = filters.get(i);
final Invoker<T> next = last;
last = new CopyOfClusterFilterChainNode<>(originalInvoker, next, filter);
}
return new ClusterCallbackRegistrationInvoker<>(originalInvoker, last, filters);
}
return last;
}