前一篇文章KnowStreaming系列教程第二篇——项目整体架构分析_诸葛子房_的博客-CSDN博客
讲述了KS的整体项目目录,这边文章来讲述下KS在调度模块里面对于指标采集和元数据同步
一、调度模块代码主要在km-task里面
public class TaskClusterAddedListener implements ApplicationListener<ClusterPhyAddedEvent> {
private static final ILog LOGGER = LogFactory.getLog(TaskClusterAddedListener.class);
@Override
public void onApplicationEvent(ClusterPhyAddedEvent event) {
LOGGER.info("method=onApplicationEvent||clusterPhyId={}||msg=listened new cluster", event.getClusterPhyId());
Long now = System.currentTimeMillis();
// 交由KS自定义的线程池,异步执行任务
FutureUtil.quickStartupFutureUtil.submitTask(() -> triggerAllTask(event.getClusterPhyId(), now));
}
private void triggerAllTask(Long clusterPhyId, Long startTimeUnitMs) {
ClusterPhy tempClusterPhy = null;
// 120秒内无加载进来,则直接返回退出
while (System.currentTimeMillis() - startTimeUnitMs <= 120L * 1000L) {
tempClusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId);
if (tempClusterPhy != null) {
break;
}
BackoffUtils.backoff(1000);
}
if (tempClusterPhy == null) {
return;
}
// 获取到之后,再延迟5秒,保证相关的集群都被正常加载进来,这里的5秒不固定
BackoffUtils.backoff(5000);
final ClusterPhy clusterPhy = tempClusterPhy;
// 集群执行集群元信息同步
List<AbstractAsyncMetadataDispatchTask> metadataServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetadataDispatchTask.class).values());
for (AbstractAsyncMetadataDispatchTask dispatchTask: metadataServiceList) {
try {
dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);
} catch (Exception e) {
// ignore
}
}
// 再延迟5秒,保证集群元信息都已被正常同步至DB,这里的5秒不固定
BackoffUtils.backoff(5000);
// 集群集群指标采集
List<AbstractAsyncMetricsDispatchTask> metricsServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetricsDispatchTask.class).values());
for (AbstractAsyncMetricsDispatchTask dispatchTask: metricsServiceList) {
try {
dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);
} catch (Exception e) {
// ignore
}
}
}
}
通过监听集群添加事件,触发元数据同步和指标采集调度任务
具体实现可参考:
spring 根据接口或者抽象类获取子类执行: https://blog.csdn.net/u012501054/article/details/103927674
二、调度任务分布式系统如何做到单节点运行,避免多台机器调度
AbstractDispatchTask 里面的execute 方法通过实现任务分配
public TaskResult execute(JobContext jobContext) {
try {
long triggerTimeUnitMs = System.currentTimeMillis();
// 获取所有的任务
List<E> allTaskList = this.listAllTasks();
if (ValidateUtils.isEmptyList(allTaskList)) {
LOGGER.debug("all-task is empty, finish process, taskName:{} jobContext:{}", taskName, jobContext);
return TaskResult.SUCCESS;
}
// 计算当前机器需要执行的任务
List<E> subTaskList = this.selectTask(allTaskList, jobContext.getAllWorkerCodes(), jobContext.getCurrentWorkerCode());
if (ValidateUtils.isEmptyList(allTaskList)) {
LOGGER.debug("sub-task is empty, finish process, taskName:{} jobContext:{}", taskName, jobContext);
return TaskResult.SUCCESS;
}
// 进行任务处理
TaskResult ret = this.processTask(subTaskList, triggerTimeUnitMs);
//组装信息
TaskResult taskResult = new TaskResult();
taskResult.setCode(ret.getCode());
taskResult.setMessage(ConvertUtil.list2String(subTaskList, ","));
return taskResult;
} catch (Exception e) {
LOGGER.error("process task failed, taskName:{}", taskName, e);
return new TaskResult(TaskResult.FAIL_CODE, e.toString());
}
}
对应代码解释如下:
参考:
https://github.com/didi/KnowStreaming/blob/master/docs/dev_guide/Task%E6%A8%A1%E5%9D%97%E7%AE%80%E4%BB%8B.md