FE启动流程分析
Doris中FE主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。
本文主要看一下Doris的fe在启动时做了什么。
启动流程分析
启动流程图:
代码路径:
doris/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
FE server的启动代码主要在这个JAVA文件中。在启动FE的时候,主要做了以下几件事:
-
环境检查
主要是检查一些启动时必要的环境变量以及初始化配置文件。包含:
**DORIS_HOME_DIR **
如果没有人为配置 DORIS_HOME_DIR,则该变量的值就是doris的解压安装目录。
PID_DIR
PID_DIR是为了判断FE进程是第一次启动还是之前启动过。
if (Strings.isNullOrEmpty(dorisHomeDir)) { System.err.println("env DORIS_HOME is not set."); return; } if (Strings.isNullOrEmpty(pidDir)) { System.err.println("env PID_DIR is not set."); return; }
初始化fe.conf、fe_custom.conf、ldap.conf
// init config Config config = new Config(); config.init(dorisHomeDir + "/conf/fe.conf"); // Must init custom config after init config, separately. // Because the path of custom config file is defined in fe.conf config.initCustom(Config.custom_config_dir + "/fe_custom.conf"); LdapConfig ldapConfig = new LdapConfig(); if (new File(dorisHomeDir + "/conf/ldap.conf").exists()) { ldapConfig.init(dorisHomeDir + "/conf/ldap.conf"); }
检测JDK版本是否匹配,主要是检测compile的JDK和runtime的jdk版本,需要要求runtimeVersion > compileVersion
public static boolean checkJavaVersion() { if (!Config.check_java_version) { return true; } String javaCompileVersionStr = getJavaVersionFromFullVersion(Version.DORIS_JAVA_COMPILE_VERSION); String javaRuntimeVersionStr = System.getProperty("java.version"); int compileVersion = JdkUtils.getJavaVersionAsInteger(javaCompileVersionStr); int runtimeVersion = JdkUtils.getJavaVersionAsInteger(javaRuntimeVersionStr); if (runtimeVersion < compileVersion) { System.out.println("The runtime java version " + javaRuntimeVersionStr + " is less than " + "compile version " + javaCompileVersionStr); return false; } return true; }
检查 解析启动FE时输入的命令行参数,以便进行不同的操作,主要会包含这几类:
**–version:**或者执行 -v ,主要是打印FE的版本
–helper: -h ,主要是指定 helper node 然后加入FE的 bdb je的副本组
–image: -i,主要是检查image文件
–bdb: -b,主要是用以运行bdbje的命令行工具
具体解析逻辑如下(bdbje tool的代码逻辑过长,有兴趣的可以自己去看一下 parseArgs的实现):
// version if (cmd.hasOption('v') || cmd.hasOption("version")) { return new CommandLineOptions(true, "", null, ""); } // helper if (cmd.hasOption('h') || cmd.hasOption("helper")) { String helperNode = cmd.getOptionValue("helper"); if (Strings.isNullOrEmpty(helperNode)) { System.err.println("Missing helper node"); System.exit(-1); } return new CommandLineOptions(false, helperNode, null, ""); } // image if (cmd.hasOption('i') || cmd.hasOption("image")) { // get image path String imagePath = cmd.getOptionValue("image"); if (Strings.isNullOrEmpty(imagePath)) { System.err.println("imagePath is not set"); System.exit(-1); } return new CommandLineOptions(false, "", null, imagePath); } //bdb tool
根据输入的参数,如果不是运行image tool、bdbje tool或者打印FE的version信息,就继续往下执行
这个时候就要准备开始启动FE了。
同样,启动FE时,需要初始化一些操作。
初始化的时候,主要是检查了FE的启动IP,是不是一个合法的IP。
这里需要注意的就是,我们在配置文件中配置的CIDR活着FQDN的配置,在初始化的时候会检测。很多小伙伴在启动FE的时候,没有正确配置IP的时候,最后用了localhost或者本地回环IP启动,导致没有使用我们想要的IP启动,具体的判断逻辑就是在这:
static void initAddrUseIp(List<InetAddress> hosts) { useFqdn = false; analyzePriorityCidrs(); // if not set frontend_address, get a non-loopback ip InetAddress loopBack = null; boolean hasMatchedIp = false; for (InetAddress addr : hosts) { LOG.debug("check ip address: {}", addr); if (addr instanceof Inet4Address) { if (addr.isLoopbackAddress()) { loopBack = addr; } else if (!priorityCidrs.isEmpty()) { if (isInPriorNetwork(addr.getHostAddress())) { localAddr = addr; hasMatchedIp = true; break; } } else { localAddr = addr; break; } } } //if all ips not match the priority_networks then print the warning log if (!priorityCidrs.isEmpty() && !hasMatchedIp) { LOG.warn("ip address range configured for priority_networks does not include the current IP address"); } // nothing found, use loopback addr if (localAddr == null) { localAddr = loopBack; } LOG.info("local address: {}.", localAddr); }
上面的逻辑看,初始化的时候会遍历网卡信息,拿遍历的IP地址和填写的PRIORITY_CIDR_SEPARATOR的值做匹配,匹配上了,就会用处于填写的CIDR范围中的ip启动,匹配不上的时候会从网卡IP中拿出一个合法的IP作为FE的启动IP,这个就不一定是我们想要的那个启动IP。特别是当前机器上有很多虚拟网卡的IP信息,就会很大概率用排在前面的虚拟IP启动。
当然,这里还会根据配置文件中的信息,去检查是不是FQDN,是不是IPV6,有兴趣的的同学都可以看一下具体的代码逻辑。
Init操作其实就是获取了当前FE的启动IP,获取完IP后,就需要检测端口,看FE的启动的需要的这些端口是否是正常的。
private static void checkAllPorts() throws IOException { if (!NetUtils.isPortAvailable(FrontendOptions.getLocalHostAddress(), Config.edit_log_port, "Edit log port", NetUtils.EDIT_LOG_PORT_SUGGESTION)) { throw new IOException("port " + Config.edit_log_port + " already in use"); } if (!NetUtils.isPortAvailable(FrontendOptions.getLocalHostAddress(), Config.http_port, "Http port", NetUtils.HTTP_PORT_SUGGESTION)) { throw new IOException("port " + Config.http_port + " already in use"); } if (Config.enable_https && !NetUtils.isPortAvailable(FrontendOptions.getLocalHostAddress(), Config.https_port, "Https port", NetUtils.HTTPS_PORT_SUGGESTION)) { throw new IOException("port " + Config.https_port + " already in use"); } if (!NetUtils.isPortAvailable(FrontendOptions.getLocalHostAddress(), Config.query_port, "Query port", NetUtils.QUERY_PORT_SUGGESTION)) { throw new IOException("port " + Config.query_port + " already in use"); } if (!NetUtils.isPortAvailable(FrontendOptions.getLocalHostAddress(), Config.rpc_port, "Rpc port", NetUtils.RPC_PORT_SUGGESTION)) { throw new IOException("port " + Config.rpc_port + " already in use"); } }
这里的代码就有很多大家比较熟悉的 " already in use",所以我们在启动的时候,尽可能先检测一下。
端口如果被占用启动是不会成功的。
现在是不是觉得所有的准备工作都做完了,可以正式启动FE了?
还有一个比较重要的检测,就是需要根据fe.conf中的 enable_bdbje_debug_mode参数的值,来决定怎么启动。
这个值主要是某些时候,我们的FE的leader选举出现一定问题,做元数据运维的时候,会走运维模式逻辑。如果是正常情况下,这个值默认是FALSE,就会走后续的正常启动FE的流程。
-
元数据环境初始化
-
元数据目录,如果不存在,需要手动创建,
File meta = new File(metaDir); if (!meta.exists()) { LOG.warn("Doris' meta dir {} does not exist." + " You need to create it before starting FE", meta.getAbsolutePath()); throw new Exception(meta.getAbsolutePath() + " does not exist, will exit"); } if (Config.edit_log_type.equalsIgnoreCase("bdb")) { File bdbDir = new File(this.bdbDir); if (!bdbDir.exists()) { bdbDir.mkdirs(); } } File imageDir = new File(this.imageDir); if (!imageDir.exists()) { imageDir.mkdirs(); }
这里主要是需要手动创建最外层的metaDir,内层的bdb的目录和image的目录会自己创建。
-
初始化 插件管理器,启动审计日志进程
-
根据当前的元数据信息获取集群ID和节点角色信息(ROLE和VERSION文件的判断)
代码较长,只节选了关键代码。具体逻辑可以看一下getClusterIdAndRole 的具体实现,这里主要就是根据指定的helper的节点的元数据信息或者本地存在的元数据信息,获取到集群的ROLE信息和VERSION信息。
如果集群是非helper节点且第一次启动,ROLE文件实没有,这个时候需要创建这个文件。同时赋予相关值(ROLE=FOLLOWER),将节点信息写入到元数据文件中。
role = FrontendNodeType.FOLLOWER; nodeName = genFeNodeName(selfNode.getIdent(), selfNode.getPort(), false /* new style */); storage.writeFrontendRoleAndNodeName(role, nodeName);
如果当前阶段存在这些元数据文件,则会去元数据文件中获取当前节点的角色信息:
role = storage.getRole(); if (role == FrontendNodeType.REPLICA) { // for compatibility role = FrontendNodeType.FOLLOWER; } nodeName = storage.getNodeName(); if (Strings.isNullOrEmpty(nodeName)) { // In normal case, if ROLE file exist, role and nodeName should both exist. // But we will get a empty nodeName after upgrading. // So for forward compatibility, we use the "old-style" way of naming: "ip_port", // and update the ROLE file. nodeName = genFeNodeName(selfNode.getHost(), selfNode.getPort(), true/* old style */); storage.writeFrontendRoleAndNodeName(role, nodeName); LOG.info("forward compatibility. role: {}, node name: {}", role.name(), nodeName); }
如果VERSION文件不存在,则会生成一个包含新的ClusterID和token信息的文件。同时实例化一个FE实例(第一次启动)。
if (!versionFile.exists()) { clusterId = Config.cluster_id == -1 ? Storage.newClusterID() : Config.cluster_id; token = Strings.isNullOrEmpty(Config.auth_token) ? Storage.newToken() : Config.auth_token; storage = new Storage(clusterId, token, this.imageDir); storage.writeClusterIdAndToken(); isFirstTimeStartUp = true; Frontend self = new Frontend(role, nodeName, selfNode.getHost(), selfNode.getPort()); // Set self alive to true, the BDBEnvironment.getReplicationGroupAdmin() will rely on this to get // helper node, before the heartbeat thread is started. self.setIsAlive(true); // We don't need to check if frontends already contains self. // frontends must be empty cause no image is loaded and no journal is replayed yet. // And this frontend will be persisted later after opening bdbje environment. frontends.put(nodeName, self); LOG.info("add self frontend: {}", self);
存在的话,就会直接去对应的文件中获取相关的ClusterIdAndToken。
如果我们启动了一个FE,无法从给出的helper节点信息中,同helper节点建立连接,就会出现:
current node is not added to the group. please add it first. " + "sleep 5 seconds and retry, current helper nodes: {}", helperNodes
的日志信息,这个异常原因就是由于当前节点无法和指定的helper节点建立正常的连接信息导致的。
当和helper节点构建正常连接后,就会从helper节点同步 VERSION信息。
如果本身节点存在VERSIN文件的信息,说明不是第一次启动,这个时候就会用本地的这个文件的元数据信息同HELPER节点的VERSION信息进行比对。主要是比较clusterID。如果不一致,说明两个节点不是同一个集群的节点,启动进程就直接退出了。
经过这一步 VERSION和ROLE的元数据信息比对后,确定是同一个集群内的节点,也确定了这个FE的ROLE信息了,就需要从image中同步editlog。同时创建一系列的cleaner 线程和监听线程:
// 3. Load image first and replay edits this.editLog = new EditLog(nodeName); loadImage(this.imageDir); // load image file editLog.open(); // open bdb env this.globalTransactionMgr.setEditLog(editLog); this.idGenerator.setEditLog(editLog); // 4. create load and export job label cleaner thread createLabelCleaner(); // 5. create txn cleaner thread createTxnCleaner(); // 6. start state listener thread createStateListener(); listener.start();
此时启动前初始化工作就做完了。等待catalog信息的同步完成即可进行下一步。
-
-
启动FE的SERVER
FeServer feServer = new FeServer(Config.rpc_port); feServer.start(); if (options.enableHttpServer) { HttpServer httpServer = new HttpServer(); httpServer.setPort(Config.http_port); httpServer.setHttpsPort(Config.https_port); httpServer.setMaxHttpPostSize(Config.jetty_server_max_http_post_size); httpServer.setAcceptors(Config.jetty_server_acceptors); httpServer.setSelectors(Config.jetty_server_selectors); httpServer.setWorkers(Config.jetty_server_workers); httpServer.setKeyStorePath(Config.key_store_path); httpServer.setKeyStorePassword(Config.key_store_password); httpServer.setKeyStoreType(Config.key_store_type); httpServer.setKeyStoreAlias(Config.key_store_alias); httpServer.setEnableHttps(Config.enable_https); httpServer.setMaxThreads(Config.jetty_threadPool_maxThreads); httpServer.setMinThreads(Config.jetty_threadPool_minThreads); httpServer.setMaxHttpHeaderSize(Config.jetty_server_max_http_header_size); httpServer.start(); Env.getCurrentEnv().setHttpReady(true); } if (options.enableQeService) { QeService qeService = new QeService(Config.query_port, ExecuteEnv.getInstance().getScheduler()); qeService.start(); } ThreadPoolManager.registerAllThreadPoolMetric(); while (true) { Thread.sleep(2000); } } catch (Throwable e) { LOG.warn("", e); }
这里正常启动相关的SERVER即可。
还有一个比较重要的点就是,如果enable_bdbje_debug_mode值为TRUE,也就是需要走元数据运维,他又是如何启动的呢?
我们回到前面判断这个值的代码:
if (Config.enable_bdbje_debug_mode) { // Start in BDB Debug mode BDBDebugger.get().startDebugMode(dorisHomeDir); return; }
继续往下看:
public void startDebugMode(String dorisHomeDir) { try { initDebugEnv(); startService(dorisHomeDir); while (true) { Thread.sleep(2000); } } catch (Throwable t) { LOG.warn("BDB debug mode exception", t); System.exit(-1); } }
主要是初始化 Debug环境的代码:
public void init() throws BDBDebugException { EnvironmentConfig envConfig = new EnvironmentConfig(); envConfig.setAllowCreate(false); envConfig.setReadOnly(true); envConfig.setCachePercent(20); try { env = new Environment(new File(metaPath), envConfig); } catch (DatabaseException e) { throw new BDBDebugException("failed to init bdb env", e); } Preconditions.checkNotNull(env); }
这里可以看到,是利用了一个元数据,创建了一个新的环境。然后从这个环境中做了一系列的事情:
- List all database in bdbje.
- get the number of journal in specified database.
- get list of journal id (key) in specified database.
- get the journal entity of the specified journal id.
此时,我们就可以通过MySQL client来查看这些信息。同时,后续的启动流程还会继续。
-