【Flink】Flink 的八种分区策略(源码解读)

news2025/2/27 18:19:55

Flink 的八种分区策略(源码解读)

  • 1.继承关系图
    • 1.1 接口:ChannelSelector
    • 1.2 抽象类:StreamPartitioner
    • 1.3 继承关系图
  • 2.分区策略
    • 2.1 GlobalPartitioner
    • 2.2 ShufflePartitioner
    • 2.3 BroadcastPartitioner
    • 2.4 RebalancePartitioner
    • 2.5 RescalePartitioner
    • 2.6 ForwardPartitioner
    • 2.7 KeyGroupStreamPartitioner
    • 2.8 CustomPartitionerWrapper

Flink 包含 8 种分区策略,这 8 种分区策略(分区器)分别如下面所示,本文将从源码的角度解读每个分区器的实现方式。

在这里插入图片描述

  • GlobalPartitioner
  • ShufflePartitioner
  • RebalancePartitioner
  • RescalePartitioner
  • BroadcastPartitioner
  • ForwardPartitioner
  • KeyGroupStreamPartitioner
  • CustomPartitionerWrapper

1.继承关系图

1.1 接口:ChannelSelector

public interface ChannelSelector<T extends IOReadableWritable> {

    /**
     * 初始化channels数量,channel可以理解为下游Operator的某个实例(并行算子的某个subtask).
     */
    void setup(int numberOfChannels);

    /**
     *根据当前的record以及Channel总数,
     *决定应将record发送到下游哪个Channel。
     *不同的分区策略会实现不同的该方法。
     */
    int selectChannel(T record);

    /**
    *是否以广播的形式发送到下游所有的算子实例
     */
    boolean isBroadcast();
}

1.2 抽象类:StreamPartitioner

public abstract class StreamPartitioner<T> implements
        ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {
    private static final long serialVersionUID = 1L;

    protected int numberOfChannels;

    @Override
    public void setup(int numberOfChannels) {
        this.numberOfChannels = numberOfChannels;
    }

    @Override
    public boolean isBroadcast() {
        return false;
    }

    public abstract StreamPartitioner<T> copy();
}

1.3 继承关系图

在这里插入图片描述

2.分区策略

2.1 GlobalPartitioner

该分区器会将所有的数据都发送到下游的某个算子实例(subtask id = 0)。

在这里插入图片描述

/**
 * 发送所有的数据到下游算子的第一个task(ID = 0)
 * @param <T>
 */
@Internal
public class GlobalPartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        //只返回0,即只发送给下游算子的第一个task
        return 0;
    }

    @Override
    public StreamPartitioner<T> copy() {
        return this;
    }

    @Override
    public String toString() {
        return "GLOBAL";
    }
}

2.2 ShufflePartitioner

随机选择一个下游算子实例进行发送。

在这里插入图片描述

/**
 * 随机的选择一个channel进行发送
 * @param <T>
 */
@Internal
public class ShufflePartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    private Random random = new Random();

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        //产生[0,numberOfChannels)伪随机数,随机发送到下游的某个task
        return random.nextInt(numberOfChannels);
    }

    @Override
    public StreamPartitioner<T> copy() {
        return new ShufflePartitioner<T>();
    }

    @Override
    public String toString() {
        return "SHUFFLE";
    }
}

2.3 BroadcastPartitioner

发送到下游所有的算子实例。

在这里插入图片描述

/**
 * 发送到所有的channel
 */
@Internal
public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;
    /**
     * Broadcast模式是直接发送到下游的所有task,所以不需要通过下面的方法选择发送的通道
     */
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
    }

    @Override
    public boolean isBroadcast() {
        return true;
    }

    @Override
    public StreamPartitioner<T> copy() {
        return this;
    }

    @Override
    public String toString() {
        return "BROADCAST";
    }
}

2.4 RebalancePartitioner

通过循环的方式依次发送到下游的 task

在这里插入图片描述

/**
 *通过循环的方式依次发送到下游的task
 * @param <T>
 */
@Internal
public class RebalancePartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    private int nextChannelToSendTo;

    @Override
    public void setup(int numberOfChannels) {
        super.setup(numberOfChannels);
        //初始化channel的id,返回[0,numberOfChannels)的伪随机数
        nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
    }

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        //循环依次发送到下游的task,比如:nextChannelToSendTo初始值为0,numberOfChannels(下游算子的实例个数,并行度)值为2
        //则第一次发送到ID = 1的task,第二次发送到ID = 0的task,第三次发送到ID = 1的task上...依次类推
        nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
        return nextChannelToSendTo;
    }

    public StreamPartitioner<T> copy() {
        return this;
    }

    @Override
    public String toString() {
        return "REBALANCE";
    }
}

