【Flink集群RPC通讯机制(四)】集群组件(tm、jm与rm)之间的RPC通信

news2025/1/10 10:54:10

文章目录

  • 1. 集群内部通讯方法概述
  • 2. TaskManager向ResourceManager注册RPC服务
  • 3. JobMaster向ResourceManager申请Slot计算资源

现在我们已经知道Flink中RPC通信框架的底层设计与实现,接下来通过具体的实例了解集群运行时中组件如何基于RPC通信框架构建相互之间的调用关系。

1. 集群内部通讯方法概述

通过RegisteredRpcConnection进行连接注册与通讯(维护心跳等)

当TaskExecutor启动后,会立即向ResourceManager中注册当前TaskManager的信息。同样,JobMaster组件启动后也立即会向ResourceManager注册JobMaster的信息。这些注册操作实际上就是在构建集群中各个组件之间的RPC连接,这里的注册连接在Flink中被称为RegisteredRpcConnection,集群组件之间的RPC通信都会通过创建RegisteredRpcConnection进行,例如获取RpcEndpoint对应的RpcGateway接口以及维护组件之间的心跳连接等。

如下图,集群运行时中各组件的注册连接主要如下三种RegisteredRpcConnection的实现。

  • JobManagerRegisteredRpcConnection:用于管理TaskManager中与JobManager之间的RPC连接。
  • ResourceManagerConnection:用于管理JobManager中与ResourceManager之间的RPC连接。
  • TaskExecutorToResourceManagerConnection:用于管理TaskExecutor中与ResourceManager之间的RPC连接。

如下图再有:

  1. RegisteredRpcConnection提供了generateRegistration()抽象方法,主要用于生成组件之间的RPC连接,每次调用RegisteredRpcConnection.start()方法启动RegisteredRpcConnection时,都会创建新的RetryingRegistration。

不同RegisteredRpcConnection创建的RetryingRegistration也会有所不同,例如在TaskExecutorToResourceManagerConnection中就会创建ResourceManagerRegistration实例。

  1. 调用rpcService.connect(targetAddress, targetType) ,返回RpcGateway的代理对象,通过RpcGateway连接到目标RpcEndpoint上。
  2. 在RetryingRegistration中会提供invokeRegistration()抽象方法,用于实现子类的RPC注册操作。

例如在ResourceManagerRegistration中会实现invokeRegistration()方法,在方法中调用resourceManager.registerTaskExecutor()将TaskExecutor信息注册到ResourceManager中,这里的ResourceManager就是ResourceManagerGateway接口代理类。

  1. 调用onRegistrationSuccess()方法继续后续操作,例如在JobManagerRegisteredRpcConnection中会向jobLeaderListener添加当前的jobId等信息。
  2. 如果当前组件没有成功到注册至目标组件时,会调用onRegistrationFailure()抽象方法继续后续操作,包括连接重连或停止整个RpcEndpoint对应的服务。

在这里插入图片描述

接着以TaskManager向ResourceManager注册RPC服务为例,介绍整个RPC连接的注册过程。
 

2. TaskManager向ResourceManager注册RPC服务

TaskManager向ResourceManager注册RPC服务的过程如图所示。
在这里插入图片描述

  1. TaskExecutor节点正常启动后,调用RpcEndpoint.onStart()方法初始化并启动TaskExecutor组件的内部服务。
  2. 创建监听服务
  1. TaskExecutor调用resourceManagerLeaderRetriever.start()方法,启动ResourceManager组件领导节点的监听服务并传入ResourceManagerLeaderListener,用于监听ResourceManager的领导节点的变化情况。
  2. 当ResourceManagerLeaderListener接收到来自ResourceManager的leaderAddress以及leaderSessionID的信息后,调用notifyOfNewResourceManagerLeader()方法通知TaskExecutor和新的ResourceManagerLeader建立RPC连接。
  1. 创建与ResourceManager组件的RPC网络连接

