这是一篇让你受益匪浅的文章,点个关注交流一下吧~
PowerJob如何使用,官方文档已经说的很详细了,即使没学过计算机的人,按照那上面的步骤来也是可以搭建出一个可以使用的例子来,所以今天就不在这里重复前人的工作,直接开始就开始将这个源代码中的原理吧。
powerjob框架主要分成两个分部,server和worker,这两部分需要分别去启动,而且需要先启动server,后启动worker,否则,worker会因为找不到可用的server,而启动失败。作为一个server启动的时候工作自然是非常繁重的,主要有以下几个步骤:
-
读取配置文件
-
启动Akka服务
-
启动Vertx服务
-
启动Spring
详细的启动代码如下所示:
private static final String TIPS = "\n\n" +
"******************* PowerJob Tips *******************\n" +
"如果应用无法启动,我们建议您仔细阅读以下文档来解决:\n" +
"if server can't startup, we recommend that you read the documentation to find a solution:\n" +
"https://www.yuque.com/powerjob/guidence/problem\n" +
"******************* PowerJob Tips *******************\n\n";
public static void main(String[] args) {
pre();//下方的私有静态方法
AkkaStarter.init();//启动Akka
VertXStarter.init();//启动Vertx
//启动Spring
try {
SpringApplication.run(PowerJobServerApplication.class, args);
} catch (Throwable t) {
log.error(TIPS);
throw t;
}
}
private static void pre() {
log.info(TIPS);//将上方的常量字符串打印出来
PropertyUtils.init();//读取配置文件
}
读取配置文件
PropertyUtils.init();//读取配置文件
这一步特别简单,仅仅涉及到server.common.utils包中的PropertyUtils。主要代码段如下:
public static void init() {
URL propertiesURL =PropertyUtils.class.getClassLoader().getResource("application.properties");
Objects.requireNonNull(propertiesURL);
try (InputStream is = propertiesURL.openStream()) {
PROPERTIES.load(is);
}catch (Exception e) {
ExceptionUtils.rethrow(e);
}
}
因为代码也不是很多,就直接把该方法的所有代码全部展示出来。
代码中第二行,方法里第一行有这么一段代码
PropertyUtils.class.getClassLoader().getResource("application.properties");
这个代码就是读取配置文件中的内容,下面是powerjob的文件目录层次,看图就知道该段代码到底读的是哪一部分。
说句实话,最开始的时候,我认为读取的是上面红色箭头指的地方,因为他代码里getResource,这个地方我认为就是读取resource文件夹下面的文件,但是.getClassLoader无法解释,后来自己跑一下这个代码就明白了,读取的是编译完的文件路径,而不是编译之前的路径。
读取的和配置文件里面的一模一样,一条不多一条不少~,springboot正常也会读取配置文件,这里为什么要单独执行一下读取,作用是什么?在这里先不说,看到后面遇到了,再来解答,留作问题1。
启动Akka服务
AkkaStarter.init();
powerjob里面主要使用的akka-remote,就是用来通信的,说白了就是手机,用来给彼此打电话汇报信息的。
Akka框架的介绍,请移步akka的官网自己去欣赏,这里只将代码的作用做一个讲解。
这个AkkaStarter是在server.remote.transport.starter包里,init方法代码如下:
public static void init() {
//一个计时的方法,就不用在开始与结束的时候使用System获取时间,然后再进行相减了。
Stopwatch stopwatch = Stopwatch.createStarted();
log.info("[PowerJob] PowerJob's akka system start to bootstrap...");
// 解析配置文件
Config akkaFinalConfig = parseConfig();
//创建一个Akka的actor系统,相当于建立了一个公司,之后所有的actor行动都在这个公司内进行
actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
//创建了一个actor。
actorSystem.actorOf(FriendRequestHandler.defaultProps(), RemoteConstant.SERVER_FRIEND_ACTOR_NAME);
log.info("[PowerJob] PowerJob's akka system started successfully, using time {}.", stopwatch);
}
解析配置文件
解析配置文件的代码如下所示:
private static Config parseConfig() {
//这一步是获取了启动第一步的读取配置文件后获取的配置内容,这也就回答了问题1。
Properties properties = PropertyUtils.getProperties();
//获取配置文件中oms.akka.port字段的内容,如果没有该配置,则默认为10086端口
int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.AKKA_PORT, String.valueOf(OmsConstant.SERVER_DEFAULT_AKKA_PORT)));
//这个是通过jvm启动时配置oms.akka.port的参数,例如:启动时候的java -Doms.akka.port=30086...
String portFromJvm = System.getProperty(PowerJobServerConfigKey.AKKA_PORT);
//如果配置了jvm启动的配置,则将其覆盖配置文件的akka端口,也就是说jvm配置优先级高于配置文件的配置
if (StringUtils.isNotEmpty(portFromJvm)) {
log.info("[PowerJob] use port from jvm params: {}", portFromJvm);
port = Integer.parseInt(portFromJvm);
}
// 启动 ActorSystem
Map<String, Object> overrideConfig = Maps.newHashMap();
//获取本地的ip地址,下面会详细进入到代码中说一下,这里面还挺深的
String localIp = NetUtils.getLocalHost();
//将关键字放到一个map映射中
overrideConfig.put("akka.remote.artery.canonical.hostname", localIp);
overrideConfig.put("akka.remote.artery.canonical.port", port);
//ip和端口组成了akka的通信地址
actorSystemAddress = localIp + ":" + port;
log.info("[PowerJob] akka-remote server address: {}", actorSystemAddress);
//获取oms-server.akka.conf配置文件里面的配置,ConfigFactory为akka自带方法
Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.SERVER_AKKA_CONFIG_NAME);
//overrideConfig里面有的配置内容,按照overrideConfig走,没有的,按照配置文件akkaBasicConfig走
return ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
}
获取本地地址
String localIp = NetUtils.getLocalHost();
在解析配置文件中,其实真正有点内容的就是这个获取本地地址这个方法,之前我没有去细研究之前,可把我给坑毁了,因为我想用局域网的ip进行来回通信,但是因为这个获取地址里面的默认策略,导致每一次使用的都不是我想用的IP地址,当我研究完了这个代码之后,终于是柳暗花明又一村了,直接就可以指定地址了。
先看源码如下所示:
public static String getLocalHost() {
//这个代码就是在获取了地址之后,就直接将地址存起来,下一次再用就可以直接返回了
if (HOST_ADDRESS != null) {
return HOST_ADDRESS;
}
//获取jvm启动时,设置的powerjob.network.local.address关键字的地址,通过这个就可以直接绑定ip了
String addressFromJVM = System.getProperty(PowerJobDKey.BIND_LOCAL_ADDRESS);
if (StringUtils.isNotEmpty(addressFromJVM)) {
log.info("[Net] use address from[{}]: {}", PowerJobDKey.BIND_LOCAL_ADDRESS, addressFromJVM);
return HOST_ADDRESS = addressFromJVM;
}
//如果上一步没有绑定地址,那就在通过getLocalAddress来获取地址。
InetAddress address = getLocalAddress();
if (address != null) {
return HOST_ADDRESS = address.getHostAddress();
}
return LOCALHOST_VALUE;
}
大概知道了获取host的思路,下一篇文章我们再详细介绍里面的具体流程,因为里面的流程相对来说有那么一点点的不简单,在这里讲的话会稍显冗长,所以我准备另开一篇,详细说一下。
启动Actor
actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig); actorSystem.actorOf(FriendRequestHandler.defaultProps(), RemoteConstant.SERVER_FRIEND_ACTOR_NAME);
第一个就是建立actor系统,参数一个是系统的名字,类似于电话号码后四位那么一个东西,另一个是akka的配置,这个在解析配置文件步骤中获取。
第二个是创建actor,这里创建的是FriendRequestHandler的Actor,调用其defaultProps()方法,源代码如下:
public static Props defaultProps() {
return Props.create(FriendRequestHandler.class)
//这一步是设置派发器的配置。
//读取配置文件oms-server.akka.conf中akka.friend-request-actor-dispatcher的关键字
.withDispatcher("akka.friend-request-actor-dispatcher")
//路由配置
.withRouter(
new RoundRobinPool(Runtime.getRuntime().availableProcessors() * 4)
.withResizer(new DefaultResizer(
Runtime.getRuntime().availableProcessors() * 4,
Runtime.getRuntime().availableProcessors() * 10,
1,
0.2d,
0.3d,
0.1d,
10
))
);
}
到这里,这个Akka服务就算是启动完成了
总结
最后的vertx的启动基本没什么东西,就是vertx的默认启动,一行代码就OK了,其余的内容和akka启动时基本相同。
vertx = Vertx.vertx();
Server的启动相对来说没有那么的复杂,代码读起来也是非常的通俗易懂,这里面只出现了一个问题
springboot自带读配置文件的功能,为什么启动的时候要先读取配置文件?
因为启动akka和vertx的时候,需要配置文件的内容,而这个时候spring还没有读取该配置文件。
相关文章
https://blog.csdn.net/i_mycode/article/details/108382597 【getClassLoader().getResource() 】
http://doc.yonyoucloud.com/doc/akka-doc-cn/2.3.6/scala/book/chapter1/01_what_is_akka.html 【AKKA 2.3.6 Scala 文档】