Flink 高可用原理

news2024/9/25 6:23:33

Flink 高可用原理

Flink JobManager 高可用 加强了 Flink 集群防止 JobManager 故障的能力。 此特性确保了 Flink 集群将始终持续执行你提交的作业。

JobManager 高可用一般概念是指,在任何时候都有 一个领导者 JobManager,如果领导者出现故障,则有多个备用 JobManager 来接管领导。这保证了 不存在单点故障,只要有备用 JobManager 担任领导者,程序就可以继续运行。

Flink 的高可用服务封装了所需的服务,使一切可以正常工作:

  • 领导者选举:从 n 个候选者中选出一个领导者
  • 服务发现:检索当前领导者的地址
  • 状态持久化:继承程序恢复作业所需的持久化状态(JobGraphs、用户代码jar、已完成的检查点)

为了恢复提交的作业,Flink 会持久化元数据和 job 组件。高可用数据将一直保存,直到相应的作业执行成功、被取消或最终失败。当这些情况发生时,将删除所有高可用数据,包括存储在高可用服务中的元数据。

Flink 组件的高可用通过高可用服务选举组件实现的 LeaderContender 实现主从节点的选举,当节点成为 Leader 节点后再启动相关的组件服务,所以使用高可用服务的组件类型是可以通过 LeaderContender 的子类来查看的。

客户端通过监听高可用服务中节点地址信息的变化,再做出相关改变。最常见的 RestClusterClient,它在创建时就会同时创建LeaderRetrievalListener,当它去请求相关组件时,都会从 LeaderRetrievalListener 中获取监听的地址信息再对服务端进行请求。

需要选举的组件作为 LeaderContender 被注入到 LeaderElectionService 中,LeaderElectionService 将作为选举者参加高可用Leader 的选举。

Flink 将在启动时创建 HighAvailabilityServices,HighAvailabilityServices 可以快速的创建 LeaderElectionService 和 LeaderRetrievalService,HighAvailabilityServices 的好处在于所有的选举服务和组件可以共享同一个高可用服务客户端,以及一些资源的清理和关闭。

Zookeeper 高可用

选举和监听

Flink 的 Zookeeper 高可用是通过 Zookeeper 的 LeaderLatch 来保证的,每个节点组件注册一个 LeaderLatch,Zookeeper 内部将会选举节点成为 Leader。

在高可用服务中, Flink 中的 LeaderElectionDriver 将用于与高可用服务通信,LeaderElectionDriver 一般作为竞选组件参加选举,例如当使用 Zookeeper 高可用服务时中,将使用 ZooKeeperLeaderElectionDriver 实现 LeaderElectionDriver 与 curator 框架中的 LeaderLatchListener,它在创建时同时会创建 CuratorFramework 的 LeaderLatch 参加选举。

LeaderElectionDriver 中有个 LeaderElectionDriver.Listener, 它一般用于监听 LeaderElectionDriver 是否成功成为 Leader。在开启高可用的 Flink 中,DefaultLeaderElectionService 一般作为 LeaderElectionDriver.Listener 的实现类,同时也是 LeaderElectionService 的实现类,LeaderElectionService 前面也讲过了,用于调用 LeaderContender 高可用组件实现类。

