AOP在PowerJob中的使用,缓存锁保证并发安全,知识细节全总结

news2025/1/15 12:43:18

这是一篇简简单单的文章,需要你简简单单看一眼就好,如果有不明白的地方,欢迎留言讨论。

 

在之前的文章中出现过一次AOP的使用,就是在运行任务之前,需要判断一下,触发该任务执行的server,是不是数据库中对应任务所在app的直接server,使用的是注解@DesignateServer,本篇文章是从另一个注解,再一次顺一遍AOP的使用,而且本篇文章的注解,再一次用到了可重入锁ReentrantLock,这个也是之前的文章中说的内容,可以再熟悉一遍,本篇文章的入口就是注解——UseCacheLock。

从名字来看,该注解是一个使用缓存时的一个锁,该类位于tech.powerjob.server.core.lock包下,用来修饰方法,在运行时执行的,源码如下:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface UseCacheLock {
    String type();
    String key();
    int concurrencyLevel();
}

type:从使用的代码处得出,目前只有两种一种是processJobInstance,另一种是processWfInstance

key:主要是任务id或者任务实例id,还有工作流id。其中任务id或者任务实例id的选取,是通过一个表达式来判断得出的。

concurrencyLevel:缓存要用到的字段,允许同时并发执行的写操作数。

UseCacheLock 的使用场景

该注解在powerjob中一共使用了8次,其中2次出现在任务的派发,6次出现在工作流的操作中,这一次就选在任务的派发,来讲一下该注解的使用场景。

@UseCacheLock(type = "processJobInstance", key = "#jobInfo.getMaxInstanceNum() > 0 || T(tech.powerjob.common.enums.TimeExpressionType).FREQUENT_TYPES.contains(#jobInfo.getTimeExpressionType()) ? #jobInfo.getId() : #instanceId", concurrencyLevel = 1024)
public void dispatch(JobInfoDO jobInfo, Long instanceId) {
    ... ...
}

方法内部的代码不重要,主要是来看方法上面的注解,里面的三个关键字分别是

processJobInstance

#jobInfo.getMaxInstanceNum() > 0 || T(tech.powerjob.common.enums.TimeExpressionType).FREQUENT_TYPES.contains(#jobInfo.getTimeExpressionType()) ? #jobInfo.getId() : #instanceId

1024

第一个和最后一个没什么好说的,主要说一说中间那一条长长的表达式,该表达式通过解读,就是判断最大同时运行任务数是否大于0,以及任务的时间表达式类型是不是FIX_RATE或者FIX_DELAY。这一表达式可以说,除非人为的将MaxInstanceNum设置为0,否则该条数据默认值就是1,也就是说这个表达式,不负责任的说,99.99%都是真,也就是说都会使用JobId作为key值。

按照代码来看,就是当任务在派发的时候,会使用到该注解,为的是防止该方法同时运行派发同一个任务,如果是同时派发两个不同的任务,就不会有影响,毕竟在派发的过程中涉及到了对任务实例的数据修改,如果两个同时进行,确实会产生问题。

UseCacheLock 的AOP处理

处理该注解的类个该类在同一个包,处理的源代码如下所示:

@Around(value = "@annotation(useCacheLock)")

public Object execute(ProceedingJoinPoint point, UseCacheLock useCacheLock) throws Throwable {
    Cache<String, ReentrantLock> lockCache = lockContainer.computeIfAbsent(useCacheLock.type(), ignore -> {
        int concurrencyLevel = useCacheLock.concurrencyLevel();
        log.info("[UseSegmentLockAspect] create Lock Cache for [{}] with concurrencyLevel: {}", useCacheLock.type(), concurrencyLevel);
        return CacheBuilder.newBuilder()
                .initialCapacity(300000)
                .maximumSize(500000)
                .concurrencyLevel(concurrencyLevel)
                .expireAfterWrite(30, TimeUnit.MINUTES)
                .build();
    });
    final Method method = AOPUtils.parseMethod(point);
    Long key = AOPUtils.parseSpEl(method, point.getArgs(), useCacheLock.key(), Long.class, 1L);
    final ReentrantLock reentrantLock = lockCache.get(String.valueOf(key), ReentrantLock::new);
    long start = System.currentTimeMillis();
    reentrantLock.lockInterruptibly();
    try {
        long timeCost = System.currentTimeMillis() - start;
        if (timeCost > SLOW_THRESHOLD) {

            final SlowLockEvent slowLockEvent = new SlowLockEvent()
                    .setType(SlowLockEvent.Type.LOCAL)
                    .setLockType(useCacheLock.type())
                    .setLockKey(String.valueOf(key))
                    .setCallerService(method.getDeclaringClass().getSimpleName())
                    .setCallerMethod(method.getName())
                    .setCost(timeCost);

            monitorService.monitor(slowLockEvent);

            log.warn("[UseSegmentLockAspect] wait lock for method({}#{}) cost {} ms! key = '{}', args = {}, ", method.getDeclaringClass().getSimpleName(), method.getName(), timeCost,
                    key,
                    JSON.toJSONString(point.getArgs()));
        }
        return point.proceed();
    } finally {
        reentrantLock.unlock();
    }
}

