powerjob的worker启动,研究完了这块代码之后我发现了,代码就是现实中我们码农的真实写照

news2024/11/16 12:42:36

这是一篇让你受益匪浅的文章,代码即使人生。

 worker启动比server启动要复杂一些,毕竟worker是要实际干活的,工欲善其事必先利其器,所以需要准备的工具还是不能少的,server对于powerjob来说,只是一个调度用的,说白了就是管理worker做什么工作的,只需要给他一个流程,让他按照流程上的内容,一次告诉worker去工作,至于怎么做,只有worker知道,server当然不会知道的,也没有必要知道。

worker的启动大概分为以下这么几个步骤:

  1. 判断是否重复初始化

  2. 获取默认配置

  3. 校验appName

  4. 获取IP地址和端口(这一步和server端是一样的,在这里就不赘述了)

  5. 初始化定时线程

  6. 连接server

  7. 初始化Akka

  8. 初始化日志系统

  9. 初始化存储

  10. 初始化定时任务

步骤是蛮多的,但是其实都不是非常的复杂

由于worker的启动源码过于多了,就不全贴出来了。

开胃菜

首先因为该worker包是需要被依赖的,所以并没有spring的启动类,但是却有启动spring时添加其配置的内容,在worker包里面的PowerJobWorker类,是实现了ApplicationContextAware, InitializingBean, DisposableBean这三个类,这三个类默认有三个方法,分别是

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException

public void afterPropertiesSet() throws Exception

public void destroy() throws Exception

99.9%的初始化工作都是在afterPropertiesSet这个方法里完成的,看名字大概也能猜出这个方法的意思,就是字面意思。

判断是否重复初始化

if (!initialized.compareAndSet(false, true)) {
    log.warn("[PowerJobWorker] please do not repeat the initialization");
    return;
}

这段代码意思就是一个initialized的变量,代表的意思是是否初始化,一开始的时候是false,因为还没有开始初始化,然后compareAndSet后面跟着两个参数,第一个参数是预期值,如果预期值和当前的变量值一样,则将当前变量更新为第二个参数的值。

如果initialized的值是false ,和预期值一样,则compareAndSet方法返回的是true,跳出if条件语句,并且initialized值变成了true。

如果initialized的·值是true,和预期值不一样,则compareAndSet返回的是false,进入条件语句,打印告警日志,并且不再有后续的初始化操作,此时initialized的值不变,依旧是true。

获取默认配置

PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();

//下面这些代码都是在之后的初始化操作中进行赋值的
workerRuntime.setWorkerAddress(workerAddress);

workerRuntime.setServerDiscoveryService(serverDiscoveryService);

workerRuntime.setActorSystem(actorSystem);

workerRuntime.setOmsLogHandler(omsLogHandler);

workerRuntime.setTaskPersistenceService(taskPersistenceService);

这个WorkerRuntime类是worker.common包里面的一个Bean类,记录了一些worker运行时的参数和环境,里面有的有默认值,有的没有默认值,需要在初始化的时候进行赋值。比如上面代码中,后面set的值

校验appName

我将里面有关打印日志的部分全部拿掉了,通过appName,去server请求appId,如果请求不到,则说明配置文件里面的“powerjob.worker.app-name”配置的有问题,所有appName都是需要注册的,所以名字是不会重复的。

private void assertAppName() {
    //获取到appName
    PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();
    String appName = config.getAppName();
    Objects.requireNonNull(appName, "appName can't be empty!");

//调用server端的服务
    String url = "http://%s/server/assert?appName=%s";
    for (String server : config.getServerAddress()) {
//获取到server的请求地址
        String realUrl = String.format(url, server, appName);
        try {
//请求服务,返回结果
            String resultDTOStr = CommonUtils.executeWithRetry0(() -> HttpUtils.get(realUrl));
//解析返回结果
            ResultDTO resultDTO = JsonUtils.parseObject(resultDTOStr, ResultDTO.class);
            if (resultDTO.isSuccess()) {
//将appId设置到运行环境里
                Long appId = Long.valueOf(resultDTO.getData().toString());
                workerRuntime.setAppId(appId);
                return;
            }else {
                throw new PowerJobException(resultDTO.getMessage());
            }
        }catch (PowerJobException oe) {
            throw oe;
        }catch (Exception ignore) {
        }
    }
    throw new PowerJobException("no server available!");
}

