Doris数据库FE——启动流程源码详细解析

news2024/11/17 21:47:22

Doris中FE主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。代码路径:doris/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
在这里插入图片描述

环境检查

在启动FE的时候,主要做环境检查。检查一些启动时必要的环境变量以及初始化配置文件,比如DORIS_HOME_DIR如果没有人为配置 DORIS_HOME_DIR,则该变量的值就是doris的解压安装目录;PID_DIR是为了判断FE进程是第一次启动还是之前启动过,并创建pid文件fe.pid。解析命令行参数。初始化fe.conf、fe_custom.conf、ldap.conf。检测JDK版本是否匹配,主要是检测compile的JDK和runtime的jdk版本,需要要求runtimeVersion > compileVersion。
在这里插入图片描述
检查 解析启动FE时输入的命令行参数,以便进行不同的操作,主要会包含这几类:–version或者执行 -v ,主要是打印FE的版本;–helper或 -h ,主要是指定 helper node 然后加入FE的 bdb je的副本组;–image:或-i,主要是检查image文件;–bdb或-b,主要是用以运行bdbje的命令行工具,具体解析逻辑如下(bdbje tool的代码逻辑过长,有兴趣的可以自己去看一下 parseArgs的实现):

    private static void checkCommandLineOptions(CommandLineOptions cmdLineOpts) {
        if (cmdLineOpts.isVersion()) {
            System.out.println("Build version: " + Version.DORIS_BUILD_VERSION);
            System.out.println("Build time: " + Version.DORIS_BUILD_TIME);
            System.out.println("Build info: " + Version.DORIS_BUILD_INFO);
            System.out.println("Build hash: " + Version.DORIS_BUILD_HASH);
            System.out.println("Java compile version: " + Version.DORIS_JAVA_COMPILE_VERSION);
            System.exit(0);
        } else if (cmdLineOpts.runBdbTools()) {
            BDBTool bdbTool = new BDBTool(Env.getCurrentEnv().getBdbDir(), cmdLineOpts.getBdbToolOpts());
            if (bdbTool.run()) { System.exit(0);
            } else { System.exit(-1);
            }
        } else if (cmdLineOpts.runImageTool()) {
            File imageFile = new File(cmdLineOpts.getImagePath());
            if (!imageFile.exists()) {
                System.out.println("image does not exist: " + imageFile.getAbsolutePath() + " . Please put an absolute path instead"); System.exit(-1);
            } else {
                System.out.println("Start to load image: ");
                try {
                    MetaReader.read(imageFile, Env.getCurrentEnv());
                    System.out.println("Load image success. Image file " + cmdLineOpts.getImagePath() + " is valid");
                } catch (Exception e) {
                    System.out.println("Load image failed. Image file " + cmdLineOpts.getImagePath() + " is invalid");
                    LOG.warn("", e);
                } finally {
                    System.exit(0);
                }
            }
        }

        // go on
    }

提前介绍以下fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java的getCurrentEnv函数,用于返回ENV单例。该类用于A singleton class can also be seen as an entry point of Doris. All manager classes can be obtained through this class. 类似于存放全局可见数据的全局变量,比如CatalogMgr、LoadManager等。
在这里插入图片描述
根据输入的参数,如果不是运行image tool、bdbje tool或者打印FE的version信息,就继续往下执行,这个时候就要准备开始启动FE了。同样,启动FE时,需要初始化一些操作。初始化的时候,主要是检查了FE的启动IP,是不是一个合法的IP。这里需要注意的就是,我们在配置文件中配置的CIDR或者FQDN的配置,在初始化的时候会检测。很多小伙伴在启动FE的时候,没有正确配置IP的时候,最后用了localhost或者本地回环IP启动,导致没有使用我们想要的IP启动,具体的判断逻辑就是在这:
在这里插入图片描述
上面的逻辑看,初始化的时候会遍历网卡信息,拿遍历的IP地址和填写的PRIORITY_CIDR_SEPARATOR的值做匹配,匹配上了,就会用处于填写的CIDR范围中的ip启动,匹配不上的时候会从网卡IP中拿出一个合法的IP作为FE的启动IP,这个就不一定是我们想要的那个启动IP。特别是当前机器上有很多虚拟网卡的IP信息,就会很大概率用排在前面的虚拟IP启动。当然,这里还会根据配置文件中的信息,去检查是不是FQDN,是不是IPV6,有兴趣的的同学都可以看一下具体的代码逻辑。Init操作其实就是获取了当前FE的启动IP,获取完IP后,就需要检测端口,看FE的启动的需要的这些端口是否是正常的。
在这里插入图片描述
如上图所示Doris主要提供四个端口:Edit log portHttp portHttps portQuery portRpc port

开始启动

还有一个比较重要的检测,就是需要根据fe.conf中的 enable_bdbje_debug_mode参数的值,来决定怎么启动。这个值主要是某些时候,我们的FE的leader选举出现一定问题,做元数据运维的时候,会走运维模式逻辑。如果是正常情况下,这个值默认是FALSE,就会走后续的正常启动FE的流程。
元数据环境初始化

            // init catalog and wait it be ready
            Env.getCurrentEnv().initialize(args);
            Env.getCurrentEnv().waitForReady();