2.5 RescalePartitioner

基于上下游 Operator 的并行度,将记录以循环的方式输出到下游 Operator 的每个实例。

举例:

  • 上游并行度是 2,下游是 4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。
  • 若上游并行度是 4,下游并行度是 2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。

在这里插入图片描述

@Internal
public class RescalePartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    private int nextChannelToSendTo = -1;

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        if (++nextChannelToSendTo >= numberOfChannels) {
            nextChannelToSendTo = 0;
        }
        return nextChannelToSendTo;
    }

    public StreamPartitioner<T> copy() {
        return this;
    }

    @Override
    public String toString() {
        return "RESCALE";
    }
}

Flink 中的执行图可以分成四层:StreamGraphJobGraphExecutionGraph物理执行图

  • StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
  • JobGraph:StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化 / 反序列化 / 传输消耗。
  • ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构。
  • 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个 TaskManager 上部署 Task 后形成的 “”,并不是一个具体的数据结构。

StreamingJobGraphGenerator 就是 StreamGraph 转换为 JobGraph。在这个类中,把 ForwardPartitionerRescalePartitioner 列为 POINTWISE 分配模式,其他的为 ALL_TO_ALL 分配模式。代码如下:

if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {
            jobEdge = downStreamVertex.connectNewDataSetAsInput(
                headVertex,
                // 上游算子(生产端)的实例(subtask)连接下游算子(消费端)的一个或者多个实例(subtask)
                DistributionPattern.POINTWISE,
                resultPartitionType);
        } else {
            jobEdge = downStreamVertex.connectNewDataSetAsInput(
                headVertex,
                // 上游算子(生产端)的实例(subtask)连接下游算子(消费端)的所有实例(subtask)
                DistributionPattern.ALL_TO_ALL,
                resultPartitionType);
        }

2.6 ForwardPartitioner

发送到下游对应的第一个 task,保证上下游算子并行度一致,即上游算子与下游算子是 1 : 1 1:1 1:1 的关系。

在这里插入图片描述

/**
 * 发送到下游对应的第一个task
 * @param <T>
 */
@Internal
public class ForwardPartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        return 0;
    }

    public StreamPartitioner<T> copy() {
        return this;
    }

    @Override
    public String toString() {
        return "FORWARD";
    }
}

在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用 ForwardPartitioner,否则使用 RebalancePartitioner,对于 ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常。

//在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用ForwardPartitioner,否则使用RebalancePartitioner
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
    partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
    partitioner = new RebalancePartitioner<Object>();
}

if (partitioner instanceof ForwardPartitioner) {
    //如果上下游的并行度不一致,会抛出异常
    if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
        throw new UnsupportedOperationException("Forward partitioning does not allow " +
            "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
            ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
            " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
    }
}

2.7 KeyGroupStreamPartitioner

根据 key 的分组索引选择发送到相对应的下游 subtask

在这里插入图片描述

  • org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
/**
 * 根据key的分组索引选择发送到相对应的下游subtask
 * @param <T>
 * @param <K>
 */
@Internal
public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implements ConfigurableStreamPartitioner {
...

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
        }
        //调用KeyGroupRangeAssignment类的assignKeyToParallelOperator方法,代码如下所示
        return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
    }
...
}
  • org.apache.flink.runtime.state.KeyGroupRangeAssignment
public final class KeyGroupRangeAssignment {
...

    /**
     * 根据key分配一个并行算子实例的索引,该索引即为该key要发送的下游算子实例的路由信息,
     * 即该key发送到哪一个task
     */
    public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
        Preconditions.checkNotNull(key, "Assigned key must not be null!");
        return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
    }

    /**
     *根据key分配一个分组id(keyGroupId)
     */
    public static int assignToKeyGroup(Object key, int maxParallelism) {
        Preconditions.checkNotNull(key, "Assigned key must not be null!");
        //获取key的hashcode
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
    }

    /**
     * 根据key分配一个分组id(keyGroupId),
     */
    public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {

        //与maxParallelism取余,获取keyGroupId
        return MathUtils.murmurHash(keyHash) % maxParallelism;
    }

    //计算分区index,即该key group应该发送到下游的哪一个算子实例
    public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
        return keyGroupId * parallelism / maxParallelism;
    }