代码看着挺长的,但是内容其实没有多少,可以一步一步拆开来看。

缓存的创建

第一步通过type来获取缓存,从文章开头我们知道,这个type就两个类型,processJobInstance就是用来派发任务的,processWfInstance就是用来操作工作流任务的,该代码里面就是processJobInstance,如果缓存存在,直接拿来用,如果不存在,则创建缓存,来看一眼创建缓存的代码:

CacheBuilder.newBuilder()
                .initialCapacity(300000)
                .maximumSize(500000)
                .concurrencyLevel(concurrencyLevel)
                .expireAfterWrite(30, TimeUnit.MINUTES)
                .build();

这个代码的大意就是创建一个有如下属性的缓存,缓存有效时间是30分钟(expireAfterWrite(30, TimeUnit.MINUTES)),这就像鱼有7秒记忆一样,这个缓存只能记录30分钟,过期失效。缓存的最大条目数是50万(maximumSize(500000))。指定用于缓存的hash table最低总规模是300000,允许同时并发操作数是concurrencyLevel,也就是传进来的1024.

key值的获取

第二步就是获取key值,该值主要是为了获取可重入锁用的,获取该值的源代码如下所示:

final Method method = AOPUtils.parseMethod(point);
Long key = AOPUtils.parseSpEl(method, point.getArgs(), useCacheLock.key(), Long.class, 1L);

从这个代码可以看到,用到了AOPUtil这个工具类的两个方法,第一个方法是解析出当前的方法,第二个是获取key值,这个AOPUtil在tech.powerjob.server.common.utils包下。解析方法的源码如下,备注解释各代码的目的:

public static Method parseMethod(ProceedingJoinPoint joinPoint) {
    //获取接入点的签名,此处必须是方法的签名,否则会报异常
    Signature pointSignature = joinPoint.getSignature();
    if (!(pointSignature instanceof  MethodSignature)) {
        throw new IllegalArgumentException("this annotation should be used on a method!");
    }
    //强转成方法的签名
    MethodSignature signature = (MethodSignature) pointSignature;
    //获取方法
    Method method = signature.getMethod();
    //如果方法所处的类是一个interface
    if (method.getDeclaringClass().isInterface()) {
        try {
            //通过IoC容器获取目标对象,然后再获取对象的方法
            method = joinPoint.getTarget().getClass().getDeclaredMethod(pointSignature.getName(), method.getParameterTypes());
        } catch (SecurityException | NoSuchMethodException e) {
            ExceptionUtils.rethrow(e);
        }
    }
    return method;
}

获取到了方法之后,就是获取key值,源代码如下,备注解释各代码的目的:

public static <T> T parseSpEl(Method method, Object[] arguments, String spEl, Class<T> clazz, T defaultResult) {
    //获取到方法的参数值类型
    String[] params = discoverer.getParameterNames(method);
    assert params != null;
    //创建数据上下文
    EvaluationContext context = new StandardEvaluationContext();
    for (int len = 0; len < params.length; len++) {
        //将param[len] = arguments[len]
        context.setVariable(params[len], arguments[len]);
    }
    try {
        //执行表达式,也就是前面#jobInfo.getMaxInstanceNum() > 0 || T(tech.powerjob.common.enums.TimeExpressionType).FREQUENT_TYPES.contains(#jobInfo.getTimeExpressionType()) ? #jobInfo.getId() : #instanceId
        Expression expression = parser.parseExpression(spEl);
        //返回表达式执行的结果,以clazz设置的类型返回
        return expression.getValue(context, clazz);
    } catch (Exception e) {
        log.error("[AOPUtils] parse SpEL failed for method[{}], please concat @tjq to fix the bug!", method.getName(), e);
        return defaultResult;
    }
}

经过以上两步,key值就获取完毕了

加锁

加锁的源代码如下所示,就是如果缓存里面保存了锁,就直接拿到,如果没有,就new一个出来,然后就启动锁,

