【开源项目】任务调度框架PowerJob介绍及源码解析

news2025/1/12 10:44:35

项目介绍

PowerJob(原OhMyScheduler)是全新一代分布式调度与计算框架,能让您轻松完成作业的调度与繁杂任务的分布式计算。
在这里插入图片描述

项目地址

  • 源码:https://gitee.com/KFCFans/PowerJob
  • 官网:http://www.powerjob.tech/index.html

快速入门

https://www.yuque.com/powerjob/guidence/nyio9g
在这里插入图片描述

源码解析

服务端启动

  1. PowerJobServerApplication启动。
    public static void main(String[] args) {

        pre();

        AkkaStarter.init();
        VertXStarter.init();

        // Start SpringBoot application.
        try {
            SpringApplication.run(PowerJobServerApplication.class, args);
        } catch (Throwable t) {
            log.error(TIPS);
            throw t;
        }
    }
  1. AkkaStarter.init();,启动actorSystem,用FriendRequestHandler作为消息的处理器。加载配置oms-server.akka.conf。服务端口号设置为10086。
    public static void init() {

        Stopwatch stopwatch = Stopwatch.createStarted();
        log.info("[PowerJob] PowerJob's akka system start to bootstrap...");

        // 忽略了一个问题,机器是没办法访问外网的,除非架设自己的NTP服务器
        // TimeUtils.check();

        // 解析配置文件
        Config akkaFinalConfig = parseConfig();
        actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
        actorSystem.actorOf(FriendRequestHandler.defaultProps(), RemoteConstant.SERVER_FRIEND_ACTOR_NAME);
        log.info("[PowerJob] PowerJob's akka system started successfully, using time {}.", stopwatch);
    }
  1. VertXStarter.init();主要是新建了Vertx对象,Vertx.vertx();,端口号设置成10010
  2. Initializer#initHandler设置了Vertx处理器WorkerRequestHttpHandler和Akka的消息处理器。
@Component
@ConditionalOnExpression("'${execution.env}'!='test'")
public class Initializer {

    @PostConstruct
    public void initHandler() {
        // init akka
        AkkaStarter.actorSystem.actorOf(WorkerRequestAkkaHandler.defaultProps(), RemoteConstant.SERVER_ACTOR_NAME);
        // init vert.x
        VertXStarter.vertx.deployVerticle(new WorkerRequestHttpHandler());
    }
}
  1. WorkerRequestHttpHandler是创建了一个HttpServer,并且设置路由。
@Slf4j
public class WorkerRequestHttpHandler extends AbstractVerticle {

    @Override
    public void start() throws Exception {

        Properties properties = PropertyUtils.getProperties();
        int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.HTTP_PORT, String.valueOf(OmsConstant.SERVER_DEFAULT_HTTP_PORT)));

        HttpServerOptions options = new HttpServerOptions();
        HttpServer server = vertx.createHttpServer(options);

        Router router = Router.router(vertx);
        router.route().handler(BodyHandler.create());
        router.post(ProtocolConstant.SERVER_PATH_HEARTBEAT)
                .handler(ctx -> {
                    WorkerHeartbeat heartbeat = ctx.getBodyAsJson().mapTo(WorkerHeartbeat.class);
                    fetchWorkerRequestHandler().processWorkerHeartbeat(heartbeat);
                    success(ctx);
                });
        router.post(ProtocolConstant.SERVER_PATH_STATUS_REPORT)
                .blockingHandler(ctx -> {
                    TaskTrackerReportInstanceStatusReq req = ctx.getBodyAsJson().mapTo(TaskTrackerReportInstanceStatusReq.class);
                    try {
                        fetchWorkerRequestHandler().processTaskTrackerReportInstanceStatus(req);
                        out(ctx, AskResponse.succeed(null));
                    } catch (Exception e) {
                        log.error("[WorkerRequestHttpHandler] update instance status failed for request: {}.", req, e);
                        out(ctx, AskResponse.failed(ExceptionUtils.getMessage(e)));
                    }
                });
        router.post(ProtocolConstant.SERVER_PATH_LOG_REPORT)
                .blockingHandler(ctx -> {
                    WorkerLogReportReq req = ctx.getBodyAsJson().mapTo(WorkerLogReportReq.class);
                    fetchWorkerRequestHandler().processWorkerLogReport(req);
                    success(ctx);
                });
        server.requestHandler(router).listen(port);
    }

    private static void out(RoutingContext ctx, Object msg) {
        ctx.response()
                .putHeader(OmsConstant.HTTP_HEADER_CONTENT_TYPE, OmsConstant.JSON_MEDIA_TYPE)
                .end(JsonObject.mapFrom(msg).encode());
    }

    private static void success(RoutingContext ctx) {
        out(ctx, ResultDTO.success(null));
    }
}

