Server端的Actor,分工非常的明确,但是只将Actor作为一部手机来用,真的合适吗?

news2025/1/10 11:22:37

这是一篇介绍PowerJob,Server端Actor的文章,如果感兴趣可以请点个关注,大家互相交流一下吧。

 

server端一共有两个Actor,一个是处理worker传过来的信息,一个是server之间的信息传递。

处理Worker的Actor叫做WorkerRequestAkkaHandler,处理Server之间的叫做FriendRequestHandler。从名字来看也是非常的有意思,server之间彼此是朋友,worker之间没有什么朋友,有的只是上下级,说跑偏了。

WorkerRequestAkkaHandler

主要接收五种类型的是消息

  1. 来自worker的心跳信息(确保worker是活着的)

  2. 任务实例的状态信息(查看worker的工作进展)

  3. worker的日志信息(监视worker是工作每一步是否有错误)

  4. worker部署容器的信息(worker额外做了哪些工作)

  5. 查询执行器集群的信息(来新员工要第一时间知道)

心跳信息的发送与接收

timingPool.scheduleAtFixedRate(new WorkerHealthReporter(workerRuntime), 0, 15, TimeUnit.SECONDS);

心跳的发送,是由worker端的WorkerHealthReporter的run方法发送的,该类实现了Runnable接口,在worker启动的时候,被设置成了每隔15秒执行一次,是worker的后台执行的程序。

心跳的接收,是由server端的WorkerRequestAkkaHandler,接收之后将信息存入到内存中,顺便记录日志,可以自行接入到ELK系统中去(如果连接到ELK)。

这一步操作的作用就是确认worker都活着,当有任务来临的时候,将任务发送到所有活着的,或者发送到状态更好的worker去执行。

任务实例的状态信息

发送方主要是TaskTracker,因为TaskTracker是一个抽象类,所以有两个实现类,一个是FrequentTaskTracker,主要负责是秒级任务,一个是CommonTaskTracker,主要负责管理JobInstance的运行,负责任务派发,这三个类均会发送任务实例的状态信息,抽象类TaskTracker主要是在创建任务的时候,如果发生异常,就会向server发送发生异常的任务实例的状态信息,源代码如下:

public static TaskTracker create(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
    try {
       ... ...
    } catch (Exception e) {
        // 直接发送失败请求
        TaskTrackerReportInstanceStatusReq response = new TaskTrackerReportInstanceStatusReq();
       //这就是一堆set信息,没什么好看的
       ...

        String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress());
        ActorSelection serverActor = workerRuntime.getActorSystem().actorSelection(serverPath);
        serverActor.tell(response, null);
    }
    return null;
}

FrequentTaskTracker主要是在Check内部类里面的的reportStatus方法执行,是一个定时执行的方法。

CommonTaskTracker也是在一个内部类StatusCheckRunnable里面的innerRun方法执行,主要是检查当前任务的执行状态,每隔13秒回执行一次,这个时间可以在启动java的时候设置。

接收端是server的WorkerRequestAkkaHandler类,接收到信息之后更新任务状态,主要代码是InstanceManager的updateStatus方法。源代码如下:为了篇幅不太长,有些日志输出给省略了,大部分都是源代码的注释说明,感觉说的挺详细,还不乏幽默感,所以就保留了。