在这里插入图片描述
0 元数据目录,如果不存在,需要手动创建,这里主要是需要手动创建最外层的metaDir,内层的bdb的目录和image的目录会自己创建。获取本节点host port、获取helper节点 host port【fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java AmbariDeployManager.java K8sDeployManager.java LocalFileDeployManager.java】1 初始化插件管理器,启动审计日志进程。2 根据当前的元数据信息获取集群ID和节点角色信息(ROLE和VERSION文件的判断) 代码较长,只节选了关键代码。具体逻辑可以看一下getClusterIdAndRole 的具体实现,这里主要就是根据指定的helper的节点的元数据信息或者本地存在的元数据信息,获取到集群的ROLE信息和VERSION信息。如果集群是非helper节点且第一次启动,ROLE文件实没有,这个时候需要创建这个文件。同时赋予相关值(ROLE=FOLLOWER),将节点信息写入到元数据文件中。如果当前阶段存在这些元数据文件,则会去元数据文件中获取当前节点的角色信息。

            // ATTN:
            // If the version file and role file does not exist and the helper node is itself, this should be the very beginning startup of the cluster, so we create ROLE and VERSION file, set isFirstTimeStartUp to true, and add itself to frontends list. If ROLE and VERSION file is deleted for some reason, we may arbitrarily start this node as FOLLOWER, which may cause UNDEFINED behavior. Everything may be OK if the origin role is exactly FOLLOWER, but if not, FE process will exit somehow.
            Storage storage = new Storage(this.imageDir);
            if (!roleFile.exists()) { // The very first time to start the first node of the cluster. It should became a Master node (Master node's role is also FOLLOWER, which means electable) For compatibility. Because this is the very first time to start, so we arbitrarily choose a new name for this node
                role = FrontendNodeType.FOLLOWER; nodeName = genFeNodeName(selfNode.getIdent(), selfNode.getPort(), false /* new style */);
                storage.writeFrontendRoleAndNodeName(role, nodeName);
                LOG.info("very first time to start this node. role: {}, node name: {}", role.name(), nodeName);
            } else {
                role = storage.getRole();
                if (role == FrontendNodeType.REPLICA) { // for compatibility
                    role = FrontendNodeType.FOLLOWER;
                }
                nodeName = storage.getNodeName();
                if (Strings.isNullOrEmpty(nodeName)) {
                    // In normal case, if ROLE file exist, role and nodeName should both exist.
                    // But we will get a empty nodeName after upgrading.
                    // So for forward compatibility, we use the "old-style" way of naming: "ip_port",
                    // and update the ROLE file.
                    nodeName = genFeNodeName(selfNode.getHost(), selfNode.getPort(), true/* old style */);
                    storage.writeFrontendRoleAndNodeName(role, nodeName);
                    LOG.info("forward compatibility. role: {}, node name: {}", role.name(), nodeName);
                }
                // Notice:
                // With the introduction of FQDN, the nodeName is no longer bound to an IP address,
                // so consistency is no longer checked here. Otherwise, the startup will fail.
            }

如果我们启动了一个FE,无法从给出的helper节点信息中,同helper节点建立连接,就会出现:current node is not added to the group. please add it first. " + “sleep 5 seconds and retry, current helper nodes: {}”, helperNodes。的日志信息,这个异常原因就是由于当前节点无法和指定的helper节点建立正常的连接信息导致的。当和helper节点构建正常连接后,就会从helper节点同步 VERSION信息。如果本身节点存在VERSIN文件的信息,说明不是第一次启动,这个时候就会用本地的这个文件的元数据信息同HELPER节点的VERSION信息进行比对。主要是比较clusterID。如果不一致,说明两个节点不是同一个集群的节点,启动进程就直接退出了。

            // try to get role and node name from helper node, this loop will not end until we get certain role type and name
            while (true) {
                if (!getFeNodeTypeAndNameFromHelpers()) {
                    LOG.warn("current node is not added to the group. please add it first. sleep 5 seconds and retry, current helper nodes: {}", helperNodes);
                    try { Thread.sleep(5000);
                        continue;
                    } catch (InterruptedException e) {
                        LOG.warn("", e); System.exit(-1);
                    }
                }
                if (role == FrontendNodeType.REPLICA)  // for compatibility
                    role = FrontendNodeType.FOLLOWER;
                break;
            }

            HostInfo rightHelperNode = helperNodes.get(0);
            Storage storage = new Storage(this.imageDir);
            if (roleFile.exists() && (role != storage.getRole() || !nodeName.equals(storage.getNodeName()))  || !roleFile.exists()) {
                storage.writeFrontendRoleAndNodeName(role, nodeName);
            }
            if (!versionFile.exists()) {
                // If the version file doesn't exist, download it from helper node
                if (!getVersionFileFromHelper(rightHelperNode)) {
                    throw new IOException("fail to download version file from " + rightHelperNode.getHost() + " will exit.");
                }

                // NOTE: cluster_id will be init when Storage object is constructed,
                //       so we new one.
                storage = new Storage(this.imageDir);
                clusterId = storage.getClusterID();
                token = storage.getToken();
                if (Strings.isNullOrEmpty(token)) {  token = Config.auth_token;
                }
            } else {
                // If the version file exist, read the cluster id and check the
                // id with helper node to make sure they are identical
                clusterId = storage.getClusterID();
                token = storage.getToken();
                try {
                    String url = "http://" + NetUtils.getHostPortInAccessibleFormat(rightHelperNode.getHost(), Config.http_port) + "/check";
                    HttpURLConnection conn = HttpURLUtil.getConnectionWithNodeIdent(url);
                    conn.setConnectTimeout(2 * 1000);
                    conn.setReadTimeout(2 * 1000);
                    String clusterIdString = conn.getHeaderField(MetaBaseAction.CLUSTER_ID);
                    int remoteClusterId = Integer.parseInt(clusterIdString);
                    if (remoteClusterId != clusterId) {
                        LOG.error("cluster id is not equal with helper node {}. will exit.",
                                rightHelperNode.getHost());
                        throw new IOException(
                                "cluster id is not equal with helper node "
                                        + rightHelperNode.getHost() + ". will exit.");
                    }
                    String remoteToken = conn.getHeaderField(MetaBaseAction.TOKEN);
                    if (token == null && remoteToken != null) {
                        LOG.info("get token from helper node. token={}.", remoteToken);
                        token = remoteToken;
                        storage.writeClusterIdAndToken();
                        storage.reload();
                    }
                    if (Config.enable_token_check) {
                        Preconditions.checkNotNull(token);
                        Preconditions.checkNotNull(remoteToken);
                        if (!token.equals(remoteToken)) {
                            throw new IOException(
                                    "token is not equal with helper node "
                                            + rightHelperNode.getHost() + ". will exit.");
                        }
                    }
                } catch (Exception e) {
                    throw new IOException("fail to check cluster_id and token with helper node.", e);
                }
            }

            getNewImage(rightHelperNode);