客户端启动

  1. PowerJobWorker实现了InitializingBean,执行方法PowerJobWorker#init。该方法中会连接服务器
ServerDiscoveryService serverDiscoveryService = new ServerDiscoveryService(workerRuntime.getAppId(), workerRuntime.getWorkerConfig());
            serverDiscoveryService.start(timingPool);
  1. 启动服务,ServerDiscoveryService#start。进行了服务的发现,在ServerDiscoveryService#acquire方法中调用http://127.0.0.1:7700/server/acquire?appId=2&currentServer=null&protocol=AKKA,找到服务器地址10.132.17.10:10086
  2. 客户端会也会启动akka服务,加载oms-worker.akka.conf的配置,设置端口号27777

客户端-服务发现

  1. 客户端发起http://127.0.0.1:7700/server/acquire?appId=2&currentServer=null&protocol=AKKA接口获取当前akka的server的地址。
  2. 服务端响应,ServerController#acquireServer。服务端设置Ping请求信息,访问Ping接口,path地址:akka://oms-server@10.132.17.10:10086/user/friend_actor,调用成功则说明地址是ok的。
    private String activeAddress(String serverAddress, Set<String> downServerCache, String protocol) {

        if (downServerCache.contains(serverAddress)) {
            return null;
        }
        if (StringUtils.isEmpty(serverAddress)) {
            return null;
        }

        Ping ping = new Ping();
        ping.setCurrentTime(System.currentTimeMillis());

        ActorSelection serverActor = AkkaStarter.getFriendActor(serverAddress);
        try {
            CompletionStage<Object> askCS = Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS));
            AskResponse response = (AskResponse) askCS.toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
            downServerCache.remove(serverAddress);
            if (response.isSuccess()) {
                return JsonUtils.parseObject(response.getData(), JSONObject.class).getString(protocol);
            }
        }catch (Exception e) {
            log.warn("[ServerElection] server({}) was down.", serverAddress);
        }
        downServerCache.add(serverAddress);
        return null;
    }
    public static ActorSelection getFriendActor(String address) {
        String path = String.format(AKKA_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, address, RemoteConstant.SERVER_FRIEND_ACTOR_NAME);
        return actorSystem.actorSelection(path);
    }
  1. FriendRequestHandler#onReceivePing会处理传到FriendRequestHandler里面的Ping请求。
    private void onReceivePing(Ping ping) {
        getSender().tell(AskResponse.succeed(TransportService.getAllAddress()), getSelf());
    }