a. 调用TaskExecutor.reconnectToResourceManager()内部方法,创建与ResourceManager组件之间的RPC网络连接。
b. 在reconnectToResourceManager()方法中会事先调用closeResourceManagerConnection()方法关闭之前的ResourceManager连接,然后依次调用tryConnectToResourceManager()和connectToResourceManager()方法创建与ResourceManager节点的RPC连接。

  1. 创建TaskExecutorRegistration对象

在connectToResourceManager()方法中会创建TaskExecutorRegistration对象,用于存储TaskManager的注册信息,其中包括taskExecutorAddress、resourceId以及dataPort等连接信息,同时还包含hardwareDescription、defaultSlotResourceProfile以及totalResourceProfile等资源描述信息。

  1. 正式建立网络连接

创建TaskExecutorToResourceManagerConnection实例,正式与ResourceManager建立RPC网络连接,同时调用TaskExecutorToResourceManagerConnection.start()方法启动RPC连接。实际上调用的是RegisteredRpcConnection.start()方法。

  1. 创建新的创建新的Registration与其他组件的RPC连接

在RegisteredRpcConnection中会调用内部方法createNewRegistration()创建新的Registration。而在createNewRegistration()方法中会调用generateRegistration()子类方法,创建与其他组件之间的RPC连接。这里主要调用的是TaskExecutorToResourceManagerConnection.generateRegistration()方法。

  1. 调用RetryingRegistration.startRegistration()方法注册具体的RPC连接,实际上会调用AkkaRpcService.connect()方法创建并获取ResourceManager对应的RpcGateway接口。
  2. 调用ResourceManagerGateway.registerTaskExecutor()方法,最终完成在ResourceManager中注册TaskManager的操作。创建的TaskExecutorRegistration同时会传递给ResourceManager。
  3. 当ResourceManager接收到TaskManager的注册信息后,会在本地维护TaskManager的注册信息,并建立与TaskManager组件之间的心跳连接,至此完成了TaskManager启动后向ResourceManager进行RPC网络连接注册的全部流程。

如代码所示

  • TaskExecutor.connectToResourceManager()方法中首先会创建TaskExecutorRegistration注册信息和TaskExecutorToResourceManagerConnection对象。
  • 然后调用TaskExecutorToResourceManagerConnection.start()方法启动TaskManager和ResourceManager之间的RPC注册连接。
private void connectToResourceManager() {
   assert(resourceManagerAddress != null);
   assert(establishedResourceManagerConnection == null);
   assert(resourceManagerConnection == null);
   log.info("Connecting to ResourceManager {}.", resourceManagerAddress);
   // TaskExecutor注册信息
   final TaskExecutorRegistration taskExecutorRegistration = 
       new TaskExecutorRegistration(
      getAddress(),
      getResourceID(),
      taskManagerLocation.dataPort(),
      hardwareDescription,
      taskManagerConfiguration.getDefaultSlotResourceProfile(),
      taskManagerConfiguration.getTotalResourceProfile()
   );
   resourceManagerConnection =
      new TaskExecutorToResourceManagerConnection(
         log,
         getRpcService(),
         taskManagerConfiguration.getRetryingRegistrationConfiguration(),
         resourceManagerAddress.getAddress(),
         resourceManagerAddress.getResourceManagerId(),
         getMainThreadExecutor(),
         new ResourceManagerRegistrationListener(),
         taskExecutorRegistration);
   resourceManagerConnection.start();
}

接着看RegisteredRpcConnection.start()的代码逻辑,如代码所示。

public void start() {
   checkState(!closed, "The RPC connection is already closed");
   checkState(!isConnected() && pendingRegistration == null, 
              "The RPC connection is already started");
   // 创建RetryingRegistration
   final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();
     // 启动RetryingRegistration
   if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
      newRegistration.startRegistration();
   } else {
      // 并行启动后,直接取消当前Registration
      newRegistration.cancel();
   }
}

