应用场景
日常开发中,经常会遇到这样的场景:执行一次接口调用,如RPC调用,偶现失败,原因可能是dubbo超时、连接数耗尽、http网络抖动等,出现异常时我们并不能立即知道原因并作出反应,可能只是一个普通的RpcException或RuntimeException,
对于这种小概率的异常,往往需要尝试再次调用(前提是接口是幂等的),因为由于网络问题、下游服务暂时的不稳定导致的异常,段时间后理论上是可以自恢复的;
例如,有时候项目需要进行同步数据,一定要同步成功,不然对于业务会有影响,偶发性的会出现调用接口失败,失败并不是特别多;
一般我们处理偶发异常的流程如下:异常时,
(1)循环的进行远程调用若干次数,记录一下调用失败的记录;
(2)休眠一段时间,尝试等待下游系统自恢复或释放连接数,继续循环调用失败的请求;
(3)如果再调用失败、通过人工二次调用进行修复;
当然,你可以通过写一个指定次数的for循环来执行重试逻辑。
简单示例
- 引入依赖
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
- 新增接口
@Slf4j
@RestController
public class UserController {
@Autowired
private RetryService retryService;
@RequestMapping("/retry")
public void retry() throws Exception {
retryService.execute();
}
}
- 新增
RetryService
@Service("service")
public class RetryService {
@Retryable(value = IllegalAccessException.class, maxAttempts = 5,
backoff = @Backoff(value = 1500, maxDelay = 100000, multiplier = 1.2))
public void execute() throws IllegalAccessException {
System.out.println("service method execute...");
throw new IllegalAccessException("manual exception");
}
@Recover
public void recover(IllegalAccessException e) {
System.out.println("service retry after Recover => " + e.getMessage());
}
}
- 在启动类上添加注解
@EnableRetry
源码分析
EnableRetry
注解,引入RetryConfiguration
@Import({RetryConfiguration.class})
@Documented
public @interface EnableRetry {
boolean proxyTargetClass() default false;
}
RetryConfiguration#init
,初始化切面类。对带有Retryable
注解的方法进行拦截,拦截方法是AnnotationAwareRetryOperationsInterceptor
。
@PostConstruct
public void init() {
Set<Class<? extends Annotation>> retryableAnnotationTypes = new LinkedHashSet(1);
retryableAnnotationTypes.add(Retryable.class);
this.pointcut = this.buildPointcut(retryableAnnotationTypes);
this.advice = this.buildAdvice();
if (this.advice instanceof BeanFactoryAware) {
((BeanFactoryAware)this.advice).setBeanFactory(this.beanFactory);
}
}
protected Advice buildAdvice() {
AnnotationAwareRetryOperationsInterceptor interceptor = new AnnotationAwareRetryOperationsInterceptor();
if (this.retryContextCache != null) {
interceptor.setRetryContextCache(this.retryContextCache);
}
if (this.retryListeners != null) {
interceptor.setListeners(this.retryListeners);
}
if (this.methodArgumentsKeyGenerator != null) {
interceptor.setKeyGenerator(this.methodArgumentsKeyGenerator);
}
if (this.newMethodArgumentsIdentifier != null) {
interceptor.setNewItemIdentifier(this.newMethodArgumentsIdentifier);
}
if (this.sleeper != null) {
interceptor.setSleeper(this.sleeper);
}
return interceptor;
}
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> retryAnnotationTypes) {
ComposablePointcut result = null;
Iterator var3 = retryAnnotationTypes.iterator();
while(var3.hasNext()) {
Class<? extends Annotation> retryAnnotationType = (Class)var3.next();
Pointcut filter = new RetryConfiguration.AnnotationClassOrMethodPointcut(retryAnnotationType);
if (result == null) {
result = new ComposablePointcut(filter);
} else {
result.union(filter);
}
}
return result;
}
AnnotationAwareRetryOperationsInterceptor#invoke
,拦截器。主要是根据注解获取RetryOperationsInterceptor
,并且通过缓存存储,避免每次调用重复生成。
public Object invoke(MethodInvocation invocation) throws Throwable {
MethodInterceptor delegate = this.getDelegate(invocation.getThis(), invocation.getMethod());
return delegate != null ? delegate.invoke(invocation) : invocation.proceed();
}
private MethodInterceptor getDelegate(Object target, Method method) {
if (!this.delegates.containsKey(target) || !((Map)this.delegates.get(target)).containsKey(method)) {
synchronized(this.delegates) {
if (!this.delegates.containsKey(target)) {
this.delegates.put(target, new HashMap());
}
Map<Method, MethodInterceptor> delegatesForTarget = (Map)this.delegates.get(target);
if (!delegatesForTarget.containsKey(method)) {
org.springframework.retry.annotation.Retryable retryable = (org.springframework.retry.annotation.Retryable)AnnotationUtils.findAnnotation(method, org.springframework.retry.annotation.Retryable.class);
if (retryable == null) {
retryable = (org.springframework.retry.annotation.Retryable)AnnotationUtils.findAnnotation(method.getDeclaringClass(), org.springframework.retry.annotation.Retryable.class);
}
if (retryable == null) {
retryable = this.findAnnotationOnTarget(target, method);
}
if (retryable == null) {
return (MethodInterceptor)delegatesForTarget.put(method, (Object)null);
}
MethodInterceptor delegate;
if (StringUtils.hasText(retryable.interceptor())) {
delegate = (MethodInterceptor)this.beanFactory.getBean(retryable.interceptor(), MethodInterceptor.class);
} else if (retryable.stateful()) {
delegate = this.getStatefulInterceptor(target, method, retryable);
} else {
delegate = this.getStatelessInterceptor(target, method, retryable);
}
delegatesForTarget.put(method, delegate);
}
}
}
return (MethodInterceptor)((Map)this.delegates.get(target)).get(method);
}
private MethodInterceptor getStatelessInterceptor(Object target, Method method, org.springframework.retry.annotation.Retryable retryable) {
RetryTemplate template = this.createTemplate(retryable.listeners());
template.setRetryPolicy(this.getRetryPolicy(retryable));
template.setBackOffPolicy(this.getBackoffPolicy(retryable.backoff()));
return RetryInterceptorBuilder.stateless().retryOperations(template).label(retryable.label()).recoverer(this.getRecoverer(target, method)).build();
}
RetryOperationsInterceptor#invoke
,封装RetryCallback
和ItemRecovererCallback
。
public Object invoke(final MethodInvocation invocation) throws Throwable {
final String name;
if (StringUtils.hasText(this.label)) {
name = this.label;
} else {
name = invocation.getMethod().toGenericString();
}
RetryCallback<Object, Throwable> retryCallback = new RetryCallback<Object, Throwable>() {
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute("context.name", name);
if (invocation instanceof ProxyMethodInvocation) {
try {
return ((ProxyMethodInvocation)invocation).invocableClone().proceed();
} catch (Exception var3) {
throw var3;
} catch (Error var4) {
throw var4;
} catch (Throwable var5) {
throw new IllegalStateException(var5);
}
} else {
throw new IllegalStateException("MethodInvocation of the wrong type detected - this should not happen with Spring AOP, so please raise an issue if you see this exception");
}
}
};
if (this.recoverer != null) {
RetryOperationsInterceptor.ItemRecovererCallback recoveryCallback = new RetryOperationsInterceptor.ItemRecovererCallback(invocation.getArguments(), this.recoverer);
return this.retryOperations.execute(retryCallback, recoveryCallback);
} else {
return this.retryOperations.execute(retryCallback);
}
}
RetryTemplate#doExecute
protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {
RetryPolicy retryPolicy = this.retryPolicy;
BackOffPolicy backOffPolicy = this.backOffPolicy;
RetryContext context = this.open(retryPolicy, state);
if (this.logger.isTraceEnabled()) {
this.logger.trace("RetryContext retrieved: " + context);
}
RetrySynchronizationManager.register(context);
Throwable lastException = null;
boolean exhausted = false;
try {
boolean running = this.doOpenInterceptors(retryCallback, context);
if (!running) {
throw new TerminatedRetryException("Retry terminated abnormally by interceptor before first attempt");
} else {
BackOffContext backOffContext = null;
Object resource = context.getAttribute("backOffContext");
if (resource instanceof BackOffContext) {
backOffContext = (BackOffContext)resource;
}
if (backOffContext == null) {
backOffContext = backOffPolicy.start(context);
if (backOffContext != null) {
context.setAttribute("backOffContext", backOffContext);
}
}
while(true) {
Object var34;
if (this.canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Retry: count=" + context.getRetryCount());
}
lastException = null;
var34 = retryCallback.doWithRetry(context);
return var34;
} catch (Throwable var31) {
Throwable e = var31;
lastException = var31;
try {
this.registerThrowable(retryPolicy, state, context, e);
} catch (Exception var28) {
throw new TerminatedRetryException("Could not register throwable", var28);
} finally {
this.doOnErrorInterceptors(retryCallback, context, var31);
}
if (this.canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
backOffPolicy.backOff(backOffContext);
} catch (BackOffInterruptedException var30) {
lastException = var31;
if (this.logger.isDebugEnabled()) {
this.logger.debug("Abort retry because interrupted: count=" + context.getRetryCount());
}
throw var30;
}
}
if (this.logger.isDebugEnabled()) {
this.logger.debug("Checking for rethrow: count=" + context.getRetryCount());
}
if (this.shouldRethrow(retryPolicy, context, state)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount());
}
throw wrapIfNecessary(var31);
}
if (state == null || !context.hasAttribute("state.global")) {
continue;
}
}
}
if (state == null && this.logger.isDebugEnabled()) {
this.logger.debug("Retry failed last attempt: count=" + context.getRetryCount());
}
exhausted = true;
var34 = this.handleRetryExhausted(recoveryCallback, context, state);
return var34;
}
}
} catch (Throwable var32) {
throw wrapIfNecessary(var32);
} finally {
this.close(retryPolicy, context, state, lastException == null || exhausted);
this.doCloseInterceptors(retryCallback, context, lastException);
RetrySynchronizationManager.clear();
}
}
架构设计
-
RetryTemplate
: 封装了Retry基本操作,是进入spring-retry框架的整体流程入口,通过RetryTemplate
可以指定监听、回退策略、重试策略等。 -
RetryCallback
:该接口封装了业务代码,且failback后,会再次调用RetryCallback
接口,直到达到重试次数/时间上限; -
RecoveryCallback
:当RetryCallback
不能再重试的时候,如果定义了RecoveryCallback
,就会调用RecoveryCallback,并以其返回结果作为最终的返回结果。此外,RetryCallback
和RecoverCallback
定义的接口方法都可以接收一个RetryContext上下文参数,通过它可以获取到尝试次数、异常,也可以通过其setAttribute()和getAttribute()来传递一些信息。 -
RetryPolicy
:重试策略,描述什么条件下可以尝试重复调用RetryCallback
接口;策略包括最大重试次数、指定异常集合/忽略异常集合、重试允许的最大超时时间;RetryTemplate
内部默认时候用的是SimpleRetryPolicy
,SimpleRetryPolicy
默认将对所有异常进行尝试,最多尝试3次。还有其他多种更为复杂功能更多的重试策略; -
BackOffPolicy
:回退策略,用来定义在两次尝试之间需要间隔的时间,如固定时间间隔、递增间隔、随机间隔等;RetryTemplate
内部默认使用的是NoBackOffPolicy
,其在两次尝试之间不会进行任何的停顿。对于一般可重试的操作往往是基于网络进行的远程请求,它可能由于网络波动暂时不可用,如果立马进行重试它可能还是不可用,但是停顿一下,过一会再试可能它又恢复正常了,所以在RetryTemplate
中使用BackOffPolicy
往往是很有必要的; -
RetryListener
:RetryTemplate中可以注册一些RetryListener
,可以理解为是对重试过程中的一个增强,它可以在整个Retry前、整个Retry后和每次Retry失败时进行一些操作;如果只想关注RetryListener
的某些方法,则可以选择继承RetryListenerSupport
,它默认实现了RetryListener
的所有方法;