【源码解析】重试模板RetryTemplate源码分析

news2024/9/28 21:24:20

应用场景

日常开发中,经常会遇到这样的场景:执行一次接口调用,如RPC调用,偶现失败,原因可能是dubbo超时、连接数耗尽、http网络抖动等,出现异常时我们并不能立即知道原因并作出反应,可能只是一个普通的RpcException或RuntimeException,

对于这种小概率的异常,往往需要尝试再次调用(前提是接口是幂等的),因为由于网络问题、下游服务暂时的不稳定导致的异常,段时间后理论上是可以自恢复的;

例如,有时候项目需要进行同步数据,一定要同步成功,不然对于业务会有影响,偶发性的会出现调用接口失败,失败并不是特别多;

一般我们处理偶发异常的流程如下:异常时,

(1)循环的进行远程调用若干次数,记录一下调用失败的记录;
(2)休眠一段时间,尝试等待下游系统自恢复或释放连接数,继续循环调用失败的请求;
(3)如果再调用失败、通过人工二次调用进行修复;

当然,你可以通过写一个指定次数的for循环来执行重试逻辑。
在这里插入图片描述

简单示例

  1. 引入依赖
        <dependency>
            <groupId>org.springframework.retry</groupId>
            <artifactId>spring-retry</artifactId>
        </dependency>
  1. 新增接口
@Slf4j
@RestController
public class UserController {

    @Autowired
    private RetryService retryService;

    @RequestMapping("/retry")
    public void retry() throws Exception {
        retryService.execute();
    }
}
  1. 新增RetryService
@Service("service")
public class RetryService {

    @Retryable(value = IllegalAccessException.class, maxAttempts = 5,
            backoff = @Backoff(value = 1500, maxDelay = 100000, multiplier = 1.2))
    public void execute() throws IllegalAccessException {
        System.out.println("service method execute...");
        throw new IllegalAccessException("manual exception");
    }

    @Recover
    public void recover(IllegalAccessException e) {
        System.out.println("service retry after Recover => " + e.getMessage());
    }

}
  1. 在启动类上添加注解@EnableRetry

源码分析

EnableRetry注解,引入RetryConfiguration

@Import({RetryConfiguration.class})
@Documented
public @interface EnableRetry {
    boolean proxyTargetClass() default false;
}

RetryConfiguration#init,初始化切面类。对带有Retryable注解的方法进行拦截,拦截方法是AnnotationAwareRetryOperationsInterceptor

    @PostConstruct
    public void init() {
        Set<Class<? extends Annotation>> retryableAnnotationTypes = new LinkedHashSet(1);
        retryableAnnotationTypes.add(Retryable.class);
        this.pointcut = this.buildPointcut(retryableAnnotationTypes);
        this.advice = this.buildAdvice();
        if (this.advice instanceof BeanFactoryAware) {
            ((BeanFactoryAware)this.advice).setBeanFactory(this.beanFactory);
        }

    }

    protected Advice buildAdvice() {
        AnnotationAwareRetryOperationsInterceptor interceptor = new AnnotationAwareRetryOperationsInterceptor();
        if (this.retryContextCache != null) {
            interceptor.setRetryContextCache(this.retryContextCache);
        }

        if (this.retryListeners != null) {
            interceptor.setListeners(this.retryListeners);
        }

        if (this.methodArgumentsKeyGenerator != null) {
            interceptor.setKeyGenerator(this.methodArgumentsKeyGenerator);
        }

        if (this.newMethodArgumentsIdentifier != null) {
            interceptor.setNewItemIdentifier(this.newMethodArgumentsIdentifier);
        }

        if (this.sleeper != null) {
            interceptor.setSleeper(this.sleeper);
        }

        return interceptor;
    }

    protected Pointcut buildPointcut(Set<Class<? extends Annotation>> retryAnnotationTypes) {
        ComposablePointcut result = null;
        Iterator var3 = retryAnnotationTypes.iterator();

        while(var3.hasNext()) {
            Class<? extends Annotation> retryAnnotationType = (Class)var3.next();
            Pointcut filter = new RetryConfiguration.AnnotationClassOrMethodPointcut(retryAnnotationType);
            if (result == null) {
                result = new ComposablePointcut(filter);
            } else {
                result.union(filter);
            }
        }

        return result;
    }

