目录
简介
Dubbo高可用
集群容错
服务治理
Dubbo线程IO模型
源码层面
Java SPI 的问题
源码解析
简介
Apache Dubbo是一款高性能的Java RPC框架。其前身是阿里巴巴公司开源的一个高性能、轻量级的开源Java RPC框架,可以和Spring框架无缝集成。
Dubbo提供了三大核心能力:
- 面向接口的远程方法调用
- 智能容错和负载均衡
- 服务自动注册和发现
官网: Apache Dubbo
Dubbo高可用
集群容错
-
服务路由:服务路由包含一条路由规则,路由规则决定了服务消费者的调用目标,即规定了服务消 费者可调用哪些服务提供者,dubbo提供三种服务路由实现,分别为条件路由 ConditionRouter、脚本路由ScriptRouter、标签路由TagRouter,本课程重点分析条件路由 条件路由规则的格式:
[服务消费者匹配条件] => [服务提供者匹配条件]
host = 10.20.153.10 => host = 10.20.153.11
该条规则表示 IP 为 10.20.153.10 的服务消费者只可调用 IP 为 10.20.153.11 机器上的服务,不可 调用其他机器上的服务。
如果服务消费者匹配条件为空,表示不对服务消费者进行限制。如果服务提供者匹配条件为空,表示对某些服务消费者禁用服务
常见路由配置:
-
白名单:
host!=10.20.153.10,10.20.153.11=>
-
黑名单
host=10.20.153.10,10.20.153.11=>
-
读写分离
method=find,list,get,is=>host=172.22.3.94,172.22.3.95,172.22.3.96
method!=find,list,get,is=>host=172.22.3.97,172.22.3.98
-
前后台分离
application=front=>host=172.22.3.91,172.22.3.92,172.22.3.93
application!=front=>host=172.22.3.94,172.22.3.95,172.22.3.96
-
-
集群容错
-
Failover Cluster 失败自动切换,当出现失败,重试其它服务器。(缺省) 通常用于读操作,但 重试会带来更长延迟。可通过 retries="2" 来设置重试次数(不含第一次)。
重试次数配置如下:
<dubbo:service retries="2" />
或
<dubbo:reference retries="2" />
-
Failfast Cluster 快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作, 比如新增记录。
-
Failsafe Cluster 失败安全,出现异常时,直接忽略。通常用于写入日志等操作。
-
Failback Cluster 失败自动恢复,后台记录失败请求,定时重发。 通常用于消息通知操作。
-
Forking Cluster 并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过forks="2"来设置最大并行数。
-
-
负载均衡 :在集群负载均衡时,Dubbo提供多种均衡策略,缺省random随机调用
-
Random LoadBalance:按照权重设置随机概率,无状态
-
RoundRobin LoadBalance:轮询,有状态
-
LeastActive LoadBalance:最少活跃数随机,方法维度的统计服务调用数
-
ConsistentHash LoadBalance:一致性Hash
-

Redis集群模式下保证可迁移和高可用——一致性算法
服务治理
-
添加依赖<packaging>war</packaging>
-
main下新建目录webapp
-
项目结构-modules里添加web.xml
-
服务降级
可以通过服务降级功能临时屏蔽某个出错的非关键服务,并定义降级后的返回策略 可以向注册中心写入动态配置覆盖规则
RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension(); Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://10.20.153.10:2181")); registry.register(URL.valueOf("override://0.0.0.0/com.foo.BarService? category=configurators&dynamic=false&application=foo&mock=force:return+null" ));使用mock实现服务降级,mock只出现在非业务异常(如,超时,网络异常等)时执行。两种配置:
boolean值,默认为false,如果配置为true,则缺省使用mock类名,即类名+Mock后缀;
"return null",可以简单忽略异常。
mock实现接口方式(推荐)
配置mock=true,同时实现mock接口,类名注意命名规范:接口名+Mock后缀,此时如果调用失败会调用Mock实现,mock实现需要保证有无参的构造方法
-
mock=force:return+null 表示消费方对该服务的方法调用都直接返回 null 值,不发起远程调用。用来屏蔽不重要服务不可用时对调用方的影响。
-
还可以改为 mock=fail:return+null 表示消费方对该服务的方法调用在失败后,再返回 null 值,不抛异常。用来容忍不重要服务不稳定时对调用方的影响。
-
-
dubbo配置服务降级方式
-
在dubbo-admin中配置
-
-
整合hystrix,实现高可用(当出现异常 时,可以调用自定义的回滚方法)
Hystrix旨在通过控制那些访问远程系统,服务和第三方库的 节点,从而对延迟和故障提供更强大的容错能力,Hystrix具备拥有回退机制和断路器功能 的线程和信号隔离,请求缓存和请求打包,以及监控和配置等功能
springboot官方提供了对hystrix的集成,直接在pom.xml里加入依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> <version>1.4.4.RELEASE</version> </dependency>
Application启动类中新增@EnableHystrix来启用hystrix starter
provider增加@HystrixCommand
Consumer method上配置@HystrixCommand(fallbackMethod= methodName))
Dubbo线程IO模型
Dubbo使用NIO模型

