断路器Hystrix
使用和工作原理
介绍
在微服务架构的分布式系统中,众多微服务有复杂的依赖关系,这些依赖在某些情况下不可避免的会出现一些请求失败。当一个依赖由于延迟高出现阻塞,调用该依赖的服务线程就会发生排队阻塞。如果这个时候出现大量的业务流量打过来,就可能会出现服务器资源被消耗殆尽导致服务宕机。
对于这种情况,就需要有一个服务的保护机制——服务隔离和熔断机制,当依赖应用出现问题,不会导致下游服务出现阻塞而导致一些列的雪崩效应。
快速开始
- pom配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.12.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.charles</groupId>
<artifactId>demo-hystrix</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo-hystrix</name>
<description>demo-hystrix</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
<version>2.2.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
- 启动类添加注解
@EnableCircuitBreaker
@SpringBootApplication
@EnableHystrix
public class DemoHystrixApplication {
public static void main(String[] args) {
SpringApplication.run(DemoHystrixApplication.class, args);
}
}
- 方法上使用断路器注解
@Service
public class HelloService {
@HystrixCommand(
fallbackMethod = "fallback",
commandProperties = {@HystrixProperty(name = "execution.isolation.strategy", value = "THREAD"), @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "1000")})
public String getHystrixResponse() {
RestTemplate restTemplate = new RestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.add("Content-Type", "application/json; charset=UTF-8");
headers.add("Accept", "*/*");
HttpEntity<String> requestEntity = new HttpEntity<>("", headers);
ResponseEntity<String> exchange = restTemplate.exchange("http://localhost:8082/sleep", HttpMethod.GET, requestEntity, String.class);
String body = exchange.getBody();
return body;
}
public String fallback() {
return "sorry, the request is timeout";
}
}
- 当调用远程服务接口超时,超过1s后,就会执行fallback方法。
源码解析
HystrixCommandAspect
是实现熔断的核心类。会拦截带有HystrixCommand
和HystrixCollapser
注解的方法。
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(ProceedingJoinPoint joinPoint) throws Throwable {
Method method = AopUtils.getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", new Object[]{joinPoint});
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser annotations at the same time");
} else {
HystrixCommandAspect.MetaHolderFactory metaHolderFactory = (HystrixCommandAspect.MetaHolderFactory)META_HOLDER_FACTORY_MAP.get(HystrixCommandAspect.HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
try {
Object result;
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = this.executeObservable(invokable, executionType, metaHolder);
}
return result;
} catch (HystrixBadRequestException var9) {
throw var9.getCause();
} catch (HystrixRuntimeException var10) {
throw this.hystrixRuntimeExceptionToThrowable(metaHolder, var10);
}
}
}
- 进入到
result = CommandExecutor.execute(invokable, executionType, metaHolder);
,会执行HystrixCommand#execute
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
- 跟进queue方法。
public Future<R> queue() {
final Future<R> delegate = toObservable().toBlocking().toFuture();
//...
}
- 跟进
toObservable
方法。核心是applyHystrixSemantics
public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
//...
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
AbstractCommand#applyHystrixSemantics
方法中会执行AbstractCommand#executeCommandAndObserve
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// mark that we're starting execution on the ExecutionHook
// if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
executionHook.onStart(_cmd);
/* determine if we're allowed to execute */
if (circuitBreaker.allowRequest()) {
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
}
AbstractCommand#executeCommandAndObserve
方法中定义了handleFallback
,用来处理事件。
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
AbstractCommand#executeCommandAndObserve
中,因为默认是开启超时配置,所以会创建HystrixObservableTimeoutOperator
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
HystrixObservableTimeoutOperator
对象的call方法,该方法创建了一个listener,之后将listener加入到HystrixTimer
中,如果超时会执行tick
方法。
public Subscriber<? super R> call(final Subscriber<? super R> child) {
final CompositeSubscription s = new CompositeSubscription();
// if the child unsubscribes we unsubscribe our parent as well
child.add(s);
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() {
@Override
public void run() {
child.onError(new HystrixTimeoutException());
}
});
TimerListener listener = new TimerListener() {
@Override
public void tick() {
// if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath
// otherwise it means we lost a race and the run() execution completed or did not start
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
// report timeout failure
originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
// shut down the original request
s.unsubscribe();
timeoutRunnable.run();
//if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout
}
}
@Override
public int getIntervalTimeInMilliseconds() {
return originalCommand.properties.executionTimeoutInMilliseconds().get();
}
};
final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
// set externally so execute/queue can see this
originalCommand.timeoutTimer.set(tl);
Subscriber<R> parent = new Subscriber<R>() {
@Override
public void onCompleted() {
if (isNotTimedOut()) {
// stop timer and pass notification through
tl.clear();
child.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if (isNotTimedOut()) {
// stop timer and pass notification through
tl.clear();
child.onError(e);
}
}
@Override
public void onNext(R v) {
if (isNotTimedOut()) {
child.onNext(v);
}
}
private boolean isNotTimedOut() {
// if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
}
};
// if s is unsubscribed we want to unsubscribe the parent
s.add(parent);
return parent;
}
-
tick方法就是将状态置为Time Out,并将观察者执行一个
onError
方法,内容为new HystrixTimeoutException()
。 -
AbstractCommand#handleTimeoutViaFallback
用来处理超时。 -
GenericCommand#getFallback
,用来获取fallback方法,且执行。
protected Object getFallback() {
final CommandAction commandAction = this.getFallbackAction();
if (commandAction != null) {
try {
return this.process(new AbstractHystrixCommand<Object>.Action() {
Object execute() {
MetaHolder metaHolder = commandAction.getMetaHolder();
Object[] args = CommonUtils.createArgsForFallback(metaHolder, GenericCommand.this.getExecutionException());
return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
}
});
} catch (Throwable var3) {
LOGGER.error(FallbackErrorMessageBuilder.create().append(commandAction, var3).build());
throw new FallbackInvocationException(ExceptionUtils.unwrapCause(var3));
}
} else {
return super.getFallback();
}
}