AnnotationAwareRetryOperationsInterceptor#invoke,拦截器。主要是根据注解获取RetryOperationsInterceptor,并且通过缓存存储,避免每次调用重复生成。

    public Object invoke(MethodInvocation invocation) throws Throwable {
        MethodInterceptor delegate = this.getDelegate(invocation.getThis(), invocation.getMethod());
        return delegate != null ? delegate.invoke(invocation) : invocation.proceed();
    }

    private MethodInterceptor getDelegate(Object target, Method method) {
        if (!this.delegates.containsKey(target) || !((Map)this.delegates.get(target)).containsKey(method)) {
            synchronized(this.delegates) {
                if (!this.delegates.containsKey(target)) {
                    this.delegates.put(target, new HashMap());
                }

                Map<Method, MethodInterceptor> delegatesForTarget = (Map)this.delegates.get(target);
                if (!delegatesForTarget.containsKey(method)) {
                    org.springframework.retry.annotation.Retryable retryable = (org.springframework.retry.annotation.Retryable)AnnotationUtils.findAnnotation(method, org.springframework.retry.annotation.Retryable.class);
                    if (retryable == null) {
                        retryable = (org.springframework.retry.annotation.Retryable)AnnotationUtils.findAnnotation(method.getDeclaringClass(), org.springframework.retry.annotation.Retryable.class);
                    }

                    if (retryable == null) {
                        retryable = this.findAnnotationOnTarget(target, method);
                    }

                    if (retryable == null) {
                        return (MethodInterceptor)delegatesForTarget.put(method, (Object)null);
                    }

                    MethodInterceptor delegate;
                    if (StringUtils.hasText(retryable.interceptor())) {
                        delegate = (MethodInterceptor)this.beanFactory.getBean(retryable.interceptor(), MethodInterceptor.class);
                    } else if (retryable.stateful()) {
                        delegate = this.getStatefulInterceptor(target, method, retryable);
                    } else {
                        delegate = this.getStatelessInterceptor(target, method, retryable);
                    }

                    delegatesForTarget.put(method, delegate);
                }
            }
        }

        return (MethodInterceptor)((Map)this.delegates.get(target)).get(method);
    }

    private MethodInterceptor getStatelessInterceptor(Object target, Method method, org.springframework.retry.annotation.Retryable retryable) {
        RetryTemplate template = this.createTemplate(retryable.listeners());
        template.setRetryPolicy(this.getRetryPolicy(retryable));
        template.setBackOffPolicy(this.getBackoffPolicy(retryable.backoff()));
        return RetryInterceptorBuilder.stateless().retryOperations(template).label(retryable.label()).recoverer(this.getRecoverer(target, method)).build();
    }

RetryOperationsInterceptor#invoke,封装RetryCallbackItemRecovererCallback

    public Object invoke(final MethodInvocation invocation) throws Throwable {
        final String name;
        if (StringUtils.hasText(this.label)) {
            name = this.label;
        } else {
            name = invocation.getMethod().toGenericString();
        }

        RetryCallback<Object, Throwable> retryCallback = new RetryCallback<Object, Throwable>() {
            public Object doWithRetry(RetryContext context) throws Exception {
                context.setAttribute("context.name", name);
                if (invocation instanceof ProxyMethodInvocation) {
                    try {
                        return ((ProxyMethodInvocation)invocation).invocableClone().proceed();
                    } catch (Exception var3) {
                        throw var3;
                    } catch (Error var4) {
                        throw var4;
                    } catch (Throwable var5) {
                        throw new IllegalStateException(var5);
                    }
                } else {
                    throw new IllegalStateException("MethodInvocation of the wrong type detected - this should not happen with Spring AOP, so please raise an issue if you see this exception");
                }
            }
        };
        if (this.recoverer != null) {
            RetryOperationsInterceptor.ItemRecovererCallback recoveryCallback = new RetryOperationsInterceptor.ItemRecovererCallback(invocation.getArguments(), this.recoverer);
            return this.retryOperations.execute(retryCallback, recoveryCallback);
        } else {
            return this.retryOperations.execute(retryCallback);
        }
    }

