Flink1.15源码解析--选举

news2024/10/3 3:01:54

文章目录

  • 一、LeaderContender
  • 二、LeaderElectionService
    • 2.1、LeaderElectionService
    • 2.2、LeaderElectionEventHandler(竞选服务的事件处理类)
  • 三 、LeaderElectionDriver
    • 3.1、LeaderLatchListener

角色说明
LeaderContender(竞选者)需要选主的主体,比如dispatcher、resource manager。当选leader或者回收leader时进行回调
LeaderElectionService(竞选服务)负责选举的服务。a.关联leaderElectionDriver实现,具体驱动实现可以是zk、k8s等;在启动时初始化驱动类;b.实现LeaderElectionEventHandler调用
LeaderElectionDriver(竞选服务驱动)选主具体驱动实现。以ZooKeeperLeaderElectionDriver实现为例,其会实现LeaderLatchListener回调接口,在主节点变化和节点变化时得到监听,然后调用 LeaderElectionEventHandler
LeaderElectionEventHandler(竞选服务的事件处理类)leader竞选回调。被选为Leader会调用LeaderContender的grantLeadership方法进行不同实现主体的后续逻辑,比如DispatcherRunner的grantLeadership后会创建Dispatcher服务,启动DispatcherBootstrap
LeaderRetrievalService(leader监听服务)监听leader的变化,传递给监听者。实现与LeaderElectionService类似,但作用不同。主要是接收Leader信息然后通知监听者LeaderRetrievalListener,监听者
LeaderRetriever(监听者)监听leader的监听者,实现了LeaderRetrievalListener接口。具体实现为RpcGatewayRetriever。RpcGatewayRetriever在回调中可以获取到leader的信息,创建akka的连接,生成aop代理类实例
RpcGatewayRcp网关,实现类通过AOP的方式封装了akka层的细节,可以直接调用实现类方法实现akka通信

一、LeaderContender

其中 LeaderContender 接口主要在 leader 选举中使用,代表了参与leader竞争的角色
其实现类有

  • JobMasterServiceLeadershipRunner
  • ResourceManager
  • DefaultDispatcherRunner
  • WebMonitorEndpoint

该接口中包含了两个重要的方法:

  1.  grantLeadership,表示leader竞选成功的回调方法

  2.  revokeLeadership,表示由leader变为非leader的回调方法

一个 服务需要进行选举, 在启动时,将自身作为竞争者,传递给了 leaderElectionService。

    @Override

    public void start() throws Exception {

        LOG.debug("Start leadership runner for job {}.", getJobID());

        leaderElectionService.start(this);

    }

二、LeaderElectionService

Leader选举服务 ,以其子类 DefaultLeaderElectionService 为例

在这里插入图片描述

2.1、LeaderElectionService

LeaderElectionService主要提供了参与选举竞争的接口调用以及竞争结果的查询接口:
在这里插入图片描述

2.2、LeaderElectionEventHandler(竞选服务的事件处理类)

leader竞选回调。被选为Leader会调用LeaderContender的grantLeadership方法进行不同实现主体的后续逻辑,比如DispatcherRunner的grantLeadership后会创建Dispatcher服务,启动DispatcherBootstrap

在这里插入图片描述

在开始参加 Leader 时(DefaultleaderElectionService::start ),会通过选举驱动器工厂创建一个 leaderElectionDriver,通过这个Driver工厂类,Flink 将基于 zookeeper 的 CuratorFramework 的细节,与 Flink 本身做了解耦

并将自身作为一个 LeaderElectionEventHandler 传入leaderElectionDriver。

    @Override
    public final void start(LeaderContender contender) throws Exception {
        checkNotNull(contender, "Contender must not be null.");
        Preconditions.checkState(leaderContender == null, "Contender was already set.");

        synchronized (lock) {
            running = true;
            leaderContender = contender;
            
            leaderElectionDriver =
                    leaderElectionDriverFactory.createLeaderElectionDriver(
                            this,
                            new LeaderElectionFatalErrorHandler(),
                            leaderContender.getDescription());
            LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);
        }
    }

