之前的代码这部分并没有补充完毕,假如调度中心如果判断有定时任务要执行了,该怎么远程通知给执行定时任务的程序呢?当定时任务要把自己的信息发送给调度中心时,是通过一个RegistryParam对象发送的。该对象内部封装了定时任务相关的信息。
public class RegistryParam implements Serializable {
private static final long serialVersionUID = 42L;
//定时任务方法的名称
private String registryKey;
//定时任务程序部署的服务器的ip地址
private String registryValue;
public RegistryParam() {
}
public RegistryParam(String registryKey, String registryValue) {
this.registryKey = registryKey;
this.registryValue = registryValue;
}
public String getRegistryKey() {
return registryKey;
}
public void setRegistryKey(String registryKey) {
this.registryKey = registryKey;
}
public String getRegistryValue() {
return registryValue;
}
public void setRegistryValue(String registryValue) {
this.registryValue = registryValue;
}
@Override
public String toString() {
return "RegistryParam{" +
"registryKey='" + registryKey + '\'' +
", registryValue='" + registryValue + '\'' +
'}';
}
}
那么,调度中心通知定时任务程序执行的时候,该用什么对象封装什么信息呢?封装的这个信息是我最关心的。什! 么信息可以成为调度中心和定时任务执行程序中判别唯一定时任务的标准呢?其实很简单,就是定时任务方法的名 字。虽然啊一个定时任务程序中可能会定义多个定时任务,但是每个定时任务方法的名称是唯一的,所以,调度中 心只要把要执行的定时任务方法的名称发送给定时任务执行程序即可。并且这个方法名称同样可以封装在一个对象 中,既然是要触发定时任务了,这个对象就可以定义为TriggerParam,意思就是触发参数。请看下面的代码块。
public class TriggerParam implements Serializable{
private static final long serialVersionUID = 42L;
// 定时任务方法的名字
private String executorHandler;
public String getExecutorHandler() {
return executorHandler;
}
public void setExecutorHandler(String executorHandler) {
this.executorHandler = executorHandler;
}
}
这样一来,调度中心只要通过网络把封装着要执行的定时任务名字的TriggerParam对象发送给定时任务执行程序,这样,定时任务程序接收到消息后,就可以从TriggerParam对象中获得要执行的任务名称,然后直接去执行即可。当然,这个过程也很复杂,在后面的章节再给大家细讲。总之,现在我的调度中心终于有了可以向定时任务执行器发送的消息了。所以,我现在就想再次把我的调度中心的核心类重构一下,也就是重构JobScheduleHelper类中的start方法,请看下面的代码块。
@Component
public class JobScheduleHelper {
// 调度定时任务的线程
private Thread scheduleThread;
// 创建当前类的对象
private static JobScheduleHelper instance = new JobScheduleHelper();
// 把当前类的对象暴露出去
public static JobScheduleHelper getInstance(){
return instance;
}
// 启动调度线程工作的方法
public void start(){
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
while (true){
// 从数据库中查询所有定时任务信息
List<YyJobInfo> yyJobInfoList = YyJobAdminConfig.getAdminConfig().getYyJobInfoDao().findAll();
// 得到当前时间
long time = System.currentTimeMillis();
// 遍历所有定时任务信息
for (YyJobInfo yyJobInfo : yyJobInfoList) {
if (time > yyJobInfo.getTriggerNextTime()){
// 如果大于就执行定时任务,就调用下面这个方法,开始远程通知定时任务程序
// 执行定时任务
// 注意,这里引入了一个新的类,JobTriggerPoolHelper
JobTriggerPoolHelper.trigger(yyJobInfo);
// 计算定时任务下一次的执行时间
Date nextTime = null;
try {
nextTime = new CronExpression(yyJobInfo.getScheduleConf()).getNextValidTimeAfter(new Date());
} catch (ParseException e) {
e.printStackTrace();
}
// 下面就是更新数据库中定时任务的操作
YyJobInfo job = new YyJobInfo();
job.setTriggerNextTime(nextTime.getTime());
System.out.println("保存job信息");
}
}
}
}
});
scheduleThread.start();
}
在上面的代码块中,我在远程通知执行定时任务的程序的操作处做了一点变动,引入了一个新的名为JobTriggerPoolHelper的新类,这个类中的trigger方法,就是用来远程通知定时任务执行器执行定时任务的。所以现在请大家看一看JobTriggerPoolHelper这个类内部的构造。
public class JobTriggerPoolHelper {
private static final Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);
private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();
public static void trigger(YyJobInfo yyJobInfo){
// helper其实就是该类的单例对象
helper.addTrigger(yyJobInfo);
}
public void addTrigger(YyJobInfo yyJobInfo){
// 这是我引入的新的类
YyJobTrigger.trigger(yyJobInfo);
}
}
上面的代码就目前来说很简单,因为我的调度中心要去触发定时任务,让定时任务执行了,所有我就又搞了一个触发任务的类,这个类就负责把定时任务的信息向程序内部继续传递下去。在JobScheduleHelper类中调用了JobTriggerPoolHelper.trigger(yyJobInfo)方法后,程序就会来到JobTriggerPoolHelper类中,调用该类的trigger方法,接着又会调用该类的addTrigger方法,在该方法中,程序就来到了我又引入新的YyJobTrigger类中,这个类就是用来真正触发定时任务远程调用的。这个类就是用来真正触发定时任务远程调用的。请大家看下面的代码块。
public class YyJobTrigger {
private static Logger logger = LoggerFactory.getLogger(YyJobTrigger.class);
public static void trigger(YyJobInfo jobInfo) {
processTrigger(jobInfo);
}
private static void processTrigger(YyJobInfo jobInfo) {
// 初始化触发器参数,这里的这个出发参数,是要在远程调用的另一端,也就是定时任务执行程序的那一端使用的
TriggerParam triggerParam = new TriggerParam();
// 设置执行器要执行的任务的方法名称
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
// 选择具体的定时任务执行器地址,这里默认使用集合汇总的第一个。
String address = jobInfo.getRegistryList().get(0);
// 在这里执行远程调用,也就是要把执行的定时任务的执行信息发送给定时任务
// 定时任务程序执行完毕后,返回一个执行结果信息,封装在ReturnT对象中
ReturnT<String> triggerResult = runExecutor(triggerParam, address);
// 输出一下状态码,根据返回的状态码判断任务是否执行成功
logger.info("返回的状态码" + triggerResult.getCode());
}
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address) {
// 在这个方法中把消息发送给定时任务执行程序
HttpURLConnection connection = null;
BufferedReader bufferedReader = null;
try {
// 创建连接
URL realUrl = new URL(address);
// 得到连接
connection = (HttpURLConnection) realUrl.openConnection();
// 设置连接属性
// post请求
connection.setRequestMethod("POST");
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setUseCaches(false);
connection.setReadTimeout(3 * 1000);
connection.setConnectTimeout(3 * 1000);
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
// 进行连接
connection.connect();
// 判断请求题是否为null
if (triggerParam != null) {
// 序列化请求体,也就是要发送的触发参数
String requestBody = GsonTool.toJson(triggerParam);
// 下面就开始正式发送消息了
DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
dataOutputStream.write(requestBody.getBytes(StandardCharsets.UTF_8));
// 刷新缓冲区
dataOutputStream.flush();
// 释放资源
dataOutputStream.close();
}
// 获取响应码
int statusCode = connection.getResponseCode();
if (statusCode != 200) {
// 设置失败结果
return new ReturnT<String>(ReturnT.FAIL_CODE, "yy-job remoting fail, StatusCode(" + statusCode + ") invalid. for url:" + address);
}
// 下面就开始接受返回的结果了
bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
StringBuilder result = new StringBuilder();
String line;
// 接受返回消息
while ((line = bufferedReader.readLine()) != null) {
result.append(line);
}
// 转换为字符串
String resultJson = result.toString();
try {
// 转换为ReturnT对象,返回给用户
ReturnT returnT = GsonTool.fromJson(resultJson, ReturnT.class, String.class);
return returnT;
} catch (Exception e) {
logger.error("yy-job remoting (url=" + address + ") response content invalid(" + resultJson + ").", e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "yy-job remoting error(" + e.getMessage() + "),for url : " + address);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "yy-job remoting error(" + e.getMessage() + "),for url:" + address);
} finally {
try {
if (bufferedReader != null) {
bufferedReader.close();
}
if (connection != null) {
connection.disconnect();
}
} catch (Exception e2) {
logger.error(e2.getMessage(), e2);
}
}
}
下面的代码块就是ReturnT类的具体内容,可以看到,定时任务的执行结果就封装在里面
public class ReturnT<T> implements Serializable {
public static final long serialVersionUID = 42L;
public static final int SUCCESS_CODE = 200;
public static final int FAIL_CODE = 500;
public static final ReturnT<String> SUCCESS = new ReturnT<String>(null);
public static final ReturnT<String> FAIL = new ReturnT<String>(FAIL_CODE, null);
private int code;
private String msg;
private T content;
public ReturnT(){}
public ReturnT(int code, String msg) {
this.code = code;
this.msg = msg;
}
public ReturnT(T content) {
this.code = SUCCESS_CODE;
this.content = content;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public T getContent() {
return content;
}
public void setContent(T content) {
this.content = content;
}
@Override
public String toString() {
return "ReturnT [code=" + code + ", msg=" + msg + ", content=" + content + "]";
}
}
根据上述代码,请大家仔细想一想,现在的程序主要是谁在干活?干的什么活?因为说到底,设计一个程序不能只考虑程序的运行,还要尽可能考虑程序的性能。当运行没有问题的时候,提高性能就成为最主要的问题了。那我目前的调度中心,性能如何,性能的瓶颈又在哪里呢?其实已经很明显了,目前的调度中心,按照我现在的编码流程,所有的活实际上都是JobScheduleHelper类中的scheduleThread线程在干。请大家再次回顾一下JobScheduleHelper类的内容。
public class JobScheduleHelper {
// 调度定时任务的线程
private Thread scheduleThread;
// 创建当前类的对象
private static JobScheduleHelper instance = new JobScheduleHelper();
// 把当前类的对象暴露出去
public static JobScheduleHelper getInstance(){
return instance;
}
// 启动调度线程工作的方法
public void start(){
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
while (true){
// 从数据库中查询所有定时任务信息
List<YyJobInfo> yyJobInfoList = YyJobAdminConfig.getAdminConfig().getYyJobInfoDao().findAll();
// 得到当前时间
long time = System.currentTimeMillis();
// 遍历所有定时任务信息
for (YyJobInfo yyJobInfo : yyJobInfoList) {
if (time > yyJobInfo.getTriggerNextTime()){
// 如果大于就执行定时任务,就调用下面这个方法,开始远程通知定时任务程序
// 执行定时任务
// 注意,这里引入了一个新的类,JobTriggerPoolHelper
JobTriggerPoolHelper.trigger(yyJobInfo);
// 计算定时任务下一次的执行时间
Date nextTime = null;
try {
nextTime = new CronExpression(yyJobInfo.getScheduleConf()).getNextValidTimeAfter(new Date());
} catch (ParseException e) {
e.printStackTrace();
}
// 下面就是更新数据库中定时任务的操作
YyJobInfo job = new YyJobInfo();
job.setTriggerNextTime(nextTime.getTime());
System.out.println("保存job信息");
}
}
}
}
});
scheduleThread.start();
}
可以看到,在JobScheduleHelper中,一旦start方法被调用了,schedulueThread线程就会启动,然后在一个循环中不停得扫描数据库,调度任务去执行。并且,调度任务去执行时,最终会一路调用YyJobTrigger类中的runExecutor方法,在该方法中,会通过http协议,把封装好的定时任务信息的对象发送给定时任务程序,并且——注意里这是是必须要强调清楚的——scheduleThread会等待定时任务程序执行完定时任务后把执行的结果回复过来。如果定时任务执行的时间过长,那scheduleThread线程不就会一直阻塞在这里等待响应结果吗?显然这是一个严重的问题,解决的方法也很简单,再开启一个线程去执行任务就好了,但每次执行任务都要创建新线程对性能也有损耗,所有可以采用线程池的方式。请大家看一下重构之后的JobTriggerPoolHelper类。
public class JobTriggerPoolHelper {
private static final Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);
// 在这里创建线程池,用于远程调用定时任务
private ThreadPoolExecutor triggerPool = null;
// 创建该类的对象
private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();
// 对外暴露的该类线程池的方法
public static void toStart() {
helper.start();
}
public static void trigger(YyJobInfo yyJobInfo) {
// helper其实就是该类的单例对象
helper.addTrigger(yyJobInfo);
}
// 该方法经过重构了,在这里把定时任务信息提交给线程池去远程发送
public void addTrigger(YyJobInfo yyJobInfo) {
triggerPool.execute(new Runnable() {
@Override
public void run() {
// 这里就是线程池中的线程去执行远程调用定时任务的任务了
YyJobTrigger.trigger(yyJobInfo);
}
});
}
public void start() {
// 在这里创建线程池,最大线程数为150
triggerPool = new ThreadPoolExecutor(
10, 150, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Yy-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
}
}
);
}
// 中止线程池的方法
public void stop(){
triggerPool.shutdown();
}
// TODO 测试启动类
现在,我想直接为大家剖析调度中心存在的一点小缺陷。有的朋友可能已经注意到了,现在是我的调度中心在维护数据库中定时任务的信息,而所有的定时任务都会把自己的信息存储在数据库中。调度线程不断地扫描数据库,判断哪些定时任务可以执行,就把信息提交给线程池。那么,请大家简单思考一下,如果数据库中存储了200个定时任务,但是线程池的最大线程数只有150,假如所有定时任务的执行时间都比较接近,这是不是意味着在一段时间内,有大量的定时任务要交给线程池中的线程执行。但是线程数并不足够,所以,有的定时任务可能就要等待别的定时任务执行完了,才能得到可以使用的线程去执行自己的任务。这种情况本来很常见,但是请大家再仔细想想,所谓定时任务,就是要在特定的时间执行才可以。所以,我们不得不考虑考虑这样一种情况。当有很多定时任务执行耗时比较长时,那么可能同一时间段,线程池中阻塞等待结果的线程就会比较多,这样一来其他定时任务就不得不也等待线程了。举个例子,如果线程池中只有两个线程,现在有三个定时任务,两个定时任务执行的耗时都为2秒,第三个定时任务执行耗时为50ms。如果这三个定时任务触发的时间都相同,结果耗时为2秒的定时任务把两个线程占用了,两个线程都要阻塞等待http的相应结果,那第三个耗时很少的定时任务就不得不等待一会儿了。这么做,就会导致耗时较少的定时任务被耗时较多的定时任务拖累,甚至导致延误执行时机,本来该执行了,结果被拖延到两秒后才执行。那针对这种情况,有没有很好的解决方法吗?当然是有的。仍然采用编程世界中的那条准则,就是少一事不如多一事。
请大家想一想,既然我的程序很有可能出现执行耗时较少的定时任务被执行耗时好长的定时任务拖累的情况,那我就把耗时较长的定时任务和耗时较短的定时任务分开不就完了?所谓的分开也很简单,就是在调度中心的JobTriggerPoolHelper组件中创建两个线程池,一个线程池专门调度耗时较长的定时任务,我就可以称它为慢线程;一个线程池专门调度耗时较短的定时任务,我就称他为快线程池。这样一来,快线程池调度定时任务就会保持很高的性能,而慢线程池就维持一般水准即可,并且,耗时较短的定时任务终于不会被耗时较长的定时任务拖累了。
那么这个编程思路该在呢么实现呢?我认为最重要的就是做好定时任务的分流,也就是哪些任务要提交给快线程池,哪些任务要提交给慢线程池。经过思考,决定给定时任务设立一个时间标尺,只要定时任务执行的时间大于500毫秒,就称这个定时任务为慢任务,要提交给慢线程池执行;而执行时间小于500毫秒的,就称这个定时任务为快任务,要提交给快线程池执行。这个判断标准很简单,简单就意味着误判,那怎么避免误判呢?因为有些定时任务很可能执行的耗时小于500毫秒,但是http在返回响应的时候网络阻塞了,导致定时任务耗时超过500毫秒了,这不就是最容易出现的误判情况吗?解决的方法也很简单,一般来说定时任务不可能只触发一次,肯定是按照特定的时间一直执行,比如每秒执行一次,每5秒执行一次等等。所说说,我只需在程序中做一个判断,判断同一个定时任务一分钟内执行的耗时,只要耗时在一分内超过500毫秒的次数大于10次了,就把它看做满任务,以后每次触发的时候,就交给慢线程池来远程调度。至于那些每分钟或每几分执行一次,甚至是每天执行一次的定时任务,晚个一两秒执行是没什么问题的。那么,怎么判定同一个定时任务每次执行的耗时呢,别忘了定时任务信息是存储在数据库中的,我只需要用每个定时任务的主键ID就可判断是否为同一个定时任务。所以,接下来我要给封装定时任务信息并且和数据库打交道的YyJobInfo类添加一个ID属性就可以了。
接下来,就为大家展示一下重构好的JobTriggerPoolHelper类
public class JobTriggerPoolHelper {
private static final Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);
//定义的快线程池
private ThreadPoolExecutor fastTriggerPool = null;
//定义的慢线程池
private ThreadPoolExecutor slowTriggerPool = null;
// 创建该类的对象
private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();
// 对外暴露的该类线程池的方法
public static void toStart() {
helper.start();
}
// 中止线程池的方法
public static void toStop() {
helper.stop();
}
public void start() {
// 快线程池,最大线程数为200
fastTriggerPool = new ThreadPoolExecutor(
10, 200, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Yy-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
}
}
);
// 慢线程池,最大线程数为100
slowTriggerPool = new ThreadPoolExecutor(
10, 100, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Yy-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
}
}
);
}
// 终止线程池的方法
public void stop() {
fastTriggerPool.shutdownNow();
slowTriggerPool.shutdownNow();
}
public static void trigger(YyJobInfo yyJobInfo) {
// helper其实就是该类的单例对象
helper.addTrigger(yyJobInfo);
}
// 获取当前的系统时间,这里计算出来的其实是系统当前的分钟数,下面马上就会用到
private volatile long minTim = System.currentTimeMillis() / (60 * 1000);
/**
* 如果有任务出现慢执行情况了,就会被记录在该Map中
* 所谓慢执行,就是执行的时间超过了500毫秒,该map的key为job的id,value为慢执行的次数
* 如果一分钟慢执行的次数超过了10次,该任务就会被交给慢线程池来执行
* 而该Map也会一分钟清空一次,来循环记录慢执行的情况
*/
private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();
// 该方法经过重构了,在这里把定时任务信息提交给线程池去远程发送
public void addTrigger(YyJobInfo yyJobInfo) {
int jobId = yyJobInfo.getId();
// 默认先用快线程池
ThreadPoolExecutor triggerPool = fastTriggerPool;
// 用任务Id从慢执行的Map中得到该job对应的慢执行次数
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
// 这里就是具体判断了,如果慢执行次数不为null,并且一分钟超过10了,就选用慢线程池来执行该任务。
if (jobTimeoutCount != null && jobTimeoutCount.get() > 10) {
// 选用慢线程池了
triggerPool = slowTriggerPool;
}
// 在这里就把任务提交给线程池了,在这个任务执行一个触发器任务,把刚才传进来的job的各种信息整合到一起
// 在触发器任务重,会进行job的远程调用,这个调用链还是比较短的,执行流程也很清晰。
triggerPool.execute(new Runnable() {
@Override
public void run() {
// 再次获取当前时间,这个时间后面会用到
long start = System.currentTimeMillis();
try {
// 这里就是线程池中的线程去执行远程调度定时任务的任务了
YyJobTrigger.trigger(yyJobInfo);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// 这里再次获得当前的分钟数,这个分钟数会刚才上面的分钟数做对比
long minTimeNow = System.currentTimeMillis() / (60 * 1000);
// 这里就用到了两个分钟数做对比,如果两个分钟数不等,说明过去了一分钟
// 而慢执行Map中的数据是一分钟清理一次,所以说这里就把慢执行Map清空
// 注意,这个清空的动作是线程池中的线程来执行的,并且这个动作是在finally代码块中执行的
// 也就意味着在上面的触发器任务执行完毕后才进行清空操作
if (minTim != minTimeNow) {
minTim = minTimeNow;
jobTimeoutCountMap.clear();
}
}
// 在这里用当前毫秒值减去之前得到的毫秒值
long cost = System.currentTimeMillis() - start;
// 判断任务的执行时间是否超过500毫秒了
// 这里仍然要结合上面的finally代码块来理解,因为触发器执行完了才会执行finally代码块中的代码,所以这时候也就能得到job的执行时间了
if (cost > 500) {
// 超过500毫秒了,就判断当前执行的任务为慢执行任务,所以将它在慢执行Map中记录一次
// Map的key为jobId,value为慢执行的次数
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if (timeoutCount != null) {
// 慢执行的次数加1
timeoutCount.incrementAndGet();
}
}
}
});
}
}