连接Server

 

serverDiscoveryService.start(timingPool);

最主要的就是上面这一行代码,这个代码里面主要流程如下:

  1. 将配置文件里面的服务器地址存入内存。

  2. 当前服务地址如果不为空,调用server端的/acquire服务获取结果。

  3. 如果经过第二步没有结果返回,则遍历配置文件中所有的server地址来获取结果。

  4. 如果依旧没有结果,说明连接不到server,需要将所有的本地任务停止。

  5. 如果得到结果,则将结果返回。

    private String discovery() {
    //1.将配置文件里面的服务器地址存入内存。
        if (ip2Address.isEmpty()) {
            config.getServerAddress().forEach(x -> ip2Address.put(x.split(":")[0], x));
        }
        String result = null;
    //2.当前服务地址如果不为空,调用server端的/acquire服务获取结果。
        String currentServer = currentServerAddress;
        if (!StringUtils.isEmpty(currentServer)) {
            String ip = currentServer.split(":")[0];
            String firstServerAddress = ip2Address.get(ip);
            if (firstServerAddress != null) {
                result = acquire(firstServerAddress);
            }
        }
    //3.如果经过第二步没有结果返回,则遍历配置文件中所有的server地址来获取结果。
        for (String httpServerAddress : config.getServerAddress()) {
            if (StringUtils.isEmpty(result)) {
                result = acquire(httpServerAddress);
            }else {
                break;
            }
        }
    
        if (StringUtils.isEmpty(result)) {
    //4.如果依旧没有结果,说明连接不到server,需要将所有的本地任务停止。
            if (FAILED_COUNT++ > MAX_FAILED_COUNT) {
                List<Long> frequentInstanceIds = TaskTrackerPool.getAllFrequentTaskTrackerKeys();
                if (!CollectionUtils.isEmpty(frequentInstanceIds)) {
                    frequentInstanceIds.forEach(instanceId -> {
                        TaskTracker taskTracker = TaskTrackerPool.remove(instanceId);
                        taskTracker.destroy();
                    });
                }
                FAILED_COUNT = 0;
            }
            return null;
        } else {
    //5.如果得到结果,则将结果返回。
            FAILED_COUNT = 0;
            return result;
        }
    }

    初始化日志系统

    OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, actorSystem, serverDiscoveryService);

    这个日志系统的主要作用,就是将本地的日志上报的server上,从传进的参数就能看出,都是和通讯相关的内容。

    这个日志系统的提交也是异步单独占用一个线程,在之前开启的线程中,其中就有一个线程是用来提交日志的,该线程会在worker启动的最后开启,代码段如下:

    timingPool.scheduleWithFixedDelay(omsLogHandler.logSubmitter, 0, 5, TimeUnit.SECONDS);

    固定每5秒提交一次日志。

    初始化存储

    worker使用的是本地的H2数据库,持久化的策略分为磁盘和内存,在worker停止的时候,会将本地的数据文件全部销毁。其主要的初始化代码在worker.persistence包里面的ConnectionFactory类中,源代码如下:

  6. private final String DISK_JDBC_URL = String.format("jdbc:h2:file:%spowerjob_worker_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false", H2_PATH);
    private final String MEMORY_JDBC_URL = String.format("jdbc:h2:mem:%spowerjob_worker_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false", H2_PATH);
    
    public synchronized void initDatasource(StoreStrategy strategy) {
        strategy = strategy == null ? StoreStrategy.DISK : strategy;
    
        HikariConfig config = new HikariConfig();
        config.setDriverClassName(Driver.class.getName());
        config.setJdbcUrl(strategy == StoreStrategy.DISK ? DISK_JDBC_URL : MEMORY_JDBC_URL);
        config.setAutoCommit(true);
        // 池中最小空闲连接数量
        config.setMinimumIdle(2);
        // 池中最大连接数量
        config.setMaximumPoolSize(32);
        dataSource = new HikariDataSource(config);
    
        try {
            FileUtils.forceDeleteOnExit(new File(H2_PATH));
        }catch (Exception ignore) {
        }
    }

    HikariCP 是一个高性能的 JDBC 连接池组件,HikariConfig 就是其相关的配置类。

    总结

  7. worker工作起来确实不是很容易,需要找到自己的上级,还需要记录自己工作的日志,需要一个人干好多任务,还需要再不耽误正常任务的同时,向自己的上级汇报工作,汇报自己的身体状态。简直就是我们底层程序员的真实写照啊。里面使用了很多经典的技术,也有比较新的技术,对于日志系统,做的还是让我学到了很多。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/349552.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JVM详解

