【Flink集群RPC通讯机制(二)】创建AkkaRpcService、启动RPC服务、实现相互通信

news2024/11/15 21:48:49

文章目录

    • 零. RpcService服务概述
    • 1. AkkaRpcService的创建和初始化
    • 2.通过AkkaRpcService初始化RpcServer
    • 3. ResourceManager中RPC服务的启动
    • 4. 实现相互通讯能力

零. RpcService服务概述

RpcService负责创建和启动Flink集群环境中RpcEndpoint组件的RpcServer,且RpcService在启动集群时会提前创建好。AkkaRpcService作为RpcService的唯一实现类,基于Akka的ActorSystem进行封装,为不同的RpcEndpoint创建相应的ActorRef实例。

 

RpcService主要包含如下两个重要方法。

  1. startServer():用于启动RpcEndpoint中的RpcServer。RpcServer实际上就是对Actor进行封装,启动完成后,RpcEndpoint中的RpcServer就能够对外提供服务了。
  2. connect():用于连接远端RpcEndpoint并返回给调用方RpcGateway接口的方法,建立连接后RPC客户端就能像本地一样调用RpcServer提供的RpcGateway接口了。

例如在JobMaster组件中创建与ResourceManager组件之间的RPC连接时。此时可以通过Akka发送消息到ResourceManager的RpcServer中,这样就使得JobMaster像调用本地方法一样在ResourceManager中执行请求任务。

 

1. AkkaRpcService的创建和初始化

在创建和启动ClusterEntrypoint及TaskManagerRunner的过程中,会调用AkkaRpcServiceUtils.createRpcService()方法创建默认的AkkaRpcService,接着启动RpcServer。

例如管理节点中会使用AkkaRpcService实例创建并启动ResourceManager、Dispatcher以及JobMaster等RPC服务。

创建AkkaRpcService主要包括如下步骤。

  1. 在ClusterEntrypoint中创建RpcService。
  2. 启动ActorSystem服务。
  3. 创建RobustActorSystem。RobustActorSystem实际上是对Akka的ActorSystem进行了封装和拓展,相比于原生Akka
    ActorSystem,RobustActorSystem包含了UncaughtExceptionHandler组件,能够对ActorSystem抛出的异常进行处理。
  4. 使用RobustActorSystem创建AkkaRpcService实例。
  5. 将AkkaRpcService返回到ClusterEntrypoint中,用于启动集群中各个RpcEndpoint组件服务

在这里插入图片描述

 

2.通过AkkaRpcService初始化RpcServer

在集群运行时中创建了共用的AkkaRpcService服务,相当于创建了Akka系统中的ActorSystem,接下来就是使用AkkaRpcService启动各个RpcEndpoint中的RpcServer实例。(AkkaRpcService服务作为共用的rpc服务,启动其他各个组件的RpcServer实例?)

 
这里先看通过AkkaRpcService初始化RpcEndpoint对应的RpcServer的逻辑。如下在org.apache.flink.runtime.rpc.RpcEndpoint的构造器中,执行了RpcServer的初始化

protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
   this.rpcService = checkNotNull(rpcService, "rpcService");
   this.endpointId = checkNotNull(endpointId, "endpointId");
   // 初始化RpcEndpoint中的RpcServer
   this.rpcServer = rpcService.startServer(this);
   this.mainThreadExecutor = new MainThreadExecutor(rpcServer, 
   this::validateRunsInMainThread);
}

具体看下rpcService.startServer(this) 启动rpcServer的逻辑

  1. ActorSystem创建相应Actor的ActorRef引用类。创建完毕后会将RpcEndpoint和ActorRef信息存储在Actor键值对集合中。
  2. 启动RpcEndpoint对应的RPC服务,首先获取当前RpcEndpoint实现的RpcGateways接口。 RpcGateway接口最终通过RpcUtils.extractImplementedRpcGateways()方法从类定义抽取出来,例如JobMaster组件会抽取JobMasterGateway接口定义。
  3. 创建InvocationHandler代理类,根据InvocationHandler代理类提供的invoke()方法实现被代理类的具体方法。
  4. 根据RpcEndpoint是否为FencedRpcEndpoint,InvocationHandler分为FencedAkkaInvocationHandler和AkkaInvocationHandler两种类型。

FencedMainThreadExecutable代理的接口主要有FencedMainThreadExecutable和FencedRpcGateway两种。
AkkaInvocationHandler主要代理实现AkkaBasedEndpoint、RpcGateway、StartStoppable、MainThreadExecutable、RpcServer等接口。

  1. 创建好InvocationHandler代理类后,通过反射的方式(Proxy.newProxyInstance())创建代理类。创建的代理类会被转换为RpcServer实例,再返回给RpcEndpoint使用。