3 经过这一步 VERSION和ROLE的元数据信息比对后,确定是同一个集群内的节点,也确定了这个FE的ROLE信息了,就需要从image中同步editlog。editLog为bdbje[Oracle Berkeley DB Java Edition (opens new window)],在 Doris 中,我们使用 bdbje 完成元数据操作日志的持久化、FE 高可用等功能。【就相当于ETCD的Raft共识模块+WAL日志模块的组合】。image file就是内存checkpoint到磁盘上的文件。globalTransactionMgr是全局事务管理器。

        // 3. Load image first and replay edits
        this.editLog = new EditLog(nodeName);
        loadImage(this.imageDir); // load image file
        editLog.open(); // open bdb env
        this.globalTransactionMgr.setEditLog(editLog);
        this.idGenerator.setEditLog(editLog);

456 创建一系列的cleaner 线程和监听线程:

        // 4. create load and export job label cleaner thread
        createLabelCleaner();
        // 5. create txn cleaner thread
        createTxnCleaner();
        // 6. start state listener thread
        createStateListener(); listener.start();
        if (!Config.edit_log_type.equalsIgnoreCase("bdb")) {
            // If not using bdb, we need to notify the FE type transfer manually.
            notifyNewFETypeTransfer(FrontendNodeType.MASTER);
        }
        if (statisticsCleaner != null) {
            statisticsCleaner.start();
        }
        if (statisticsAutoAnalyzer != null) {
            statisticsAutoAnalyzer.start();
        }        

此时启动前初始化工作就做完了。等待catalog信息的同步完成即可进行下一步。

    // wait until FE is ready.
    public void waitForReady() throws InterruptedException {
        long counter = 0;
        while (true) {
            if (isReady()) {
                LOG.info("catalog is ready. FE type: {}", feType);
                break;
            }

            Thread.sleep(100);
            if (counter++ % 20 == 0) {
                LOG.info("wait catalog to be ready. FE type: {}. is ready: {}, counter: {}", feType, isReady.get(),
                        counter);
            }
        }
    }

启动FE的SERVER

创建 QeServer ,负责与mysql client 通信;创建 FeServer ,由Thrift Server组成,负责 FE 和 BE 通信;创建 HttpServer ,负责提供Rest API以及Doris FE前端页面接口。

            // init and start:
            // 1. HttpServer for HTTP Server
            // 2. FeServer for Thrift Server
            // 3. QeService for MySQL Server
            FeServer feServer = new FeServer(Config.rpc_port);
            feServer.start();

            if (options.enableHttpServer) {
                HttpServer httpServer = new HttpServer();
                httpServer.setPort(Config.http_port);
                httpServer.setHttpsPort(Config.https_port);
                httpServer.setMaxHttpPostSize(Config.jetty_server_max_http_post_size);
                httpServer.setAcceptors(Config.jetty_server_acceptors);
                httpServer.setSelectors(Config.jetty_server_selectors);
                httpServer.setWorkers(Config.jetty_server_workers);
                httpServer.setKeyStorePath(Config.key_store_path);
                httpServer.setKeyStorePassword(Config.key_store_password);
                httpServer.setKeyStoreType(Config.key_store_type);
                httpServer.setKeyStoreAlias(Config.key_store_alias);
                httpServer.setEnableHttps(Config.enable_https);
                httpServer.setMaxThreads(Config.jetty_threadPool_maxThreads);
                httpServer.setMinThreads(Config.jetty_threadPool_minThreads);
                httpServer.setMaxHttpHeaderSize(Config.jetty_server_max_http_header_size);
                httpServer.start();
                Env.getCurrentEnv().setHttpReady(true);
            }

            if (options.enableQeService) {
                QeService qeService = new QeService(Config.query_port, ExecuteEnv.getInstance().getScheduler());
                qeService.start();
            }

            ThreadPoolManager.registerAllThreadPoolMetric();

