深度了解flink(十) JobManager(4) ResourceManager HA

news2025/1/16 17:53:08

ResourceManager(ZK模式)的高可用启动流程

ResourceManager启动流程在DefaultDispatcherResourceManagerComponentFactory#create

public DispatcherResourceManagerComponent create(
            Configuration configuration,
            ResourceID resourceId,
            Executor ioExecutor,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            BlobServer blobServer,
            HeartbeatServices heartbeatServices,
            DelegationTokenManager delegationTokenManager,
            MetricRegistry metricRegistry,
            ExecutionGraphInfoStore executionGraphInfoStore,
            MetricQueryServiceRetriever metricQueryServiceRetriever,
            Collection<FailureEnricher> failureEnrichers,
            FatalErrorHandler fatalErrorHandler)
            throws Exception {
        //resourcemanager的选举服务
        LeaderRetrievalService resourceManagerRetrievalService = null;
        ResourceManagerService resourceManagerService = null;

        try {
            //resourceManager leader 获取服务
            resourceManagerRetrievalService =
                    highAvailabilityServices.getResourceManagerLeaderRetriever();
            //LeaderGatewayRetriever实现了LeaderRetrievalListener接口
            final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever =
                    new RpcGatewayRetriever<>(
                            rpcService,
                            ResourceManagerGateway.class,
                            ResourceManagerId::fromUuid,
                            new ExponentialBackoffRetryStrategy(
                                    12, Duration.ofMillis(10), Duration.ofMillis(50)));

            
            resourceManagerService =
                    ResourceManagerServiceImpl.create(
                            resourceManagerFactory,
                            configuration,
                            resourceId,
                            rpcService,
                            highAvailabilityServices,
                            heartbeatServices,
                            delegationTokenManager,
                            fatalErrorHandler,
                            new ClusterInformation(hostname, blobServer.getPort()),
                            webMonitorEndpoint.getRestBaseUrl(),
                            metricRegistry,
                            hostname,
                            ioExecutor);

            //启动resourceManagerService start方法会调用具体的选举方法
            resourceManagerService.start();
            //启动etrievalService服务的start方法,
            resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);

            return new DispatcherResourceManagerComponent(
                    dispatcherRunner,
                    resourceManagerService,
                    dispatcherLeaderRetrievalService,
                    resourceManagerRetrievalService,
                    webMonitorEndpoint,
                    fatalErrorHandler,
                    dispatcherOperationCaches);
        } catch (Exception exception) {
            //省略无关代码
        }
    }

初始化LeaderRetrievalServer

resourceManagerRetrievalService =
                    highAvailabilityServices.getResourceManagerLeaderRetriever();

getResourceManagerLeaderRetriever会调用父类AbstractHaServices的这个方法

 @Override
    public LeaderRetrievalService getResourceManagerLeaderRetriever() {
        return createLeaderRetrievalService(getLeaderPathForResourceManager());
    }

createLeaderRetrievalService会调用具体子类的方法,zk模式下会走到ZooKeeperLeaderElectionHaServices

    @Override
    protected LeaderRetrievalService createLeaderRetrievalService(String componentId) {
        // Maybe use a single service for leader retrieval
        return ZooKeeperUtils.createLeaderRetrievalService(
                curatorFrameworkWrapper.asCuratorFramework(),
                ZooKeeperUtils.getLeaderPath(componentId),
                configuration);
    }

继续方法跟进