具体的高可用组件实现类有:

  • WebMonitorEndpoint:Rest 服务器
    • MiniDispatcherRestEndpoint
    • DispatcherRestEndpoint
  • JobMasterServiceLeadershipRunner: 用于启动 JobManager,具体可见[ JobManager创建和启动解析](# JobManager创建和启动解析)
  • ResourceManagerServiceImpl:用于启动 ResourceManager,具体可见[ResourceManager创建和启动解析](# ResourceManager创建和启动解析)
  • DefaultDispatcherRunner:用于启动 Dispatcher,具体可见[ Dispatcher 创建和启动解析](# Dispatcher 创建和启动解析)

每个高可用组件都将在 Zookeeper 上注册相应的节点进行 Leader 竞选,相关的组件路径如下。

/flink
       +/cluster_id_1/leader/latch
       |            |       /resource_manager/connection_info
       |            |       /dispatcher/connection_info
       |            |       /rest_server/connection_info
       |            |       /job-id-1/connection_info
       |            |       /job-id-2/connection_info
       |            |
       |            |
       |            +jobgraphs/job-id-1
       |            |         /job-id-2
       |            +jobs/job-id-1/checkpoints/latest
       |                 |                    /latest-1
       |                 |                    /latest-2
       |                 |        /checkpoint_id_counter

ZooKeeperMultipleComponentLeaderElectionHaServices 在启动时创建,作为管理整个 JobManager 高可用的组件对象。

下面解析组件选举 Leader 和 Leader 监听的整个源码流程:

组件选举流程:

  • 组件开始选举 Leader。

    • 创建Leader竞争者。

      new LeaderContender

    • 通过 leaderName 从高可用服务中获取 DefaultLeaderElectionService 选举服务,leaderName 最后会作为 Zookeeper 节点路径。

      ZooKeeperMultipleComponentLeaderElectionHaServices#createLeaderElectionService

      • 获取或创建单例的LeaderElectionService

        ZooKeeperMultipleComponentLeaderElectionHaServices#getOrInitializeSingleLeaderElectionService

        new DefaultMultipleComponentLeaderElectionService

    • DefaultLeaderElectionService 为 LeaderContender 竞争 Leader

      DefaultLeaderElectionService#start(LeaderContender)

    • DefaultLeaderElectionService 启动和判断是否为 Leader 时都会开始竞争 Leader

      DefaultLeaderElectionService#onGrantLeadership

      • 内部的 LeaderContender 开始竞争 Leader,这里使用 ResourceManagerServiceImpl 来举例

        LeaderContender#onGrantLeadership

        • 当前节点的 ResourceManagerServiceImpl 成为 Leader

          ResourceManagerServiceImpl#grantLeadership

          • 创建 ResourceManager

            ResourceManagerServiceImpl#createResourceManager

          • 启动ResourceManager

            ResourceManagerServiceImpl#startResourceManagerIfIsLeader

          • 如果启动完后没有问题,那么确定ResourceManager为Leader

            DefaultLeaderElectionService#confirmLeadership

            • 验证当前 ResourceManagerServiceImpl 是否为Leader

            • 如果是 Leader,则创建 LeaderInformation Leader 信息,并写入 Zookeeper 中

              MultipleComponentLeaderElectionDriverAdapter#writeLeaderInformation

              • 发布 LeaderInformation

                DefaultMultipleComponentLeaderElectionService#publishLeaderInformation

                ZooKeeperMultipleComponentLeaderElectionDriver#publishLeaderInformation

                • 生成Zookeeper连接信息路径,ResourceManager的信息就是resource_manager/connection_info,默认为ZooKeeperUtils#getComponentPath/connection_info

                  ZooKeeperUtils#generateConnectionInformationPath

                • 将Leader信息写到 Zookeeper 节点中

                  ZooKeeperUtils#writeLeaderInformationToZooKeeper

JobManager Leader 监听流程

  • 根据 high-availability.cluster-id 创建 ClientHighAvailabilityServices

    ClientHighAvailabilityServicesFactory#create

    • 根据 Configuration 创建 Zookeeper 的 CuratorFramework,大部分高可用的参数都在这里面使用到

      ZooKeeperUtils#startCuratorFramework

    • 创建ZooKeeperClientHAServices

      ZooKeeperClientHAServices#init

      • 缓存创建的 CuratorFramework 和 Configuration,用于创建 LeaderRetrievalService
  • 通过高可用服务创建 Leader 监听服务 LeaderRetrievalService

    ClientHighAvailabilityServices#getClusterRestEndpointLeaderRetriever

    • 调用 ZookeeperUtils 创建 DefaultLeaderRetrievalService

      ZooKeeperUtils#createLeaderRetrievalService

      • 创建 ZooKeeperLeaderRetrievalDriverFactory

        ZooKeeperUtils#createLeaderRetrievalDriverFactory

      • 使用 ZooKeeperLeaderRetrievalDriverFactory 创建 DefaultLeaderRetrievalService

  • Leader 节点监听服务注册需要的监听器

    LeaderRetrievalService#start

    • 使用 ZooKeeperLeaderRetrievalDriverFactory 创建 ZooKeeperLeaderRetrievalDriver

      ZooKeeperLeaderRetrievalDriver#ZooKeeperLeaderRetrievalDriver

      • 缓存 LeaderRetrievalService

      • 创建 Zookeeper 的 TreeCache

        • 使用 ZooKeeperLeaderRetrievalDriverFactory 中的 CuratorFramework 和监听路径创建 TreeCache

          ZooKeeperUtils#createTreeCache

        • TreeCache 添加 ZooKeeperLeaderRetrievalDriver 的 retrieveLeaderInformationFromZooKeeper 作为 Leader 节点数据变化后的回调方法,回调时会解析数据,然后会将数据再发送给 LeaderRetrievalService

          ZooKeeperUtils#createTreeCacheListener

          TreeCache#getListenable#addListener

          ZooKeeperLeaderRetrievalDriver#retrieveLeaderInformationFromZooKeeper

      • 启动 TreeCache

        TreeCache#start

      • 将自身的 handleStateChange 方法注册为 CuratorFramework 连接状态改变后的回调方法

        CuratorFramework#getConnectionStateListenable#addListener

        ZooKeeperLeaderRetrievalDriver#handleStateChange

  • Zookeeper Leader 节点变化后通知 ZooKeeperLeaderRetrievalDriver 节点地址改变

    ZooKeeperLeaderRetrievalDriver#retrieveLeaderInformationFromZooKeeper

    • 获取监听节点的当前信息

      TreeCache#getCurrentData

    • 如果存在信息,那么读取信息中的新地址和 SessionId

      LeaderRetrievalEventHandler#notifyLeaderAddress

    • 将地址和 SessionId 或者空信息发送给 LeaderRetrievalEventHandler (LeaderRetrievalService)

      LeaderRetrievalEventHandler#notifyLeaderAddress

      • LeaderRetrievalService 再回调 LeaderRetrievalListener 的 notifyLeaderAddress 方法,将地址和 SessionId 传输过去

        LeaderRetrievalListener#notifyLeaderAddress

        LeaderRetrievalListener 就是各个需要监听 JobManager 地址的客户端监听器

        客户端有: ResourceManagerLeaderListener、LeaderRetriever、JobLeaderIdListener、JobManagerLeaderListener、LeaderConnectingAddressListener 和 LeaderInformationListener

JobManager 结束清理数据

JobManaager 结束后需要清理的 Zookeeper 数据主要有 JobGraph、Job Checkpoint 和 无用的节点

JobGraphStore、CheckpointRecovery

  • 集群停止,停止高可用并清理高可用信息

    HighAvailabilityServices#closeWithOptionalClean

    • 清理高可用信息

      HighAvailabilityServices#cleanupAllData

      • 删除当前 JobManager 在 Zookeeper 上创建的节点以及子节点,也就是 /KaTeX parse error: Expected 'EOF', got '}' at position 38: …eeper.path.root}̲/{high-availability.cluster-id} 节点

        curatorFramework#delete#idempotent#deletingChildrenIfNeeded#forPath("/");

      • 清理无用的 Zookeeper 节点路径

        AbstractHaServices#internalCleanup

        • 获取 NameSpace, 一般为/$high-availability.zookeeper.path.root}
        • 开始循环删除 Namespace 节点以及全部空的父节点,如果节点非空就无法删除
      • 清理 BlobStoreService

        BlobStoreService#cleanupAllData

    • 关闭服务和客户端

      HighAvailabilityServices#close

参考

官方Flink 高可用概念:https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/ha/overview/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2162778.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

失踪人口回归(明天开始继续更新学习内容)

从明天开始继续更新个人学习经验及收获,可能会直接从C入门开始,总结一下C在C的基础上增加的新语法。这篇就当作水贴算了,大家别点赞,留点赞给明天的文章,哈哈 我是安姐的修沟...........

html TAB、table生成

1. 代码 <!DOCTYPE html> <head> <meta charset"UTF-8"> <title>Dynamic Tabs with Table Data</title> <style> /* 简单的样式 */ .tab-content { display: none; border: 10px solid #ccc; padding: 30px; mar…

二叉树的基本概念(上)

文章目录 &#x1f34a;自我介绍&#x1f34a;简介&#x1f34a;树的定义树中的专业术语树的分类 &#x1f34a;二叉树的特性讲解 你的点赞评论就是对博主最大的鼓励 当然喜欢的小伙伴可以&#xff1a;点赞关注评论收藏&#xff08;一键四连&#xff09;哦~ &#x1f34a;自我介…

【Redis入门到精通六】在Spring Boot中集成Redis(含配置和操作演示)

目录 Spring Boot中集成Redis 1.项目创建和环境配置 2.基本操作演示 Spring Boot中集成Redis Spring社区也自定义了一套Redis的客户端&#xff0c;与jedis的操作方式有所差异&#xff0c;Spring中把每个类型的操作都单独封装了起来。下面就让我来带大家了解如何在Spring Boot…

JavaScript动态数据可视化

一、引言 在前端开发中&#xff0c;JavaScript无疑是最核心的技术之一。它能够处理各种交互逻辑&#xff0c;实现复杂的功能。本文将通过一个动态数据可视化的案例&#xff0c;展示如何使用JavaScript实现复杂功能。动态数据可视化能够将大量数据以直观、生动的方式呈现&#…

从 Tesla 的 TTPoE 看资源和算法

特斯拉的 ttpoe 出来有一段时间了&#xff0c;不出所料网上一如既往的一堆 pr 文&#xff0c;大多转译自 演讲 ppt 和 Replacing TCP for Low Latency Applications&#xff0c;看了不下 20 篇中文介绍&#xff0c;基本都是上面这篇文章里的内容&#xff0c;车轱辘话颠来倒去。…

Python网络爬虫获取Wallhaven壁纸图片(源码)

** 话不多说&#xff0c;直接附源码&#xff0c;可运行&#xff01; ** import requests from lxml import etree from fake_useragent import UserAgent import timeclass wallhaven(object):def __init__(self):# yellow# self.url "https://wallhaven.cc/search?co…

【C++篇】手撕 C++ string 类:从零实现到深入剖析的模拟之路

文章目录 C string 类的模拟实现&#xff1a;从构造到高级操作前言第一章&#xff1a;为什么要手写 C string 类&#xff1f;1.1 理由与价值 第二章&#xff1a;实现一个简单的 string 类2.1 基本构造与析构2.1.1 示例代码&#xff1a;基础的 string 类实现2.1.2 解读代码 2.2 …

电池快充协议芯片

1&#xff1a;18650充电快充规则 电池知识 | 东莞市恒帝电子科技有限公司 (heldee.com) 锂电池快速充电知识【钜大锂电】 (juda.cn)18 锂电池和18650锂电池能不能快速充电&#xff0c;四种充电方式讲解 | 东莞市恒帝电子科技有限公司 (heldee.com) 2&#xff1a;国产厂家 …

数据库课程 CMU15-445 2023 Fall Project-2 Extendible Hash Index

0 实验结果 tips:完成项目的前提不需要一定看视频 1 数据结构&#xff1a;扩展哈希 解释下这张图&#xff1a; 图中header的最大深度2&#xff0c;directory最大深度2&#xff0c;桶的容量2。 最开始的时候只有一个header。 插入第一个数据&#xff0c;假设这个数据对应的哈希…

安谋科技发布全新自研“玲珑”多媒体处理器

当前&#xff0c;受视频直播、AR/VR、智驾智舱等新兴应用场景和使用人群的飞速增长&#xff0c;视频编解码及显示处理领域呈现出旺盛需求&#xff0c;进而带动了下游设备数量不断攀升。以智能汽车为例&#xff0c;根据盖世汽车研究院的产业报告显示&#xff0c;预计2025年国内车…

【GeekBand】C++设计模式笔记4_Strategy_策略模式

1. “组件协作”模式 现代软件专业分工之后的第一个结果是“框架与应用程序的划分”&#xff0c;“组件协作”模式通过晚期绑定&#xff0c;来实现框架与应用程序之间的松耦合&#xff0c;是二者之间协作时常用的模式。典型模式 Template MethodStrategyObserver / Event 2.…

如何确定SAP 某些凭证或者单号的号码编码范围的 OBJECT 是什么?

在SAP的运维或者项目实施中&#xff0c;有时会如何确定SAP 某些凭证或者单号的号码 OBJECT 是什么&#xff1f; 一般一下常用的可以通过事务代码 例如&#xff1a; XDN1 Create Number Ranges for Customer Accounts&#xff0c;定义客户编码FBN1查看维护会计凭证号范围 我…

【项目】多设计模式下的同步异步日志系统

文章目录 项目介绍开发环境核心技术日志系统介绍为什么需要日志系统日志系统技术实现同步写日志异步写日志 相关技术知识补充不定参函数不定参宏函数的使用C中不定参函数的使用C中不定参函数的使用 设计模式单例模式工厂模式建造者模式代理模式 日志系统框架设计模块划分日志等…

springboot+阿里云物联网教程

需求背景 最近有一个项目,需要用到阿里云物联网,不是MQ。发现使用原来EMQX的代码去连接阿里云MQTT直接报错,试了很多种方案都不行。最终还是把错误分析和教程都整理一下。 需要注意的是,阿里云物联网平台和MQ不一样。方向别走偏了。 概念描述 EMQX和阿里云MQTT有什么区别…

springboot整合openfeign

文章目录 准备一、引入必要依赖二、写一个feign client并暴露到注册中心2.1 client2.2 开启Feign客户端功能 三、别的服务引入IProductClient并调用方法3.1 建一个order-service&#xff0c;引入IProductClient所在模块3.2 注入IProductClient&#xff0c;并调用方法 四、启动服…

Github优质项目推荐-第一期

文章目录 Github优质项目推荐一、【free-for-dev】&#xff0c;88.4k stars二、【linux-command】&#xff0c;31.5k stars三、【system-design-primer】&#xff0c;270k stars四、【GitHub-Chinese-Top-Charts】&#xff0c;99.1k stars五、【Docker-OSX】&#xff0c;46k st…

分布式计算框架

进入Scala模式 终端里输入Scala 创建一个新的Scala文件 vim 文件名.scala 复制粘贴代码 ctrlshift c/v 使用vim 先进入插入模式&#xff0c;可以通过按i键来实现&#xff0c;然后粘贴代码&#xff0c;完成后按Esc键退出插入模式&#xff0c;保存并退出可以通过输入:wq然后按…

HarmonyOS开发之利用TextPicker实现日期选择框只有【年】

效果图&#xff1a; 一&#xff1a;实现年份数组 function generateYearArray(startYear, endYear) {const yearArray [];for (let year startYear; year < endYear; year) {yearArray.push(year年);}return yearArray; } 二&#xff0c;设置年份区间&#xff08;1995,2…