在这里插入图片描述

Doris 的元数据主要存储4类数据:
用户数据信息。包括数据库、表的 Schema、分片信息等。

各类作业信息。如导入作业,Clone 作业、SchemaChange 作业等。

用户及权限信息。

集群及节点信息

元数据的数据流具体过程如下:
只有 leader FE 可以对元数据进行写操作。写操作在修改 leader 的内存后,会序列化为一条log,按照 key-value 的形式写入 bdbje。其中 key 为连续的整型,作为 log id,value 即为序列化后的操作日志。

日志写入 bdbje 后,bdbje 会根据策略(写多数/全写),将日志复制到其他 non-leader 的 FE 节点。non-leader FE 节点通过对日志回放,修改自身的元数据内存镜像,完成与 leader 节点的元数据同步。

leader 节点的日志条数达到阈值后(默认 10w 条),会启动 checkpoint 线程。checkpoint 会读取已有的 image 文件,和其之后的日志,重新在内存中回放出一份新的元数据镜像副本。然后将该副本写入到磁盘,形成一个新的 image。之所以是重新生成一份镜像副本,而不是将已有镜像写成 image,主要是考虑写 image 加读锁期间,会阻塞写操作。所以每次 checkpoint 会占用双倍内存空间。

image 文件生成后,leader 节点会通知其他 non-leader 节点新的 image 已生成。non-leader 主动通过 http 拉取最新的 image 文件,来更换本地的旧文件。

bdbje 中的日志,在 image 做完后,会定期删除旧的

源码解析
Doris FE启动步骤(只说核心的几个部分):

Doris启动的时候首先去初始化Catalog,并等待Catalog完成

启动QeServer 这个是mysql client连接用的,端口是9030

启动FeServer这个是Thrift Server,主要是FE和BE之间通讯用的

启动HttpServer ,各种rest api接口及前端web界面

这里我们分析的是元数据这块只看Catalog初始化过程中做了什么事情

PaloFe ——> start()
// 初始化Catalog并等待初始化完成
Catalog.getCurrentCatalog().initialize(args);
Catalog.getCurrentCatalog().waitForReady();
Catalog -->initialize()
第一步:获取本节点和Helper节点
getSelfHostPort();
getHelperNodes(args);
第二步:检查和创建元数据目录及文件
第三步:获取集群ID及角色(Observer和Follower)
getClusterIdAndRole();
第四步:首先加载image并回访editlog
this.editLog = new EditLog(nodeName);
loadImage(this.imageDir); // load image file
editLog.open(); // open bdb env
this.globalTransactionMgr.setEditLog(editLog);
this.idGenerator.setEditLog(editLog);
第五步:创建load和导出作业标签清理线程(这是一个MasterDaemon守护线程)
createLabelCleaner()
第六步:创建tnx清理线程
createTxnCleaner();
第七步:启动状态监听线程,这个线程主要是监听Master,Observer、Follower状态转换,及Observer和Follower元数据同步,Leader选举
createStateListener();
listener.start();
Load Job Label清理:createLabelCleaner
//每个label_keep_max_second(默认三天),从idToLoadJob, dbToLoadJobs and dbLabelToLoadJobs删除旧的job,
//包括从ExportMgr删除exportjob, exportJob 默认七天清理一次,控制参数history_job_keep_max_second
//这个线程每个四个小时运行一次,是由label_clean_interval_second参数来控制
public void createLabelCleaner() {
labelCleaner = new MasterDaemon(“LoadLabelCleaner”, Config.label_clean_interval_second * 1000L) {
@Override
protected void runAfterCatalogReady() {
load.removeOldLoadJobs();
loadManager.removeOldLoadJob();
exportMgr.removeOldExportJobs();
}
};
}
事务(tnx)清理线程:createTxnCleaner()
//定期清理过期的事务,默认30秒清理一次,控制参数:transaction_clean_interval_second
//这里清理的是tnx状态是:
//1.已过期:VISIBLE(可见) 或者 ABORTED(终止), 并且 expired(已过期)
//2.已超时:事务状态是:PREPARE, 但是 timeout
//事务状态是:COMMITTED和 VISIBLE状态的不能被清除,只能成功
public void createTxnCleaner() {
txnCleaner = new MasterDaemon(“txnCleaner”, Config.transaction_clean_interval_second) {
@Override
protected void runAfterCatalogReady() {
globalTransactionMgr.removeExpiredAndTimeoutTxns();
}
};
}
FE状态监听器线程 createStateListener()
这个线程主要是监听Master,Observer、Follower状态转换,及Observer和Follower元数据同步,Leader选举

定期检查,默认是100毫秒,参数:STATE_CHANGE_CHECK_INTERVAL_MS