在RpcServer创建的过程中可以看出,实际上包含了创建RpcEndpoint中的Actor引用类ActorRef和AkkaInvocationHandler动态代理类。最后将动态代理类转换为RpcServer接口返回给RpcEndpoint实现类,此时实现的组件就能够获取到RpcServer服务,且通过RpcServer代理了所有的RpcGateways接口,提供了本地方法调用和远程方法调用两种模式。

@Override  
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {  
    checkNotNull(rpcEndpoint, "rpc endpoint");  
  
    final SupervisorActor.ActorRegistration actorRegistration =  
            registerAkkaRpcActor(rpcEndpoint);  
    final ActorRef actorRef = actorRegistration.getActorRef();  
    final CompletableFuture<Void> actorTerminationFuture =  
            actorRegistration.getTerminationFuture();  
    //启动RpcEndpoint对应的RPC服务
    LOG.info(  
            "Starting RPC endpoint for {} at {} .",  
            rpcEndpoint.getClass().getName(),  
            actorRef.path());  
  
    final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);  
    final String hostname;  
    Option<String> host = actorRef.path().address().host();  
    if (host.isEmpty()) {  
        hostname = "localhost";  
    } else {  
        hostname = host.get();  
    }  
    //解析EpcEndpoint实现的所有RpcGateway接口
    Set<Class<?>> implementedRpcGateways =  
            new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));  
    //额外添加RpcServer和AkkaBasedEndpoint类
    implementedRpcGateways.add(RpcServer.class);  
    implementedRpcGateways.add(AkkaBasedEndpoint.class);  
  
    final InvocationHandler akkaInvocationHandler;  
    //根据是否是FencedRpcEndpoint创建不同的动态代理对象
    if (rpcEndpoint instanceof FencedRpcEndpoint) {  
        // a FencedRpcEndpoint needs a FencedAkkaInvocationHandler  
        akkaInvocationHandler =  
                new FencedAkkaInvocationHandler<>(  
                        akkaAddress,  
                        hostname,  
                        actorRef,  
                        configuration.getTimeout(),  
                        configuration.getMaximumFramesize(),  
                        actorTerminationFuture,  
                        ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,  
                        captureAskCallstacks);  
  
        implementedRpcGateways.add(FencedMainThreadExecutable.class);  
    } else {  
        akkaInvocationHandler =  
                new AkkaInvocationHandler(  
                        akkaAddress,  
                        hostname,  
                        actorRef,  
                        configuration.getTimeout(),  
                        configuration.getMaximumFramesize(),  
                        actorTerminationFuture,  
                        captureAskCallstacks);  
    }  
  
    // Rather than using the System ClassLoader directly, we derive the ClassLoader  
    // from this class . That works better in cases where Flink runs embedded and all Flink    // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader    ClassLoader classLoader = getClass().getClassLoader();  
  
    @SuppressWarnings("unchecked")  
    RpcServer server =  
            (RpcServer)  
                    Proxy.newProxyInstance(  
                            classLoader,  
                            implementedRpcGateways.toArray(  
                                    new Class<?>[implementedRpcGateways.size()]),  
                            akkaInvocationHandler);  
  
    return server;  
}

 

3. ResourceManager中RPC服务的启动

RpcServer在RpcEndpoint的构造器中完成初始化后,接下来就是启动RpcEndpoint和RpcServer,这里以ResourceManager为例进行说明。

在启动集群时,看下如何启动ResourceManager的RPC服务的。如下调用链

ClusterEntrypoint.startCluster->runCluster
->dispatcherResourceManagerComponentFactory.create
->resourceManager.start();
=>
public final void start() {  
    rpcServer.start();  
}

继续探索RPC服务是如何启动的

首先在DefaultDispatcherResourceManagerComponentFactory中调用ResourceManager.start()方法启动ResourceManager实例,此时在ResourceManager.start()方法中会同步调用RpcServer.start()方法,启动ResourceManager所在RpcEndpoint中的RpcServer,如下。