三 、LeaderElectionDriver

LeaderElectionDriver 选主具体驱动实现。以ZooKeeperLeaderElectionDriver实现为例,其会实现 LeaderLatchListener 回调接口,在主节点变化和节点变化时得到监听,然后调用 LeaderElectionEventHandler

以其 子类实现 ZooKeeperLeaderElectionDriver 为例

在这里插入图片描述

通过工厂创建

	// 通过工厂创建 
    @Override
    public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
            LeaderElectionEventHandler leaderEventHandler,  // leaderElectionService 对象 作为 leaderEventHandler
            FatalErrorHandler fatalErrorHandler,
            String leaderContenderDescription)
            throws Exception {
        return new ZooKeeperLeaderElectionDriver(
                client, path, leaderEventHandler, fatalErrorHandler, leaderContenderDescription);
    }

ZooKeeperLeaderElectionDriver 封装了 CuratorFramework 作为选举框架

    /**
     * Creates a ZooKeeperLeaderElectionDriver object.
     *
     * @param client Client which is connected to the ZooKeeper quorum
     * @param path ZooKeeper node path for the leader election
     * @param leaderElectionEventHandler Event handler for processing leader change events
     * @param fatalErrorHandler Fatal error handler
     * @param leaderContenderDescription Leader contender description
     */
    public ZooKeeperLeaderElectionDriver(
            CuratorFramework client,
            String path,
            LeaderElectionEventHandler leaderElectionEventHandler, // DefaultLeaderElectionService
            FatalErrorHandler fatalErrorHandler,
            String leaderContenderDescription)
            throws Exception {
        checkNotNull(path);
        this.client = checkNotNull(client);
        this.connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(path);
        this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
        this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
        this.leaderContenderDescription = checkNotNull(leaderContenderDescription);
        leaderLatchPath = ZooKeeperUtils.generateLeaderLatchPath(path);
        leaderLatch = new LeaderLatch(client, leaderLatchPath);
        this.cache =
                ZooKeeperUtils.createTreeCache(
                        client,
                        connectionInformationPath,
                        this::retrieveLeaderInformationFromZooKeeper);

        running = true;
		// 启动选举 
        leaderLatch.addListener(this); // 添加  LeaderLatchListener
        leaderLatch.start();

        cache.start();

        client.getConnectionStateListenable().addListener(listener);
    }
// org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch#start
    public void start() throws Exception {
        Preconditions.checkState(this.state.compareAndSet(LeaderLatch.State.LATENT, LeaderLatch.State.STARTED), "Cannot be started more than once");
        this.startTask.set(AfterConnectionEstablished.execute(this.client, new Runnable() {
            public void run() {
                try {
                    LeaderLatch.this.internalStart();
                } finally {
                    LeaderLatch.this.startTask.set((Object)null);
                }

            }
        }));
    }

3.1、LeaderLatchListener


public interface LeaderLatchListener {
	// 选举成功回调方法
    void isLeader();
	
    void notLeader();
}

回调实现

    @Override
    public void isLeader() {
    	// org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#start 中  
    	// DefaultLeaderElectionService 将 自己作为 一个 leaderElectionEventHandler
    	// 所以此处选举成功, 通过回调 通知 DefaultLeaderElectionService 选举leader
        leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID());
    }

    @Override
    public void notLeader() {
        leaderElectionEventHandler.onRevokeLeadership();
    }

DefaultLeaderElectionService 响应选举成功,回调 leaderContender.grantLeadership:

    @Override
    @GuardedBy("lock")
    public void onGrantLeadership(UUID newLeaderSessionId) {
        synchronized (lock) {
            if (running) {
                issuedLeaderSessionID = newLeaderSessionId;
                clearConfirmedLeaderInformation();

                if (LOG.isDebugEnabled()) {
                    LOG.debug(
                            "Grant leadership to contender {} with session ID {}.",
                            leaderContender.getDescription(),
                            issuedLeaderSessionID);
                }
				// 主要逻辑 leaderContender, 
                leaderContender.grantLeadership(issuedLeaderSessionID);
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(
                            "Ignoring the grant leadership notification since the {} has "
                                    + "already been closed.",
                            leaderElectionDriver);
                }
            }
        }
    }