public void createStateListener() {
listener = new Daemon(“stateListener”, STATE_CHANGE_CHECK_INTERVAL_MS) {
@Override
protected synchronized void runOneCycle() {

while (true) {
FrontendNodeType newType = null;
try {
newType = typeTransferQueue.take();
} catch (InterruptedException e) {
LOG.error(“got exception when take FE type from queue”, e);
Util.stdoutWithTime("got exception when take FE type from queue. " + e.getMessage());
System.exit(-1);
}
Preconditions.checkNotNull(newType);
LOG.info(“begin to transfer FE type from {} to {}”, feType, newType);
if (feType == newType) {
return;
}

/*
* INIT -> MASTER: transferToMaster
* INIT -> FOLLOWER/OBSERVER: transferToNonMaster
* UNKNOWN -> MASTER: transferToMaster
* UNKNOWN -> FOLLOWER/OBSERVER: transferToNonMaster
* FOLLOWER -> MASTER: transferToMaster
* FOLLOWER/OBSERVER -> INIT/UNKNOWN: set isReady to false
*/
switch (feType) {
case INIT: {
switch (newType) {
case MASTER: {
transferToMaster();
break;
}
case FOLLOWER:
case OBSERVER: {
transferToNonMaster(newType);
break;
}
case UNKNOWN:
break;
default:
break;
}
break;
}
case UNKNOWN: {
switch (newType) {
case MASTER: {
transferToMaster();
break;
}
case FOLLOWER:
case OBSERVER: {
transferToNonMaster(newType);
break;
}
default:
break;
}
break;
}
case FOLLOWER: {
switch (newType) {
case MASTER: {
transferToMaster();
break;
}
case UNKNOWN: {
transferToNonMaster(newType);
break;
}
default:
break;
}
break;
}
case OBSERVER: {
switch (newType) {
case UNKNOWN: {
transferToNonMaster(newType);
break;
}
default:
break;
}
break;
}
case MASTER: {
// exit if master changed to any other type
String msg = "transfer FE type from MASTER to " + newType.name() + “. exit”;
LOG.error(msg);
Util.stdoutWithTime(msg);
System.exit(-1);
}
default:
break;
} // end switch formerFeType

feType = newType;
LOG.info(“finished to transfer FE type to {}”, feType);
}
} // end runOneCycle
};

listener.setMetaContext(metaContext);
}
Leader的选举通过:

transferToNonMaster和transferToMaster

元数据同步方法: startMasterOnlyDaemonThreads,这个方法是启动Checkpoint守护线程,由Master定期朝各个Follower和Observer推送image,然后在有节点本地做Image回放,更新自己本节点的元数据,这个线程只在Master节点启动 startNonMasterDaemonThreads 启动其他守护线程在所有FE节点启动,这里包括TabletStatMgr、LabelCleaner、EsRepository、DomainResolver

private void transferToNonMaster(FrontendNodeType newType) {
isReady.set(false);
if (feType == FrontendNodeType.OBSERVER || feType == FrontendNodeType.FOLLOWER) {
Preconditions.checkState(newType == FrontendNodeType.UNKNOWN);
LOG.warn(“{} to UNKNOWN, still offer read service”, feType.name());
// not set canRead here, leave canRead as what is was.
// if meta out of date, canRead will be set to false in replayer thread.
metaReplayState.setTransferToUnknown();
return;
}

// transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER
// add helper sockets
if (Config.edit_log_type.equalsIgnoreCase(“BDB”)) {
for (Frontend fe : frontends.values()) {
if (fe.getRole() == FrontendNodeType.FOLLOWER || fe.getRole() == FrontendNodeType.REPLICA) {
((BDBHA) getHaProtocol()).addHelperSocket(fe.getHost(), fe.getEditLogPort());
}
}
}

if (replayer == null) {
//创建回放线程
createReplayer();
replayer.start();
}

// ‘isReady’ will be set to true in ‘setCanRead()’ method
fixBugAfterMetadataReplayed(true);

     startNonMasterDaemonThreads();


MetricRepo.init();
}

创建editlog回放守护线程,这里主要是将Master推送的Image日志信息在本地进行回访,写到editlog中

public void createReplayer() {
replayer = new Daemon(“replayer”, REPLAY_INTERVAL_MS) {
@Override
protected void runOneCycle() {
boolean err = false;
boolean hasLog = false;
try {
//进行image回放,重写本地editlog
hasLog = replayJournal(-1);
metaReplayState.setOk();
} catch (InsufficientLogException insufficientLogEx) {
// 从以下成员中复制丢失的日志文件:拥有文件的复制组
LOG.error(“catch insufficient log exception. please restart.”, insufficientLogEx);
NetworkRestore restore = new NetworkRestore();
NetworkRestoreConfig config = new NetworkRestoreConfig();
config.setRetainLogFiles(false);
restore.execute(insufficientLogEx, config);
System.exit(-1);
} catch (Throwable e) {
LOG.error(“replayer thread catch an exception when replay journal.”, e);
metaReplayState.setException(e);
try {
Thread.sleep(5000);
} catch (InterruptedException e1) {
LOG.error("sleep got exception. ", e);
}
err = true;
}

setCanRead(hasLog, err);
}
};
replayer.setMetaContext(metaContext);
}
日志回放,重写本地editlog