在这里插入图片描述

  1. 调用ResourceManager.start()方法,此时会调用RpcEndpoint.start()父方法,启动ResourceManager组件的RpcServer。
  2. 通过动态代理AkkaInvocationHandler.invoke()方法执行流程,发现调用的是StartStoppable.start()方法,此时会直接调用AkkaInvocationHandler.start()本地方法。
  3. 在AkkaInvocationHandler.start()方法中,实际上会调用rpcEndpoint.tell(ControlMessages.START,ActorRef.noSender())方法向ResourceManager对应的Actor发送控制消息,表明当前Actor实例可以正常启动并接收来自远端的RPC请求。
  4. AkkaRpcActor调用handleControlMessage()方法处理ControlMessages.START控制消息。
  5. 将AkkaRpcActor中的状态更新为StartedState,此时ResourceManager的RpcServer启动完成,ResourceManager组件能够接收来自其他组件的RPC请求。

在flink1.12中省略了AkkaInvocationHandler的干预。

经过以上步骤,指定组件的RpcEndpoint节点就正常启动,此时RpcServer会作为独立的线程运行在JobManager或TaskManager进程中,处理本地和远程提交的RPC请求

 

4. 实现相互通讯能力

当AkkaRpcService启动RpcEndpoint中的RpcServer后,RpcEndpoint组件仅能对外提供处理RPC请求的能力,RpcEndpoint组件需要在启动后向其他组件注册自己的RpcEndpoint信息,并完成组件之间的RpcConnection注册,才能相互访问和通信。而创建RPC连接需要调用RpcService.connect()方法。

如代码所示,在AkkaRpcService.connect()方法中,完成了RpcConnection对象的创建。

@Override  
public <C extends RpcGateway> CompletableFuture<C> connect(  
        final String address, final Class<C> clazz) {  
  
    return connectInternal(  
            address,  
            clazz,  
            (ActorRef actorRef) -> {  
                Tuple2<String, String> addressHostname = extractAddressHostname(actorRef);  
  
                return new AkkaInvocationHandler(  
                        addressHostname.f0,  
                        addressHostname.f1,  
                        actorRef,  
                        configuration.getTimeout(),  
                        configuration.getMaximumFramesize(),  
                        null,  
                        captureAskCallstacks);  
            });  
}

具体看AkkaRpcService.connectInternal()方法逻辑。

  1. 获取ActorRef引用对象。
  2. 调用Patterns.ask()方法,向actorRef对应的RpcEndpoint节点发送RemoteHandshakeMessage消息,确保连接的RpcEndpoint节点正常,如果成功,则RpcEndpoint会返回HandshakeSuccessMessage消息。
  3. 调用invocationHandlerFactory创建invocationHandler动态代理类,此时可以看到传递的接口列表为new Class<?>[]{clazz},也就是当前RpcEndpoint需要访问的RpcGateway接口。例如JobMaster访问ResourceManager时,这里就是ResourceManagerGateway接口。
private <C extends RpcGateway> CompletableFuture<C> connectInternal(  
        final String address,  
        final Class<C> clazz,  
        Function<ActorRef, InvocationHandler> invocationHandlerFactory) {  
    checkState(!stopped, "RpcService is stopped");  
  
    LOG.debug(  
            "Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.",  
            address,  
            clazz.getName());  
        
    //获取actorRef实例  
    final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);  
    //进行handshake操作,确保需要连接的RpcEndpoint节点正常  
    final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =  
            actorRefFuture.thenCompose(  
                    (ActorRef actorRef) ->  
                            FutureUtils.toJava( 
                            //调用Patterns.ask()方法,向actorRef对应的
                            //RpcEndpoint节点发送RemoteHandshakeMessage消息,
                            //确保连接的RpcEndpoint节点正常,如果成功,则
                            //RpcEndpoint会返回HandshakeSuccessMessage消息。 
                                    Patterns.ask(  
                                                    actorRef,  
                                                    new RemoteHandshakeMessage(  
                                                            clazz, getVersion()),  
                                                    configuration.getTimeout().toMilliseconds())  
                                            .<HandshakeSuccessMessage>mapTo(  
                                                    ClassTag$.MODULE$  
                                                            .<HandshakeSuccessMessage>apply(  
                                                                    HandshakeSuccessMessage  
                                                                            .class))));  
    //创建RPC动态代理类  
    return actorRefFuture.thenCombineAsync(  
            handshakeFuture,  
            (ActorRef actorRef, HandshakeSuccessMessage ignored) -> {  
                InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);  
  
                // Rather than using the System ClassLoader directly, we derive the ClassLoader  
                // from this class . That works better in cases where Flink runs embedded and                // all Flink                // code is loaded dynamically (for example from an OSGI bundle) through a custom                // ClassLoader                ClassLoader classLoader = getClass().getClassLoader();  
  
                @SuppressWarnings("unchecked")  
                C proxy =  
                        (C)  
                                Proxy.newProxyInstance(  
                                        classLoader, new Class<?>[] {clazz}, invocationHandler);  
  
                return proxy;  
            },  
            actorSystem.dispatcher());  
}