关注:RetryingRegistration.startRegistration()逻辑。

  1. 根据不同的targetType,选择创建FencedRpcGateway还是普通的RpcGateway。
  2. 创建RpcGateway代理类后,就可以连接到指定的RpcEndpoint了。对于rpcService.connect()方法的定义,我们已经在RPC底层原理中介绍过。
  3. 创建RPC连接后,尝试注册操作。
  4. 如果注册失败,则进行Retry操作,除非接收到取消操作的消息。
public void startRegistration() {
        if (canceled) {
            return;
        }
        try {
            final CompletableFuture<G> rpcGatewayFuture;
            // 根据不同的targetType,选择创建FencedRpcGateway还是普通的RpcGateway
            if (FencedRpcGateway.class.isAssignableFrom(targetType)) {
                rpcGatewayFuture = (CompletableFuture<G>) rpcService.connect(
                    targetAddress,
                    fencingToken,
                    targetType.asSubclass(FencedRpcGateway.class));
            } else {
                rpcGatewayFuture = rpcService.connect(targetAddress, targetType);
            }
            // 成功获取网关后,尝试注册操作
            CompletableFuture<Void> rpcGatewayAcceptFuture = 
                rpcGatewayFuture.thenAcceptAsync(
                (G rpcGateway) -> {
                    log.info("Resolved {} address, beginning registration", 
                       targetName);
                    register(rpcGateway, 1, retryingRegistrationConfiguration.
                       getInitialRegistrationTimeoutMillis());
                },
                rpcService.getExecutor());
            // 如果注册失败,则进行Retry操作,除非取消操作
            rpcGatewayAcceptFuture.whenCompleteAsync(
                (Void v, Throwable failure) -> {
                    if (failure != null && !canceled) {
                        final Throwable strippedFailure =
                            ExceptionUtils.stripCompletionException(failure);
                        // 间隔指定时间后再次注册
                        startRegistrationLater(retryingRegistrationConfiguration.
                           getErrorDelayMillis());
                    }
                },
                rpcService.getExecutor());
        }
        catch (Throwable t) {
            completionFuture.completeExceptionally(t);
            cancel();
        }
    }

继续了解ResourceManagerRegistration.invokeRegistration()的具体实现。

该方法会创建和ResourceManagerGateway之间的连接以及注册操作
resourceManager会接收来自TaskExecutor的注册信息,并根据taskExecutorRegistration提供的注册信息,将TaskExecutor信息记录在ResourceManager的本地存储中,然后开启TaskExecutor之间的心跳连接。至此,TaskManager能和ResourceManager进行正常的RPC通信了。

protected CompletableFuture<RegistrationResponse> invokeRegistration(
      ResourceManagerGateway resourceManager, ResourceManagerId fencingToken, 
    long timeoutMillis) throws Exception {
   Time timeout = Time.milliseconds(timeoutMillis);
   return resourceManager.registerTaskExecutor(
      taskExecutorRegistration,
      timeout);
}

对于其他组件之间的RpcConnection注册操作,例如TaskManager与JobMaster之间的RPC连接注册,基本上和ResourceManagerRegistration一样,这里暂不介绍。

接下来我们看JobMaster是如何向ResourceManager申请Slot计算资源的。

 

3. JobMaster向ResourceManager申请Slot计算资源

当JobMaster组件启动后,

  • 会(调用JobMaster.startJobMasterServices())启动JobMaster中的内部服务,其中就包括了SlotPool组件。
  • 同时会创建和启动JobMaster与ResourceManager之间的RPC连接ResourceManagerConnection。创建成功后,会执行包括向ResourceManager发送申请Slot计算资源的RPC请求等后续操作。

如代码所示

//从SlotPoolImpl.connectToResourceManager()可以看出,方法中分别遍历
//waitingForResourceManager集合中的PendingRequest,
//然后就每个PendingRequest调用requestSlotFromResourceManager()方法向
//ResourceManager申请PendingRequest中指定的Slot计算资源。
public void connectToResourceManager(
    @Nonnull ResourceManagerGateway resourceManagerGateway) {
        this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
        for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
            requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
        }
        waitingForResourceManager.clear();
}

