Flink的六种物理分区策略

news2024/9/23 1:24:06

文章目录

    • 物理分区(Physical Partitioning)
      • 🍕1.随机分区(shuffle)
      • 🍔2.轮询分区(Round-Robin)
      • 🍟3. 重缩放分区(rescale)
      • 🌭4.广播(broadcast)
      • 🥙5.全局分区
      • 🫔6.自定义分区

😃😃😃😃😃

githubgithubgithubgithubgithubgithubgithubgithubgithubgithubgithubgithubgithub

更多资源链接,欢迎访问作者gitee仓库:https://gitee.com/fanggaolei/learning-notes-warehouse/tree/master

物理分区(Physical Partitioning)

  为了同keyBy相区别,我们把这些操作统称为“物理分区”操作。物理分区与keyBy另一大区别在于,keyBy之后得到的是一个KeyedStream,而物理分区之后结果仍是DataStream,且流中元素数据类型保持不变。从这一点也可以看出,分区算子并不对数据进行转换处理,只是定义了数据的传输方式。

🍕1.随机分区(shuffle)

  最简单的重分区方式就是直接“洗牌”。通过调用DataStream的.shuffle()方法,将数据随机地分配到下游算子的并行任务中去。

​   随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区。因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同。=

image-20221117105745301

package com.fang.chapter05;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ShuffleTest {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Event> stream = env.fromElements(
                new Event("Marry", "./home", 1000L),
                new Event("Bob", "./prod?id=1", 1000L),
                new Event("Li", "./home", 3500L),
                new Event("Bob", "./prod?id=2", 3200L),
                new Event("Marry", "./home", 1200L),
                new Event("Bob", "./prod?id=3", 110L),
                new Event("Anna", "./home", 3550L),
                new Event("Li", "./prod?id=4", 3210L)
        );

        //经洗牌后打印输出,并行度为 4
        stream.shuffle().print("shuffle").setParallelism(4);

        env.execute();
    }
}

image-20221117104400843

🍔2.轮询分区(Round-Robin)

  轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分发。通过调用DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance 使用的是Round-Robin负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。

image-20221117105732840

package com.fang.chapter05;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ShuffleTest {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Event> stream = env.fromElements(
                new Event("Marry", "./home", 1000L),
                new Event("Bob", "./prod?id=1", 1000L),
                new Event("Li", "./home", 3500L),
                new Event("Bob", "./prod?id=2", 3200L),
                new Event("Marry", "./home", 1200L),
                new Event("Bob", "./prod?id=3", 110L),
                new Event("Anna", "./home", 3550L),
                new Event("Li", "./prod?id=4", 3210L)
        );



        // 经轮询重分区后打印输出,并行度为 4
        stream.rebalance().print("rebalance").setParallelism(4);
        //stream.print("rebalance").setParallelism(4);//默认使用
        env.execute();
    }
}

image-20221117104328853

🍟3. 重缩放分区(rescale)

  重缩放分区和轮询分区非常相似。当调用rescale()方法时,其实底层也是使用Round-Robin 算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中。也就是说,“发牌人”如果有多个,那么rebalance的方式是每个发牌人都面向所有人发牌;而rescale 的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。

image-20221117105759339

package com.fang.chapter05;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class ShuffleTest2 {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.addSource(new RichParallelSourceFunction<Integer>() {

            @Override
            public void run(SourceContext<Integer> sourceContext) throws Exception {
                for (int i = 0; i < 8; i++) {
                    // 将奇数发送到索引为 1 的并行子任务
                    // 将偶数发送到索引为 0 的并行子任务
                    if ((i + 1) % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
                        sourceContext.collect(i + 1);
                    }
                }
            }

            @Override
            public void cancel() {

            }
        }).setParallelism(2).rescale().print().setParallelism(4);

        env.execute();
    }
}

image-20221117104953515

🌭4.广播(broadcast)

  这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用DataStream的broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。

package com.fang.chapter05;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ShuffleTest {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 读取数据源,并行度为 1
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=1", 5 * 1000L),
                new Event("Cary", "./home", 60 * 1000L)
        );
        // 经广播后打印输出,并行度为 4
        stream.broadcast().print("broadcast").setParallelism(4);
        env.execute();

    }
}

image-20221117104723132

🥙5.全局分区

  全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。

package com.fang.chapter05;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ShuffleTest {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 读取数据源,并行度为 1
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=1", 5 * 1000L),
                new Event("Cary", "./home", 60 * 1000L)
        );
        // 经广播后打印输出,并行度为 4
        stream.global().print().setParallelism(4);
        env.execute();

    }
}

image-20221117104857452

