上一篇博客《Dubbo源码深度解析(五)》主要讲:当服务消费方发起请求时,服务提供方是通过Netty服务接受请求并处理的,涉及到Netty相关使用及部分原理的讲解,以及最后又是如何将Invoker对象的执行结果返回给服务消费方的等。同时也讲了服务消费方的调用链路,这块还是比较复杂的。难点在于:要对Dubbo的SPI机制很熟悉,才知道具体是哪个接口实现类,并且还涉及到接口的包装类,以及对真正干活的Invoker的层层包装;如果对Dubbo理解和使用不够深入,可以借助debug,但如果没有真的理解Dubbo的SPI机制,是无法搞清楚为什么最终得到的Invoker对象会经过那么多层包装,而哪些包装的类又是怎么来的;还有就是Dubbo中很多地方,都是动态生成字节码对象,如果对字节码对象中接口的实现方法不熟悉的话,也很难继续看下去。
这篇博客,将从服务消费方开始讲:被@DubboReference修饰的属性,是如何得到实现类并进行属性注入的,以及服务消费方最终是如何调用到服务提供方的。这里我就默认各位都认真看过了我前面写的五篇Dubbo源码解析博客。因此,我不会再像之前一样,讲得那么仔细了。
这里可以考虑几个问题:
① 对被@DubboReference修饰的属性进行依赖注入,但服务消费方本地并没有定义接口实现类,因此,最终注入的对象,一定是代理对象;
② 既然是属性注入,多半是类似@Autowired注解,Dubbo一定会提供一个BeanPostProcessor,专门处理@DubboReference注解。这跟服务提供方处理@DubboService注解还不一样,因为它是创建Bean对象并注册到Spring容器中,因此需要借助于BeanFactoryPostProcessor或者ImportBeanDefinitionRegistrar(需借助@Import注解)来处理,因为这两种接口的执行时机是在BeanPostProcessor之前,准确地讲是生成BeanDefinition阶段(如包扫描、处理配置类),因为Spring最终是根据BeanDefinition来生成Bean的。根据我前面讲的内容可以知道,服务提供方就是通过ServiceAnnotationBeanPostProcessor来处理的(BeanFactoryPostProcessor的实现类)。
③ 服务消费方是如何跟服务提供方进行通讯的?毫无疑问也是使用的Netty,这里肯定也涉及到注册中心,因为服务消费方肯定是从注册中心拉取对应服务提供方的相关信息,如IP/PORT等,同时还要考虑服务提供方配置的变更,因此,服务消费方肯定会定期拉取服务消费方的信息并更新到自己本地。如果服务提供方有多个节点,那是不是还要考虑负载均衡呢?因此负载均衡这块肯定是在服务消费方发起调用的时候实现的,参考之前服务提供方之前的FilterNode链,可能服务消费方也会有类似的处理,当然这都只是猜测,具体可以看源码,来验证自己的猜测。
开始今天的内容。关于上文说的处理@DubboReference注解,其实在之前的博客中有提到,就是在@EnableDubboConfig注解中做了处理,代码如下:
DubboBeanUtils#registerCommonBeans()方法中,就往Spring容器中注册了一个后置处理器,即ReferenceAnnotationBeanPostProcessor,看名字也能知道,肯定是它处理的@DubboReference注解,注解看看这个后置处理器,先看看它的类继承结构,结果如下:
可以知道,ReferenceAnnotationBeanPostProcessor实现了InstantiationAwareBeanPostProcessor接口,看看该接口,代码如下:
如果对Spring源码熟悉的话,就知道AutowiredAnnotationBeanPostProcessor实现了该接口,并且实现了postProcessProperties()方法,在该方法中处理被@Autowired注解修饰的类的成员属性或者成员方法,并实现属性的依赖注入。ReferenceAnnotationBeanPostProcessor也类似,但它实现的是postProcessPropertyValues()方法,但是根据Spring源码可知,这也没问题(建议通过实现postProcessProperties()方法实现依赖注入,因为postProcessPropertyValues()是一个过期的方法,后续Spring版本会弃用),Spring核心代码如下:
看看 ReferenceAnnotationBeanPostProcessor的无参构造,代码如下:
再看看 ReferenceAnnotationBeanPostProcessor#postProcessPropertyValues()方法,代码如下:
看看AbstractAnnotationBeanPostProcessor#findInjectionMetadata()方法,代码如下:
以字段被@DubboReference注解修饰为例 ,看看AbstractAnnotationBeanPostProcessor#findFieldAnnotationMetadata()方法,代码如下:
再回到ReferenceAnnotationBeanPostProcessor#postProcessPropertyValues()方法,该方法中,调用的是AbstractAnnotationBeanPostProcessor.AnnotatedInjectionMetadata#inject()方法,代码如下:
再看看AbstractAnnotationBeanPostProcessor#getInjectedObject()方法,看看是如何获取带注入的对象的,代码如下:
继续看ReferenceAnnotationBeanPostProcessor#doGetInjectedBean()方法,代码如下:
看ReferenceAnnotationBeanPostProcessor#buildReferenceBeanIfAbsent()方法,代码如下:
AnnotatedInterfaceConfigBeanBuilder#build()方法,代码如下:
再看看AnnotatedInterfaceConfigBeanBuilder#configureBean()方法,代码如下:
需要要提一下的是,此时RegistryConfig对象(注册中心)没有设置给ReferenceBean对象,因为代码中会判断@是否给@DubboReference注解设置了registry属性(即注册中心名),具体代码如下:
再看看ReferenceBeanBuilder#postConfigureBean()方法,代码如下:
既然是初始化方法,当然需要看看afterPropertiesSet()方法,代码如下:
ReferenceAnnotationBeanPostProcessor#doGetInjectedBean()方法,再看ReferenceAnnotationBeanPostProcessor#registerReferenceBean()方法,代码如下:
最后调用ReferenceBean.get()方法,代码如下:
看看ReferenceBean#init()方法,代码如下:
重点看ReferenceBean#createProxy()方法,代码如下:
再看看ConfigValidationUtils#loadRegistries()方法,代码如下:
根据前面将服务提供方,也能知道这里的REF_PROTOCOL属性实际上是Protocol$Adaptive对象,并且由于url中的协议是"registry",因此最终得到调用的是RegistryProtocol#refer()方法,代码如下:
看看Cluster#getCluster()方法,代码如下:
再看看org.apache.dubbo.rpc.cluster.Cluster文件的内容,结果如下:
因此最终得到的Cluster为MockClusterWrapper对象,而MockClusterWrapper对象的cluster属性为FailoverCluster对象,断点验证,结果如下:
重点看RegistryProtocol#doRefer()方法,代码如下:
调用RegistryProtocol#getMigrationInvoker()方法,实际上是调用RegistryProtocol子类的getMigrationInvoker()方法,即:
再看RegistryProtocol#interceptInvoker()方法,代码如下:
看看MigrationRuleListener类,发现rawRule是在其无参构造中赋值的,并且结果为"INIT":
再看MigrationRuleHandler#doMigrate()方法,代码如下:
回到MigrationRuleHandler#doMigrate()方法,最终调用的是这里,结果如下:
先看MigrationInvoker#refreshServiceDiscoveryInvoker()方法,代码如下:
因此调用registryFactory.getRegistry()方法为,得到的Registry对象是ListenerRegistryWrapper对象,而ListenerRegistryWrapper对象的registry属性是ServiceDiscoveryRegistry,断点验证:
继续看RegistryProtocol#doCreateInvoker()方法,代码如下:
先看看ServiceDiscoveryRegistryDirectory#buildRouterChain()方法,代码如下:
重点看看是如何获取RouterFactory对象的集合的,ExtensionLoader#getActivateExtension()方法的代码如下:
再看看org.apache.dubbo.rpc.cluster.RouterFactory中的内容,结果为:
最终遍历RouterFactory的集合,调用RouterFactory#getRouter()方法,得到Router对象,结果为:
回到RegistryProtocol#doCreateInvoker()方法,ServiceDiscoveryRegistryDirectory#subscribe()方法,代码如下:
看看ServiceDiscoveryRegistry#createServiceDiscovery()方法,代码如下:
可知serviceDiscovery属性实际上是EventPublishingServiceDiscovery对象,而EventPublishingServiceDiscovery对象的serviceDiscovery属性是NacosServiceDiscovery对象。回到ServiceDiscoveryRegistry#subscribe()方法,代码如下:
回到MigrationInvoker#migrateToServiceDiscoveryInvoker()方法,到这里为止,发现MigrationInvoker#refreshServiceDiscoveryInvoker()方法,其实没做什么,那重点应该是MigrationInvoker#refreshInterfaceInvoker()方法,代码如下:
先看看父类的subscribe()方法,看了些啥,代码如下:
再看NacosRegistry#doSubscribe()方法,代码如下:
获取到服务名的列表之后,再调用NacosRegistry#doSubscribe()方法,订阅服务,代码如下:
看看NacosRegistry#notifySubscriber()方法,代码如下:
重点看看RegistryDirectory#toInvokers()方法,传入的URL对象当然是服务提供方的地址信息,代码如下:
重点看protocol.refer()方法是如何生成Invoker对象的,实际上DynamicDirectory的protocol属性也是通过Setter方法设置的,与前面也讲过,生成DynamicDirectory对象的时候会进行属性的注入,因此最终protocol实际上是Protocol$Adaptive对象,也就是调用Protocol$Adaptive#refer()方法,代码如下:
由前面可知,在Protocol$Adaptive#refer()方法中,由于传入的URL的协议是"dubbo",因此extension对象实际上是ProtocolFilterWrapper对象,而ProtocolFilterWrapper对象的protocol属性是ProtocolListenerWrapper对象,而ProtocolListenerWrapper对象的protocol属性是DubboProtocol对象。先看看ProtocolFilterWrapper#refer()方法,代码如下:
断点看看最终得到的FilterNode链是什么样的,结果如下:
回到ProtocolFilterWrapper#refer()方法,再调用ProtocolListenerWrapper#refer()方法,代码如下:
最终调用的才是DubboProtocol#refer()方法,而refer()方法并不在DubboProtocol中实现,而是在其父类,因此最终调用的是AbstractProtocol#refer()方法,代码如下:
这里的requestHandler跟之前服务提供是一样的,是共用的,继续看Exchangers#connect()方法,代码如下:
剩下的内容将在最后一篇博客中讲解,敬请期待~