文章目录
- 写作背景
- Hystrix是什么
- Hystrix的核心功能
- 上手实战
- RestTemplate整合Hystrix
- OpenFeign整合Hystrix
- OpenFeign与Hystrix整合的各种参数如何配置?
- 源码验证
- 基于@HystrixCommand注解实现熔断源码分析
- 初始化资源线程池的源码
- OpenFeign与Hystrix整合执行请求的源码
写作背景
在前面复习了Eurek + Ribbon + OpenFeign的基础上,来复习Hystrix。本文的编写思路是微服务架构下为什么要有负责熔断的公共组件比如Hystrix,Hystrix是什么有哪些核心功能,与OpenFeign整合的底层工作原理是什么,再加上实战和源码验证。
Hystrix是什么
在分布式微服务架构下,服务之间不可避免地发生相互调用,没有一个系统能够保证自身运行绝对不会出问题。有可能因为微服务之间的网络通信出现较大的延迟,或者是被调用的微服务负载过大无法及时响应请求,导致服务调用者的线程一直占用着得不到释放。在并发的场景下,一旦服务调用者的线程资源被耗尽那么这个服务将不能再对外提供服务,甚至更严重会引起服务级联失效,这就是微服务的“雪崩效应”,因此希望有一个公共组件能够在服务通过网络请求访问其他服务时,对延迟和失败提供容错能力,为服务间的调用提供保护和控制。
Hystrix就是Netflix公司开源的一个延迟和容错的组件,用于隔离访问远程系统,防止服务级联失败,从而提供系统的可用性和容错性。同时Hystrix提供失败降级机制,使系统能够更快地从异常中恢复。
Hystrix的核心功能
1、资源隔离,包括线程隔离和信号量隔离
Hystrix为每个依赖服务都维护了⼀个⼩型的线程池(舱壁模式)(或者信号量)。如果该线程池已满, 发往该依赖的请求就被⽴即拒绝,⽽不是排队等待,从⽽加速失败判定
2、提供fallback降级回退机制来应对故障
当请求失败、超时、被拒绝,或断路器处于open打开状态时,执⾏回退逻辑。回退逻辑一般是返回一个默认值,或者从内存里读一个实现设定的值。
3、提供近实时的统计、监控和报警功能来提高故障发现的速度
Hystrix可以近乎实时地监控运⾏指标和配置的变化,例如成功、失败、超时、以及被拒绝的请求等,通过对请求的成功和失败的统计,当对一个依赖服务的调用失败次数达到一个阈值时,就会自动熔断,断路器就会处于open状态,在一定时间内对改依赖服务的调用就会直接走降级
4、自我修复
断路器打开⼀段时间后,会⾃动进⼊“半开”状态,此时会放一个请求过去,如果请求成功,断路器会关闭掉。
上手实战
RestTemplate整合Hystrix
1、pom.xml引入hystrix依赖坐标
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
2、注入一个RestTemplate
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
RestTemplate restTemplate = new RestTemplate();
return restTemplate;
}
3、启动类增加@EnableCircuitBreaker开启Hystrix功能
@EnableEurekaClient
@SpringBootApplication
@EnableCircuitBreaker
public class ServicePortalApplication {
}
4、在fc-service-portal服务里写一个接口
/**
* RestTemplate与Hystrix整合,fallbackMethod指定降级回滚方法,方法的形参和返回结果必须和原始方法一致
*
* @return User
*/
@HystrixCommand(
fallbackMethod = "getUser2Fallback")
@GetMapping("/getUser2/{id}")
public User getUser2(@PathVariable("id") Integer id,
@RequestParam(name = "age", required = false) Integer age) {
return restTemplate.getForObject("http://fc-service-screen/getUser2/{id}", User.class, id, age);
}
/**
* 回退方法,方法的形参和返回值必须和原始getUser2方法一致
*/
private User getUser2Fallback(Integer id, Integer age) {
return User.builder().id(2).age(30).name("愉乐人生").build();
}
在接口上加@HystrixCommand注解,并且指定回滚降级方法fallbackMethod,注意降级方法的入参和返回结果必须和原始方法一样。
然后我们启动Eureka和fc-service-portal,不启动fc-service-screen然后调用/getUser2{id}接口肯定要走降级,我们试下
访问如下接口
http://localhost:8002/getUser2/1?age=25
返回结果如下,跟降级方法里一致符合预期。
这个时候我们再启动fc-service-screen服务,再访问一次
http://localhost:8002/getUser2/2?age=25
我们再看下返回结果,也是符合预期的,是正常走的服务调用里的逻辑
OpenFeign整合Hystrix
FeignClient的接口上@FeignClient注解里的fallback指定降级的处理类
@FeignClient(value = "fc-service-screen", configuration = ScreenFeignConfiguration.class, fallback = ScreenFeignClientHystrix.class)
public interface ScreenFeignClient {
}
看下ScreenFeignClientHystrix 类里的代码,特别注意要加类似@Service的注解代表这是一个Bean要注入到Spring容器的
@Service
public class ScreenFeignClientHystrix implements ScreenFeignClient {
@Override
public int getPort() {
return 0;
}
@Override
public User getUser(Integer id, Integer age) {
return User.builder().id(1).age(20).name("张三").build();
}
@Override
public String upload(MultipartFile file) {
return "";
}
}
配置文件开启Feign的Hystrix功能
#开启Feign的熔断配置
feign:
hystrix:
enabled: true
fc-service-poral里加一个接口
/**
* OpenFeign与Hystrix整合,screenFeignClient的@FeignClient注解里指定fallback回滚降级处理类
* @return User
*/
@GetMapping("/getUser3/{id}")
public User getUser3(@PathVariable("id") Integer id,
@RequestParam(name = "age", required = false) Integer age) {
return screenFeignClient.getUser(id, age);
}
启动服务后访问如下接口
http://localhost:8002/getUser3/2?age=26
OpenFeign与Hystrix整合的各种参数如何配置?
netflix hystrix的原生参数配置,默认值如下
#hystrix的配置
hystrix:
command:
default:
circuitBreaker:
#默认是false关闭,如果改成true将强制打开熔断器
forceOpen: true
#触发熔断错误比例阈值,默认值50%
errorThresholdPercentage: 50
#熔断后休眠时长,默认值5秒
sleepWindowInMilliseconds: 3000
#熔断触发最小请求次数,默认值是20
requestVolumeThreshold: 4
execution:
isolation:
thread:
#熔断超时设置,默认为1s
timeoutInMilliseconds: 100
源码验证
基于@HystrixCommand注解实现熔断源码分析
首先从@EnableCircuitBreaker注解来入手,它是激活熔断器的开关
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
//导入了一个Selector
@Import(EnableCircuitBreakerImportSelector.class)
public @interface EnableCircuitBreaker {
}
我们看下EnableCircuitBreakerImportSelector
public class EnableCircuitBreakerImportSelector
//父类传入的泛型是@EnableCircuitBreaker注解
extends SpringFactoryImportSelector<EnableCircuitBreaker> {
@Override
protected boolean isEnabled() {
//获取断路器开关的配置
return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled",
Boolean.class, Boolean.TRUE);
}
}
看SpringFactoryImportSelector的构造器和selectImports()方法
protected SpringFactoryImportSelector() {
//获取到的子类的
this.annotationClass = (Class<T>) GenericTypeResolver
.resolveTypeArgument(this.getClass(), SpringFactoryImportSelector.class);
}
@Override
public String[] selectImports(AnnotationMetadata metadata) {
if (!isEnabled()) {
return new String[0];
}//annotationClass就是@EnableCircuitBreaker注解
AnnotationAttributes attributes = AnnotationAttributes.fromMap(
metadata.getAnnotationAttributes(this.annotationClass.getName(), true));
Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is "
+ metadata.getClassName() + " annotated with @" + getSimpleName() + "?");
// Find all possible auto configuration classes, filtering duplicates
//根据@EnableCircuitBreake注解类的全限定类名作为key去spring.factories里面找自动配置类
List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader
.loadFactoryNames(this.annotationClass, this.beanClassLoader)));
if (factories.isEmpty() && !hasDefaultFactory()) {
throw new IllegalStateException("Annotation @" + getSimpleName()
+ " found, but there are no implementations. Did you forget to include a starter?");
}
if (factories.size() > 1) {
// there should only ever be one DiscoveryClient, but there might be more than
// one factory
this.log.warn("More than one implementation " + "of @" + getSimpleName()
+ " (now relying on @Conditionals to pick one): " + factories);
}
return factories.toArray(new String[factories.size()]);
}
selectImports方法里的最终目的是根据传进来的泛型全限定类名作为key去spring.factories文件查找对应的配置类,然后注入
也就是说会注入一个HystrixCircuitBreakerConfiguration,我们看下这个类
@Configuration(proxyBeanMethods = false)
public class HystrixCircuitBreakerConfiguration {
@Bean
public HystrixCommandAspect hystrixCommandAspect() {
return new HystrixCommandAspect();
}
它注入了一个HystrixCommandAspect 看名字是个切面,再看下这个切面类HystrixCommandAspect
@Aspect
public class HystrixCommandAspect {
//定义切入点,关注@HystrixCommand
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {
}
//环绕通知
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
//获取原始目标方法
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
//根据方法加的是@HystrixCommand还是@HystrixCollapser注解,或者方法的元数据
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
//看这里,这里就用到了原生的netflix包的东西了
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {//HystrixCommand
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {//HystrixObservableCommand
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause();
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}
private enum HystrixPointcutType {
COMMAND,
COLLAPSER;
static HystrixPointcutType of(Method method) {
if (method.isAnnotationPresent(HystrixCommand.class)) {
return COMMAND;
} else if (method.isAnnotationPresent(HystrixCollapser.class)) {
return COLLAPSER;
} else {
String methodInfo = getMethodInfo(method);
throw new IllegalStateException("'https://github.com/Netflix/Hystrix/issues/1458' - no valid annotation found for: \n" + methodInfo);
}
}
}
HystrixCommand主要用于仅仅会返回一个结果的调用
HystrixObservableCommand主要用于可能会返回多条结果的调用
打个断点看下
进到CommandExcutor.execute()方法里看下
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
Validate.notNull(invokable);
Validate.notNull(metaHolder);
switch (executionType) {
case SYNCHRONOUS: {
//同步执行
return castToExecutable(invokable, executionType).execute();
}
case ASYNCHRONOUS: {
//异步执行
HystrixExecutable executable = castToExecutable(invokable, executionType);
if (metaHolder.hasFallbackMethodCommand()
&& ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
return new FutureDecorator(executable.queue());
}
return executable.queue();
}
case OBSERVABLE: {
HystrixObservable observable = castToObservable(invokable);
return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
}
default:
throw new RuntimeException("unsupported execution type: " + executionType);
}
}
打断点看下
public R execute() {
try {
return this.queue().get();
} catch (Exception var2) {
throw Exceptions.sneakyThrow(this.decomposeException(var2));
}
}
public Future<R> queue() {
//这里是RxJava响应式编程也是处理业务逻辑的地方
final Future<R> delegate = this.toObservable().toBlocking().toFuture();
执行Command就可以发起一次对依赖服务的调用
execute()和queue()仅仅对HystrixCommand适用
execute():调用后直接block住,属于同步调用,直到依赖服务返回单条结果,或者抛出异常
execute()实际上会调用queue().get().queue(),接着会调用toObservable().toBlocking().toFuture()
也就是说,无论是哪种执行command的方式,最终都是依赖toObservable()去执行的
queue():返回一个Future,属于异步调用,后面可以通过Future获取单条结果
初始化资源线程池的源码
GenericCommand中根据元数据信息重写了两个很核⼼的⽅法,⼀个是run⽅法封
装了对原始⽬标⽅法的调⽤,另外⼀个是getFallBack⽅法,它封装了对回退⽅法的调⽤
@Override
protected Object run() throws Exception {
LOGGER.debug("execute command: {}", getCommandKey().name());
return process(new Action() {
@Override
Object execute() {
return getCommandAction().execute(getExecutionType());
}
});
}
@Override
protected Object getFallback() {
final CommandAction commandAction = getFallbackAction();
if (commandAction != null) {
try {
return process(new Action() {
@Override
Object execute() {
MetaHolder metaHolder = commandAction.getMetaHolder();
Object[] args = createArgsForFallback(metaHolder, getExecutionException());
return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
}
});
} catch (Throwable e) {
LOGGER.error(FallbackErrorMessageBuilder.create()
.append(commandAction, e).build());
throw new FallbackInvocationException(unwrapCause(e));
}
} else {
return super.getFallback();
}
}
另外,在GenericCommand的上层类构造函数中会完成资源的初始化,⽐如线程池
GenericCommand —>AbstractHystrixCommand—>HystrixCommand—>AbstractCommand
private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, com.netflix.hystrix.HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
return fromConstructor == null ? com.netflix.hystrix.HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults) : fromConstructor;
}
缓存Map里没有就创建
OpenFeign与Hystrix整合执行请求的源码
启动Hystrix之后,FeignClient接口生成的代理类的InvocationHandler是HystrixInvocationHandler
InvocationHandler.invoke()方法是动态代理中最最核心的,相当于是T proxy注入HystrixTestController,调用T proxy的时候,所有方法的调用,全部会走InvocationHandler.invoke()方法
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable {
//new了一个HstrixCommand
HystrixCommand<Object> hystrixCommand =
new HystrixCommand<Object>(setterMethodMap.get(method)) {
@Override
protected Object run() throws Exception {
try {
//根据方法名找到对应的SynchronousMethodHander,然后调用其invoke()方法
return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
} catch (Exception e) {
throw e;
} catch (Throwable t) {
throw (Error) t;
}
}
@Override
protected Object getFallback() {
if (fallbackFactory == null) {
return super.getFallback();
}
try {
Object fallback = fallbackFactory.create(getExecutionException());
Object result = fallbackMethodMap.get(method).invoke(fallback, args);
if (isReturnsHystrixCommand(method)) {
return ((HystrixCommand) result).execute();
} else if (isReturnsObservable(method)) {
// Create a cold Observable
return ((Observable) result).toBlocking().first();
} else if (isReturnsSingle(method)) {
// Create a cold Observable as a Single
return ((Single) result).toObservable().toBlocking().first();
} else if (isReturnsCompletable(method)) {
((Completable) result).await();
return null;
} else if (isReturnsCompletableFuture(method)) {
return ((Future) result).get();
} else {
return result;
}
if (Util.isDefault(method)) {
return hystrixCommand.execute();
} else if (isReturnsHystrixCommand(method)) {
return hystrixCommand;
} else if (isReturnsObservable(method)) {
// Create a cold Observable
return hystrixCommand.toObservable();
} else if (isReturnsSingle(method)) {
// Create a cold Observable as a Single
return hystrixCommand.toObservable().toSingle();
} else if (isReturnsCompletable(method)) {
return hystrixCommand.toObservable().toCompletable();
} else if (isReturnsCompletableFuture(method)) {
return new ObservableCompletableFuture<>(hystrixCommand);
}
return hystrixCommand.execute();
}
访问如下请求,打断点看下
http://localhost:8002/getUser3/2?age=26
进入熔断的源码,其实就是拿到你定义的熔断降级类,根据方法名和参数找到对应的方法来执行