RetryTemplate#doExecute

    protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {
        RetryPolicy retryPolicy = this.retryPolicy;
        BackOffPolicy backOffPolicy = this.backOffPolicy;
        RetryContext context = this.open(retryPolicy, state);
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("RetryContext retrieved: " + context);
        }

        RetrySynchronizationManager.register(context);
        Throwable lastException = null;
        boolean exhausted = false;

        try {
            boolean running = this.doOpenInterceptors(retryCallback, context);
            if (!running) {
                throw new TerminatedRetryException("Retry terminated abnormally by interceptor before first attempt");
            } else {
                BackOffContext backOffContext = null;
                Object resource = context.getAttribute("backOffContext");
                if (resource instanceof BackOffContext) {
                    backOffContext = (BackOffContext)resource;
                }

                if (backOffContext == null) {
                    backOffContext = backOffPolicy.start(context);
                    if (backOffContext != null) {
                        context.setAttribute("backOffContext", backOffContext);
                    }
                }

                while(true) {
                    Object var34;
                    if (this.canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
                        try {
                            if (this.logger.isDebugEnabled()) {
                                this.logger.debug("Retry: count=" + context.getRetryCount());
                            }

                            lastException = null;
                            var34 = retryCallback.doWithRetry(context);
                            return var34;
                        } catch (Throwable var31) {
                            Throwable e = var31;
                            lastException = var31;

                            try {
                                this.registerThrowable(retryPolicy, state, context, e);
                            } catch (Exception var28) {
                                throw new TerminatedRetryException("Could not register throwable", var28);
                            } finally {
                                this.doOnErrorInterceptors(retryCallback, context, var31);
                            }

                            if (this.canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
                                try {
                                    backOffPolicy.backOff(backOffContext);
                                } catch (BackOffInterruptedException var30) {
                                    lastException = var31;
                                    if (this.logger.isDebugEnabled()) {
                                        this.logger.debug("Abort retry because interrupted: count=" + context.getRetryCount());
                                    }

                                    throw var30;
                                }
                            }

                            if (this.logger.isDebugEnabled()) {
                                this.logger.debug("Checking for rethrow: count=" + context.getRetryCount());
                            }

                            if (this.shouldRethrow(retryPolicy, context, state)) {
                                if (this.logger.isDebugEnabled()) {
                                    this.logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount());
                                }

                                throw wrapIfNecessary(var31);
                            }

                            if (state == null || !context.hasAttribute("state.global")) {
                                continue;
                            }
                        }
                    }

                    if (state == null && this.logger.isDebugEnabled()) {
                        this.logger.debug("Retry failed last attempt: count=" + context.getRetryCount());
                    }

                    exhausted = true;
                    var34 = this.handleRetryExhausted(recoveryCallback, context, state);
                    return var34;
                }
            }
        } catch (Throwable var32) {
            throw wrapIfNecessary(var32);
        } finally {
            this.close(retryPolicy, context, state, lastException == null || exhausted);
            this.doCloseInterceptors(retryCallback, context, lastException);
            RetrySynchronizationManager.clear();
        }
    }

架构设计

在这里插入图片描述

  1. RetryTemplate: 封装了Retry基本操作,是进入spring-retry框架的整体流程入口,通过RetryTemplate可以指定监听、回退策略、重试策略等。

  2. RetryCallback:该接口封装了业务代码,且failback后,会再次调用RetryCallback接口,直到达到重试次数/时间上限;

  3. RecoveryCallback:当RetryCallback不能再重试的时候,如果定义了RecoveryCallback,就会调用RecoveryCallback,并以其返回结果作为最终的返回结果。此外,RetryCallbackRecoverCallback定义的接口方法都可以接收一个RetryContext上下文参数,通过它可以获取到尝试次数、异常,也可以通过其setAttribute()和getAttribute()来传递一些信息。

  4. RetryPolicy:重试策略,描述什么条件下可以尝试重复调用RetryCallback接口;策略包括最大重试次数、指定异常集合/忽略异常集合、重试允许的最大超时时间;RetryTemplate内部默认时候用的是SimpleRetryPolicySimpleRetryPolicy默认将对所有异常进行尝试,最多尝试3次。还有其他多种更为复杂功能更多的重试策略;

  5. BackOffPolicy:回退策略,用来定义在两次尝试之间需要间隔的时间,如固定时间间隔、递增间隔、随机间隔等;RetryTemplate内部默认使用的是NoBackOffPolicy,其在两次尝试之间不会进行任何的停顿。对于一般可重试的操作往往是基于网络进行的远程请求,它可能由于网络波动暂时不可用,如果立马进行重试它可能还是不可用,但是停顿一下,过一会再试可能它又恢复正常了,所以在RetryTemplate中使用BackOffPolicy往往是很有必要的;

  6. RetryListener:RetryTemplate中可以注册一些RetryListener,可以理解为是对重试过程中的一个增强,它可以在整个Retry前、整个Retry后和每次Retry失败时进行一些操作;如果只想关注RetryListener的某些方法,则可以选择继承RetryListenerSupport,它默认实现了RetryListener的所有方法;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/375590.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