服务端-执行任务

  1. 入口是JobController#runImmediately,先过切面DesignateServerAspect#execute,执行InstanceService#create进行任务实例创建。经过切面UseCacheLockAspect#execute进行分发DispatchService#dispatch
  2. 如果可以找到合适的worker线程,则构造请求实体,发送请求;如果找不到worker,则InstanceManager#processFinishedInstance,完成或者失败,都需要一些处理。比如日志打印,告警。
        // 获取当前最合适的 worker 列表
        List<WorkerInfo> suitableWorkers = workerClusterQueryService.getSuitableWorkers(jobInfo);

        if (CollectionUtils.isEmpty(suitableWorkers)) {
            log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available", jobId, instanceId);
            instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE, now);

            instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), FAILED, SystemInstanceResult.NO_WORKER_AVAILABLE);
            return;
        }
		List<String> workerIpList = suitableWorkers.stream().map(WorkerInfo::getAddress).collect(Collectors.toList());

        // 构造任务调度请求
        ServerScheduleJobReq req = constructServerScheduleJobReq(jobInfo, instanceInfo, workerIpList);

        // 发送请求(不可靠,需要一个后台线程定期轮询状态)
        WorkerInfo taskTracker = suitableWorkers.get(0);
        String taskTrackerAddress = taskTracker.getAddress();

        transportService.tell(Protocol.of(taskTracker.getProtocol()), taskTrackerAddress, req);
        log.info("[Dispatcher-{}|{}] send schedule request to TaskTracker[protocol:{},address:{}] successfully: {}.", jobId, instanceId, taskTracker.getProtocol(), taskTrackerAddress, req);

        // 修改状态
        instanceInfoRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), current, taskTrackerAddress, now);

        // 装载缓存
        instanceMetadataService.loadJobInfo(instanceId, jobInfo);
  1. 因为是akka协议,所以用到了AkkaTransporter。拼接客户端地址akka://oms@10.132.17.10:27777/user/worker,调用服务。构造的对象是ServerScheduleJobReq
    public void tell(String address, PowerSerializable object) {
        ActorSelection taskTrackerActor = AkkaStarter.getWorkerActor(address);
        taskTrackerActor.tell(object, null);
    }
  1. 客户端接受到请求,TaskTrackerActor#onReceiveServerScheduleJobReq,会初始化CommonTaskTracker#initTaskTracker,开启定时任务执行Dispatcher。分发任务,会TaskTracker#dispatchTask,用到了akka框架,发送TaskTrackerStartTaskReq对象,用的处理器是processor_tracker
  2. ProcessorTrackerActor#onReceiveTaskTrackerStartTaskReq,会将任务信息封装成ProcessorRunnable,找到对应的BasicProcessor进行处理。
 ClassLoader classLoader = omsContainer == null ? getClass().getClassLoader() : omsContainer.getContainerClassLoader();
        ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor, omsLogger, classLoader, statusReportRetryQueue, workerRuntime);
        try {
            threadPool.submit(processorRunnable);
            success = true;
        } catch (RejectedExecutionException ignore) {
            log.warn("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed due to ThreadPool has too much task waiting to process, this task will dispatch to other ProcessorTracker.",
                    instanceId, newTask.getTaskId(), newTask.getTaskName());
        } catch (Exception e) {
            log.error("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed.", instanceId, newTask.getTaskId(), newTask.getTaskName(), e);
        }
  1. 调用成功之后,会发送ProcessorReportTaskStatusReq请求。
        // 2. 回复接收成功
        if (success) {
            ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();
            reportReq.setInstanceId(instanceId);
            reportReq.setSubInstanceId(newTask.getSubInstanceId());
            reportReq.setTaskId(newTask.getTaskId());
            reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue());
            reportReq.setReportTime(System.currentTimeMillis());

            taskTrackerActorRef.tell(reportReq, null);

            log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.",
                    instanceId, newTask.getTaskId(), newTask.getTaskName(), threadPool.getQueue().size());
        }

客户端-发送心跳

  1. PowerJobWorker实现了InitializingBean,在init方法中会执行timingPool.scheduleAtFixedRate(new WorkerHealthReporter(workerRuntime), 0, 15, TimeUnit.SECONDS);。获取服务端的接口地址,akka://oms-server@192.168.1.5:10086/user/server_actor,发送心跳
        // 发送请求
        String serverPath = AkkaUtils.getServerActorPath(currentServer);
        if (StringUtils.isEmpty(serverPath)) {
            return;
        }
        ActorSelection actorSelection = workerRuntime.getActorSystem().actorSelection(serverPath);
        actorSelection.tell(heartbeat, null);
  1. 服务端AbWorkerRequestHandler#processWorkerHeartbeat,用来处理心跳请求。Server在接收到心跳信息后会进行状态的更新,ClusterStatusHolder#updateStatus
    public void updateStatus(WorkerHeartbeat heartbeat) {

        String workerAddress = heartbeat.getWorkerAddress();
        long heartbeatTime = heartbeat.getHeartbeatTime();

        WorkerInfo workerInfo = address2WorkerInfo.computeIfAbsent(workerAddress, ignore -> {
            WorkerInfo wf = new WorkerInfo();
            wf.refresh(heartbeat);
            return wf;
        });
        long oldTime = workerInfo.getLastActiveTime();
        if (heartbeatTime < oldTime) {
            log.warn("[ClusterStatusHolder-{}] receive the expired heartbeat from {}, serverTime: {}, heartTime: {}", appName, heartbeat.getWorkerAddress(), System.currentTimeMillis(), heartbeat.getHeartbeatTime());
            return;
        }

        workerInfo.refresh(heartbeat);

        List<DeployedContainerInfo> containerInfos = heartbeat.getContainerInfos();
        if (!CollectionUtils.isEmpty(containerInfos)) {
            containerInfos.forEach(containerInfo -> {
                Map<String, DeployedContainerInfo> infos = containerId2Infos.computeIfAbsent(containerInfo.getContainerId(), ignore -> Maps.newConcurrentMap());
                infos.put(workerAddress, containerInfo);
            });
        }
    }

