Flink之Partitioner(分区规则)

news2024/10/6 18:29:30

Flink之Partitioner(分区规则)

方法注释
global()全部发往1个task
broadcast()广播(前面的文章讲解过,这里不做阐述)
forward()上下游并行度一致时一对一发送,和同一个算子连中算子的OneToOne是一回事
shuffle()随机分配(只是随机,同Spark的shuffle不同)
rebalance()轮询分配,默认机制就是rebalance()
recale()一般是下游task是上游task的并行度的倍数时,在生成job时,会将下游中的某几个subtask和上游的某个subtask绑成一组,然后在组内上游subtask以轮询的方式将数据发送给下游的subtask.
partitionCustom自定义分区器(这里不做演示,后续会单独写一个自定义分区器的内容)
keyBy()根据数据key的HashCode进行Hash分配
  • global

    global在实际业务场景中使用的不是很多,一般都是需要全局数据汇总的时候才会用到.global就是将上游的数据全部发往下游的第一个subtask中,也就是说下游设置再多的并行度是没意义的,所以使用global的时候,下游的task的并行度都是1.
    在这里插入图片描述
    这里结合代码看一下:

    public class FlinkPartitioner {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            conf.setInteger("rest.port", 8081);
            // 开启本地WebUI,构建流环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
            // 添加数据源,Socket
            DataStreamSource<String> sourceStream = env.socketTextStream("localhost", 9999);
            // 转大写,设置并行度为3,且设置数据分区方式为global
            DataStream<String> upperCaseMapStream = sourceStream.map(s -> s.toUpperCase()).setParallelism(3).global();
            // 切分字符串,设置并行度为1
            SingleOutputStreamOperator<String> splitFlatMapStream = upperCaseMapStream.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String value, Collector<String> out) throws Exception {
                    String[] split = value.split(",");
                    for (String s : split) {
                        out.collect(s);
                    }
                }
            }).setParallelism(1);
    
            //......
            env.execute("Flink Partitioner");
        }
    }
    

    WebUI界面查看代码中upperCaseMapStreamsplitFlatMapStream之间数据的发送方式
    在这里插入图片描述

  • forward

    forward其实就是一对一发送数据,和之前讲解Task的文章中提到的算子之间OneToOne的模式是一样的,就是可以将forward理解为同一个task chain[算子链]中算子之间的数据传输方式,但是使用forward的前提是上下游的算子并行度是一致的也就是上下游的subtask数量保持一致,图解如下:
    在这里插入图片描述

    代码内容如下:

    public class FlinkPartitioner {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            conf.setInteger("rest.port", 8081);
            // 开启本地WebUI,构建流环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
            // 添加数据源,Socket
            DataStreamSource<String> sourceStream = env.socketTextStream("localhost", 9999);
            // 转大写,设置为forward分区方式
            DataStream<String> upperCaseMapStream = sourceStream.map(s -> s.toUpperCase()).setParallelism(3).forward();
            // 切分字符串
            SingleOutputStreamOperator<String> splitFlatMapStream = upperCaseMapStream.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String value, Collector<String> out) throws Exception {
                    String[] split = value.split(",");
                    for (String s : split) {
                        out.collect(s);
                    }
                }
            }).setParallelism(3).startNewChain(); // 这里加上.startNewChain是为了在WebUI能看到效果,因为upperCaseMapStream和splitFlatMapStream的并行度是一致的,不加startNewChain默认的机制会将两者划分到同一个算子链中,就看不到实际的效果了.
            // ...
    
            env.execute("Flink Partitioner");
        }
    }
    

    WebUI界面查看upperCaseMapStreamsplitFlatMapStream的数据发送方式,如下:
    在这里插入图片描述

  • shuffle

    通过前面WebUI的图片我们可以看到,从Socket数据源将数据发送到第一个map的时候使用的是默认的rebalance方式,也就是轮询发送的方式,而这里说的shuffle虽然也是一对多的发送方式,但是发送数据时是随机的,举个例子,上游有3subtask,下游有5subtask,数据源有5条数据,上游中的某一个subtask向下游发送数据时,是随机发送的,下游的5subtask并不是每个都一定能接受到数据,可能有的接收到1条,有的接收到2条,有的接收到3条数据,这就是shuffle发送数据的方式.

    如果说上两个operator并行度一致,上游选择了shuffle发送数据的方式,那么两个operator会绑定成一个task chain么?不会,因为shuffle的数据发送方式就已经导致两个operator不是OneToOne的模式了.
    在这里插入图片描述
    代码示例:

    public class FlinkPartitioner {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            conf.setInteger("rest.port", 8081);
            // 开启本地WebUI,构建流环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
            // 添加数据源,Socket
            DataStreamSource<String> sourceStream = env.socketTextStream("localhost", 9999);
            // 转大写,设置为shuffle分区方式
            DataStream<String> upperCaseMapStream = sourceStream.map(s -> s.toUpperCase()).setParallelism(3).shuffle();
            // 切分字符串
            SingleOutputStreamOperator<String> splitFlatMapStream = upperCaseMapStream.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String value, Collector<String> out) throws Exception {
                    String[] split = value.split(",");
                    for (String s : split) {
                        out.collect(s);
                    }
                }
            }).setParallelism(7)
            // ...
    
            env.execute("Flink Partitioner");
        }
    }
    

    WebUI界面查看upperCaseMapStreamsplitFlatMapStream的数据发送方式,如下:
    在这里插入图片描述

  • Rebalance

    rebalance就是Flink默认的数据分发机制,直白的讲就是给每个小朋友一人一块糖果,直到发完为止,不偏不倚,这个不细说了,没什么可讲的.
    在这里插入图片描述

  • recale

    关于recale前面说到了是组内的方式进行轮询分发数据,这里就以图解的方式进行讲解,便于理解.

    Flink任务启动时,如果发现上下游中使用了recale分发数据的方式就会将上下游的subtask进行分组绑定,如上游有2个subtask,下游有四个subtask,就会将上游的一个subtask和下游的两个subtask进行绑定,如下图:
    在这里插入图片描述

    当上下游对应的subtask分组后,上下游组内的subtak就会以组内轮询的方式发送数据,如下图:
    在这里插入图片描述

  • keyBy

    keyBy使用的HASH分区方式,实际是hashCode() + murmurHash()的组合方式,这个在源码的KeyGroupRangeAssignment类中是可以看到的,简单来说根据keyhash值模除以下游的最大并行度(return MathUtils.murmurHash(keyHash) % maxParallelism;).

    关于keyBy的使用应该都很熟悉了,这里直接给大家看演示结果吧,如下图:
    在这里插入图片描述