新冠小阳人症状记录

原想挺过春节后再养&#xff0c;发现事与愿违。生理期期间抵抗力下降&#xff0c;所以在生理期第二天就有些症状了。可能是生理期前一天出去采购食物染上&#xff0c;也可能是合租夫妻染上。anyway&#xff0c;记录下自己的症状与相应有效的偏方&#xff1a; 第一天&#xff1a…

git构建工具

目录 一、简介 1、版本控制 2、版本控制工具 二、Git工作机制 1、工作区 2、暂存区 3、远程库 三、常用命令 1、初始化本地仓库 git init 2、查看本地库状态 git status 3、工作区文件加入到暂存区 git add 4、 暂存区文件提交到本地库 git commit -m &quo…

Odoo | Webserivce | 5分钟学会【JSONRPC】接口开发

文章目录Odoo - JsonRPC1. Odoo内方法结构&#xff08;接收端&#xff09;2. POST接口请求结构&#xff08;发送端&#xff09;3. 实例测试Odoo - JsonRPC 1. Odoo内方法结构&#xff08;接收端&#xff09; # -*- coding: utf-8 -*- import odoo import logging import trac…

手把手带你做一套毕业设计-征程开启

本文是《手把手带你做一套毕业设计》专栏的开篇&#xff0c;文本将会包含我们创作这个专栏的初衷&#xff0c;专栏的主体内容&#xff0c;以及我们专栏的后续规划。关于这套毕业设计的作者呢前端部分由狗哥负责&#xff0c;服务端部分则由天哥操刀。我们力求毕业生或者新手通过…

Anaconda3 +pycharm详细安装教程(2023年)

前言最近配置了一台新电脑&#xff0c;准备安装Anaconda&#xff0c;原来是直接安装的python安装包以及pycharm&#xff0c;需要使用什么包就安装什么包&#xff0c;由于网络原因&#xff0c;经常安装失败&#xff0c;所以选择包含众多科学数据包的Anaconda。说到这里&#xff…

@font-face用法超详细讲解

文章目录font-face是什么font-face基本语法urlTTFOTFEOTWOFFSVGformatfont-face用法示例font字体下载ttf-to-eot 字体转换器https://blog.csdn.net/qq_37417446/article/details/106728725 https://developer.mozilla.org/zh-CN/docs/Web/CSS/font-face font-face是什么 font-…

网络性能总不好?专家帮你来“看看”— CANN 6.0 黑科技 | 网络调优专家AOE,性能效率双提升

随着深度学习模型复杂度和数据集规模的增大&#xff0c;计算效率的提升成为不可忽视的问题。然而&#xff0c;算法网络的多样性、输入数据的不确定性以及硬件之间的差异性&#xff0c;使得网络调优耗费巨大成本&#xff0c;即使是经验丰富的专家&#xff0c;也需要耗费数天的时…

C#窗口介绍

窗口就是打开程序我们所面对的一个面板&#xff0c;里面可以添加各种控件&#xff0c;如下图所示&#xff0c;我们可以在属性栏设置其标题名称、图标、大小等。图1 窗口图 图2 设置面板 图3 设置双击标题框&#xff0c;会生成Load函数&#xff0c;也可以到事件里面去找Load函数…

Linux(Centos)安装RabbitMQ+延时插件+开机自启动

安装目录1&#xff1a;前言1.1 系统环境1.2&#xff1a;安装版本1.3 简介2&#xff1a;安装2.1&#xff1a;安装前准备&#xff1a;2.2&#xff1a;安装Erlang2.3&#xff1a;安装RabbitMQ2.3&#xff1a;延迟依赖插件安装1&#xff1a;前言 1.1 系统环境 操作系统版本&#…