一&#xff0c;JVM 1&#xff0c;JVM区域划分 类装载器&#xff0c;运行时数据区&#xff0c;字节码执行引擎 2&#xff0c;JVM内存模型&#xff08;运行时数据区&#xff09; 由本地方法栈&#xff0c;虚拟机栈&#xff0c;堆&#xff0c;方法区&#xff0c;和程序计数器组成。…

C++类基础(十五)

类的继承——虚函数&#xff08;二&#xff09; ● 由虚函数所引入的动态绑定属于运行期行为&#xff0c;与编译期行为有所区别 虚函数与继承紧密相关 – 虚函数的缺省实参只会考虑静态类型 struct Base {virtual void fun(int x 3){std::cout << "virtual void f…

国产技术迎来突破,14nm芯片横空出世,低代码也有好消息

芯片&#xff0c;被称为工业时代的“粮食”&#xff0c;小到手机手环&#xff0c;大到飞机轮船&#xff0c;几乎各个行业都不离开芯片的支持&#xff0c;其重要性不言而喻。而我国在这一领域一直较为薄弱。 一、“芯片之路坎坷” 由于国内半导体芯片市场底子薄弱、没有主动权…

NetApp AFF A 系列全闪存存储阵列

NetApp AFF A 系列全闪存阵列是一款智能、至强、至信的解决方案&#xff0c;它可利用现代云技术为您的 Data Fabric 提供所需的速度、效率和安全性。 是时候实现数据现代化了 进行任何 IT 转型的基础性第一步是利用高性能全闪存存储打造现代化基础架构&#xff0c;提高关键业务…

【C++之容器适配器】反向迭代器的实现

目录前言一、反向迭代器的实现1. 底层2. 成员函数1. 构造函数2. operator*()3. operator->()4. 前置5. 后置6. 前置--7. 后置--8. operator!()9. operator()二、vector反向迭代器的实现1. vector的正向迭代器2. vector反向迭代器的实现3. 测试vector的反向迭代器三、list反向…

git提交

文章目录关于数据库&#xff1a;桌面/vue-admin/vue_shop_api 的 git 输入 打开 phpStudy ->mySQL管理器 导入文件同时输入密码&#xff0c;和文件名 node app.js 错误区&#xff1a; $ git branch // git branch 查看分支 只有一个main分支不见master解决&#xff1a; gi…

PyQt5保姆级入门教程——从安装到使用

目录 Part1&#xff1a;安装PyQt5 Part 2&#xff1a;PyCharm配置PyQt5 Part 3&#xff1a;PyQt5设计界面介绍 Part 4&#xff1a;PyQt5设计UI 今天看了多个大佬的教程&#xff0c;总算是把PyQt5开发弄好了&#xff0c;每个部分都要看几个人的十分不方便&#xff0c;我十分…

YOLOv3简介

YOLOv3 预测部分 Darknet-53 YOLOv3的主干提取网络为Darknet-53&#xff0c;相比于YOLOv2时期的Darknet-19&#xff0c;其加深了网络层数且引入了Residual残差结构。其通过不断的1X1卷积和3X3卷积以及残差边的叠加&#xff0c;大幅度的加深了网络。残差网络的特点是容易优化&a…

【Unity VR开发】结合VRTK4.0:将浮点数从交互器传递到可交互对象

语录&#xff1a; 愿你熬得过万丈孤独&#xff0c;藏得下星辰大海。 前言&#xff1a; 默认情况下&#xff0c;交互器只能将单个布尔操作传递给可交互对象&#xff0c;后者控制可交互对象上的抓取操作。在其他时候&#xff0c;交互器中的其他操作可能希望传递给可交互对象&…

leaflet 设置marker,并可以任意拖动每一个marker(071)