以上就是对Flink中分区规则的讲解.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/886996.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

排序算法-冒泡排序(C语言实现)

简介&#x1f600; 冒泡排序是一种简单但效率较低的排序算法。它重复地扫描待排序元素列表&#xff0c;比较相邻的两个元素&#xff0c;并将顺序错误的元素交换位置&#xff0c;直到整个列表排序完成。 实现&#x1f9d0; 以下内容为本人原创&#xff0c;经过自己整理得出&am…

Python遍历多个子文件夹并基于文件名特征将文件复制到不同的目标文件夹

本文介绍基于Python语言&#xff0c;遍历一个大文件夹中大量的子文件夹&#xff0c;并将每一个子文件夹中大量的文件&#xff0c;按照每一个文件的文件名称的特点与差异&#xff0c;自动创建多个目标文件夹&#xff0c;并将指定文件复制到不同的目标文件夹中的方法。 首先&…

【云原生】【k8s】Kubernetes+EFK构建日志分析安装部署

目录 EFK安装部署 一、环境准备&#xff08;所有主机&#xff09; 1、主机初始化配置 2、配置主机名并绑定hosts&#xff0c;不同主机名称不同 3、主机配置初始化 4、部署docker环境 二、部署kubernetes集群 1、组件介绍 2、配置阿里云yum源 3、安装kubelet kubeadm …

《零基础7天入门Arduino物联网-05》电路基础知识下

配套视频课程&#xff1a;《零基础学Arduino物联网&#xff0c;入门到进阶》 配套课件资料获取&#xff1a;微联实验室 配套学习套件购买&#xff1a;淘宝搜索店铺【微联实验室】 电阻、电容、电感和二极管 电阻 电阻用于电路中&#xff0c;其主要功能在于控制电流的流动和阻…

开源低代码平台Openblocks

网友 HankMeng 想看低代码工具&#xff0c;正好手上有一个&#xff1b; 什么是 Openblocks &#xff1f; Openblocks 是一个开发人员友好的开源低代码平台&#xff0c;可在几分钟内构建内部应用程序。 传统上&#xff0c;构建内部应用程序需要复杂的前端和后端交互&#xff0c;…

元宇宙展厅:打造新型“人货场”发展趋势

在过去几年&#xff0c;元宇宙的话题被多次提起&#xff0c;随着VR技术的不断发展&#xff0c;元宇宙营销逐步出圈&#xff0c;通过沉浸式、互动式的虚拟环境&#xff0c;为消费者打造更加身临其境的体验。 元宇宙展厅构建新型“人货场”关系&#xff0c;将商品搬进元宇宙空间是…

安卓纯代码布局开发游戏二:Android Studio开发环境搭建

1.Android Studio下载&#xff1a; Download Android Studio & App Tools - Android Developers 2.安装 安装过程非常简单&#xff0c;找到下载包&#xff0c;一直点Next即可。 3.下载Android SDK 第一次进入Android Studio默认会先下载Android SDK,笔者下载的Android SDK存…

SpringBoot集成KoTime

koTime是一个开源免费的springboot项目性能分析工具&#xff0c;通过追踪方法调用链路以及对应的运行时长快速定位性能瓶颈&#xff0c;除此之外&#xff0c;代码热更新、异常检测都可以有&#xff01; 一.添加KoTime依赖&#xff1a; <dependency><groupId>cn.l…