继续看SlotPoolImpl.requestSlotFromResourceManager()方法的实现,如下代码所示。

  1. 创建AllocationID并将pendingRequest和AllocationID存储在pendingRequests集合中。
  2. 判断pendingRequest是否出现异常或已经分配了其他AllocationID,如果出现异常或已分配则取消当前pendingRequest中的资源分配请求。
  3. 调用resourceManagerGateway.requestSlot()远程RPC方法向ResourceManager申请Slot计算资源,此时会在方法中创建SlotRequest对象,指定申请Slot计算资源的具体参数。
  4. ResourceManager接收到SlotPool发送的SlotRequest请求后,会将SlotRequest转发给SlotManager进行处理,此时如果能正常分配到Slot资源,则会返回Acknowledge信息。
  5. 调用FutureUtils.whenCompleteAsyncIfNotDone()方法执行返回rmResponse CompletableFuture的对象,此时如果Slot资源申请过程出现异常,则调用slotRequestToResourceManager-Failed()方法进行处理。
private void requestSlotFromResourceManager(
            final ResourceManagerGateway resourceManagerGateway,
            final PendingRequest pendingRequest) {
        checkNotNull(resourceManagerGateway);
        checkNotNull(pendingRequest);
        log.info("Requesting new slot [{}] and profile {} from resource manager.", 
                 pendingRequest.getSlotRequestId(), pendingRequest.
                    getResourceProfile());
        final AllocationID allocationId = new AllocationID();
        pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId,
                            pendingRequest);
        pendingRequest.getAllocatedSlotFuture().whenComplete(
            (AllocatedSlot allocatedSlot, Throwable throwable) -> {
                if (throwable != null 
                    || !allocationId.equals(allocatedSlot.getAllocationId())) {
                    resourceManagerGateway.cancelSlotRequest(allocationId);
                }
            });
        CompletableFuture<Acknowledge> rmResponse = 
            resourceManagerGateway.requestSlot(
            jobMasterId,
            new SlotRequest(jobId, allocationId, 
                            pendingRequest.getResourceProfile(), jobManagerAddress),
            rpcTimeout);
        FutureUtils.whenCompleteAsyncIfNotDone(
            rmResponse,
            componentMainThreadExecutor,
            (Acknowledge ignored, Throwable failure) -> {
                if (failure != null) {
                    slotRequestToResourceManagerFailed(pendingRequest.
                                                     getSlotRequestId(), failure);
                }
            });
}

从以上实例可以看出,集群运行时中各个组件之间都是基于RPC通信框架相互访问的。RpcEndpoint组件会创建与其他RpcEndpoint之间的RegisteredRpcConnection,并通过RpcGateway接口的动态代理类与其他组件进行通信。

需要注意的是,Flink把Akka作为RPC底层的通信框架,但没有使用Akka其他丰富的监督功能,并且未来有去掉Akka依赖的可能。

 
参考:《Flink设计与实现:核心原理与源码解析》–张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1464949.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Windows10和Ubuntu22.04双系统安装

概要&#xff1a; 本篇演示Windows10和Ubuntu22.04双系统的安装&#xff0c;先安装Windows10&#xff0c;再安装Ubuntu22.04。 先安装Ubuntu22.04&#xff0c;后安装Windows10见&#xff1a; Ubuntu22.04和Windows10双系统安装-CSDN博客 一、说明 1、电脑 笔者的电脑品牌…

备战蓝桥杯—— 双指针技巧巧答链表2

对于单链表相关的问题&#xff0c;双指针技巧是一种非常广泛且有效的解决方法。以下是一些常见问题以及使用双指针技巧解决&#xff1a; 合并两个有序链表&#xff1a; 使用两个指针分别指向两个链表的头部&#xff0c;逐一比较节点的值&#xff0c;将较小的节点链接到结果链表…

