Dubbo源码篇03---从点点直连探究Complier编译的原理
- 什么是点点直连
- 实际需求
- 如何实现动态编译?
- 如何发起调用?
- 点点直连原理
- 实现点点直连
- 消费端
- 提供端
- 测试
- 点点直连小结
什么是点点直连
Dubbo正常的请求模型,都是消费端从注册中心拉取服务提供者列表,然后采用适当的负载均衡策略,挑选出一个服务提供者URL,随机发起请求。
但是,Dubbo也给我们提供了一种方式,可以在没有注册中心的时候,直接使用提前设置好的URL发起请求,或者在有注册的中心的时候,绕过注册中心,使用设置好的URL发起请求,这种方式也被称为点点直连。
那么点点直连在实际项目开发过程中,究竟有没有用处呢?
下面我们跟随着实际需求的视角,具体来看看吧。
实际需求
订单系统这边由于入库订单的状态异常,导致该笔订单消息及时无法推送到供应商系统,从而阻碍了该笔订单在供应商侧的功能运转。
为了争取最短时间内恢复这笔订单的功能运转,我们需要尽快修改这条推送记录在数据库的状态,此时我们可能会想到以下几个做法:
- 通过编写update语句直接修改线上那条出现问题的记录,但是通常一家公司中的数据订正流程会很繁琐,耗时较长,并非这里的最佳选择
- 利用Web服务器后台的日志,重放一遍用户的请求,但是问题在于并不是所有场景都能根据重放用户请求解决,需要根据具体业务场景进行抉择
- 在深入理解JVM第三版一书中曾介绍过使用java类加载器提供的热更新能力实现动态调试线上服务的实现,我们能否借鉴这一思路,编写调用DAO层完成记录状态更新的代码,然后通过暴露出来的调试接口,将代码上传,然后利用类加载器提供的热更新技术动态加载类,然后调用调试方法完成订单状态更新呢?
如何实现动态编译?
Java代码从编译到执行的流程如下所示:
开发者编写的“Java 源代码”被编译后变成 class 字节码文件,然后字节码文件被 JVM 加载,直到变成可使用的类。
在这样的开发过程中,动态编译一般有两种方式:
- 自主编码实现,比如通过 Runtime 调用 javac,或者通过 JavaCompile 调用 run。
- 调用插件实现,比如使用市面上常用的 groovy-all.jar 插件。
关于热更新技术的原理可以阅读我之前写的两篇文章:
- JAVA实现代码热更新
- Groovy实现热部署
出于简单性考虑,本文使用groovy插件实现java代码的动态编译。
如何发起调用?
由于需要将用于修复的代码上传到生产环境的机器上执行,因此每一个生产环境服务都需要对外暴露一个接口,用于接收动态调试请求:
由于修复代码需要上传到生产环境执行,因此为了避免引发不必要的产线事故,我们一般会拿某台机器节点做个小范围的验证,也就是说,这里需要用到一开始讲到的点点直连技术。
那么下一个问题就来了,如何实现点点直连呢?
点点直连原理
Dubbo在ReferenceConfig的父类ReferenceConfigBase类中提供了一个名为Url的字段:
public abstract class ReferenceConfigBase<T> extends AbstractReferenceConfig {
/**
* The url for peer-to-peer invocation
* 专为点到点连接而设计的
*/
protected String url;
....
那么该字段的构成规则是怎样的呢? 又是如何起的作用的呢?
下面我们来简单追踪一下url被使用到的地方:
- 当消费者端服务启动时,会为指定的服务接口创建一个代理,创建代理需要用到的客户端配置参数由ReferenceConfig负责提供,因此创建代理的动作也是在ReferenceConfig内部的createProxy方法内完成的
//ReferenceConfig
private T createProxy(Map<String, String> referenceParameters) {
...
// 是否配置了客户端用户点点直连的Url
if (StringUtils.isNotEmpty(url)) {
//如果消费者端配置了url属性,那么dubbo会认为该rul是一个点对点地址,或者是一个注册中心的地址
parseUrl(referenceParameters);
} else {
// dubbo走从注册中心拉取服务提供者url那套逻辑
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
aggregateUrlFromRegistry(referenceParameters);
}
}
createInvokerForRemote();
...
URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,
referenceParameters.get(INTERFACE_KEY), referenceParameters);
consumerUrl = consumerUrl.setScopeModel(getScopeModel());
consumerUrl = consumerUrl.setServiceModel(consumerModel);
MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel());
// create service proxy
return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
- referenceParameters保存了客户端各种配置
parseUrl方法负责解析用户配置的点对点直连URL:
//ReferenceConfig
private void parseUrl(Map<String, String> referenceParameters) {
//按照空格,;对消费者端设置的url进行切分,这里说明一个url属性中,我们可以通过空格或者;设置多个服务提供者的直连地址
//或者指定一个或者多个专属的注册中心地址
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (ArrayUtils.isNotEmpty(us)) {
for (String u : us) {
//解析当前url字符串,并解析为一个Dubbo提供的URL对象返回
URL url = URL.valueOf(u);
//url内部对象urlAddress对象的path属性--具体看下图
if (StringUtils.isEmpty(url.getPath())) {
//大部分情况下我们不会指定path,因此一般path值默认为服务接口名
url = url.setPath(interfaceName);
}
...
//判断我们设置的是否是一个注册中心地址
if (UrlUtils.isRegistry(url)) {
//添加进urls集合保存,并且使用REFER_KEY属性表明当前url代表的是注册中心地址
urls.add(url.putAttribute(REFER_KEY, referenceParameters));
} else {
//将referenceParameters集合中的参数以&的形式拼接在当前url后面,类比http的请求参数url的拼接方式
//然后将拼接完整的url添加进urls集合
URL peerUrl = getScopeModel().getApplicationModel().getBeanFactory().getBean(ClusterUtils.class).mergeUrl(url, referenceParameters);
peerUrl = peerUrl.putAttribute(PEER_KEY, true);
urls.add(peerUrl);
}
}
}
}
- URL.valueOf方法负责解析当前url字符串,并解析为一个Dubbo提供的URL对象返回,格式如下:
- url.getPath方法返回的是Url内部的urlAddress对象的path属性
- 不指定path,默认被设置为服务接口名的情况
- peerUrl拼接得到的结果
dubbo://127.0.0.1:80/dubbo.dubboSpi.HelloService?application=generic-call-consumer&async=true&background=false&generic=true&interface=dubbo.dubboSpi.HelloService¶m=value&pid=4600®ister.ip=192.168.18.131&side=consumer&sticky=false&timeout=7000
从parseUrl方法逻辑可知,dubbo会将客户端各种配置参数以类似http请求参数的url拼接方式组织起来。
createInvokerForRemote方法负责构造发起请求调用的Invoker对象:
private void createInvokerForRemote() {
//此处我们的urls集合中的url只有一个,有多个逻辑这里跳过不看
//如果urls的长度为1,说明只有一个服务提供者,则直接通过protocolSPI.refer方法创建一个Invoker实例,
//如果这个服务提供者不是注册中心,则使用StaticDirectory对这个Invoker进行包装。
//StaticDirectory是Dubbo框架中的一个类,用于将一组Invoker封装成一个目录,以便消费者调用
if (urls.size() == 1) {
URL curUrl = urls.get(0);
//这里根据urlAddress内部的protocol属性作为key,通过dubbo的SPI机制寻找对应协议的实现类
//这里实际调用的是DubboProtocol的refer方法,因为我们这里urlAddress的protocol值为dubbo
invoker = protocolSPI.refer(interfaceClass, curUrl);
//如果当前url并非指代一个注册中心地址
if (!UrlUtils.isRegistry(curUrl)) {
List<Invoker<?>> invokers = new ArrayList<>();
invokers.add(invoker);
//默认情况下Cluster会通过Registry拿到一堆服务提供方的IP地址列表后,然后通过一定的路由和负载均衡策略决定具体选择调用哪一个Provider
invoker = Cluster.getCluster(scopeModel, Cluster.DEFAULT).join(new StaticDirectory(curUrl, invokers), true);
}
} else {
...
}
}
- dubbo的Adaptive动态适配机制会为ProtocolSPI类创建一个代理对象,代理对象的refer如下所示:(本系列还未讲到dubbo SPI原理部分,所以这部分大家先了解即可,感兴趣的也可以自行研究一下)
public class Protocol$Adaptive
implements Protocol {
public Invoker refer(Class clazz, URL uRL) throws RpcException {
String string;
if (uRL == null) {
throw new IllegalArgumentException("url == null");
}
string = uRL.getProtocol() == null ? "dubbo" : uRL.getProtocol();
if (string == null) {
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + uRL + ") use keys([protocol])");
}
ScopeModel scopeModel = ScopeModelUtil.getOrDefault(uRL.getScopeModel(), Protocol.class);
Protocol protocol = scopeModel.getExtensionLoader(Protocol.class).getExtension(string);
return protocol.refer(clazz, uRL);
}
...
DubboProtocol的refer方法实现如下:
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
...
return protocolBindingRefer(type, url);
}
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
...
// create rpc invoker.
// dubbo总共分为十层,各个层之间的交互主要是通过Inovker完成的,可以理解分层的实现是Invoker套Invoker
//这里只需要知道invoker的doInvoke方法中会完成本层应该做的逻辑
//例如这里DubboInvoker会在protocol层完成相关逻辑处理
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
...
return invoker;
}
实现点点直连
通过上面的分析可知,dubbo为我们在客户端配置中提供了一个url参数用来实现点点直连,url的构成规则为:
[protocol://][username:password@][host:port]/[path][?k1=v1&k2=v2]
dubbo://127.0.0.1:80/dubbo.dubboSpi.HelloService?application=generic-call-consumer&async=true&background=false&generic=true&interface=dubbo.dubboSpi.HelloService¶m=value&pid=4600®ister.ip=192.168.18.131&side=consumer&sticky=false&timeout=7000
可见dubbo的url 的构成规则,居然和 http 的构成规则如出一辙,那我们试着通过赋值 url 为dubbo://[机器IP结点]:[机器IP提供Dubbo服务的端口]
,应该就大功告成了。
准备一个页面,填入 5 个字段信息,接口类名、接口方法名、接口方法参数类名、指定的 URL 节点、修复问题的 Java 代码,然后将这 5 个字段通过 HTTP 请求发往 Web 服务器,Web 服务器接收到请求后组装泛化所需对象,最后通过泛化调用的形式完成功能修复。
消费端
- 负责接收动态调试请求的控制器
@RestController
public class DynamicDebugController {
private static String zookeeperAddress = "zookeeper://" + System.getProperty("zookeeper.address", "127.0.0.1") + ":2181";
@PostMapping("/gateway/dynamic/debug/request")
public Object repairRequest(@RequestBody DynamicDebugRequest dynamicDebugRequest) {
// 将入参的req转为下游方法的入参对象,并发起远程调用
return commonInvoke(dynamicDebugRequest);
}
private Object commonInvoke(DynamicDebugRequest dynamicDebugRequest) {
// 然后试图通过类信息对象想办法获取到该类对应的实例对象
ReferenceConfig<GenericService> referenceConfig =
createReferenceConfig(dynamicDebugRequest.getClassName(), dynamicDebugRequest.getUrl());
// 远程调用
GenericService genericService = referenceConfig.get();
return genericService.$invoke(
dynamicDebugRequest.getMtdName(),
new String[]{dynamicDebugRequest.getParameterTypeName()},
new Object[]{dynamicDebugRequest.getParamsMap()});
}
private static ReferenceConfig<GenericService> createReferenceConfig(String className, String url) {
DubboBootstrap dubboBootstrap = DubboBootstrap.getInstance();
// 设置应用服务名称
ApplicationConfig applicationConfig = new ApplicationConfig();
applicationConfig.setName(dubboBootstrap.getApplicationModel().getApplicationName());
// 设置注册中心的地址
RegistryConfig registryConfig = new RegistryConfig(zookeeperAddress);
ReferenceConfig<GenericService> referenceConfig = new ReferenceConfig<>();
referenceConfig.setApplication(applicationConfig);
referenceConfig.setRegistry(registryConfig);
referenceConfig.setInterface(className);
// 设置泛化调用形式
referenceConfig.setGeneric("true");
// 设置默认超时时间5秒
referenceConfig.setTimeout(5 * 1000);
// 设置点对点连接的地址
referenceConfig.setUrl(url);
return referenceConfig;
}
}
- 承载动态调试请求参数的对象
@Setter
@Getter
public class DynamicDebugRequest {
/**
* <h2>接口类名,例:com.provider.one.DynamicDebugService</h2>
**/
private String className;
/**
* <h2>接口方法名,例:dynamicDebug</h2>
**/
private String mtdName;
/**
* <h2>接口方法参数类名,例:com.provider.one.DynamicRequest</h2>
**/
private String parameterTypeName;
/**
* <h2>指定的URL节点,例:dubbo://ip:port</h2>
**/
private String url;
/**
* <h2>可以是调用具体接口的请求参数,也可以是修复问题的Java代码</h2>
**/
private Map<String,String> paramsMap;
}
提供端
- 服务提供者的启动类
public class Provider {
private static String zookeeperAddress = "zookeeper://" + System.getProperty("zookeeper.address", "127.0.0.1") + ":2181";
public static void main(String[] args) throws InterruptedException {
//启动内嵌的zk
new EmbeddedZooKeeper(2181, false).start();
//创建ApplicationConfig
ApplicationConfig applicationConfig = new ApplicationConfig();
applicationConfig.setName("dynamic-debug-service-provider");
//创建注册中心配置
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setAddress(zookeeperAddress);
//新建服务实现类,注意要使用GenericService接收
DynamicDebugService helloService = new DynamicDebugServiceImpl();
//创建服务相关配置
ServiceConfig<DynamicDebugService> service = new ServiceConfig<>();
service.setApplication(applicationConfig);
service.setRegistry(registryConfig);
service.setInterface(DynamicDebugService.class);
service.setRef(helloService);
service.export();
new CountDownLatch(1).await();
}
}
官方文档Demo提供的EmbeddedZooKeeper类源码,大家copy到自己本地即可
- 对外暴露的动态调试服务接口
public interface DynamicDebugService {
/**
* 定义了一个专门处理万能修复逻辑的Dubbo接口
*/
Object dynamicDebug(Map<String,String> req);
}
- 服务接口的实现类
public class DynamicDebugServiceImpl implements DynamicDebugService {
private final GroovyClassLoader groovyClassLoader = new GroovyClassLoader();
@SneakyThrows
@Override
public Object dynamicDebug(Map<String,String> req) {
// 编译 Java 代码,然后变成 JVM 可识别的 Class 对象信息
Class<?> javaClass = compile(req.get("code"));
//和spring结合的扩展思路: 创建实例对象,并经过spring的后置处理
// Object bean = instantiationAndPostProcessBean(javaClass);
//这里没有和spring结合,直接简单实例化即可
Object bean = javaClass.newInstance();
if(!(bean instanceof Function)){
throw new IllegalArgumentException("动态java类并非Function类型");
}
Function<Map<String,String>, Object> function = (Function) bean;
// 执行单例对象的方法即可
return function.apply(req);
}
/**
* 利用 groovy-all.jar 中的 groovyClassLoader 来编译 Java 代码
*/
private Class<?> compile(String javaCode) {
return groovyClassLoader.parseClass(javaCode);
}
/**
* 实例化bean,并经过spring的所有后置处理,但是不放入Spring容器中
*/
private Object instantiationAndPostProcessBean(Class<?> javaClass) {
return ((DefaultListableBeanFactory) SpringUtil.getBeanFactory()).createBean(javaClass);
}
}
测试
测试,首先我们需要准备一个测试类:
public class TestJavaCode implements Function<Map<String,String>,String> {
@Override
public String apply(Map<String,String> s) {
System.out.println("执行动态方法: "+s);
return "res: "+s;
}
}
该类会作为请求参数传递给动态调试控制器,然后由动态调试控制器通过泛化调用,来调用服务端的动态调试服务接口,最终执行测试的apply方法。
- 请求参数和请求结果
{
"className":"com.provider.one.DynamicDebugService",
"mtdName":"dynamicDebug",
"parameterTypeName":"java.util.Map",
"url":"dubbo://192.168.154.1:20880/com.provider.one.DynamicDebugService",
"paramsMap": {
"code": "package com.provider.one.code;import java.util.function.Function;public class TestJavaCode implements Function<Map<String,String>,String> {@Override public String apply(Map<String,String> s) {System.out.println(\"执行动态方法: \"+s);return \"res: \"+s;}}"
}
}
点点直连小结
哪些应用场景需要用到点点直连呢?
- 第一,修复生产环境突然Bug事件,通过直连 + 泛化 + 动态代码编译执行,可以轻松临时解决产线棘手的问题。
- 第二,绕过注册中心直接联调测试,有些公司由于测试环境的复杂性,有时候不得不采用简单的直连方式,来快速联调测试验证功能。
- 第三,检查服务存活状态,如果需要针对多台机器进行存活检查,那就需要循环调用所有服务的存活检查接口。
点点直连实现简单来说,分为如下几步:
- 接口类名、接口方法名、接口方法参数类名、业务请求参数,四个维度的数据不能少。
- 根据接口类名创建 ReferenceConfig 对象,设置 generic = true 、url = 协议 +IP+PORT 两个重要属性,调用 referenceConfig.get 拿到 genericService 泛化对象。
- 传入接口方法名、接口方法参数类名、业务请求参数,调用 genericService.$invoke 方法拿到响应对象。