【深度学习--RNN 循环神经网络--附LSTM情感文本分类】

deep learning 系列 --RNN 循环神经网络 什么是序列模型 包括了RNN LSTM GRU等网络模型&#xff0c;主要用途是自然语言处理、语音识别等方面&#xff0c;比如生成乐曲&#xff0c;音频转换为文字&#xff0c;文本情感分类&#xff0c;机器翻译等等 标准模型的缺陷 以往的标…

Windows上使用FFmpeg实现本地视频推送模拟海康协议rtsp视频流

场景 Nginx搭建RTMP服务器FFmpeg实现海康威视摄像头预览&#xff1a; Nginx搭建RTMP服务器FFmpeg实现海康威视摄像头预览_nginx rtmp 海康摄像头_霸道流氓气质的博客-CSDN博客 上面记录的是使用FFmpeg拉取海康协议摄像头的rtsp流并推流到流媒体服务器。 如果在其它业务场景…

ppt中线材相交接的地方,如何绘画

ppt中线材相交接的地方&#xff1a; 在ppt中绘画线材相互交接的地方&#xff1a; 1.1绘图工具中的“弧形” 1.2小技巧 “弧形”工具点一下&#xff0c;在ppt中如下 1.3拖动活动点进行调整图形 1.4绘画圆弧 1.5调整“圆弧”的大小&#xff0c;鼠标放在“黄色点”位置&#xf…

爬虫逆向实战(十七)--某某丁简历登录

一、数据接口分析 主页地址&#xff1a;某某丁简历 1、抓包 通过抓包可以发现数据接口是submit 2、判断是否有加密参数 请求参数是否加密&#xff1f; 通过查看“载荷”模块可以发现有一个enPassword加密参数 请求头是否加密&#xff1f; 通过查看请求头可以发现有一个To…

React 高阶组件(HOC)

React 高阶组件(HOC) 高阶组件不是 React API 的一部分&#xff0c;而是一种用来复用组件逻辑而衍生出来的一种技术。 什么是高阶组件 高阶组件就是一个函数&#xff0c;且该函数接受一个组件作为参数&#xff0c;并返回一个新的组件。基本上&#xff0c;这是从 React 的组成…

部署MES管理系统首先要解决什么问题

随着制造业市场竞争的加剧&#xff0c;企业需要更加高效、灵活的生产运营&#xff0c;以提高产品质量和降低成本。在这种情况下&#xff0c;MES管理系统解决方案成为许多企业的选择。然而&#xff0c;在部署MES管理系统之前&#xff0c;必须首先解决一些关键问题&#xff0c;以…

ReactNative进阶(三十四):ipa Archive 阶段报错error: Multiple commands produce问题修复及思考

文章目录 一、前言二、问题描述三、问题解决四、拓展阅读五、拓展阅读 一、前言 在应用RN开发跨平台APP阶段&#xff0c;从git中拉取项目&#xff0c;应用Jenkins进行组包时&#xff0c;发现最终生成的ipa安装包版本号始终与项目中设置的版本号不一致。 二、问题描述 经过仔…

常见排序集锦-C语言实现数据结构

目录 排序的概念 常见排序集锦 1.直接插入排序 2.希尔排序 3.选择排序 4.堆排序 5.冒泡排序 6.快速排序 hoare 挖坑法 前后指针法 非递归 7.归并排序 非递归 排序实现接口 算法复杂度与稳定性分析 排序的概念 排序 &#xff1a;所谓排序&#xff0c;就是使一串记录&#…

PHP 报修管理系统mysql数据库web结构apache计算机软件工程网页wamp

一、源码特点 PHP 报修管理系统 是一套完善的web设计系统&#xff0c;对理解php编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。 下载地址&#xff1a; https://download.csdn.net/download/qq_41221322/88209950 视…

一次网络不通 “争吵” 引发的思考

为啥争吵&#xff0c;吵什么&#xff1f; "你到底在说什么啊&#xff0c;我 K8s 的 ecs 节点要访问 clb 的地址不通和本地网卡有什么关系..." 气愤语气都从电话那头传了过来&#xff0c;这时电话两端都沉默了。过了好一会传来地铁小姐姐甜美的播报声打断了刚刚的沉寂…

【校招VIP】java语言考点之List和扩容

考点介绍&#xff1a; List是最基础的考点&#xff0c;但是很多同学拿不到满分。本专题从两种实现子类的比较&#xff0c;到比较复杂的数组扩容进行分析。 『java语言考点之List和扩容』相关题目及解析内容可点击文章末尾链接查看&#xff01;一、考点题目 1、以下关于集合类…

仪表板展示 | DataEase看中国:2023年中国电影市场分析

背景介绍 随着《消失的她》、《变形金刚&#xff1a;超能勇士崛起》、《蜘蛛侠&#xff1a;纵横宇宙》、《我爱你》等国内外影片的上映&#xff0c;2023年上半年的电影市场也接近尾声。据国家电影专资办初步统计&#xff0c;上半年全国城市院线票房达262亿元&#xff0c;已经超…