解决Maven爆红以及解决 Idea 卡在 Resolving问题

关于 Idea 卡在 Resolving&#xff08;前提是Maven的setting.xml中配置好了阿里云和仓库&#xff09; 参考文章https://blog.csdn.net/jiangyu1013/article/details/95042611 解决Maven爆红参考文章https://devpress.csdn.net/beijing/656d993b76f0791b6eca7bb0.html?dp_toke…

使用k-近邻算法改进约会网站的配对效果(kNN)

目录 谷歌笔记本&#xff08;可选&#xff09; 准备数据&#xff1a;从文本文件中解析数据 编写算法&#xff1a;编写kNN算法 分析数据&#xff1a;使用Matplotlib创建散点图 准备数据&#xff1a;归一化数值 测试算法&#xff1a;作为完整程序验证分类器 使用算法&…

跨城租赁再复用 | 保利经典款展厅珠江之畔云游湘江之滨盛大启幕

2023年5月1日 由优积出品的 长沙保利-梅溪天珺营销中心 唯美亮相&#xff0c;举城共鉴&#xff0c;不负一城期待 盛大开放&#xff01; 优积科技可拆装售楼部 首次服务湖南项目 保利梅溪天珺与君说 &#xff0c;赞51 ▲点击观看展厅开放盛况 长沙保利梅溪天珺售楼处是从佛山…

硬件驱动为什么要有WHQL数字签名?

为了保证驱动程序的安全性&#xff0c;避免用户下载到不利于系统稳定和安全的驱动程序&#xff0c;驱动程序签名被设立出来。最初这一过程由代码签名证书来完成&#xff0c;现在取而代之的则是需要对驱动程序做WHQL认证。本文将详细介绍硬件驱动为什么要有WHQL数字签名的相关内…

如何用GPT高效地处理文本、文献查阅、PPT编辑、编程、绘图和论文写作?

原文链接&#xff1a;如何用GPT高效地处理文本、文献查阅、PPT编辑、编程、绘图和论文写作?https://mp.weixin.qq.com/s?__bizMzUzNTczMDMxMg&mid2247594986&idx4&sn970f9ba75998f2dd9fa5707d1611a6cc&chksmfa82320dcdf5bb1bdf58c20686d4eb209770e68253ed90d…

中建七局领导一行莅临优积科技考察交流

7月17日&#xff0c;中建七局二公司副总经理、总工程师张体浪、基础设施分公司副总经理兼城建事业部总经理陈才正、BIM技术中心总经理完颜健飞等一行7人来访优积科技&#xff0c;公司CEO刘其东携团队成员热情接待了来访领导。 会上双方对我司预制快建可循环售楼部、绿色科技展厅…

pclpy 可视化点云(多窗口可视化、单窗口多点云可视化)

pclpy 可视化点云&#xff08;多窗口可视化、单窗口多点云可视化&#xff09; 一、算法原理二、代码三、结果1.多窗口可视化结果2.单窗口多点云可视化 四、相关数据五、问题与解决方案1.问题2.解决 一、算法原理 原理看一下代码写的很仔细的。。目前在同一个窗口最多建立2个窗…

2个wordpress优化SEO主题模板

SEO优化wordpress主题 简洁的SEO优化wordpress主题&#xff0c;效果好不好&#xff0c;结果会告诉你&#xff0c;适合SEO公司使用的主题。 https://www.jianzhanpress.com/?p2804 SEO优化海外WordPress主题 简洁的SEO优化海外服务商WordPress主题&#xff0c;为中国制造202…

1、WEB攻防-通用漏洞SQL注入MYSQL跨库ACCESS偏移

用途&#xff1a;个人学习笔记&#xff0c;欢迎指正&#xff01; 前言&#xff1a; 为了网站和数据库的安全性&#xff0c;MYSQL 内置有 ROOT 最高用户&#xff0c;划分等级&#xff0c;每个用户对应管理一个数据库&#xff0c;这样保证无不关联&#xff0c;从而不会影响到其他…