public void updateStatus(TaskTrackerReportInstanceStatusReq req) throws ExecutionException {
    Long instanceId = req.getInstanceId();
    // 获取相关数据
    JobInfoDO jobInfo = instanceMetadataService.fetchJobInfoByInstanceId(req.getInstanceId());
    InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
    if (instanceInfo == null) {return;}
    // 丢弃过期的上报数据
    if (req.getReportTime() <= instanceInfo.getLastReportTime()) {return;}
    // 丢弃非目标 TaskTracker 的上报数据(脑裂情况)
    if (!req.getSourceAddress().equals(instanceInfo.getTaskTrackerAddress())) {return;}

    InstanceStatus receivedInstanceStatus = InstanceStatus.of(req.getInstanceStatus());
    Integer timeExpressionType = jobInfo.getTimeExpressionType();
    // 更新 最后上报时间 和 修改时间
    instanceInfo.setLastReportTime(req.getReportTime());
    instanceInfo.setGmtModified(new Date());
    
    // 下面这个IF主要是处理FrequentTaskTracker发来的消息
    // FREQUENT 任务没有失败重试机制,TaskTracker一直运行即可,只需要将存活信息同步到DB即可
    // FREQUENT 任务的 newStatus 只有2中情况,一种是 RUNNING,一种是 FAILED(表示该机器 overload,需要重新选一台机器执行)
    // 综上,直接把 status 和 runningNum 同步到DB即可
    if (TimeExpressionType.FREQUENT_TYPES.contains(timeExpressionType)) {
        // 如果实例处于失败状态,则说明该 worker 失联了一段时间,被 server 判定为宕机,而此时该秒级任务有可能已经重新派发了,故需要 Kill 掉该实例
        if (instanceInfo.getStatus() == InstanceStatus.FAILED.getV()) {
            stopInstance(instanceId, instanceInfo);
            return;
        }
        LifeCycle lifeCycle = LifeCycle.parse(jobInfo.getLifecycle());
        // 检查生命周期是否已结束
        if (lifeCycle.getEnd() != null && lifeCycle.getEnd() <= System.currentTimeMillis()) {
            stopInstance(instanceId, instanceInfo);
            instanceInfo.setStatus(InstanceStatus.SUCCEED.getV());
        } else {
            instanceInfo.setStatus(receivedInstanceStatus.getV());
        }
        instanceInfo.setResult(req.getResult());
        instanceInfo.setRunningTimes(req.getTotalTaskNum());
        instanceInfoRepository.saveAndFlush(instanceInfo);
        // 任务需要告警
        if (req.isNeedAlert()) {
            alert(instanceId, req.getAlertContent());
        }
        return;
    }
    
    // 更新运行次数
    if (instanceInfo.getStatus() == InstanceStatus.WAITING_WORKER_RECEIVE.getV()) {
        // 这里不会存在并发问题
        instanceInfo.setRunningTimes(instanceInfo.getRunningTimes() + 1);
    }
    // QAQ ,不能提前变更 status,否则会导致更新运行次数的逻辑不生效继而导致普通任务 无限重试
    instanceInfo.setStatus(receivedInstanceStatus.getV());

    boolean finished = false;
    if (receivedInstanceStatus == InstanceStatus.SUCCEED) {
        instanceInfo.setResult(req.getResult());
        instanceInfo.setFinishedTime(System.currentTimeMillis());
        finished = true;
    } else if (receivedInstanceStatus == InstanceStatus.FAILED) {
        // 当前重试次数 <= 最大重试次数,进行重试 (第一次运行,runningTimes为1,重试一次,instanceRetryNum也为1,故需要 =)
        if (instanceInfo.getRunningTimes() <= jobInfo.getInstanceRetryNum()) {
            // 延迟10S重试(由于重试不改变 instanceId,如果派发到同一台机器,上一个 TaskTracker 还处于资源释放阶段,无法创建新的TaskTracker,任务失败)
            instanceInfo.setExpectedTriggerTime(System.currentTimeMillis() + 10000);
            // 修改状态为 等待派发,正式开始重试
            // 问题:会丢失以往的调度记录(actualTriggerTime什么的都会被覆盖)
            instanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
        } else {
            instanceInfo.setResult(req.getResult());
            instanceInfo.setFinishedTime(System.currentTimeMillis());
            finished = true;
        }
    }
    // 同步状态变更信息到数据库
    instanceInfoRepository.saveAndFlush(instanceInfo);
    if (finished) {
        // 这里的 InstanceStatus 只有 成功/失败 两种,手动停止不会由 TaskTracker 上报
        processFinishedInstance(instanceId, req.getWfInstanceId(), receivedInstanceStatus, req.getResult());
    }

}

所谓脑裂问题,就是同一个集群中的不同节点,对于集群的状态有了不一样的理解

worker的日志信息

 

timingPool.scheduleWithFixedDelay(omsLogHandler.logSubmitter, 0, 5, TimeUnit.SECONDS);

发送方式Worker中的OmsLogHandler类里的LogSubmitter内部类的run方法,也是另起线程进行处理的,将产生的日记内容进行上传,这里面使用了一个锁,保证只有一个线程上传日志。