python-批量下载某短视频平台音视频标题、评论、点赞数

python-批量下载某短视频平台音视频标题、评论数、点赞数前言一、获取单个视频信息1、获取视频 url2、发送请求3、数据解析二、批量获取数据1、批量导入地址2、批量导出excel文件3、批量存入mysql数据库三、完整代码前言 1、Cookie中文名称为小型文本文件&#xff0c;指某些网…

【linux】软硬链接

在linux中在磁盘中定位文件并不是根据文件名而是根据文件的inode&#xff0c;一个文件对应一个inode但是一个inode可以对应多个文件。硬链接硬链接是通过索引节点进行的链接。在Linux中&#xff0c;多个文件指向同一个索引节点是允许的&#xff0c;像这样的链接就是硬链接。硬链…

【论文速递】EMNLP 2020 - 将事件抽取作为机器阅读理解任务

【论文速递】EMNLP 2020 - 将事件抽取作为机器阅读理解任务 【论文原文】&#xff1a;Event Extraction as Machine Reading Comprehension 【作者信息】&#xff1a;Jian Liu and Yubo Chen and Kang Liu and Wei Bi and Xiaojiang Liu 论文&#xff1a;https://aclantholo…

计算机视觉知识体系

文章目录一、计算机视觉1.1 系统工程方案层1.2 领域任务模块层1.3 基础算法层1.4 深入领域体会一、计算机视觉 三个层次&#xff1a;系统工程方案层、领域任务模块层、基础算法层。 三方面知识点&#xff1a;图像处理、机器学习、基础数学与模型。 视频的三个场景&#xff1…

k8s node之间是如何通信的?

承接上文同一个node中pod之间如何通信&#xff1f;单一Pod上的容器是怎么共享网络命名空间的&#xff1f;每个node上的pod ip和cni0网桥ip和flannel ip都是在同一个网段10.1.71.x上。cni0网桥会把报文发送flannel这个网络设备上&#xff0c;flannel其实是node上的一个后台进程&…

【设计模式】2.抽象工厂模式

抽象工厂模式 前面介绍的工厂方法模式中考虑的是一类产品的生产&#xff0c;如畜牧场只养动物、电视机厂只生产电视机、传智播客只培养计算机软件专业的学生等。 这些工厂只生产同种类产品&#xff0c;同种类产品称为同等级产品&#xff0c;也就是说&#xff1a;工厂方法模式…

【python】条件语句,简单理解

嗨害大家好鸭&#xff01;我是小熊猫~ Python 条件语句 Python条件语句是通过一条或多条语句的执行结果&#xff08;True或者False&#xff09;来决定执行的代码块。 可以通过下图来简单了解条件语句的执行过程: 更多python资料获取:点击此处跳转文末名片获取 Python程序语言…

【笔记】两台1200PLC进行S7 通信(1)

使用两台1200系列PLC进行S7通信&#xff08;入门&#xff09; 文章目录 目录 文章目录 前言 一、通信 1.概念 2.PLC通信 1.串口 2.网口 …

2023年5月软考软件设计师备考经验

一、考试目标&#xff1a; 通过本考试的合格人员能根据软件开发项目管理和软件工程的要求&#xff0c;按照系统总体设计规格说明书进行软件设计&#xff0c;编写程序设计规格说明书等相应的文档&#xff0c;组织和指导程序员编写、调试程序&#xff0c;并对软件进行优化和集成…

cityengine自定义纹理库资源

背景 cityengine虽然可以将shp生成带纹理的三维模型,但是纹理不一定满足我们的要求,这时候我们就想用我们自己制作的纹理 粗略了解规则文件 了解Building_From_Footprint.cga这个规则文件,具体文件位置默认在 “C:\Users[电脑用户名:如Administrator]\Documents\CityEng…

小米无线AR眼镜探索版细节汇总

在MWC 2023期间&#xff0c;小米正式发布了一款无线AR眼镜&#xff0c;虽然还没看过实机&#xff0c;但XDA提前上手体验&#xff0c;我们从中进行总结。首先我要说的是&#xff0c;小米这款眼镜和高通无线AR眼镜参考设计高度重叠&#xff0c;产品卖点几乎一致&#xff0c;只是增…