-
config 配置层:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置 类,也可以通过 spring 解析配置生成配置类
-
proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory
-
registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService
-
cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口 为 Cluster, Directory, Router, LoadBalance
-
monitor 监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory, Monitor, MonitorService
-
protocol 远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter
-
exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口 为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
-
transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
-
serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput,ThreadPool
源码层面
Java SPI 的问题
-
接口的所有实现类全部都需要加载并实例化
-
无法根据参数来获取所对应的实现类
-
不能解决IOC、AOP的问题
-
基于以上问题,dubbo在java原生的SPI机制进行了增强,解决了以上问题
Dubbo 重新实现了一套功能更强的 SPI 机制,Dubbo SPI 的相关逻辑被封装在了 ExtensionLoader 类 中,通过 ExtensionLoader,我们可以加载指定的实现类。Dubbo SPI 所需的配置文件需放置在META-INF/dubbo 路径下,配置内容如下
//Dubbo SPI配置文件 college=cn.itheima.api.impl.Czxy shortTrain=cn.itheima.api.impl.Itheima //Java 原生SPI cn.itheima.api.impl.Czxy cn.itheima.api.impl.Itheima
与 Java SPI 实现类配置不同,Dubbo SPI 是通过键值对的方式进行配置,这样我们可以按需加载指定的实现类。另外,在测试 Dubbo SPI 时,需要在 Robot 接口上标注 @SPI 注解。
源码解析
通过 ExtensionLoader 的 getExtensionLoader 方法获取一个 ExtensionLoader 实例,该方法方法先从缓存中获取与拓展类对应的 ExtensionLoader,若缓存未命中,则创建一个新的实例