接收端是server的WorkerRequestAkkaHandler类,接收之后保存到数据库中。

worker部署容器的信息

发送端是Worker的OmsContainerFactory类中的fetchContainer方法,该方法是由WorkActor触发的,当server要部署容器的时候,会向WorkerActor接收,然后调用方法onReceiveServerDeployContainerRequest,方法中判断该容器是否已经保存在本地,如果没有再通过fetchContainer向server的WorkerRequestAkkaHandler发送请求获取容器信息,然后部署。

接收端是server的WorkerRequestAkkaHandler类,接收到信息之后,server会将容器id,name,version和下载的url发回给worker,让worker通过url下载容器进行部署。

查询执行器集群的信息

发送端是worker的TaskTracker类的内部类WorkerDetector的run方法,如果是秒级任务,在任务初始化的时候会设置成每一分钟执行一次,在FrequentTaskTracker的initTaskTracker方法内

scheduledPool.scheduleAtFixedRate(new WorkerDetector(), 1, 1, TimeUnit.MINUTES);

如果是正常的任务,任务类型是Map或者MapReduce,会执行该方法:

if (executeType == ExecuteType.MAP || executeType == ExecuteType.MAP_REDUCE) { scheduledPool.scheduleAtFixedRate(new WorkerDetector(), 1, 1, TimeUnit.MINUTES); }

接收端是server的WorkerRequestAkkaHandler类,接收之后,将所有可以使用的worker的信息返回。

FriendRequestHandler

主要接收两种类型的信息:

  1. Ping 检测目标机器是否存活(还有和我一个级别的活人吗)

  2. RemoteProcessReq 远程执行命令(告诉你的直接下属干活,我不想得罪人)

检测目标机器是否存活

发送方式server的ServerElectionService类的activeAddress方法,该方法是worker启动的时候连接server时调用acquire服务的时候,会调用该方法,该方法会向worker发送的server地址询问目前存活的所有server地址信息。

触发的起点是在Worker的PowerJobWorker的init()中

serverDiscoveryService.start(timingPool);

=》ServerDiscoveryService的start方法的this.currentServerAddress = discovery();

=》ServerDiscoveryService的discovery方法的result = acquire(这个地址不重要,重要的是调用了这个方法);

=》ServerDiscoveryService的acquire方法的result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url));

然后转到了Server的ServerController类的acquireServer方法中

return ResultDTO.success(serverElectionService.elect(appId, protocol, currentServer));

=》ServerElectionService的elect方法的return getServer0(appId, protocol);

=》ServerElectionService的getServer0方法的String activeAddress = activeAddress(originServer, downServerCache, protocol);

=》ServerElectionService的activeAddress方法的CompletionStage<Object> askCS = Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS));

以上就是调用前的全部步骤了。

接收方是server的FriendRequestHandler,返回给询问方目前所有存活的server地址。

远程执行命令

发送方式server中JobServer上的注解DesignateServer的切面方法,在server执行某个任务时,会对当前worker的直属server进行判断,如果worker的直属server是当前调度任务的server,则直接执行,如果不是,则会将该方法发送给直属server进行执行。

比如说立即执行任务的命令,会在JobService的runJob中执行,但是该方法上有一个注解@DesignateServer,这也就会在方法执行之前,调用DesignateServerAspect的execute方法,如果将目标server地址与本地地址进行对比不一样,则会执行该远程方法。

接收方是server的FriendRequestHandler,接到执行方法的类名,方法名,入参和返回值等信息,执行方法。执行方法是在RemoteRequestProcessor类中。

总结

 server的这两个Actor职责划分还是很清晰的,不过感觉将Actor仅仅只是用在通信上,有点大材小用的感觉,Actor这个单词本身就是将其比作一个演员,应该是扮演某个角色,当然了,让其仅仅扮演一个手机,可能也是个不错的想法。Akka-remote的底层是Netty,如果直接使用Netty估计也可以,只不过Akka将其进行了封装,使用起来能够更方便一些,不过就是给人一种用大炮打蚊子的感觉。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/352115.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

5、HAL库驱动W25Qxx