public synchronized boolean replayJournal(long toJournalId) {
long newToJournalId = toJournalId;
if (newToJournalId == -1) {
newToJournalId = getMaxJournalId();
}
if (newToJournalId <= replayedJournalId.get()) {
return false;
}

LOG.info(“replayed journal id is {}, replay to journal id is {}”, replayedJournalId, newToJournalId);
JournalCursor cursor = editLog.read(replayedJournalId.get() + 1, newToJournalId);
if (cursor == null) {
LOG.warn(“failed to get cursor from {} to {}”, replayedJournalId.get() + 1, newToJournalId);
return false;
}

long startTime = System.currentTimeMillis();
boolean hasLog = false;
while (true) {
JournalEntity entity = cursor.next();
if (entity == null) {
break;
}
hasLog = true;
//生成新的editlog
EditLog.loadJournal(this, entity);
replayedJournalId.incrementAndGet();
LOG.debug(“journal {} replayed.”, replayedJournalId);
if (feType != FrontendNodeType.MASTER) {
journalObservable.notifyObservers(replayedJournalId.get());
}
if (MetricRepo.isInit) {
// Metric repo may not init after this replay thread start
MetricRepo.COUNTER_EDIT_LOG_READ.increase(1L);
}
}
long cost = System.currentTimeMillis() - startTime;
if (cost >= 1000) {
LOG.warn(“replay journal cost too much time: {} replayedJournalId: {}”, cost, replayedJournalId);
}

return hasLog;
}
只有角色为 Master 的 FE 才会主动定期生成 image 文件。每次生成完后,都会推送给其他非 Master 角色的 FE。当确认其他所有 FE 都收到这个 image 后,Master FE 会删除 bdbje 中旧的元数据 journal。所以,如果 image 生成失败,或者 image 推送给其他 FE 失败时,都会导致 bdbje 中的数据不断累积。

在Master节点日志中搜索你可以看到下面这个日志,一分钟一次

2021-04-16 08:34:34,554 INFO (leaderCheckpointer|72) [BDBJEJournal.getFinalizedJournalId():410] database names: 52491702
2021-04-16 08:34:34,554 INFO (leaderCheckpointer|72) [Checkpoint.runAfterCatalogReady():81] checkpoint imageVersion 52491701, checkPointVersion 0

CheckPoint线程的启动只在Master Fe节点,在Catalog.startMasterOnlyDaemonThreads方法里启动的

在这里startMasterOnlyDaemonThreads方法里会在Master Fe 节点启动一个 TimePrinter 线程。该线程会定期向 bdbje 中写入一个当前时间的 key-value 条目。其余 non-leader 节点通过回放这条日志,读取日志中记录的时间,和本地时间进行比较,如果发现和本地时间的落后大于指定的阈值(配置项:meta_delay_toleration_second。写入间隔为该配置项的一半),则该节点会处于不可读的状态,当查询或者load等任务落到这节点的时候会报:failed to call frontend service异常。此机制解决了 non-leader 节点在长时间和 leader 失联后,仍然提供过期的元数据服务的问题。

所以这里整个集群是需要做NTP时间同步,保持各个节点时间一致,避免因为时间差异造成的服务不可用

// start all daemon threads only running on Master
private void startMasterOnlyDaemonThreads() {
// start checkpoint thread
checkpointer = new Checkpoint(editLog);
checkpointer.setMetaContext(metaContext);
// set “checkpointThreadId” before the checkpoint thread start, because the thread
// need to check the “checkpointThreadId” when running.
checkpointThreadId = checkpointer.getId();

checkpointer.start();

// time printer
createTimePrinter();
timePrinter.start();

updateDbUsedDataQuotaDaemon.start();
}
CheckPoint线程启动以后会定期向非Master FE推送Image日志信息,默认是一分钟,配置参数:checkpoint_interval_second

具体方法:runAfterCatalogReady

Master FE定期向非Master FE推送image日志信息

删除旧的journals:获取每个非Master节点的当前journal ID。 删除bdb数据库时,不能删除比任何非Master节点的当前journal ID 更新的的db。 否则此滞后节点将永远无法获取已删除的journal。