🫔6.自定义分区

  当 Flink 提 供 的 所 有 分 区 策 略 都 不 能 满 足 用 户 的 需 求 时 , 我 们 可 以 通 过 使 用partitionCustom()方法来自定义分区策略。
  在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner)对象,第二个是应用分区器的字段,它的指定方式与 keyBy 指定 key 基本一样:可以通过字段名称指定,也可以通过字段位置索引来指定,还可以实现一个 KeySelector

package com.fang.chapter05;

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ShuffleTest {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.fromElements(1, 2, 3, 4, 5, 6, 7, 8)
                .partitionCustom(new Partitioner<Integer>() {
                    @Override
                    public int partition(Integer key, int numPartitions) {
                        return key % 2;
                    }
                }, new KeySelector<Integer, Integer>() {
                    @Override
                    public Integer getKey(Integer value) throws Exception {
                        return value;
                    }
                })
                .print().setParallelism(2);

        env.execute();

    }
}

image-20221117104038383

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/12593.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深度学习入门(三十八)计算性能——多GPU训练

深度学习入门&#xff08;三十八&#xff09;计算性能——多GPU训练前言计算性能——多GPU训练课件多GPU并行数据并行VS模型并行数据并行总结教材1 问题拆分2 数据并行性3 简单网络4 数据同步5 数据分发6 训练7 小结多GPU的简洁实现1简单网络2 网络初始化3 训练4 小结前言 核心…

因子特征工程:alphalens库深度解析

原创文章第107篇&#xff0c;专注“个人成长与财富自由、世界运作的逻辑&#xff0c; AI量化投资”。 前面的文章我们把数据&#xff0c;因子定制&#xff0c;自动标注的功能都准备好了&#xff0c;今天继续因子分析&#xff0c;分析的框架当然还是alphalens。 星球有一期研报…

Babel插件指南

Babel插件指南 文章目录Babel插件指南Babel简介AST(Abstract syntax tree)简介ESTree AST NodeVisitors&#xff08;访问者&#xff09;Babel APIbabylonbabel-traversebabel generator项目中实践引用自定义的babel插件新增插件js文件确定要实现的功能&#xff0c;编译成AST进行…

绘制花朵-第13届蓝桥杯Scratch选拔赛真题精选

[导读]&#xff1a;超平老师计划推出Scratch蓝桥杯真题解析100讲&#xff0c;这是超平老师解读Scratch蓝桥真题系列的第78讲。 蓝桥杯选拔赛每一届都要举行4~5次&#xff0c;和省赛、国赛相比&#xff0c;题目要简单不少&#xff0c;再加上篇幅有限&#xff0c;因此我精挑细选…

OpenYurt v1.1.0: 新增 DaemonSet 的 OTA 和 Auto 升级策略

作者&#xff1a;昌蒲、侯雪城 边缘计算云原生平台、CNCF SandBox 项目 - OpenYurt [1 ] &#xff0c;近期发布了 v1.1.0 版本。 OpenYurt 作为边缘云原生领域的开源项目&#xff0c;采用云管边的云边一体化架构&#xff0c;致力于解决云原生落地边缘计算场景的痛点问题。针对…

扩充antd的Icon图标库

一、功能介绍 项目中有个菜单图标支持配置的功能&#xff0c;如下 二、遇到的问题 上面的图标都是antdIcon组件自带的&#xff0c;只需要给Icon传不同的type就可以显示出来不同的图标&#xff0c;但是我现在需要将自己的图标也放到这个里面&#xff0c;而且实现通过传个type…

asp.net+sqlserver个人简历生成系统C#项目

目 录 1 项目来源 1 1.1 项目背景 1 1.2目的和意义 1 1.3研究成果 2 2 系统开发环境 3 2.1 Visual Studio.NET开发平台 3 2.2 ASP.NET 2.0开发技术 3 2.3 ADO.NET数据访问技术 4 2.4 Microsoft SQL Server简介 4 2.5 B/S结构 5 3 需求分析 6…

服务端Skynet(五)——如何搭建一个实例

服务端Skynet(五)——如何搭建一个实例 文章目录服务端Skynet(五)——如何搭建一个实例1、配置文件2、服务消息分发与回应(call/send)3、通信(server/client)4、Mysql连接1、配置文件 ​ 搭建一个实例 主要看 config 文件的设置&#xff0c;如下&#xff1a; --config inclu…

RK3399驱动开发 | 15 - RTC实时时钟芯片HYM8563S调试(基于linux5.4.32内核)

文章目录 一、Linux RTC设备驱动框架二、HYM8563实时时钟芯片1. 简介2. 引脚图3. 连接原理图三、设备驱动调试1. 设备树节点描述2. 使能内核驱动3. 测试四、hym8563驱动实现分析1. i2c设备驱动框架2. rtc设备注册流程3. 通过i2c驱动操作硬件一、Linux RTC设备驱动框架 Linux内…