总结

  1. akka是一个说是比较简单的框架,但是是scala写的,不同版本差异很大,不熟悉scala的很难用的好。
  2. vertx是比较简单的框架。入门可以参考这篇博客:https://blog.csdn.net/qq_42985872/article/details/128494611
  3. 目前只整理了如上5个功能的源码,其实我对任务调度框架的理解,就是服务端告诉客户端你该执行了。作者还是做了很多检查任务状态的处理,保证了任务可以顺利执行。更加全面,也更加复杂了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/129580.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

前端期末考试试题及参考答案(01)

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl 一、 填空题 ______表示页面中一个内容区块或整个页面的标题。______表示页面中一块与上下文不相关的独立内容&#xff0c;比如一篇文章。CSS的引入方式有3种&#xff0c;分…

Python数据分析案例15——超市零售购物篮关联分析(apriori)

啤酒和纸尿裤的故事大多数人都听说过&#xff0c;纸尿裤的售卖提升了啤酒的销售额。 关联分析就是这样的作用&#xff0c;可以研究某种商品的售卖对另外的商品的销售起促进还是抑制的作用。 案例背景 本次案例背景是超市的零售数据&#xff0c;研究商品之间的关联规则。使用的…

移植SFUD,驱动SPI FLASH ZD25WQ80

1、关于SFUD SFUD (Serial Flash Universal Driver) 串行 Flash 通用驱动库&#xff0c;支持众多spi flash&#xff0c;关于SFUD的详细资料可参考&#xff1a;https://github.com/armink/SFUD。 2、为什么会有通用驱动 JEDEC &#xff08;固态技术协会&#xff09;针对串行 …

Python的22个万用公式,你确定不看看吗

前言 在大家的日常python程序的编写过程中&#xff0c;都会有自己解决某个问题的解决办法&#xff0c;或者是在程序的调试过程中&#xff0c;用来帮助调试的程序公式。 小编通过几十万行代码的总结处理&#xff0c;总结出了22个python万用公式&#xff0c;可以帮助大家解决在…

TypeScript中type和interface区别

typescript中interface介绍&#xff1a;TypeScript 中的接口 interface_疆~的博客-CSDN博客通常使用接口&#xff08;Interface&#xff09;来定义对象的类型。https://blog.csdn.net/qq_40323256/article/details/128478749 type type关键字是声明类型别名的关键字。用来给一…

windows 编译C++ boost库(超详细)

系列文章目录 文章目录系列文章目录前言一、windows二、b2.exe 参数前言 boost库其实不进行编译&#xff0c;大部分库也是可以正常使用的 而且也有一个开源工具vcpkg可以帮组我们下载编译&#xff0c;只是在国内用起来比较麻烦&#xff0c;而且还时常出bug 所以这里详细记录…

mac下,使用 docker 搭建,单机机器集群

背景&#xff1a; 在 Mac本下&#xff0c;通过 docker 完成一个 es 集群&#xff08;3台-或许可多台&#xff09;搭建。&#xff08;后续如果有真实的机器&#xff0c;只需要又该对应的 ip 地址即可&#xff0c;需要关注的是&#xff0c;机器间是可以互相 ping通的&#xff0c;…

4.3.5、IPv4 地址的应用规划

给定一个 IPv4 地址块&#xff0c;如何将其划分成几个更小的地址块&#xff0c;并将这些地址快分配给互联网中的不同网络&#xff0c; 进而可以给各网络中的主机和路由器接口分配 IPv4 地址。 一般有两种方法&#xff1a; 定长的子网掩码 FLSM &#xff08;Fixed Length Sub…

线程,进程以及Java中创建线程的多种方式