...

2.8 CustomPartitionerWrapper

通过 Partitioner 实例的 Partition 方法(自定义的)将记录输出到下游。

public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    Partitioner<K> partitioner;
    KeySelector<T, K> keySelector;

    public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
        this.partitioner = partitioner;
        this.keySelector = keySelector;
    }

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
        }
		//实现Partitioner接口,重写partition方法
        return partitioner.partition(key, numberOfChannels);
    }

    @Override
    public StreamPartitioner<T> copy() {
        return this;
    }

    @Override
    public String toString() {
        return "CUSTOM";
    }
}

比如:

public class CustomPartitioner implements Partitioner<String> {
      // key: 根据key的值来分区
      // numPartitions: 下游算子并行度
      @Override
      public int partition(String key, int numPartitions) {
         return key.length() % numPartitions;//在此处定义分区策略
      }
  }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1504561.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

手机APP测试——如何进行安装、卸载、运行?

手机APP测试——主要针对的是安卓( Android )和苹果IOS两大主流操作系统,主要考虑的就是功能性、兼容性、稳定性、易用性、性能等测试&#xff0c;今天先来讲讲如何进行安装、卸载、运行的内容。 一、App安装 1、点击运行APP安装包,检测安装包是否正常; . 2、进入[安装向导]…

Java17 --- SpringCloud之OpenFeign

目录 一、OpenFeign实现服务调用 1.1、创建openfeign微服务 二、Openfeign超时控制 2.1、全局默认配置 2.2、单个微服务配置 三、重试机制 四、替换openfeign默认的HttpClient 五、请求响应压缩 六、日志打印 一、OpenFeign实现服务调用 1.1、创建openfeign微服…

LLM长上下文外推方法

现在的LLM都集中在卷上下文长度了&#xff0c;最新的Claude3已经支持200K的上下文&#xff0c;见&#xff1a;cost-context。下面是一些提升LLM长度外推能力的方法总结&#xff1a; 数据工程 符尧大佬的最新工作&#xff1a;Data Engineering for Scaling Language Models to …

[虚拟机保护逆向] [HGAME 2023 week4]vm

[虚拟机保护逆向] [HGAME 2023 week4]vm 虚拟机逆向的注意点&#xff1a;具体每个函数的功能&#xff0c;和其对应的硬件编码的*长度* 和 *含义*&#xff0c;都分析出来后就可以编写脚本将题目的opcode转化位vm实际执行的指令 &#xff1a;分析完成函数功能后就可以编写脚本输出…

c++ primer plus 笔记 第十六章 string类和标准模板库

string类 string自动调整大小的功能&#xff1a; string字符串是怎么占用内存空间的&#xff1f; 前景&#xff1a; 如果只给string字符串分配string字符串大小的空间&#xff0c;当一个string字符串附加到另一个string字符串上&#xff0c;这个string字符串是以占用…

Spring web开发(入门)

1、我们在执行程序时&#xff0c;运行的需要是这个界面 2、简单的web接口&#xff08;127.0.0.1表示本机IP&#xff09; package com.example.demo;import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestCont…

代码学习记录15

随想录日记part15 t i m e &#xff1a; time&#xff1a; time&#xff1a; 2024.03.09 主要内容&#xff1a;今天的主要内容是二叉树的第四部分&#xff0c;主要涉及平衡二叉树的建立&#xff1b;二叉树的路径查找&#xff1b;左叶子之和&#xff1b;找树左下角的值&#xff…

考研复习C语言初阶(4)+标记和BFS展开的扫雷游戏

目录 1. 一维数组的创建和初始化。 1.1 数组的创建 1.2 数组的初始化 1.3 一维数组的使用 1.4 一维数组在内存中的存储 2. 二维数组的创建和初始化 2.1 二维数组的创建 2.2 二维数组的初始化 2.3 二维数组的使用 2.4 二维数组在内存中的存储 3. 数组越界 4. 冒泡…

3.DOM-事件进阶(事件对象、事件委托)

环境对象this 环境对象本质上是一个关键字 this this所在的代码区域不同&#xff0c;代表的含义不同 全局作用域中的this 全局作用域中this代表window对象 局部作用域中的this 在局部作用域中(函数中)this代表window对象 原因是函数调用的时候简写了&#xff0c;函数完整写…