宝塔防火墙必要的快速操作指令

重新启动、禁止固定ip等 重启firewall-cmd --reload 禁止固定ip&#xff1a;firewall-cmd --permanent --add-rich-rulerule family"ipv4" source address"192.168.1.1" reject 取消富规则&#xff1a;firewall-cmd --list-rich-rules 删除富规则&#…

Java#9(文字格斗游戏和对象数组练习)

目录 一.文字格斗游戏 二.对象数组 三.键盘录入练习 四.复杂对象数组练习 题目要求: 一.文字格斗游戏 Role类的代码 package Game;import java.util.Random;public class Role {String name;int blood;public Role() {}public Role(String name, int blood) {this.name na…

Node.js 流 Stream【详解】

什么是流&#xff1f; 流是一种将整体数据分割成多个小块依次进行处理的方式。 举个形象的例子&#xff1a; 山上有1000颗拳头大的小石子&#xff0c;需要搬下山。 传统的处理方式&#xff1a;安排一辆大卡车&#xff0c;一次性将石子全部运下山。流的处理方式&#xff1a;修…

Nginx制作下载站点

nginx使用的是模块ngx_http_autoindex_module来实现的&#xff0c;该模块处理以斜杠(“/”)结尾的请求&#xff0c;并生成目录列表。 nginx编译的时候会自动加载该模块&#xff0c;但是该模块默认是关闭的&#xff0c;使用下来指令来完成对应的配置 autoindex 启用或禁用目录…

医疗器械许可证怎么办理

医疗器械经营许可证申请条件 1.有两个与业务规模和业务范围相适应的质量管理机构或大专以上学历的质量管理人员。质量管理人员应具有国家认可的相关专业资格或职称&#xff1b; 2.具有与经营规模和范围相适应的相对独立的经营场所&#xff1b; 3.具备与经营规模和经营范围相…

解读OpenShift的逻辑架构和技术架构

01 OpenShift的逻辑架构 OpenShift的逻辑架构图如图2-6所示。 ▲图2-6 OpenShift逻辑架构 图2-6中的关键组件介绍如下。 底层基础设施:OpenShift可以运行在公有云(AWS、Azure、Google等)、私有云(OpenStack)、虚拟机(vSphere、RHV、红帽KVM)、X86、IBM Power/Z服务器上。…

跨域及cors解决跨域

1.什么是跨域 出于浏览器的同源策略限制。同源策略&#xff08;Sameoriginpolicy&#xff09;是一种约定&#xff0c;它是浏览器最核心也最基本的安全功能&#xff0c;如果缺少了同源策略&#xff0c;则浏览器的正常功能可能都会受到影响。可以说Web是构建在同源策略基础之上的…

DJ11 8086系列处理器(第二节课)

目录 一、8088CPU的系统总线 1. 最小模式 2. 最大模式 二、8086/8088 CPU 的功能结构 1. 8086/8088 CPU 的内部结构 2. 8086/8088 CPU 的内部寄存器 1&#xff09;通用寄存器 2&#xff09;段寄存器 3&#xff09;控制寄存器 三、8086/8088 CPU 的存储器组织 1. 物…

超级账本Fabric的世界状态操作与账本操作

在 Hyperledger Fabric 中&#xff0c;账本由两个不同但相关的部分组成 - 世界状态和区块链。 世界状态&#xff1a; 一个数据库&#xff0c;其中存储了一组帐本状态的当前值的缓存。世界状态使程序可以轻松地直接访问状态的当前值&#xff0c;而不必通过遍历整个交易日志来计…

PROTAC与抗体偶联药物的结合

PROTAC 的靶点真核生物的蛋白降解途径主要分为溶酶体途径、泛素蛋白酶体途径、胞液蛋白酶水解途径和线粒体蛋白酶途径等四种 (图1)。其中&#xff0c;PROTAC 所依赖的蛋白酶体途径主要针对细胞周期蛋白、转录因子、细胞表面受体以及胞内变性蛋白等进行降解。 图 1. 不同蛋白降…

《安富莱嵌入式周报》第291期:分分钟设计数字芯片,单片机版JS,神经网络DSP,microPLC,FatFS升级至V0.15,微软Arm64 VS正式版发布

往期周报汇总地址&#xff1a;嵌入式周报 - uCOS & uCGUI & emWin & embOS & TouchGFX & ThreadX - 硬汉嵌入式论坛 - Powered by Discuz! 视频版&#xff1a; https://www.bilibili.com/video/BV1Dd4y1b74x 《安富莱嵌入式周报》第291期&#xff1a;分分…