经过以上步骤,实现了创建RpcEndpoint组件之间的RPC连接,此时集群RPC组件之间可以进行相互访问,例如JobMaster可以向ResourceManager发送Slot资源请求。
RPC 服务启动的 Akka actor 能接收来自RpcGateway RPC 调用。

 

参考:《Flink设计与实现:核心原理与源码解析》–张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1467851.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SQL-Labs靶场“46-50”关通关教程

君衍. 一、四十六关 ORDER BY数字型注入1、源码分析2、rand()盲注3、if语句盲注4、时间盲注5、报错注入6、Limit注入 二、四十七关 ORDER BY单引号报错注入1、源码分析2、报错注入3、时间盲注 三、四十八关 ODRER BY数字型盲注1、源码分析2、rand()盲注3、if语句盲注4、时间盲注…

2.openEuler概述及安装指南(二)

openEuler OECA认证辅导&#xff0c;标红的文字为学习重点和考点。 如果需要做实验&#xff0c;建议下载麒麟信安、银河麒麟、统信等具有图形化的操作系统&#xff0c;其安装与openeuler基本一致。 1.安装过程及配置 使用光盘引导安装&#xff1a; 此处以光盘安装为例介绍安装…

c语言的数据结构:找环状链表入口处

一起<(&#xffe3;︶&#xffe3;)↗[GO!] 1.如何判断一个链表是否有环 思路:设定两个快慢指针fast和slow,fast每次走两个结点,slow每次走一个节点 如果fast指针遇到了Null,那么这个链表没有环,如果fast和slow可以相遇,则代表这个链表有环 代码如下 N:fast先进环,slow后…

【Azure 架构师学习笔记】- Azure Databricks (8) --UC架构简介

本文属于【Azure 架构师学习笔记】系列。 本文属于【Azure Databricks】系列。 接上文 【Azure 架构师学习笔记】- Azure Databricks (7) --Unity Catalog(UC) 基本概念和组件 前言 UC 简单来说&#xff0c;就是管理两样东西&#xff1a;用户和元存储。 用户管理 所有Databri…

从新手到高手:用NumPy学习网站打造你的数据处理超能力!

介绍&#xff1a;NumPy是一个用于数值计算的Python库&#xff0c;特别擅长处理多维数组和矩阵。以下是对NumPy的详细介绍&#xff1a; 起源和发展&#xff1a;NumPy由Travis Oliphant在2005年创建&#xff0c;它是基于原来的Numeric模块和Numarray模块发展而来的。它的大部分代…

上网行为监控软件能够看到聊天内容吗

随着信息技术的不断发展&#xff0c;上网行为监控软件在企业网络安全管理中扮演着越来越重要的角色。 这类软件主要用于监控员工的上网行为&#xff0c;以确保工作效率和网络安全。 而在这其中&#xff0c;域智盾软件作为一款知名的上网行为监控软件&#xff0c;其功能和使用…

靡语IT:JavaScript数组

目录 1.数组&#xff1a;Array 2.Array.length 3.数组的声明(创建)方法 4.数组去重 5.数组遍历 6.类数组对象 1.数组&#xff1a;Array 数组对象的作用是&#xff1a;使用单独的变量名来存储一系列的值。 参数 参数 size 是期望的数组元素个数。返回的数组&#xff0…

SpringBoot3+Vue3 基础知识(持续更新中~)

bean 把方法的返回结果注入到ioc中 1: 2: 3: 组合注解封装 实战篇&#xff1a; 解析token&#xff1a; 统一携带token&#xff1a; 驼峰命名与下划线命名转换&#xff1a; NotEmpty!!! mybatis&#xff1a; PageHelper设置后&#xff0c;会将pageNum,和pageSize自己拼接…

动态绑定样式,uniapp,用三元运算动态绑定多个class类样式,动态绑定的样式可以和原始样式共存

介绍 | uni-app官网 vue、uniapp中动态添加绑定style、class 9种方法实现_vue style动态绑定-CSDN博客 uniapp使用三元运算符动态绑定元素的style样式_uniapp style动态绑定-CSDN博客 对象写法,可以写多个class类 class类的名字&#xff1a;判断条件&#xff0c;最后结果只有…

【k8s资源调度-StatefulSet】

1、部署对象StatefulSet资源&#xff08;无状态应用&#xff09; StatefulSet针对的是有状态应用&#xff0c;有状态应用会对我们的当前pod的网络、文件系统等有关联。 2、配置文件如下 StatefulSet资源的配置文件粗略如下&#xff0c;如下的配置信息包含了数据卷&#xff0c;…

MobaXterm连接VirtualBox虚拟机

目录 1.下载MobaXterm 2.获取连接配置 3.mobaXterm连接虚拟机 4.更好的方案 1.下载MobaXterm 据说MobaXtrem是远程终端的超级全能神器,官网下载地址&#xff1a;MobaXterm free Xserver and tabbed SSH client for Windows 选择适合你的版本&#xff1a;一个是Home Editi…

【Docker】初学者 Docker 基础操作指南:从拉取镜像到运行、停止、删除容器

在现代软件开发和部署中&#xff0c;容器化技术已经成为一种常见的方式&#xff0c;它能够提供一种轻量级、可移植和可扩展的应用程序打包和部署解决方案。Docker 是目前最流行的容器化平台之一&#xff0c;它提供了一整套工具和技术&#xff0c;使得容器的创建、运行和管理变得…

实现外网手机或者电脑随时随地远程访问家里的电脑主机(linux为例)

文章目录 一、背景概要二、安装配置花生壳软件(linux版本)三、手机端(外网)验证连接四、安装ubuntu20server版系统遇到的问题记录 一、背景概要 由于经常在遇到某些问题的时候&#xff0c;针对某一个场景的理解&#xff0c;需要借助于自己的电脑去编译(aosp/linux/qemu)代码查…

【MySQL】数据类型(常见类型)-- 详解

一、数据类型分类 二、数值类型 1、tinyint 类型 在 MySQL 中&#xff0c;整型可以指定是有符号的和无符号的&#xff0c;默认是有符号的。 有符号&#xff1a; 插入数据越界测试&#xff1a; 在 MySQL 表中建立属性列时&#xff0c;我们可以发现列名称在前&#xff0c;类型在…

使用python查看官网是否发布新的内容

目录 前言 第一章、python介绍和使用pip install下载包 1.python介绍 2.使用vscode编写python 3.pip install的使用 第二章、查看官网是否发布新的内容 第三章、代码实现 目录结构 代码实现 check_new_news.py files.py news.py main.py file.txt 运行演示 前言 也…

【QT 5 +Linux下软件生成+qt软件生成使用工具+学习他人文章+第一篇:使用linuxdeployqt软件生成】

【QT 5 Linux下软件生成qt软件生成使用工具学习他人文章第一篇&#xff1a;使用linuxdeployqt软件生成】 1、前言2、实验环境3、自我学习总结-本篇总结1、新手的疑问&#xff0c;做这件事的目的2、了解工具&#xff1a;linuxdeployqt工具3、解决相关使用过程中问题 4、参照文章…

python[6]

类和对象 面向对象编程–说白就是让对象干活 创建类&#xff1a;class 类名&#xff1a; 创建类对象 对象名 类名&#xff08;&#xff09; 构造方法 1、构造方法的名称是__init__ 2、构造方法的作用&#xff1f; 构建类对象的时候会自动运行 构建类对象的传参会传递给构造…

高级RAG:揭秘PDF解析

原文地址&#xff1a;https://pub.towardsai.net/advanced-rag-02-unveiling-pdf-parsing-b84ae866344e 2024 年 2 月 3 日 附加内容&#xff1a;揭秘PDF解析&#xff1a;如何从科学pdf论文中提取公式 对于RAG&#xff0c;从文档中提取信息是一个不可避免的场景。确保从源头…

python:读 Freeplane.mm文件,使用 xml.etree 生成测试案例.csv文件

Freeplane 是一款基于 Java 开源软件&#xff0c;继承 Freemind 的思维导图工具软件&#xff0c;它扩展了知识管理功能。 读取 Freeplane.mm文件&#xff0c;使用 xml.etree 生成测试案例.csv文件 编写 fpmm_etree_csv.py 如下 #-*- coding: UTF-8 -*- """ …

AWS安全组是什么?有什么用?

最近看到小伙伴在问&#xff0c;AWS安全组是什么&#xff1f;有什么用&#xff1f;今天我们大家就来简单聊聊&#xff0c;仅供参考哦&#xff01; AWS安全组是什么&#xff1f;有什么用&#xff1f; 【回答】&#xff1a;AWS安全组是一种虚拟防火墙&#xff0c;用于控制进出…