1. 前言 今天的这篇文章的目的还是为了讲述下什么叫线程&#xff0c;什么是进程。可能之前很多人都是通过背书得来的&#xff0c;今天就从通俗易懂的角度来分析下 2. 适合人群 线程以及进程的初学者 3. 开始 3.1 什么是程序 其实不管是程序/ 进程/ 线程都是基于操作系统而言…

141.环形链表

给你一个链表的头节点 head &#xff0c;判断链表中是否有环。 如果链表中有某个节点&#xff0c;可以通过连续跟踪 next 指针再次到达&#xff0c;则链表中存在环。 为了表示给定链表中的环&#xff0c;评测系统内部使用整数 pos 来表示链表尾连接到链表中的位置&#xff08;…

SuperMap iDesktop地质体模型匹配地形——精修地质体模型路线

作者&#xff1a;超图研究院技术支持中心-于丁 地质体模型匹配地形——精修地质体模型路线 相信大家开展地质体业务时&#xff0c;常常会遇到构建的精模地质体与DEM地形数据的交界面&#xff0c;嵌合效果不佳、相互压盖、渲染冲突不稳定&#xff08;闪面&#xff09;、掩盖、漂…

前端期末考试试题及参考答案(03)

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl 一、 填空题 transition属性中______规定应用过渡的CSS属性的名称。transition属性中______定义过渡效果花费的时间。transition属性中______属性规定过渡效果的时间曲线。…

GitHub入门指南(上)

前言 我去年入门 GitHub&#xff0c;一开始不知道从哪开始学&#xff0c;在网上查找了很多文章、教程学习。这篇文章就是以我刚学习时的小白视角写的&#xff0c;希望能帮助到想开始学习 GitHub 又不知如何上手的学习者。因为我也是初级水平&#xff0c;文中介绍的知识基于我自…

6.移动端布局-rem布局

1.rem基础 优点&#xff1a;可以通过修改html里边文字的大小来改变页面中其他元素的大小&#xff0c;可以实现整体控制 1.1 rem单位 rem(root em)是一个相对单位&#xff0c;类似于em。 em是相对于自身元素字体大小&#xff08;若自身没有设置font-size则基础父元素的字体大…

一次SQL调优 聊一聊 SQLSERVER 数据页

一&#xff1a;背景 1.讲故事 最近给一位朋友做 SQL 慢语句 优化&#xff0c;花了些时间调优&#xff0c;遗憾的是 SQLSERVER 非源码公开&#xff0c;玩起来不是那么顺利&#xff0c;不过从这次经历中我觉得明年的一个重大任务就是好好研究一下它&#xff0c;争取在 SQLSERVE…

uniapp实现音频播放抢占系统音频焦点

项目为使用uniapp框架开发的Android/iOS APP应用 实现功能需求 假设手机正在播放音乐&#xff0c;当前APP处于前台收到消息&#xff0c;需播放提示音提示用户。目标为降低后台正在播放音乐的音量&#xff0c;播放提示音&#xff0c;播放完毕后恢复后台音乐音量 需求分析 乍…

拉伯证券|新能源汽车前11月产销翻倍,渗透率升至三分之一

2022年11月&#xff0c;国内新能源轿车渗透率已升至33.8%&#xff0c;创前史新高。 2022年的最终一个交易日早盘&#xff0c;两市高开高走&#xff0c;沪指涨0.61%&#xff0c;深证成指涨0.35%&#xff0c;创业板指涨0.3%。板块上来看&#xff0c;Web3.0、虚拟人、网络游戏概念…

BI技巧丨RankxYoY

群友&#xff1a;PowerBI可以实现两年的排名差异么&#xff1f; 白茶&#xff1a;可以&#xff01;安排&#xff01; 本期的问题&#xff0c;来自于群内小伙伴的一个实际应用场景。在实际业务中&#xff0c;这个需求属于常见类型&#xff0c;展示当前排名时&#xff0c;用户关注…

【软件测试】 测试开发?看看一线大厂需求的测试开发技能......

目录&#xff1a;导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09;前言 能在一线大厂工作是…

FPGA知识汇集-FPGA系统时序理论

时序约束条件 下面来具体讨论一下系统时序需要满足的一些基本条件。我们仍然以下图的结构为例&#xff0c;并可以据此画出相应的时序分析示意图。 在上面的时序图中&#xff0c;存在两个时序环&#xff0c;我们称实线的环为建立时间环&#xff0c;而虚线的环我们称之为保持时间…