createExtension方法包含如下步骤
-
通过 getExtensionClasses 获取所有的拓展类
-
通过反射创建拓展对象
-
向拓展对象中注入依赖
-
将拓展对象包裹在相应的 Wrapper 对象中
private T createExtension(String name) {
// 从配置文件中加载所有的拓展类,可得到“配置项名称”到“配置类”的映射关系表
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
// 通过反射创建实例
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// 向实例中注入依赖
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
// 循环创建 Wrapper 实例
for (Class<?> wrapperClass : wrapperClasses) {
// 将当前 instance 作为参数传给 Wrapper 的构造方法,并通过反射创建Wrapper 实例。
// 然后向 Wrapper 实例中注入依赖,最后将 Wrapper 实例再次赋值给 instance 变量
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("...");
}
}
此处injectExtension方法依赖注入实现原理:Dubbo 首先会通过反射获取到实例的所有方法,然后再遍历方法列表,检测方法名是否具有 setter 方法特征。若有,则通过 ObjectFactory 获取依赖对象,最后通过反射调用 setter 方法将依赖设置到目标对象中。
private T injectExtension(T instance) {
try {
if (objectFactory != null) { // 遍历目标类的所有方法
for (Method method : instance.getClass().getMethods()) {
// 检测方法是否以 set 开头,且方法仅有一个参数,且方法访问级别为 public
if (method.getName().startsWith("set")
&& method.getParameterTypes().length == 1
&& Modifier.isPublic(method.getModifiers())) {
// 获取 setter 方法参数类型
Class<?> pt = method.getParameterTypes()[0];
try {
// 获取属性名,比如 setName 方法对应属性名 name
String property = method.getName().length() > 3 ?
method.getName().substring(3, 4).toLowerCase() +
method.getName().substring(4) : "";
// 从 ObjectFactory 中获取依赖对象
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
// 通过反射调用 setter 方法设置依赖
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("fail to inject via method...");
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
第一个步骤中获取所有的扩展类方法getExtensionClasses,该方法先检查缓存,若缓存未命中,则通过 synchronized 加锁。加锁后再次检查缓存,并判空。此时如果 classes 仍为 null,则通过loadExtensionClasses 加载拓展类
private Map<String, Class<?>> getExtensionClasses() {
// 从缓存中获取已加载的拓展类
Map<String, Class<?>> classes = cachedClasses.get();
// 双重检查
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
// 加载拓展类
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}
loadExtensionClasses 方法做了两件事,一是解析SPI注解,二是调用 loadDirectory 方法加载指定文件夹配置文件
private Map<String, Class<?>> loadExtensionClasses() {
// 获取 SPI 注解,这里的 type 变量是在调用 getExtensionLoader 方法时传入的
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if (defaultAnnotation != null) {
String value = defaultAnnotation.value();
if ((value = value.trim()).length() > 0) {
// 对 SPI 注解内容进行切分
String[] names = NAME_SEPARATOR.split(value);
// 检测 SPI 注解内容是否合法,不合法则抛出异常
if (names.length > 1) {
throw new IllegalStateException("more than 1 default extension name on extension...");
}
// 设置默认名称,参考 getDefaultExtension 方法
if (names.length == 1) {
cachedDefaultName = names[0];
}
}
}
Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
// 加载指定文件夹下的配置文件
loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
loadDirectory(extensionClasses, DUBBO_DIRECTORY);
loadDirectory(extensionClasses, SERVICES_DIRECTORY);
return extensionClasses;
}
loadDirectory 方法获取classLoader ,通过classLoader获取URL资源信息,遍历URL通过 loadResource加载资源
private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir) {
// fileName = 文件夹路径 + type 全限定名
String fileName = dir + type.getName();
try {
Enumeration<URL> urls;
ClassLoader classLoader = findClassLoader();
// 根据文件名加载所有的同名文件
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
if (urls != null) {
while (urls.hasMoreElements()) {
java.net.URL resourceURL = urls.nextElement();
// 加载资源
loadResource(extensionClasses, classLoader, resourceURL);
}
}
} catch (Throwable t) {
logger.error("...");
}
}
loadResource 方法用于读取和解析配置文件,并通过反射加载类,最后调用 loadClass 方法
private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, java.net.URL resourceURL) {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), "utf-8"));
try {
String line;
// 按行读取配置内容
while ((line = reader.readLine()) != null) {
// 定位 # 字符
final int ci = line.indexOf('#');
if (ci >= 0) {
// 截取 # 之前的字符串,# 之后的内容为注释,需要忽略
line = line.substring(0, ci);
}
line = line.trim();
if (line.length() > 0) {
try {
String name = null;
int i = line.indexOf('=');
if (i > 0) {
// 以等于号 = 为界,截取键与值
name = line.substring(0, i).trim();
line = line.substring(i + 1).trim();
}
if (line.length() > 0) {
// 加载类,并通过 loadClass 方法对类进行缓存
loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader),
name);
}
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class...");
}
}
}
} finally {
reader.close();
}
} catch (Throwable t) {
logger.error("Exception when load extension class...");
}
}
loadClass方法主要用用用于操作缓存
private void loadClass(Map<String, Class<?>> extensionClasses,
java.net.URL resourceURL, Class<?> clazz,
String name) throws NoSuchMethodException {
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("...");
}
// 检测目标类上是否有 Adaptive 注解
if (clazz.isAnnotationPresent(Adaptive.class)) {
if (cachedAdaptiveClass == null) {
// 设置 cachedAdaptiveClass缓存
cachedAdaptiveClass = clazz;
} else if (!cachedAdaptiveClass.equals(clazz)) {
throw new IllegalStateException("...");
}
// 检测 clazz 是否是 Wrapper 类型
} else if (isWrapperClass(clazz)) {
Set<Class<?>> wrappers = cachedWrapperClasses;
if (wrappers == null) {
cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
wrappers = cachedWrapperClasses;
}
// 存储 clazz 到 cachedWrapperClasses 缓存中
wrappers.add(clazz);
// 程序进入此分支,表明 clazz 是一个普通的拓展类
} else {
// 检测 clazz 是否有默认的构造方法,如果没有,则抛出异常
clazz.getConstructor();
if (name == null || name.length() == 0) {
// 如果 name 为空,则尝试从 Extension 注解中获取 name,或使用小写的类名作为name
name = findAnnotationName(clazz);
if (name.length() == 0) {
throw new IllegalStateException("...");
}
}
// 切分 name
String[] names = NAME_SEPARATOR.split(name);
if (names != null && names.length > 0) {
Activate activate = clazz.getAnnotation(Activate.class);
if (activate != null) {
// 如果类上有 Activate 注解,则使用 names 数组的第一个元素作为键,
// 存储 name 到 Activate 注解对象的映射关系
cachedActivates.put(names[0], activate);
}
for (String n : names) {
if (!cachedNames.containsKey(clazz)) {
// 存储 Class 到名称的映射关系
cachedNames.put(clazz, n);
}
Class<?> c = extensionClasses.get(n);
if (c == null) {
// 存储名称到 Class 的映射关系
extensionClasses.put(n, clazz);
} else if (c != clazz) {
throw new IllegalStateException("...");
}
}
}
}
}



