那两条时间主要是记录加锁的时间,如果时间过长就要记录一条日志,记录加锁慢时的任务信息。

final ReentrantLock reentrantLock = lockCache.get(String.valueOf(key), ReentrantLock::new);
long start = System.currentTimeMillis();
reentrantLock.lockInterruptibly();
try {
    long timeCost = System.currentTimeMillis() - start;
    ... ...
}
... ...

加锁结束之后,就可以执行注解修饰的方法了,执行就是下面这一行:

point.proceed();

执行结束之后,将锁打开就OK了。

总结

 

本篇文章涉及的知识主要是AOP的使用,可重入锁的使用,IoC容器相关,Spring的表达式的使用,缓存Cache的创建,每一个知识点都够我喝一壶了,所以大家如果想要了解这些知识的细节,请自行搜索去查想要了解的内容,如果你懒得查,也可以问我,当然我也懒,回不回答就看我心情了,哼,我外号就叫不高兴,所以大家看着办吧。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/379251.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[神经网络]图神经网络(GNN)

一、概述 1.图 图用来表示一些实体(entities)之间的关系(实体表示为点(node)&#xff0c;关系表示为边(edge))。 关系分为有方向和无方向 2.数据的图表示 以图像文件为例&#xff0c;我们可以用邻接矩阵来表示一张图像。每个点表示一个像素点&#xff0c;若一个像素点有x个相邻…

重生之我是SVG(1)-入门

概述 引用一句来自MDN的一句话&#xff1a; SVG 图像是使用各种元素创建的&#xff0c;这些元素分别应用于矢量图像的结构、绘制与布局。在这里&#xff0c;您可以找到每个 SVG 元素的参考文档。 SVG 文件可以直接插入网页&#xff0c;成为 DOM 的一部分&#xff0c;然后用 Ja…

华三OSPF多区域互访实验

OSPF 实验 实验拓扑 实验需求 按照图示配置 IP 地址按照图示分区域配置 OSPF &#xff0c;实现全网互通为了路由结构稳定&#xff0c;要求路由器使用环回口作为 Router-id&#xff0c;ABR 的环回口宣告进骨干区域 实验解法 1.配置 IP 地址部分 2.按照图示分区域配置 OS…

Zeppelin-0.10.0的安装

目录 1.解压到指定目录 2.修改文件名 3.拷贝配置文件 4.修改IP和端口号&#xff0c;也可以改为8090等端口号 5.修改zeppelin-env.sh文件 6.复制hive-site.xml文件到当前目录下 7.切换目录 8.拷贝hadoop和hive的各种jar包到/opt/soft/zeppelin/interpreter/jdbc目录下 …

SDYY大学普通话考试报名系统说明文档

系列文章目录 健康云平台开发说明文档SD申报系统迭代说明文档漏刻有时物联网传感器API接口对接说明文档Echarts数据分析系统Data Analysis Platform使用说明文档漏刻有时云守护数据可视化v2.0迭代升级说明文档百度地图POI多信息点标注开发说明文档漏刻有时云守护数据可视化画质…

Docker之路(7.DockerFile文件编写、DockerFile 指令解释、CMD与ENTRYPOINT的区别)

1.DockerFile介绍 dockerfile 是用来构建docker镜像的文件&#xff01;命令参数脚本&#xff01; 构建步骤&#xff1a; 编写一个dockerfile文件docker build构建成为一个镜像docker run 运行镜像docker push发布镜像&#xff08;DockerHub、阿里云镜像仓库&#xff09; 2.Dock…

如何使用ADFSRelay分析和研究针对ADFS的NTLM中继攻击

关于ADFSRelay ADFSRelay是一款功能强大的概念验证工具&#xff0c;可以帮助广大研究人员分析和研究针对ADFS的NTLM中继攻击。 ADFSRelay这款工具由NTLMParse和ADFSRelay这两个实用程序组成。其中&#xff0c;NTLMParse用于解码base64编码的NTLM消息&#xff0c;并打印有关消…

SAP 在建工程转固定资产

由固定资产归口采购部门或业务部门提交购置固定资产/在建工程的申请&#xff0c;经审批后&#xff0c;若是需要安装调试&#xff0c;则由财务部固定资产会计建立内部订单收集成本&#xff0c;月末结转在建工程。项目完工后&#xff0c;相关部门&#xff08;公司装备部、分公司装…

Python虚拟环境迁移

使用python开发脚本使用的时候难免会遇到需要更换电脑来运行的问题&#xff0c;但是python不同版本的兼容性较差&#xff0c;在其他电脑使原python脚本运行时经常会发生一些问题&#xff0c;因此就需要python虚拟环境的迁移了。但是&#xff0c;直接将虚拟环境复制到另一台电脑…

Git学习(1)pro git阅读尚硅谷视频

目录 目录&#xff1a; 1. 起步 2. Git 基础 3. Git 分支 4. 服务器上的 Git 5. 分布式 Git 第一章 1.3 Git是什么 1.6运行git前的配置 该开源图书网站 Git - Book (git-scm.com) 目录&#xff1a; 1. 起步 1.1 关于版本控制1.2 Git 简史1.3 Git 是什么&#xff1f;1…

《图机器学习》-GNN Augmentation and Training

GNN Augmentation and Training一、Graph Augmentation for GNNs1、Feature Augmentation2、Structure augmentation3、Node Neighborhood Sampling一、Graph Augmentation for GNNs 之前的假设&#xff1a; Raw input graph computational graph&#xff0c;即原始图等于计算…

产品需求文档需要注意10件事

01什么是完美的产品需求文档&#xff08;PRD&#xff09;&#xff1f;就像产品经理一样&#xff0c;产品需求文档需要同时有效地执行许多不同的角色。该文档将由设计师&#xff0c;营销人员和工程团队使用&#xff0c;并且需要传达他们所需的所有必要信息。参考上面的漫画&…

html,

目录1. html新建1.1 html基本结构1.2 html细节2. 标签2.1 font标签2.2 字符实体2.3 标题标签2.4 超链接标签2.5 列表标签2.6 图片标签2.7 表格标签2.8 表单标签2.8.1基本使用2.8.2表单综合练习2.8.3表单格式化2.8.4表单使用细节2.8.5get请求2.8.6post请求2.9 其它标签2.9.1div标…

【JavaScript】第一章JavaScript入门

第一章 JavaScript入门JavaScript介绍JavaScript的起源JavaScript的应用JavaScript的特点JavaScript是脚本语言支持面向对象编程&#xff0c;面向过程编程/函数式编程支持跨平台执行JavaScript和ECMAScript的关系开发工具编辑器sublime textVisual Studio CodewebstormDreamwea…

安装VMWare虚拟机之后,发现网络贼卡,打开网页很慢

事情描述&#xff1a; 最近忙一个项目&#xff0c;需要到虚拟机中部署环境&#xff0c;安装完之后&#xff0c;就开整自己的项目了。 可以过几天&#xff0c;发现本地网络贼卡&#xff0c;打开各网页慢的一批&#xff0c;一开始还以为是路由器的问题&#xff0c;反复折腾之后排…

HBuilder X启动微信开发工具报错的问题

今天通过HBuilder X启动微信开发工具&#xff0c;报了如下的错&#xff1a; [微信小程序开发者工具] [error] IDE service port disabled. To use CLI Call, please enter y to confirm enabling CLI capability, or manually open IDE -> Settings -> Security Settings…

Android Studio相关记录

目录Android Studio 便捷插件Android LogcatJava文件的类头模板Android Studio 使用遇到的问题解决方案org.jetbrains.annotations.NullableBuild 控制台编译输出中文乱码Terminal 使用 git 命令窗口git 命令窗口中文乱码Android Studio 便捷插件 Android Logcat 配置路径 Fi…

【VUE】六 路由和传值

目录 一、 路由和传值 二、案例 三、案例存在无法刷新问题 一、 路由和传值 当某个组件可以根据某些参数值的不同&#xff0c;展示不同效果时&#xff0c;需要用到动态路由。 例如&#xff1a;访问网站看到课程列表&#xff0c;点击某个课程&#xff0c;就可以跳转到课程详…

【c#】学习DATATABLE排序

c#实现Datatable排序Datatable排序结果图代码展示总结Datatable排序 结果图 原数据 倒序 去重 筛选行 代码展示 1、使用datatable视图对table进行排序 //倒序排序 dt.DefaultView.Sort “CreateTime desc”; dt dt.DefaultView.ToTable(); 如果想升序排序&#xff0c…

kafka入个门

名词 topic 表 partation 水平扩展 leader 主 follow备 produce生产 offset偏移量 消息队列的流派 什么是 MQ Message Queue&#xff08;MQ&#xff09;&#xff0c;消息队列中间件。很多人都说&#xff1a;MQ 通过将消息的发送和接收分离来实现应用程序的异步和解偶&#xf…