以 WebMonitorEndpoint 为例

    @Override
    public void grantLeadership(final UUID leaderSessionID) {
        log.info(
                "{} was granted leadership with leaderSessionID={}",
                getRestBaseUrl(),
                leaderSessionID);
        leaderElectionService.confirmLeadership(leaderSessionID, getRestBaseUrl());
    }

参考:
https://www.modb.pro/db/107324
https://blog.csdn.net/qq_44836294/article/details/108022739

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/7201.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux 夺命连环11问你能答对几个?

朋友们,先来11个快问快答看看自己能答对8个吗? 1.如何查看一个文件的末尾50行? tail -n 50 file 2.如何查看文件中包含“error”的行 cat file | grep "error" 3.如何查看某端口号是否被占用? netstat -anp | gre…

图片水印怎么加?图片加水印方法分享

相信大家在日常生活中,都会在各个平台上分享自己拍摄的照片吧,但大家在收到网友的好评和点赞的同时,是不是会因为担心图片被别人转发或者是拿去做一些不好的事情而感到烦恼呢?其实要解决这个烦恼很简单,那就是给图片添…

点击化学(Click chemistry) 叠氮-PEG4-NHS/Biotin-PEG-N3/Azid/DBCO-EPG-NHS/DBCO-NH2

点击化学(Click chemistry),也称作链接化学、速配接合组合式化学。在药物开发、生物医用材料优化、生物分子标记与检测等诸多领域中有着较好的应用,已经成为最热门的研究领域之一。点击化学符合绿色化学的12项原则,具有…

stm32 笔记 IO 口点灯实验及 HAL 库使用

GPIO 概述 全称:general purpose intput output,通用输入输出接口。 顾名思义,既可作为输入框也可以作为输入口。 引脚不一定是 GPIO,有些引脚也作为复位或晶振等使用。 GPIO的八种工作模式 输入:浮空输入&#xf…

甘露糖-酰基|mannose-Hydrazide|酰基-PEG-甘露糖

甘露糖-酰基|mannose-Hydrazide|酰基-PEG-甘露糖 酰基(acyl group),是指有机或无机含氧酸去掉一个或多个羟基后剩下的原子团,通式为R-M(O)-。 酰基(acyl group)指的是有机或无机含氧酸去掉羟基后剩下的一价原子团,通式为R-M(O)-。在有机化学…

论文调研

一、论文部分 基于傅里叶文档恢复的鲁棒文档去锐化与识别https://www.semanticscholar.org/paper/Fourier-Document-Restoration-for-Robust-Document-Xue-Tian/64dcd0cac46b936eb413f36b462be3b5b298c75b 1. 由于这篇论文没有给代码,所以在connected papers上查找…

Spring Security 安全框架 (一) 基础操作

1.password 登录密码 在 springboot 项目中 , 引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-security</artifactId></dependency><dependency><groupId>org.springfra…

因特网的组成,边缘之间的通讯方式,数据交换的方式

边缘之间的通讯方式有俩种&#xff1a; 1.客户服务器方式 2.对等方式&#xff08;pear-to-pear&#xff09; 什么是客户端服务器方式&#xff1f; 客户 (client) 和服务器 (server) 都是指通信中所涉及的两个应用进程。 客户-服务器方式所描述的是进程之间服务和被服务的关系…

QT菜单栏,工具栏,状态栏

1. 菜单栏 用代码来实现菜单栏&#xff1a; 头文件&#xff1a;mainwindow.h #ifndef MAINWINDOW_H #define MAINWINDOW_H ​ #include <QMainWindow> ​ QT_BEGIN_NAMESPACE namespace Ui { class MainWindow; } QT_END_NAMESPACE ​ class MainWindow : public QMai…

如何用JavaScript完美地区分双击和单击事件

