1.调度服务
1.1.JDK之ScheduledExecutorService
讲到调度任务,我们脑海里马上会想到ScheduledExecutorService。
ScheduledExecutorService是 Java java.util.concurrent 包中的一个接口,它继承自 ExecutorService 接口。它主要用于在给定的延迟后运行任务,或者定期地执行任务。这个接口提供了几种安排任务执行的方法,包括单次执行、定期执行和周期性执行。
以下是 ScheduledExecutorService 提供的一些关键方法:
-
schedule(Callable<V> callable, long delay, TimeUnit unit): 安排所提交的
Callable
任务在指定的延迟后运行,返回一个Future
,代表任务的结果。 -
schedule(Runnable command, long delay, TimeUnit unit): 安排所提交的
Runnable
任务在指定的延迟后运行。 -
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit): 安排所提交的
Runnable
任务在指定的初始延迟后首次启动,并且随后按指定的周期重复执行。 -
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit): 安排所提交的
Runnable
任务在指定的初始延迟后首次启动,并且随后在每次执行结束和下次执行开始之间都存在指定的延迟。
然而,如果采用了sprintboot,我们也可以直接采用springboot提供的调试线程池。这其中有一个最大的优势在于,可以利用spring的cron表达式,采用方法注解的方式,不用引入quartz第三方工具。
1.2.springboot使用调度线程池
我们尝试从源代码的角度,来看看springboot提供的调度线程池是怎么创建的。
首先,启用调度配置,启动类加上@EnableScheduling
springboot的自动配置类一般以AutoConfiguration作为后缀,采用模糊搜索ScheduleAutoConfiguration可以找到目标TaskSchedulingAutoConfiguration。
从截图我们可以看出,我们只需在application.yml加入以下的配置就可以启动了
spring:
task:
## 定时任务(业务上定时任务量不多,2个足矣)
scheduling:
## 线程池核心线程数量
pool:
size: 2
## 线程名称前线
threadNamePrefix: common-scheduling-
spring对线程池进行二次封闭,最终调用的还是jdk的ThreadPoolExecutor类,我们在该类的构造函数打个断点,可以看到,pool.size参数已经被传参:
1.3.ThreadPoolExecutor核心参数与执行流程
这里有必要先介绍下 ThreadPoolExecutor类的几个函数参数及基本运行机制
构造函数参数:
-
核心线程数(Core Pool Size): 线程池中始终保持的线程数量,即使它们处于空闲状态。如果任务数量少于核心线程数,线程池会创建新的线程来处理任务,而不会立即回收这些线程。
-
最大线程数(Maximum Pool Size): 线程池中允许的最大线程数量。如果任务数量超过了核心线程数但小于最大线程数,且工作队列已满,线程池会创建新的线程来处理任务,直到达到最大线程数。
-
工作队列(Work Queue): 用于存放待执行任务的阻塞队列。当所有核心线程都在忙碌时,新的任务会被放入工作队列中等待执行。
-
线程工厂(Thread Factory): 用于创建新线程的工厂。它提供了一种方式来定制线程的创建过程,例如设置线程的名称、优先级、是否为守护线程等。
-
拒绝策略(Rejected Execution Handler): 当任务无法被线程池及时处理时(即当线程池已满,且工作队列已满),线程池会采用拒绝策略来处理新提交的任务。常见的拒绝策略包括:
AbortPolicy
:抛出RejectedExecutionException
。CallerRunsPolicy
:由调用者线程运行该任务。DiscardPolicy
:静默丢弃任务。DiscardOldestPolicy
:丢弃队列中最老的任务,然后尝试再次提交当前任务。
-
保持活动时间(Keep-Alive Time): 非核心线程空闲时在终止前等待新任务的最长时间。如果线程池允许核心线程空闲,这个参数也适用于核心线程。
-
时间单位(Time Unit): 与保持活动时间配合使用的时间单位,例如
TimeUnit.SECONDS
。
执行流程:
- 如果线程池中的线程数量少于核心线程数,即使有空闲线程,线程池也会优先创建新线程来执行新的任务。
- 如果线程池中的线程数量达到核心线程数,新的任务会被放入工作队列等待执行。
- 如果工作队列已满且线程数量少于最大线程数,线程池会创建新的非核心线程来执行任务。
- 如果工作队列已满且线程数量达到最大线程数,新的任务会被拒绝,线程池会采用拒绝策略来处理。
1.4.自定义线程池
由于spingboot对调度任务线程池的参数支持有限,如果想定制自己的参数,可以注入自己的调度线程池,从代码可看出:
@Configuration
public class ThreadPoolConfig {
@Bean
public ScheduledExecutorService scheduledExecutorService() {
return new ScheduledThreadPoolExecutor(2,
new NamedThreadFactory("common-schedule"),
new ThreadPoolExecutor.CallerRunsPolicy()) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
}
};
}
}
1.5.spring cron注解
Cron 表达式是一种用于描述定时任务触发时间的字符串表达式。它由多个时间字段组成,每个字段代表定时任务在特定时间单位上的触发条件。
1.5.1.cron表达式语法格式
秒 分 时 日 月 星期 年份
其中,每个时间字段都有对应的取值范围和特殊符号。下面是每个时间字段的详细说明:
1、秒(Seconds):取值范围为 0~59。例如,`0/5` 表示从0秒开始,每隔 5 秒触发一次,`*` 表示每秒都触发。
2、分钟(Minutes):取值范围为 0~59。例如,`0/5` 表示从0分钟开始,每隔 5 分钟触发一次,`*` 表示每分钟都触发。
3、小时(Hours):取值范围为 0~23。例如,`0/2` 表示从0小时开始,每隔 2 小时触发一次,`*` 表示每小时都触发。
4、日期(Day of Month):取值范围为 1~31。例如,`1,15` 表示每月的 1 日和 15 日触发,`*` 表示每天都触发。
5、月份(Month):取值范围为 1~12,也可以使用英文缩写 JAN、FEB、MAR 等。例如,`1,6` 表示一月和六月触发,`*` 表示每个月都触发。
6、 星期(Day of Week):取值范围为 1~7,1 表示星期日,2 表示星期一,以此类推,也可以使用英文缩写 SUN、MON、TUE 等。例如,`2-6` 表示星期一到星期五触发,`*` 表示每个星期都触发。
7、年份(Year):可选字段,表示触发条件的年份。例如,`2023` 表示在 2023 年触发,`*` 表示每年都触发。
除了取值范围,Cron 表达式还支持一些特殊符号,用于指定特定的触发条件,例如:
- 星号(*):代表所有可能的取值,表示不限制该时间字段的取值范围。
- 问号(?):仅在日期和星期字段中使用,表示不指定具体的取值,可以任意匹配。
- 斜线(/):表示间隔触发,例如在分钟字段中,"*/5" 表示每隔 5 分钟触发一次。
- 逗号(,):用于指定多个取值,例如在小时字段中,"1,3,5" 表示在第 1、3、5 小时触发。
- 减号(-):用于指定一个范围,例如在月份字段中,"3-6" 表示三月到六月触发。
L : 表示最后,只能出现在星期和每月第几天域,如果在星期域使用1L,意味着在最后的一个星期日触发。
W : 表示有效工作日(周一到周五),只能出现在每月第几日域,系统将在离指定日期的最近的有效工作日触发事件。注意一点,W的最近寻找不会跨过月份
LW : 这两个字符可以连用,表示在某个月最后一个工作日,即最后一个星期五。
# : 用于确定每个月第几个星期几,只能出现在每月第几天域。例如在1#3,表示某月的第三个星期日。
1.5.2.cron表达式示例
作用 | 表达式 |
每隔5秒执行一次 | */5 * * * * ? |
每天中午12点执行一次 | 0 0 12 * * ? |
2024年的每天上午10:00执行一次 | 0 0 10 * * ? 2023 |
每天下午6点到下午6:59每分钟执行一次 | 0 * 18 * * ? |
每月的最后一个星期五上午10:30执行一次 | 0 30 10 ? * 6L |
每月的第4个星期五上午10:25执行一次 | 0 25 10 ? * 6#4 |
每天上午8点,下午1点,4点执行一次 | 0 0 8,13,16 * * ? |
2.异步任务
2.1.springboot配置
在软件开发中,有些任务比较耗时但又无需马上获得结果。一般地,这些任务我们可以采用独立线程池异步执行。如果程序基于springboot环境,我们有现成的工具可以使用。
首先,我们需要在程序启动入口类增加@EnableAsync。
借着,我们尝试从源代码的角度,来看看springboot提供的异步线程池是怎么创建的。
从springboot的命名风格可知,通过模糊搜索TaskAutoConfiguration,可以找到TaskExecutionAutoConfiguration,如下:
从源代码可知,只需在application.yml配置如下参数即可:
spring:
execution:
pool:
coreSize: 4
queueCapacity: 64
maxSize: 8
## 禁止空闲线程关闭,保证最少有core个存活线程
allowCoreThreadTimeout: false
keepAlive: 300s
threadNamePrefix: common-async_task-
这些参数跟jdk的ThreadPoolExecutor类的构造参数非常相似,这里不作解析 。
2.2.使用异步任务
使用方法,只要在目标方法的签名加上@Async
然而,执行结果却出乎意外(在main主线程上执行,没有异步执行)
熟悉springaop机制的同学,马上知道这是因为异步任务底层是基于动态代理机制实现的。Spring AOP 代理只有在通过 Spring 容器获取 Bean 时才会创建。当在同一个类内部调用一个方法时,调用的是原始对象,而不是代理对象。因此,内部调用不会经过 Spring AOP 代理,也就无法触发异步执行。解决这个问题最简单的方法是用一个新的类来管理异步执行方法。
问题解决
2.3.模拟系统繁忙进行性能测试
我们尝试模拟一些极端情况,系统处理不过来的情况。修改上面的配置,改为
pool:
coreSize: 1
queueCapacity: 1
maxSize: 8
同时,异步执行增加时间延迟来模拟耗时任务
@Component
@Slf4j
class AsyncTaskHandler {
@Async
public void busyTask1() {
try {
Thread.sleep(5000);
} catch (InterruptedException ignored) {
}
log.info("----------busyTask1---------");
}
@Async
public void busyTask2() {
try {
Thread.sleep(5000);
} catch (InterruptedException ignored) {
}
log.info("----------busyTask2---------");
}
}
执行结果会出现报错(线程数已达到最大数量,且任务队列已满,无法添加新任务)
调度任务执行频率是可预见的,有多少个任务,执行频率,开发可知,核心数量,最大数量,队列容量比较好设定。而异步执行频率很大程度是由系统的使用者(用户)决定的,因此这些参数需要根据流量动态修改。
2.4.异步线程池拒绝策略
如果不想把 queueCapacity和maxSize都设置成很大的话,我们可以考虑修改下线程池的拒绝策略。最妥当的方式是,既然异步不了,那就"熔断"成同步,直接在调用者所在的业务线程执行。然而,springboot没有提供相应的配置项。为此,我们只能关闭springboot的自动配置了。
从代码可看出,只要提供一个Executor实例,并且名字叫taskExecutor即可。
话不多说,上代码
@Configuration
public class ThreadPoolConfig {
private final int core = Runtime.getRuntime().availableProcessors();
/**
* springboot 自动注入的异步执行线程池,拒绝策略为丢弃,难以配置maxPoolSize参数
* @see TaskExecutionAutoConfiguration#taskExecutorBuilder
*/
@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor(TaskExecutionProperties properties) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(properties.getPool().getCoreSize());
executor.setThreadNamePrefix(properties.getThreadNamePrefix());
executor.setMaxPoolSize(properties.getPool().getMaxSize());
executor.setQueueCapacity(properties.getPool().getQueueCapacity());
executor.setKeepAliveSeconds((int) properties.getPool().getKeepAlive().toSeconds());
executor.setAllowCoreThreadTimeOut(properties.getPool().isAllowCoreThreadTimeout());
// 超过队列容量,则在业务线程上执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
重新运行程序,可以看到,当任务超过线程池负载的时候,多余的线程会在调用线程上执行,变为同步代码
个人认为:使用springboot创建的线程池,代码也只是稍微简化一点点。
采用原生线程池,每个业务代码必须实现Runnable接口,而使用springboot的异步线程池,只需以方法注解的形式即可,底层aop会生成对应的代理方法。但要确保避免内部方法调用导致异步逻辑失效。