第071个 点击查看专栏目录 本示例的目的是介绍演示如何在vue+leaflet中通过L.marker来添加marker,通过设置其属性,可以让marker在地图上任意的拖动。 直接复制下面的 vue+leaflet源代码,操作2分钟即可运行实现效果 文章目录 示例效果配置方式示例源代码(共76行)相关API参…

实体店店铺管理软件怎么挑?看了排名就知道!

很多实体店店主在选择店铺管理软件时,不知道怎么选择,其实这个不难。一般根据市场上的排名也选择就ok了&#xff0c;因为一款被大家都认证过好用的软件&#xff0c;怎么都比盲选的或者名不见经传的软件好。选择一款适合的实体店店铺管理软件可以省很多事。而截止现在管理软件排…

linux将新加磁盘绑挂载到指定目录

查看当前挂载情况df -l此时可以看到sda和sdb两块磁盘已经被挂载&#xff0c;但实际上还有更多块磁盘未被挂载&#xff08;磁盘名称sda&#xff0c;结尾字母安顺递增&#xff09;查看一安装的所有磁盘fdisk -l此时我们可以看到还有很多未进行分区磁盘为磁盘添加分区fdisk /dev/s…

【另辟蹊径】Table 单元格内容过多之省略展示方案,设置Element table的 showOverflowTooltip 属性无效后的替代方案

一、问题背景 设置了element table的组件 <el-table-column>属性showOverflowTooltip无效&#xff0c;如图所示。 PS&#xff1a;注意不是不起作用&#xff0c;是有作用但是内容过多展示占据了整个界面&#xff0c;影响美观和用户体验。 有的博主解决方法是全局样式文件…

一眼万年的 Keychron 无线机械键盘

一眼万年的 Keychron 无线机械键盘 一款好的键盘对于程序员或者喜欢码字的人来说是非常重要的&#xff0c;而最近博主入手了自己的第一款机械键盘——Keychron 无线机械键盘。 机械键盘特点 有独立轴体&#xff0c;通过两个簧接触&#xff0c;来触发信号&#xff0c;价格相对贵…

大文件上传如何做断点续传?

一、是什么 不管怎样简单的需求&#xff0c;在量级达到一定层次时&#xff0c;都会变得异常复杂 文件上传简单&#xff0c;文件变大就复杂 上传大文件时&#xff0c;以下几个变量会影响我们的用户体验 服务器处理数据的能力请求超时 网络波动 上传时间会变长&#xff0c;高…

缓存穿透-总结

目录 缓存穿透-总结 出现场景&#xff1a; 解决方法&#xff1a; 方法1.缓存空对象&#xff1a; 方法2.加一个布隆过滤器&#xff1a; 总结&#xff1a; 缓存穿透-总结 出现场景&#xff1a; 缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在&#xff0c;这样缓…

光量子领域新突破:有望打造芯片工厂!

将2D材料与氮化硅谐振器混合集成&#xff0c;使一系列单光子源与硅基光子按需精准结合。&#xff08;图片来源&#xff1a;网络&#xff09;量子光子学的著名专家、电气和计算机工程助理教授Galan Moody的实验室成功创造了一种在芯片上产生单光子的新方法。量子具有叠加态的特性…

飞桨特色产业级模型库,助力AI开发与落地更简单!

飞桨在长期的产业实践中发现&#xff0c;开发者使用开源模型项目落地普遍会遇到三大难题&#xff1a; 算法和模型繁多&#xff0c;做模型选择是个难题&#xff1b; 模型效果不错&#xff0c;但产业落地时容易遇到资源限制和部署的问题&#xff1b; 面对新场景无从下手&#x…

minio public桶禁止在直接访问桶位置时列出所有文件url

minio的public桶因为没有限制&#xff0c;所以在直接访问到桶地址的时候会列出桶内所有文件的url&#xff0c;这样很不安全&#xff0c;如何禁止这个功能&#xff0c;可以使用三种方法 1、如果是新版的可以直接设置桶的Access Policy为自定义就好 编辑custom的Policy&#xff…

五种情况下企业需要引进低代码开发平台

随着低代码开发平台的热度在上升&#xff0c;企业中也开始流行一种新的应用交付方式&#xff1a;业务部门基于低代码开发平台将所需要的功能&#xff08;或简单的可用版本&#xff09;自行搭建出来&#xff0c;当遇到较为复杂的需求时&#xff0c;则向IT部门请求支援。业务与IT…