通过一个悬浮球交互功能的案例来阐述问题&#xff0c;以及解决办法。 实现效果 类似微信里的悬浮窗效果&#xff0c;苹果手机的悬浮球功能效果 1.可以点击拖动&#xff0c;然后吸附在窗口边缘2.点击悬浮球&#xff0c;可以跳转界面&#xff0c;或者更改悬浮球的形态准备 1.移…

C++ 语法基础课2 —— printf 语句与判断结构

文章目录1. printf 输出格式(#include<cstdio>)1.1 int、float、double、char 等类型的输出格式1.2 所有输出的变量均可包含在一个字符串中1.2.1 练习11.2.2 练习21.3 扩展功能2. if 语句2.1 基本 if-else 语句2.1.1 练习12.1.2 练习22.1.3 练习42.2 常用比较运算符2.3 i…

金融业数字化聚焦容器云,全闪存为什么是点睛之笔?

文|螳螂观察 作者|李永华 刻板、保守、小心翼翼…… 很多人对金融业尤其是银行在数字化创新方面的印象&#xff0c;都是如此。 这个印象到底对不对&#xff1f; 答案可能是&#xff0c;既对&#xff0c;又不对。 对的地方在于&#xff0c;出于合规等要求&#xff0c;一个…

[附源码]计算机毕业设计JAVA点餐系统

[附源码]计算机毕业设计JAVA点餐系统 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven…

采集平台-大数据平台数据采集系统

随着社会的发展。人们也是越来越离不开互联网&#xff0c;今天小编就给大家盘点一下免费的采集平台&#xff0c;只需要点几下鼠标就能轻松爬取数据&#xff0c;不管是导出excel还是自动发布到网站都支持。详细参考图片一、二、三、四&#xff01; 企业人员 通过爬取动态网页数…

android 静默升级 卸载功能实现

一、近期需要实现一个apk静默升级卸载自启动功能&#xff0c;首先需要获取系统root权限才能执行静默升级&#xff0c;下面不墨迹直接上代码. 首先是MainActivity 页面 package com.example.tiaoshiapkjingmo;import androidx.appcompat.app.AppCompatActivity; import okhttp3.…

如何利用 promise 影响代码的执行顺序?

如何利用 promise 影响代码的执行顺序&#xff1f; 我们写代码的时候&#xff0c;经常会遇到这样的场景。2个不同组件&#xff0c;它们的生命周期本身都是相互独立&#xff0c;毫无关联的&#xff0c;但是它们调用的接口&#xff0c;存在相互依赖的情况。 我举个例子&#xf…

windows中Mysql数据库的安装和卸载

以安装包“mysql-5.7.35-win32.zip”为例&#xff0c;推荐安装5.7最新版本 一、安装 1、根据自己操作系统版本下载32位或64位的安装包&#xff0c;也可去官网下载&#xff0c;建议下载如下图压缩包 将下载的解压包解压&#xff0c;目录结构如下&#xff1a; 2、新建文件“my.…

IC工程师入门必学,Verilog零基础入门教程

近年来&#xff0c;IC行业发展火热&#xff0c;薪资待遇高&#xff0c;发展前景好&#xff0c;所以成了很多人转行的首选。但IC行业入行门槛高&#xff0c;需要具备相关的知识技能。比如工程师必须至少掌握一种HDL语言。 一般在HDL语言中&#xff0c;Verilog相对来说更加易学、…

负载均衡架构设计技巧

负载均衡算法 轮询&随机 基本原理 轮询&#xff1a;将请求依次发给服务器 随机&#xff1a;将请求随机发给服务器 适用场景 通用&#xff0c;无状态的负载均衡 优缺点 实现简单不会判断服务器状态&#xff0c;除非服务器连接丢失 问题场景 某个服务器当前因为触发…

Java面向对象---尚硅谷Java入门视频学习

1.类和对象 1.1创建过程 对象是将内存地址赋值给了变量&#xff0c;所以变量其实引用了内存中的对象&#xff0c;所以称之为引用变量&#xff0c;而变量的类型&#xff08;即类&#xff09;称之为引用数据类型。 堆&#xff08;Heap&#xff09;&#xff0c;此内存区域的唯…