性能测试工具-locust

简介 Locust是基于python语言的性能测试框架。 Locust支持分布式部署&#xff0c;单实例并发线程数可达1000&#xff0c;界面简约。 locust基于事件。 Locust的缺点 1、无可视化脚本编写功能&#xff0c;需要基于Python语言和locust框架进行脚本编写&#xff0c;纯代码编写…

新版Java面试专题视频教程——多线程篇①

新版Java面试专题视频教程——多线程篇① Java多线程相关面试题 0. 问题汇总0.1 线程的基础知识0.2 线程中并发安全 1.线程的基础知识1.1 线程和进程的区别&#xff1f;1.2 并行和并发有什么区别&#xff1f;1.3 创建线程的四种方式1.4 runnabl…

LeetCode 2583.二叉树中的第 K 大层和:层序遍历 + 排序

【LetMeFly】2583.二叉树中的第 K 大层和&#xff1a;层序遍历 排序 力扣题目链接&#xff1a;https://leetcode.cn/problems/kth-largest-sum-in-a-binary-tree/ 给你一棵二叉树的根节点 root 和一个正整数 k 。 树中的 层和 是指 同一层 上节点值的总和。 返回树中第 k …

Codeforces Round 927 (Div. 3)

F. Feed Cats 题目大意 给一长度为的数轴&#xff0c;个区间在数轴上选取一些点作为特殊点在满足个区间中&#xff0c;每个区间内只能有一个特殊点问最多能选多少个特殊点 解题思路 对于每个点有放或不放两种状态考虑表示位置可能放或不放的最优结果若不放&#xff0c;若放…

制造业客户数据安全解决方案(终端安全/文件加密/介质管理等)

针对前文制造业客户数据安全解决方案&#xff08;数据防泄密需求分析&#xff09;提到的泄密风险&#xff0c;本文详细介绍一套完整、合理的解决方案&#xff0c;通过该方案构建公司数据安全防护边界&#xff0c;自动加密、全方位保护数据安全。 PC端&#xff1a;https://isite…

《VitePress 简易速速上手小册》第10章 维护与更新(2024 最新版)

文章目录 10.1 博客的日常维护10.1.1 基础知识点解析10.1.2 重点案例&#xff1a;内容更新策略10.1.3 拓展案例 1&#xff1a;性能优化实践10.1.4 拓展案例 2&#xff1a;备份和安全性策略 10.2 VitePress 版本更新与迁移10.2.1 基础知识点解析10.2.2 重点案例&#xff1a;平稳…

基于 Spring Boot, Spring Cloud 构建微服务架构企业级开发平台

结尾有链接 基于SpringBoot2/SpringSecurity/SpringSession/SpringSocial/SpringSecurityOAuth2构建的互联网应用基础框架&#xff0c;包含认证中心、消息通知、安全中心和用户中心。支持Session/JWT/OAuth2认证模式&#xff0c;支持账号密码/短信验证码/社会化登录等登录模式…

ONLYOFFICE 桌面应用程序 v8.0 发布:全新 RTL 界面、本地主题、Moodle 集成等你期待的功能来了!

目录 &#x1f4d8; 前言 &#x1f4df; 一、什么是 ONLYOFFICE 桌面编辑器&#xff1f; &#x1f4df; 二、ONLYOFFICE 8.0版本新增了那些特别的实用模块&#xff1f; 2.1. 可填写的 PDF 表单 2.2. 双向文本 2.3. 电子表格中的新增功能 单变量求解&#xff1a;…

vue组件渲染过程

前言 一个组件渲染到页面&#xff0c;修改data触发更新&#xff08;数据驱动视图&#xff09;其背后原理是什么&#xff0c;需要掌握哪些点考察对流程了解的全面程度 回顾三大核心知识点 响应式&#xff1a;监听data属性getter、setter&#xff08;包括数组&#xff09; 模板…