最后删除旧的image文件

     // push image file to all the other non master nodes
     // DO NOT get other nodes from HaProtocol, because node may not in bdbje replication group yet.
     List<Frontend> allFrontends = Catalog.getServingCatalog().getFrontends(null);
     int successPushed = 0;
     int otherNodesCount = 0;
     if (!allFrontends.isEmpty()) {
         otherNodesCount = allFrontends.size() - 1; // skip master itself
         for (Frontend fe : allFrontends) {
             String host = fe.getHost();
             if (host.equals(Catalog.getServingCatalog().getMasterIp())) {
                 // skip master itself
                 continue;
             }
             int port = Config.http_port;
             
             String url = "http://" + host + ":" + port + "/put?version=" + replayedJournalId
                     + "&port=" + port;
             LOG.info("Put image:{}", url);


try {
MetaHelper.getRemoteFile(url, PUT_TIMEOUT_SECOND * 1000, new NullOutputStream());
successPushed++;
} catch (IOException e) {
LOG.error(“Exception when pushing image file. url = {}”, url, e);
}
}

         LOG.info("push image.{} to other nodes. totally {} nodes, push succeed {} nodes",
                  replayedJournalId, otherNodesCount, successPushed);
     }
     
     // Delete old journals
     if (successPushed == otherNodesCount) {
         long minOtherNodesJournalId = Long.MAX_VALUE;
         long deleteVersion = checkPointVersion;
         if (successPushed > 0) {
             for (Frontend fe : allFrontends) {
                 String host = fe.getHost();
                 if (host.equals(Catalog.getServingCatalog().getMasterIp())) {
                     // skip master itself
                     continue;
                 }
                 int port = Config.http_port;
                 URL idURL;
                 HttpURLConnection conn = null;
                 try {
                     /*
                      * get current replayed journal id of each non-master nodes.
                      * when we delete bdb database, we cannot delete db newer than
                      * any non-master node's current replayed journal id. otherwise,
                      * this lagging node can never get the deleted journal.
                      */
                     idURL = new URL("http://" + host + ":" + port + "/journal_id");
                     conn = (HttpURLConnection) idURL.openConnection();
                     conn.setConnectTimeout(CONNECT_TIMEOUT_SECOND * 1000);
                     conn.setReadTimeout(READ_TIMEOUT_SECOND * 1000);
                     String idString = conn.getHeaderField("id");
                     long id = Long.parseLong(idString);
                     if (minOtherNodesJournalId > id) {
                         minOtherNodesJournalId = id;
                     }
                 } catch (IOException e) {
                     LOG.error("Exception when getting current replayed journal id. host={}, port={}",
                             host, port, e);
                     minOtherNodesJournalId = 0;
                     break;
                 } finally {
                     if (conn != null) {
                         conn.disconnect();
                     }
                 }
             }
             deleteVersion = Math.min(minOtherNodesJournalId, checkPointVersion);
         }
         //删除旧的Journal
         editLog.deleteJournals(deleteVersion + 1);
         if (MetricRepo.isInit) {
             MetricRepo.COUNTER_IMAGE_PUSH.increase(1L);
         }
         LOG.info("journals <= {} are deleted. image version {}, other nodes min version {}", 
                  deleteVersion, checkPointVersion, minOtherNodesJournalId);
     }
     
     //删除旧的image文件
     MetaCleaner cleaner = new MetaCleaner(Config.meta_dir + "/image");
     try {
         cleaner.clean();
     } catch (IOException e) {
         LOG.error("Master delete old image file fail.", e);
     }

https://new-developer.aliyun.com/article/1124025
https://blog.csdn.net/flyinthesky111/article/details/131281581
https://blog.csdn.net/qq_42200605/article/details/124232478
https://blog.csdn.net/hf200012/article/details/117825649
https://www.jianshu.com/p/de2896715e02

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1047249.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

社区团购新零售搭伙拼团小程序源码(前后端)

社区团购新零售搭伙拼团小程序源码是一款非常实用的电商小程序&#xff0c;它包含了前后端文件&#xff0c; 可以快速地进行部署和使用。该小程序是基于微信小程序开发的&#xff0c;支持社区团购、新零售、搭伙拼团等多种功能。 该小程序具有良好的用户体验&#xff0c;包括…

范数Norm-衡量向量大小的方法

性质 非负性: 范数的值总是非负的,且当且仅当向量全为零时,范数的值为零。 齐次性: 对于任意实数α,有 三角不等式: 对于任意向量x和y,有 常见范数 L1: 向量所有元素绝对值的和,权重稀疏 L2:欧几里得范数,权重平滑 无穷范数:表示向量中最大的元素 为什么使用范…

英飞凌 Tricore 架构中断系统详解

本文以TC3系列MCU为例&#xff0c;先来了解中断源是如何产生的&#xff0c;再看一下CPU是如何处理中断源的。 AURIX TC3XX的中断路由模块 Interrupt Router (IR) 在TC3中&#xff0c;中断既可以被CPU处理&#xff0c;也可以被DMA处理&#xff0c;所以手册中不再把中断称为中断…

vue3硅谷甄选02 | 封装svg组件 - axios二次封装

文章目录 vue3硅谷甄选02功能1&#xff1a;封装svg组件SVG图标配置svg封装成组件svg组件注册为全局组件自定义统一注册全局组件的插件自定义插件的原理插件的使用 app.use(plugin, [options]) 功能2&#xff1a;axios二次封装使用mock插件构造数据axios二次封装api接口统一管理…

【大数据开发技术】实验01-Hadoop安装部署

文章目录 Hadoop安装部署一、实验目标二、实验要求三、实验内容四、实验步骤附&#xff1a;系列文章 Hadoop安装部署 虚拟机数量&#xff1a;3 系统版本&#xff1a;Centos 7.5 Hadoop版本&#xff1a; Apache Hadoop 2.7.3 主节点信息&#xff1a; 操作系统&#xff1a;Cen…

Tomcat 与 JDK 对应版本关系

对应关系 Tomcat版本 jdk版本11.0.x JDK 21及以后10.1.x JDK11及以后10.0.xJDK1.8及以后9.0.x JDK1.8及以后8.5.xJDK1.7及以后8.0.x JDK1.7及以后 查看对应关系方法&#xff1a; 登陆Tomcat官网&#xff1a;Apache Tomcat - Welcome! 结果&#xff1a;

Arthas:Java调试利器使用