一、 SPI通信驱动W25Qxx 1、使用驱动文件快速配置工程代码驱动W25Qxx &#xff08;此驱动文件只适合W25Qxx 16M及以下型号&#xff0c;因为访问地址位数不同&#xff09; 注&#xff1a;本次使用SPI的方式进行访问W25Qxx Flash进行数据读写&#xff0c;关于W25Qxx芯片不会做…

10大主流压力测试工具各有所长,怎么选适合自己的?

市面上流行的压力/负载/性能测试工具多是来自国外&#xff0c;近年来国内的性能测试工具也如雨后春笋崛起。同时由于开发的目的和侧重点不同&#xff0c;其功能也有很大差异&#xff0c;下面就为您简单介绍10款目前最常见的测试产品。 1、kylinTOP测试与监控平台&#xff08;商…

实现一个比ant功能更丰富的Modal组件

普通的modal组件如下&#xff1a; 我们写的modal额外支持&#xff0c;后面没有蒙版&#xff0c;并且Modal框能够拖拽 还支持渲染在文档流里&#xff0c;上面的都是fixed布局&#xff0c;我们这个正常渲染到文档下面&#xff1a; render部分 <RenderDialog{...restState}visi…

Lesson5.2---Python 之 NumPy 切片索引和广播机制

一、切片和索引 ndarray 对象的内容可以通过索引或切片来访问和修改&#xff08;&#xff09;&#xff0c;与 Python 中 list 的切片操作一样。ndarray 数组可以基于 0 - n 的下标进行索引&#xff08;先行后列&#xff0c;都是从 0 开始&#xff09;。 区别在于&#xff1a;数…

代码随想录算法训练营第三十二天 | 122.买卖股票的最佳时机II,55. 跳跃游戏,45.跳跃游戏II

一、参考资料买卖股票的最佳时机IIhttps://programmercarl.com/0122.%E4%B9%B0%E5%8D%96%E8%82%A1%E7%A5%A8%E7%9A%84%E6%9C%80%E4%BD%B3%E6%97%B6%E6%9C%BAII.html 跳跃游戏https://programmercarl.com/0055.%E8%B7%B3%E8%B7%83%E6%B8%B8%E6%88%8F.html 跳跃游戏 IIhttps://pr…

金三银四必备软件测试必问面试题

初级软件测试必问面试题1、你的测试职业发展是什么&#xff1f;测试经验越多&#xff0c;测试能力越高。所以我的职业发展是需要时间积累的&#xff0c;一步步向着高级测试工程师奔去。而且我也有初步的职业规划&#xff0c;前 3 年积累测试经验&#xff0c;按如何做好测试工程…

【数据结构期末例题】

前言 本文是博主自己在准备学校数据结构考试时的总结&#xff0c;各个知识点都贴有对应的详细讲解文章以供大家参考&#xff1b;当然文中还有许许多多的截图&#xff0c;这些是博主对主要内容的摘取&#xff0c;对于那些基础较好的同学可以直接看截图&#xff0c;减少跳转对应文…

声呐学习笔记之波束成形

目录什么是波束什么是波束成形线阵数学推导(均匀排布)什么是波束 和光束一样&#xff0c;当所有波的传播方向都一致时&#xff0c;即形成了波束。工程师利用波束已经有相当久的历史。在二战中&#xff0c;工程师已经将波束利用在雷达中&#xff0c;雷达通过扫描波束方向来探测…

力扣-分数排名

大家好&#xff0c;我是空空star&#xff0c;本篇带你了解一道简单的力扣sql练习题。 文章目录前言一、题目&#xff1a;178. 分数排名二、解题1.错误示范①提交SQL运行结果2.错误示范②提交SQL运行结果3.正确示范①提交SQL运行结果4.正确示范②提交SQL运行结果5.正确示范③提交…

全流程GMS地下水数值模拟技能培养及溶质运移反应问题深度解析实践技术

本次综合前期多次学习的效果及重点关注环节&#xff0c;系统性呈现地下水数值模拟软件GMS建模方法同时&#xff0c;建立与实践项目过程中的重点问题相融合&#xff0c;在教学中不仅强调学习三维地质结构建模、水文地质模型概化、边界条件设定、参数反演和模型校核等关键环节&am…

