这是一篇让你受益匪浅的文章,代码即使人生。
worker启动比server启动要复杂一些,毕竟worker是要实际干活的,工欲善其事必先利其器,所以需要准备的工具还是不能少的,server对于powerjob来说,只是一个调度用的,说白了就是管理worker做什么工作的,只需要给他一个流程,让他按照流程上的内容,一次告诉worker去工作,至于怎么做,只有worker知道,server当然不会知道的,也没有必要知道。
worker的启动大概分为以下这么几个步骤:
-
判断是否重复初始化
-
获取默认配置
-
校验appName
-
获取IP地址和端口(这一步和server端是一样的,在这里就不赘述了)
-
初始化定时线程
-
连接server
-
初始化Akka
-
初始化日志系统
-
初始化存储
-
初始化定时任务
步骤是蛮多的,但是其实都不是非常的复杂
由于worker的启动源码过于多了,就不全贴出来了。
开胃菜
首先因为该worker包是需要被依赖的,所以并没有spring的启动类,但是却有启动spring时添加其配置的内容,在worker包里面的PowerJobWorker类,是实现了ApplicationContextAware, InitializingBean, DisposableBean这三个类,这三个类默认有三个方法,分别是
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
public void afterPropertiesSet() throws Exception
public void destroy() throws Exception
99.9%的初始化工作都是在afterPropertiesSet这个方法里完成的,看名字大概也能猜出这个方法的意思,就是字面意思。
判断是否重复初始化
if (!initialized.compareAndSet(false, true)) {
log.warn("[PowerJobWorker] please do not repeat the initialization");
return;
}
这段代码意思就是一个initialized的变量,代表的意思是是否初始化,一开始的时候是false,因为还没有开始初始化,然后compareAndSet后面跟着两个参数,第一个参数是预期值,如果预期值和当前的变量值一样,则将当前变量更新为第二个参数的值。
如果initialized的值是false ,和预期值一样,则compareAndSet方法返回的是true,跳出if条件语句,并且initialized值变成了true。
如果initialized的·值是true,和预期值不一样,则compareAndSet返回的是false,进入条件语句,打印告警日志,并且不再有后续的初始化操作,此时initialized的值不变,依旧是true。
获取默认配置
PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();
//下面这些代码都是在之后的初始化操作中进行赋值的
workerRuntime.setWorkerAddress(workerAddress);
workerRuntime.setServerDiscoveryService(serverDiscoveryService);
workerRuntime.setActorSystem(actorSystem);
workerRuntime.setOmsLogHandler(omsLogHandler);
workerRuntime.setTaskPersistenceService(taskPersistenceService);
这个WorkerRuntime类是worker.common包里面的一个Bean类,记录了一些worker运行时的参数和环境,里面有的有默认值,有的没有默认值,需要在初始化的时候进行赋值。比如上面代码中,后面set的值
校验appName
我将里面有关打印日志的部分全部拿掉了,通过appName,去server请求appId,如果请求不到,则说明配置文件里面的“powerjob.worker.app-name”配置的有问题,所有appName都是需要注册的,所以名字是不会重复的。
private void assertAppName() {
//获取到appName
PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();
String appName = config.getAppName();
Objects.requireNonNull(appName, "appName can't be empty!");
//调用server端的服务
String url = "http://%s/server/assert?appName=%s";
for (String server : config.getServerAddress()) {
//获取到server的请求地址
String realUrl = String.format(url, server, appName);
try {
//请求服务,返回结果
String resultDTOStr = CommonUtils.executeWithRetry0(() -> HttpUtils.get(realUrl));
//解析返回结果
ResultDTO resultDTO = JsonUtils.parseObject(resultDTOStr, ResultDTO.class);
if (resultDTO.isSuccess()) {
//将appId设置到运行环境里
Long appId = Long.valueOf(resultDTO.getData().toString());
workerRuntime.setAppId(appId);
return;
}else {
throw new PowerJobException(resultDTO.getMessage());
}
}catch (PowerJobException oe) {
throw oe;
}catch (Exception ignore) {
}
}
throw new PowerJobException("no server available!");
}
连接Server
serverDiscoveryService.start(timingPool);
最主要的就是上面这一行代码,这个代码里面主要流程如下:
-
将配置文件里面的服务器地址存入内存。
-
当前服务地址如果不为空,调用server端的/acquire服务获取结果。
-
如果经过第二步没有结果返回,则遍历配置文件中所有的server地址来获取结果。
-
如果依旧没有结果,说明连接不到server,需要将所有的本地任务停止。
-
如果得到结果,则将结果返回。
private String discovery() { //1.将配置文件里面的服务器地址存入内存。 if (ip2Address.isEmpty()) { config.getServerAddress().forEach(x -> ip2Address.put(x.split(":")[0], x)); } String result = null; //2.当前服务地址如果不为空,调用server端的/acquire服务获取结果。 String currentServer = currentServerAddress; if (!StringUtils.isEmpty(currentServer)) { String ip = currentServer.split(":")[0]; String firstServerAddress = ip2Address.get(ip); if (firstServerAddress != null) { result = acquire(firstServerAddress); } } //3.如果经过第二步没有结果返回,则遍历配置文件中所有的server地址来获取结果。 for (String httpServerAddress : config.getServerAddress()) { if (StringUtils.isEmpty(result)) { result = acquire(httpServerAddress); }else { break; } } if (StringUtils.isEmpty(result)) { //4.如果依旧没有结果,说明连接不到server,需要将所有的本地任务停止。 if (FAILED_COUNT++ > MAX_FAILED_COUNT) { List<Long> frequentInstanceIds = TaskTrackerPool.getAllFrequentTaskTrackerKeys(); if (!CollectionUtils.isEmpty(frequentInstanceIds)) { frequentInstanceIds.forEach(instanceId -> { TaskTracker taskTracker = TaskTrackerPool.remove(instanceId); taskTracker.destroy(); }); } FAILED_COUNT = 0; } return null; } else { //5.如果得到结果,则将结果返回。 FAILED_COUNT = 0; return result; } }
初始化日志系统
OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, actorSystem, serverDiscoveryService);
这个日志系统的主要作用,就是将本地的日志上报的server上,从传进的参数就能看出,都是和通讯相关的内容。
这个日志系统的提交也是异步单独占用一个线程,在之前开启的线程中,其中就有一个线程是用来提交日志的,该线程会在worker启动的最后开启,代码段如下:
timingPool.scheduleWithFixedDelay(omsLogHandler.logSubmitter, 0, 5, TimeUnit.SECONDS);
固定每5秒提交一次日志。
初始化存储
worker使用的是本地的H2数据库,持久化的策略分为磁盘和内存,在worker停止的时候,会将本地的数据文件全部销毁。其主要的初始化代码在worker.persistence包里面的ConnectionFactory类中,源代码如下:
-
private final String DISK_JDBC_URL = String.format("jdbc:h2:file:%spowerjob_worker_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false", H2_PATH); private final String MEMORY_JDBC_URL = String.format("jdbc:h2:mem:%spowerjob_worker_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false", H2_PATH); public synchronized void initDatasource(StoreStrategy strategy) { strategy = strategy == null ? StoreStrategy.DISK : strategy; HikariConfig config = new HikariConfig(); config.setDriverClassName(Driver.class.getName()); config.setJdbcUrl(strategy == StoreStrategy.DISK ? DISK_JDBC_URL : MEMORY_JDBC_URL); config.setAutoCommit(true); // 池中最小空闲连接数量 config.setMinimumIdle(2); // 池中最大连接数量 config.setMaximumPoolSize(32); dataSource = new HikariDataSource(config); try { FileUtils.forceDeleteOnExit(new File(H2_PATH)); }catch (Exception ignore) { } }
HikariCP 是一个高性能的 JDBC 连接池组件,HikariConfig 就是其相关的配置类。
总结
-
worker工作起来确实不是很容易,需要找到自己的上级,还需要记录自己工作的日志,需要一个人干好多任务,还需要再不耽误正常任务的同时,向自己的上级汇报工作,汇报自己的身体状态。简直就是我们底层程序员的真实写照啊。里面使用了很多经典的技术,也有比较新的技术,对于日志系统,做的还是让我学到了很多。