Go语言数据结构(二)堆/优先队列

文章目录 1. container中定义的heap2. heap的使用示例3. 刷lc应用堆的示例 更多内容以及其他Go常用数据结构的实现在这里&#xff0c;感谢Star&#xff1a;https://github.com/acezsq/Data_Structure_Golang 1. container中定义的heap 在golang中的"container/heap"…

[数据集][目标检测]变电站缺陷检测数据集VOC+YOLO格式8307张17类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;8307 标注数量(xml文件个数)&#xff1a;8307 标注数量(txt文件个数)&#xff1a;8307 标注…

Java8 CompletableFuture异步编程-进阶篇

&#x1f3f7;️个人主页&#xff1a;牵着猫散步的鼠鼠 &#x1f3f7;️系列专栏&#xff1a;Java全栈-专栏 &#x1f3f7;️个人学习笔记&#xff0c;若有缺误&#xff0c;欢迎评论区指正 前言 我们在前面文章讲解了CompletableFuture这个异步编程类的基本用法&#xff0c;…

【操作系统概念】第11章:文件系统实现

文章目录 0.前言11.1 文件系统结构11.2 文件系统实现11.2.1 虚拟文件系统 11.3 分配方法11.3.1 连续分配11.3.2 链接分配11.3. 3 索引分配 11.5 空闲空间管理11.5.1 位图/位向量11.5.2 链表11.5.3 组 0.前言 正如第10章所述&#xff0c;文件系统提供了机制&#xff0c;以在线存…

【数据分享】2000-2022年全国1km分辨率的逐年PM2.5栅格数据(免费获取)

PM2.5作为最主要的空气质量指标&#xff0c;在我们日常研究中非常常用&#xff01;之前我们给大家分享了2013-2022年全国范围逐日的PM2.5栅格数据&#xff08;可查看之前的文章获悉详情&#xff09;&#xff01; 本次我们给大家带来的是2000-2022年全国范围的逐年的PM2.5栅格数…

树莓派4B Ubuntu20.04 Python3.9安装ROS踩坑记录

问题描述 在使用sudo apt-get update命令更新时发现无法引入apt-pkg,使用python3 -c "import apt_pkg"发现无法引入&#xff0c;应该是因为&#xff1a;20.04的系统默认python是3.8&#xff0c;但是我换成了3.9所以没有编译文件&#xff0c;于是使用sudo update-alte…

K8S - 在任意node里执行kubectl 命令

当我们初步安装玩k8s &#xff08;master 带 2 nodes&#xff09; 时 正常来讲kubectl 只能在master node 里运行 当我们尝试在某个 node 节点来执行时&#xff0c; 通常会遇到下面错误 看起来像是访问某个服务器的8080 端口失败了。 原因 原因很简单 , 因为k8s的各个组建&…

思科网络中如何配置标准ACL协议

一、什么是标准ACL协议&#xff1f;有什么作用及配置方法&#xff1f; &#xff08;1&#xff09;标准ACL&#xff08;Access Control List&#xff09;协议是一种用于控制网络设备上数据流进出的协议。标准ACL基于源IP地址来过滤数据流&#xff0c;可以允许或拒绝特定IP地址范…

微信私信短剧机器人源码

本源码仅提供参考&#xff0c;有能力的继续开发 接口为api调用 云端同步 https://ys.110t.cn/api/ajax.php?actyingshilist 影视搜索 https://ys.110t.cn/api/ajax.php?actsearch&name剧名 每日更新 https://ys.110t.cn/api/ajax.php?actDaily 反馈接口 https://ys.11…

机器学习-pytorch1(持续更新)

上一节我们学习了机器学习的线性模型和非线性模型的机器学习基础知识&#xff0c;这一节主要将公式变为代码。 代码编写网站&#xff1a;https://colab.research.google.com/drive 学习课程链接&#xff1a;ML 2022 Spring 1、Load Data&#xff08;读取数据&#xff09; 这…

Chain of Verification(验证链、CoVe)—理解与实现

原文地址&#xff1a;Chain of Verification (CoVe) — Understanding & Implementation 2023 年 10 月 9 日 GitHub 存储库 介绍 在处理大型语言模型&#xff08;LLM&#xff09;时&#xff0c;一个重大挑战&#xff0c;特别是在事实问答中&#xff0c;是幻觉问题。当答案…