Arthas:Java调试利器使用 1. Arthas是什么2. Arthas可以解决什么问题Arthas启动方式1. jar启动2. 在线安装 远程连接命令使用- 退出threadclassloaderscsm watchtrace修改日志级别 1. Arthas是什么 Arthas(阿尔萨斯)是阿里开源的一个Java在线分析诊断工具. 2. Arthas可以解决…

C#(CSharp)入门实践项目(简易回合制游戏)

项目名称 木木夕营救公主 项目介绍 这是一个小游戏&#xff0c;你将扮演一个英雄&#xff08;木木夕&#xff09;&#xff0c;去打败恶龙&#xff0c;拯救出公主&#xff0c;该项目采用回合制战斗模式&#xff0c;由于角色的血量和攻击为随机数&#xff0c;所以需要靠运气才…

YOLOv7改进:CBAM注意力机制

目录 1.介绍 1.1、论文的出发点 1.2、论文的主要工作 1.3、CBAM模块的具体介绍 2.YOLOv7改进 2.1yaml 配置文件如下 2.2common.py配置 2.3yolo.py配置 1.介绍 1.1、论文的出发点 cnn基于其丰富的表征能力&#xff0c;极大地推动了视觉任务的完成&#xff0c;为了提高…

【MySql】3- 实践篇(一)

文章目录 1. 普通索引和唯一索引的选择1.1 查询过程1.2 更新过程1.2.1 change buffer1.2.2 change buffer 的使用场景 1.3 索引选择和实践1.4 change buffer 和 redo log2. MySQL为何有时会选错索引?2.1 优化器的逻辑2.1.1 扫描行数是怎么判断的?2.1.2 重新统计索引信息 2.2 …

一站式吃鸡利器,提升游戏战斗力,助您稳坐鸡王宝座!

各位吃鸡玩家们&#xff0c;听说过绝地求生作图工具吗&#xff1f;想知道如何提高游戏战斗力、分享顶级作战干货、查询装备皮肤库存&#xff1f;还在为游戏账号安全而担心吗&#xff1f;别急&#xff0c;今天就为您介绍一款一站式吃鸡利器&#xff0c;满足您的所有需求&#xf…

【使用工具】IDEA创建类及已有类添加注释-详细操作

1.背景 很多开发好多时候其实不太会给类添加注释&#xff0c;尤其是已经有的类&#xff0c;上网查询&#xff0c;好多文档错误百出&#xff0c;而且不全 2.正文 2.1新建类添加注释 idea给新建类创建注释有两种方式 先写一个简单的模板 /** * description: TODO * autho…

kotlin协程CoroutineScope Dispatchers.IO launch 线程Id

kotlin协程CoroutineScope Dispatchers.IO launch 线程Id import kotlinx.coroutines.*fun main(args: Array<String>) {println("main 线程id:${Thread.currentThread().threadId()}")CoroutineScope(Dispatchers.IO).launch {println("launch 线程id:$…

【JVM】第二篇 JVM内存模型深度剖析与优化

目录 一. JDK体系结构与跨平台特性介绍二. JVM内存模型深度剖析三. 从Jvisualvm来研究下对象内存流转模型四. GC Root与STW机制五. JVM参数设置通用模型一. JDK体系结构与跨平台特性介绍 二. JVM内存模型深度剖析 按照线程是否共享来划分 TLAB(Thread Local Allocation Buffer…

mybatis核心组件

title: “mybatis核心组件” createTime: 2021-12-08T12:19:5708:00 updateTime: 2021-12-08T12:19:5708:00 draft: false author: “ggball” tags: [“mybatis”] categories: [“java”] description: “mybatis核心组件” #mermaid-svg-AYu4pQutsPsK0P5T {font-family:&quo…

stm32 - 初识2

stm32 - 初识2 工程架构点灯程序寄存器方式点灯库函数的方式点灯 工程架构 启动文件 中断向量表&#xff0c;中断服务函数&#xff0c;其他中断等 中断服务函数中的&#xff0c;复位中断是整个程序的入口&#xff0c;调用systeminit&#xff0c;和main函数 点灯程序 寄存器方式…

自适应阈值分割-OTSU

OTSU 在前面固定阈值中选取了一个阈值为127进行阈值分割&#xff0c;那如何知道选的这个阈值效果好不好呢&#xff1f;答案是&#xff1a;不断尝试&#xff0c;所以这种方法在很多文献中都被称为经验阈值。 Otsu阈值法就提供了一种自动高效的二值化方法。Otsu算法也称最大类间…

C++之std::atomic类模板原子操作应用总结(二百三十九)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…

JAVA+SpringBoot+VUE工厂车间管理系统(含论文)源码

springboot169基于vue的工厂车间管理系统的设计录像(毕业设计jdz2023) 一、源码描述 JAVASpringBootVUE工厂车间管理系统,包含源码数据库论文等,含MySQL脚本&#xff0c;基于B/S和Web开发的&#xff0c;感兴趣的朋友可以下载看看 二、功能介绍 1、个人中心 2、人员管理 3、设备…

计算机图像处理:图像轮廓

图像轮廓 图像阈值分割主要是针对图片的背景和前景进行分离&#xff0c;而图像轮廓也是图像中非常重要的一个特征信息&#xff0c;通过对图像轮廓的操作&#xff0c;就能获取目标图像的大小、位置、方向等信息。画出图像轮廓的基本思路是&#xff1a;先用阈值分割划分为两类图…