public static DefaultLeaderRetrievalService createLeaderRetrievalService(
        final CuratorFramework client, final String path, final Configuration configuration) {
    return new DefaultLeaderRetrievalService(
            createLeaderRetrievalDriverFactory(client, path, configuration));

最终调用DefaultLeaderRetrievalService的构造方法进行初始化,入参是一个LeaderRetrievalDriverFactory接口,zk模式下是ZooKeeperLeaderRetrievalDriverFactory

初始化LeaderRetrievalListener

            final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever =
                    new RpcGatewayRetriever<>(
                            rpcService,
                            DispatcherGateway.class,
                            DispatcherId::fromUuid,
                            new ExponentialBackoffRetryStrategy(
                                    12, Duration.ofMillis(10), Duration.ofMillis(50)));

RpcGatewayRetrieverUML类图如下

LeaderGatewayRetriever#notifyNewLeaderAddressLeaderRetrievalListener#notifyLeaderAddress做了实现

    @Override
    public void notifyNewLeaderAddress(
            CompletableFuture<Tuple2<String, UUID>> newLeaderAddressFuture) {
        final CompletableFuture<T> newGatewayFuture = createGateway(newLeaderAddressFuture);

        final CompletableFuture<T> oldGatewayFuture =
                atomicGatewayFuture.getAndSet(newGatewayFuture);

        newGatewayFuture.whenComplete(
                (t, throwable) -> {
                    if (throwable != null) {
                        oldGatewayFuture.completeExceptionally(throwable);
                    } else {
                        oldGatewayFuture.complete(t);
                    }
                });
    }

创建ResourceManagerService

 resourceManagerService =
                    ResourceManagerServiceImpl.create(
                            resourceManagerFactory,
                            configuration,
                            resourceId,
                            rpcService,
                            highAvailabilityServices,
                            heartbeatServices,
                            delegationTokenManager,
                            fatalErrorHandler,
                            new ClusterInformation(hostname, blobServer.getPort()),
                            webMonitorEndpoint.getRestBaseUrl(),
                            metricRegistry,
                            hostname,
                            ioExecutor);

ResourceManagerService负责管理ResourceManager的生命周期,也负责haServer的启动

创建LeaderElection
    private ResourceManagerServiceImpl(
            ResourceManagerFactory<?> resourceManagerFactory,
            ResourceManagerProcessContext rmProcessContext)
            throws Exception {
        //省略其他方法
        //LeaderElection进行了初始化
        this.leaderElection =
                rmProcessContext.getHighAvailabilityServices().getResourceManagerLeaderElection();
    
LeaderElection选举
 resourceManagerService.start();

跳转到start方法

  public void start() throws Exception {
        synchronized (lock) {
            if (running) {
                LOG.debug("Resource manager service has already started.");
                return;
            }
            running = true;
        }

        LOG.info("Starting resource manager service.");
        
        leaderElection.startLeaderElection(this);
    }

leaderElection.startLeaderElection(this)zk模式下走的是DefaultLeaderElection的方法,

    @Override
    public void startLeaderElection(LeaderContender contender) throws Exception {
        Preconditions.checkNotNull(contender);
        parentService.register(componentId, contender);
    }

开始对参选者进行注册

protected void register(String componentId, LeaderContender contender) throws Exception {
        checkNotNull(componentId, "componentId must not be null.");
        checkNotNull(contender, "Contender must not be null.");

        synchronized (lock) {
           
            if (leaderElectionDriver == null) {
                createLeaderElectionDriver();
            }

        //省略无关代码
    }

register会判断选举的driver是否存在,如果不存在,则根据高可用的模式进行选举驱动的创建

 @Override
    public ZooKeeperLeaderElectionDriver create(
            LeaderElectionDriver.Listener leaderElectionListener) throws Exception {
        return new ZooKeeperLeaderElectionDriver(curatorFramework, leaderElectionListener);
    }
ZooKeeperLeaderElectionDriver初始化
public ZooKeeperLeaderElectionDriver(
            CuratorFramework curatorFramework, LeaderElectionDriver.Listener leaderElectionListener)
            throws Exception {
        //参数校验
        this.curatorFramework = Preconditions.checkNotNull(curatorFramework);
        this.leaderElectionListener = Preconditions.checkNotNull(leaderElectionListener);

        //使用ZooKeeperUtils.generateLeaderLatchPath方法基于curatorFramework的命名空间生成一个ZooKeeper节点路径,该路径通常用于领导者选举的锁。
        this.leaderLatchPath =
                ZooKeeperUtils.generateLeaderLatchPath(curatorFramework.getNamespace());
        //使用CuratorFramework和之前生成的路径创建一个LeaderLatch实例。LeaderLatch是Curator提供的一个领导者选举实现。    
        this.leaderLatch = new LeaderLatch(curatorFramework, ZooKeeperUtils.getLeaderLatchPath());
        //使用ZooKeeperUtils.createTreeCache方法创建一个TreeCache实例,用于监听ZooKeeper中特定路径(这里是根路径"/")下的节点变化
        this.treeCache =
                ZooKeeperUtils.createTreeCache(
                        curatorFramework,
                        "/",
                        new ZooKeeperLeaderElectionDriver.ConnectionInfoNodeSelector());

        treeCache
                .getListenable()
                .addListener(
                        (client, event) -> {
                            switch (event.getType()) {
                                case NODE_ADDED:
                                case NODE_UPDATED:
                                    Preconditions.checkNotNull(
                                            event.getData(),
                                            "The ZooKeeper event data must not be null.");
                                    handleChangedLeaderInformation(event.getData());
                                    break;
                                case NODE_REMOVED:
                                    Preconditions.checkNotNull(
                                            event.getData(),
                                            "The ZooKeeper event data must not be null.");
                                    handleRemovedLeaderInformation(event.getData().getPath());
                                    break;
                            }
                        });

        leaderLatch.addListener(this);
        curatorFramework.getConnectionStateListenable().addListener(listener);
        leaderLatch.start();
        treeCache.start();
    }
ZooKeeperLeaderElectionDriver.handleStateChange

状态变化时候根据不同状态打印日志

private void handleStateChange(ConnectionState newState) {
        switch (newState) {
            case CONNECTED:
                LOG.debug("Connected to ZooKeeper quorum. Leader election can start.");
                break;
            case SUSPENDED:
                LOG.warn("Connection to ZooKeeper suspended, waiting for reconnection.");
                break;
            case RECONNECTED:
                LOG.info(
                        "Connection to ZooKeeper was reconnected. Leader election can be restarted.");
                break;
            case LOST:
                // Maybe we have to throw an exception here to terminate the JobManager
                LOG.warn(
                        "Connection to ZooKeeper lost. None of the contenders participates in the leader election anymore.");
                break;
        }
    }
DefaultLeaderElectionService.notifyLeaderContenderOfLeadership
    private void notifyLeaderContenderOfLeadership(String componentId, UUID sessionID) {
        if (!leaderContenderRegistry.containsKey(componentId)) {
            LOG.debug(
                    "The grant leadership notification for session ID {} is not forwarded because the DefaultLeaderElectionService ({}) has no contender registered.",
                    sessionID,
                    leaderElectionDriver);
            return;
        } else if (!sessionID.equals(issuedLeaderSessionID)) {
            LOG.debug(
                    "An out-dated leadership-acquired event with session ID {} was triggered. The current leader session ID is {}. The event will be ignored.",
                    sessionID,
                    issuedLeaderSessionID);
            return;
        }

        Preconditions.checkState(
                !confirmedLeaderInformation.hasLeaderInformation(componentId),
                "The leadership should have been granted while not having the leadership acquired.");

        LOG.debug(
                "Granting leadership to the contender registered under component '{}' with session ID {}.",
                componentId,
                issuedLeaderSessionID);

        leaderContenderRegistry.get(componentId).grantLeadership(issuedLeaderSessionID);
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2231925.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux系统编程——信号的基本概念(信号产生于处理、可靠信号、可重入函数、SIGCHLD)

一、什么是信号 1、信号的定义 信号是UNIX和Linux系统响应某些条件而产生的一个事件&#xff0c;接收到该信号的进程会相应地采取一些行动。信号是软中断&#xff0c;通常信号是由一个错误产生的。但它们还可以作为进程间通信或修改行为的一种方式&#xff0c;明确地由一个进程…

节省50%人工录入时间!免费开源AI工具让法律文件数据提取更高效

法律行业痛点&#xff1a;处理大量的合同、诉讼材料和财务报告等文件是一项繁琐且耗时的工作。这些文件中的表格常包含关键信息&#xff0c;如费用清单、时效统计和条款列表等&#xff0c;手动录入和整理这些数据不仅效率低下&#xff0c;而且容易出错。表格识别技术&#xff0…

单智能体carla强化学习实战工程介绍

有三个工程&#xff1a; Ray_Carla: 因为有的论文用多进程训练强化学习&#xff0c;包括ray分布式框架等&#xff0c;这里直接放了一个ray框架的示例代码&#xff0c;是用sac搭建的&#xff0c;obs没用图像&#xff0c;是数值状态向量值&#xff08;速度那些&#xff09;。 …

消息队列面试——打破沙锅问到底

消息队列的面试连环炮 前言 你用过消息队列么&#xff1f;说说你们项目里是怎么用消息队列的&#xff1f; 我们有一个订单系统&#xff0c;订单系统会每次下一个新订单的时候&#xff0c;就会发送一条消息到ActiveMQ里面去&#xff0c;后台有一个库存系统&#xff0c;负责获取…

第02章 MySQL环境搭建

一、MySQL的卸载 如果安装mysql时出现问题&#xff0c;则需要将mysql卸载干净再重新安装。如果卸载不干净&#xff0c;仍然会报错安装不成功。 步骤1&#xff1a;停止MySQL服务 在卸载之前&#xff0c;先停止MySQL8.0的服务。按键盘上的“Ctrl Alt Delete”组合键&#xff0…

向量数据库指南》——解锁多模态RAG应用,引领智能问答新时代!

多模态 RAG 应用:解锁智能问答的新维度 在当今这个信息爆炸的时代,我们每天都需要处理海量的数据,这些数据以多种形式存在,包括文本、图像、音频和视频等。随着人工智能技术的飞速发展,尤其是大型语言模型(LLM)的广泛应用,我们越来越依赖于这些智能系统来理解和回应我…

【MySQL 保姆级教学】 复合查询--超级详细(10)

复合查询 1. 复合查询的作用2. 创建将进行操作的表2.1 员工表 emp2.2 部门表 dept2.3 薪资等级表 3. 基本查询回顾4. 多表查询4.1 多表查询的定义4.2 笛卡尔积4.3 内连接 inner join4.4 交叉连接 cross join4.5 左外连接 left join4.6 右外连接 right join4.7 自连接 5. 子查询…

飞桨首创 FlashMask :加速大模型灵活注意力掩码计算,长序列训练的利器

在 Transformer 类大模型训练任务中&#xff0c;注意力掩码&#xff08;Attention Mask&#xff09;一方面带来了大量的冗余计算&#xff0c;另一方面因其 O ( N 2 ) O(N^2) O(N2)巨大的存储占用导致难以实现长序列场景的高效训练&#xff08;其中 N N N为序列长度&#xff09;…

高电压、真差分信号采集的SAR ADC驱动电路设计

1 简介 本设计展示了一种用于驱动高压 SAR ADC 以实现高压全差分信号数据采集的解决方案。该差分信号可能具有广泛的共模电压范围&#xff0c;具体取决于放大器的电源和输入信号振幅。使用一个通用高压精密放大器来执行差分到单端信号转换&#xff0c;并以最高吞吐量驱动 10V 的…

在VS Code中操作MySQL数据库

【基础篇】 【小白专用24.5.26 已验证】VSCode下载和安装与配置PHP开发环境&#xff08;详细版&#xff09;_vscode php-CSDN博客 ~~~~~~~~~~~~~~~~~~~~~~~~~ 在VS Code中下载插件 Prettier SQL VSCode 和 MySQL : 随后在VS Code中点击Database图标 在连接界面输入MySQL数据库…

Java唯一键实现方案

数据唯一性 1、生成UUID1.1 代码中实现1.2 数据库中实现优点缺点 2、数据库递增主键优点 3、数据库递增序列3.1 创建序列3.2 使用序列优点缺点 在Java项目开发中&#xff0c;对数据的唯一性要求&#xff0c;业务数据入库的时候保持单表只有一条记录&#xff0c;因此对记录中要求…

【MySQL】可重复读级别下基于Next Key Lock解决幻读

昨天读到了一篇文章[1]&#xff0c;里面讲&#xff0c;面试官说mysql的可重复读级别下有解决幻读的方式&#xff0c;最后公布了答案&#xff0c;是在sql后面加for update。这么说倒是没错&#xff0c;但是这种问法给我一种奇怪的感觉&#xff0c;因为for update无论在哪个隔离级…

vscode通过.vscode/launch.json 内置php服务启动thinkphp 应用后无法加载路由解决方法

我们在使用vscode的 .vscode/launch.json Launch built-in server and debug 启动thinkphp应用后默认是未加载thinkphp的路由文件的&#xff0c; 这个就导致了&#xff0c;某些thinkphp的一些url路由无法访问的情况&#xff0c; 如http://0.0.0.0:8000/api/auth.admin/info这…

【canal 中间件】canal 实时监听 binlog

文章目录 一、安装 MySQL1.1 启动 mysql 服务器1.2 开启 Binlog 写入功能1.2.1创建 binlog 配置文件1.2.2 修改配置文件权限1.2.3 挂载配置文件1.2.4 检测 binlog 配置是否成功 1.3 创建账户并授权 二、安装 canal2.1 安装 canal-admin(可选)2.1.1 启动 canal-admin 容器2.1.2 …

在阿里云快速启动Umami玩转网页分析

阿里云计算巢提供了Umami快速部署能力&#xff0c;使用者不需要自己下载代码&#xff0c;不需要自己安装复杂的依赖&#xff0c;不需要了解底层技术&#xff0c;只需要在控制台图形界面点击几下鼠标就可以快速部署并启动Umami&#xff0c;非技术同学也能轻松搞定。 什么是Umam…

【模型学习之路】手写+分析bert

手写分析bert 目录 前言 架构 embeddings Bertmodel 预训练任务 MLM NSP Bert 后话 netron可视化 code2flow可视化 fine tuning 前言 Attention is all you need! 读本文前&#xff0c;建议至少看懂【模型学习之路】手写分析Transformer-CSDN博客。 毕竟Bert是tr…

stm32移植LVGL(LVGL 8.2.0)

目录 1.下载LVGL源码 2.修改LVGL文件夹 (1)文件夹 examples 改动 (2)文件夹 demos 改动 3.最终LVGL文件夹内容 4.软件Keil配置、添加头文件 5.程序配置 6.其它配置参考链接 1.下载LVGL源码 LVGL源码地址&#xff1a;https://github.com/lvgl/lvgl 2.修改LVGL文件夹…

海南华志亿星电子商务有限公司电商服务的璀璨新星

在这个全民直播、短视频带货风起云涌的时代&#xff0c;抖音电商以其独特的魅力成为了众多商家竞相追逐的蓝海市场。而在这片波澜壮阔的商海中&#xff0c;海南华志亿星电子商务有限公司犹如一颗璀璨的新星&#xff0c;以其专业的服务、创新的策略&#xff0c;为无数商家照亮了…

动手学深度学习66 使用注意力机制的seq2seq

1. 使用注意力机制的seq2seq key value等价 是一个东西 第i个词rnn的输出 根据加权的不同&#xff0c;解码器前面用编码器前面的输出&#xff0c;到后面用后面的输出。 2. code 核心代码: context 怎么算 embedding没变&#xff0c;Decoder加了attention层 class Seq2SeqAt…

高校大数据实训平台介绍

高校大数据实验室架构 具体实训平台介绍 编程实训平台 1、大数据开发实训平台 大数据开发实训平台是面向实训课和课后训练的编程实训平台&#xff0c;平台底层基于Docker技术&#xff0c;采用容器云部署方案&#xff0c;预装大数据相关课程教学所需的实训环境…