套娃式工具!用 AI 识别 AI ?#AI classifier

2022年以来&#xff0c;市面上就出现了不少 AI 生成文本的工具&#xff0c;尤其是 OpenAI 推出的 ChatGPT &#xff0c;不仅能够协助完成撰写邮件、视频脚本、文案、翻译、代码等任务&#xff0c;还能通过学习和理解人类的语言来进行对话&#xff0c;并根据聊天的上下文进行互动…

AI技术网关如何用于安全生产监测?有什么优势?

现代工业生产和运营的规模越来越庞大、系统和结构越来越复杂&#xff0c;现场的风险点多面广&#xff0c;给作业一线的安全监管带来极大的挑战。 针对工地、煤矿、危化品、加油站、烟花爆竹、电力等行业的安全生产监管场景&#xff0c;可以借助AI智能与物联网技术&#xff0c;…

4.1 Filter-policy

1. 实验目的 熟悉Filter-policy的应用场景掌握Filter-policy的配置方法2. 实验拓扑 Filter-policy实验拓扑如图4-5所示: 图4-5:Filter-policy 3. 实验步骤 (1) 网络连通性 R1的配置 <Huawei>system-vi…

点成分享|对于粘性液体该如何精准移液?

之前文章介绍移液器原理及分类时有说到&#xff0c;从移液器的使用原理来进行移液器的分类&#xff0c;大致就可分为空气置换式移液器和正向置换移液器&#xff08;即外置活塞式移液器&#xff09;。 对于粘性液体&#xff0c;特别是高粘度液体的移液操作&#xff0c;最好的方…

Vulnhub 渗透练习(四)—— Acid

环境搭建 环境下载 kail 和 靶机网络适配调成 Nat 模式&#xff0c;实在不行直接把网络适配还原默认值&#xff0c;再重试。 信息收集 主机扫描 没扫到&#xff0c;那可能端口很靠后&#xff0c;把所有端口全扫一遍。 发现 33447 端口。 扫描目录&#xff0c;没什么有用的…

代码随想录算法训练营第三十天 | 332.重新安排行程,51. N皇后,37. 解数独,总结

Day29 休息~一、参考资料重点&#xff01;&#xff01; 回溯算法总结篇https://programmercarl.com/%E5%9B%9E%E6%BA%AF%E6%80%BB%E7%BB%93.html 组合问题&#xff1a;N个数里面按一定规则找出k个数的集合排列问题&#xff1a;N个数按一定规则全排列&#xff0c;有几种排列方式…

【数字电路】数字电路的学习核心

文章目录前言一、电子电路知识体系二、数电的学习目标三、数字电路分析例子四、数字电路设计例子总结前言 用数字信号完成对数字量进行算术运算和逻辑运算的电路称为数字电路&#xff0c;或数字系统。由于它具有逻辑运算和逻辑处理功能&#xff0c;所以又称数字逻辑电路。现代…

2023美赛E题思路代码分析

2023美赛数学建模E题思路分析&#xff0c;更多的可以文末 E题&#xff1a;光污染 问题一&#xff1a;制定一个广泛适用的指标来确定一个地点的光污染风险水平。 首先我们要知道光污染以两种形式存在&#xff1a; 天空辉光&#xff08;也称为人造天空辉光、光穹或逃逸光&…

Exchange 2013升级以及域名绑定等若干问题

环境简介Exchange 2013服务器位于ad域中&#xff0c;系统为Windows server 2012 R2&#xff0c;其内部域名为&#xff1a;mail.ad.com一. Exchange客户端无法在浏览器中正常运行在域中部署Exchange服务器后&#xff0c;除了可以通过outlook、foxmail等邮件客户端来使用邮箱功能…

具有非线性动态行为的多车辆列队行驶问题的基于强化学习的方法

论文地址&#xff1a; Reinforcement Learning Based Approach for Multi-Vehicle Platooning Problem with Nonlinear Dynamic Behavior 摘要 协同智能交通系统领域的最新研究方向之一是车辆编队。研究人员专注于通过传统控制策略以及最先进的深度强化学习 (RL) 方法解决自动…