1.flink快速入门

news2024/9/24 7:21:39

前言

下图表示的是一个简单的flink-job的计算图,这种图被称为DAG(有向无环图),表示的这个任务的计算逻辑,无论是spark、hive、还是flink都会把用户的计算逻辑转换为这样的DAG,数据的计算按照DAG触发,理论上只要构建出这样一个DAG图,就可以描述清楚用户的计算逻辑,在DAG的基础上,将Node并行化就可以将整个job并行化。

在Flink之前的上一代流式计算框架Apache Storm的hello world如下(节选了一部分):从storm的helloworld代码可以很清楚的看到storm构建dag是依赖用户自己构建,用户将自己脑中的dag图使用代码画出来,line2创建了一个DAG的builder,line4新增了一个节点,line6也新增了一个节点,dag画完了后在line16将DAG生成出来提交到集群执行。从这里可以看出storm构建DAG的逻辑是用户心中有图,自己画出来。

// 实例化TopologyBuilder类。
TopologyBuilder topologyBuilder = new TopologyBuilder();
// 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。
topologyBuilder.setSpout("SimpleSpout", new SimpleSpout(), 1);
// 设置数据处理节点并分配并发数。指定该节点接收喷发节点的策略为随机方式。
topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 3).shuffleGrouping("SimpleSpout");
Config config = new Config();
config.setDebug(true);
if (args != null && args.length > 0) {
    config.setNumWorkers(1);
    StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
} else {
    // 这里是本地模式下运行的启动代码。
    config.setMaxTaskParallelism(1);
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("simple", config, topologyBuilder.createTopology());
}

再看一下flink的helloworld,代码如下,该代码对应的DAG就是文章开头的图片,下面代码中line3获取一个执行的环境,line6从9999端口读入数据,line7做flatmap,ling15做分组操作,line20对分组的数据做sum聚合,line22执行任务;通过和storm的helloworld的对比,可以很明显的看出flink代码中很难看出DAG的样子,flink专注的并不是用户去画DAG,而是用户表达清楚自己的业务,由flink将DAG画出并执行,这也是flink会将storm慢慢淘汰的原因之一

public class Demo01_hello {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        env.socketTextStream("localhost", 9999)
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        for (String s : value.split(" ")) {
                            out.collect(Tuple2.of(s, 1));
                        }
                    }
                })
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                }).sum(1).print();

        env.execute();
    }
}

总结一下:flink目前提供了多种api,包裹flink-stream-api,table/sql-api,python-api,这些api的表象不同,但是底层都是将用户表达的逻辑翻译为DAG部署到集群上

那就从Hello-world开始吧

大数据的hello-word都是从wordcount开始的,这是mapreduce时代的传承,让我们再看一下flink的wordcount

public class Demo01_hello {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        env.socketTextStream("localhost", 9999)
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        for (String s : value.split(" ")) {
                            out.collect(Tuple2.of(s, 1));
                        }
                    }
                })
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                }).sum(1).print();

        env.execute();
    }
}

line3从StreamExecutionEnvironment获取了一个执行环境,这个环境在本地就是local的,在yarn上就是yarn的,在k8s上就是k8s的

line4设置任务的并行度,这里遇到了第一个概念:并行度,并行度表示任务的并行个数,比如数据源kafka有2个分区,那么最佳的并行度就是2,因为一个分区只能被一个消费者消费,并行度大于2则多余的消费者消费不到数据

line6设置了数据源为socket,监听9999端口

line7对数据源的数据做flatmap操作,输入是string,输出是tuple2<string,integer>

line15对tuple2<string,integer>做了分组操作,按照string分组,这里涉及了另一个概念shuffle,shuffle就是打乱的意思

line20对分组后的数据tuple2<string,integer>做了sum操作,计算出每一个string的数量

ling22执行任务

下图展示了该任务如何从代码变成可以运行的执行图运行在分布式环境中

可以看到上图中有四张图,编写的代码会经历

StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图,最终提交到集群执行

(1)StreamGraph

a)StreamNode:表示每一个operator,并且携带了这个operator的若干信息

b)StreamEdge:表示streamnode之间的边,边上还携带了标识:rebalance、hash、forward,表示streamnode之间的数据传输方式

c)StreamGraph其实已经很像前言中的dag图了,但是还有些不同 

(2)JobGraph

a)JobVertex:streamgraph中的streamnode如果存在可以优化的情况,比如operator-chain,那么多个streamnode就可以合并为一个jobvertex,operator-chain的条件是streamedge=forward且前后两个streamnode并行度相同

b) IntermediateDataset:jobvertex的产出数据,即若干个operator处理后的结果集

c)JobEdge:数据传输通道,从intermediatedataset传输数据到下游jobvertex

(3)ExecutionGraph

a)ExecutionVertex: jobvertex的并行化节点

b)ExecutionJobVertex:jobvertex对应的节点,一一对应

c)IntermediateResultPartition: 表示ExecutionVertex的输出结果,一个ExecutionVertex对应一个IntermediateResultPartition

d)IntermediateResult:和IntermediateDataset一一对应

e)ExecutionEdge:连接IntermediateResultPartition和ExecutionVertex一一对应

(4)物理执行图

a)Task:具体的调度task,封装了operator的操作,包括用户的逻辑

b)ResultPartition:对应IntermediateResultPartition,一一对应

c)ResultSubPartition:是Resultpartition的子分区,他的数量和下游的task有关,如果source算子就一个,所以他的ResultPartition就一个,但是下游有两个flatmap算子,所以这个ResultPartition会分成2个ResultSubPartition,分别给下游两个flatmap算子消费

d)InputChannel:连接ResultSubPartition和下游task算子的数据通道

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/900599.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

spring源码分析bean的生命周期(下)

doGetBean()执行过程 createBean()执行过程 一、DependsOn注解 spring创建对象之前会判断类上是否加了DependsOn注解&#xff0c;加了会遍历然后会添加到一个map中&#xff0c;spring会先创建DependsOn注解指定的类 二、spring类加载器 在合并BeanDefinition&#xff0c;确定…

centos7.9和redhat6.9 离线升级OpenSSH和openssl (2023年的版本)

升级注意事项&#xff01; 1、多开几个连接窗口&#xff08;xshell&#xff09;&#xff0c;避免升级openssh失败无法再次连接终端&#xff0c;否则要跑机房了。 2、可开启telnet服务、vnc服务、打快照。多几个“保命”的路数。一、centos7.9的信息 [rootnode2 ~]# openssl v…

1391. 检查网格中是否存在有效路径;2502. 设计内存分配器;1638. 统计只差一个字符的子串数目

核心思想&#xff1a;并查集。枚举网格中的块&#xff0c;把能连通的连通在一起&#xff0c;最后看&#xff08;0&#xff0c;0&#xff09;和&#xff08;m-1,n-1&#xff09;是否连通&#xff0c;然后网格中的每个点坐标是二维的&#xff0c;然后通过x*ny转换为一维&#xff…

大数据课程K2——Spark的RDD弹性分布式数据集

文章作者邮箱:yugongshiye@sina.cn 地址:广东惠州 ▲ 本章节目的 ⚪ 了解Spark的RDD结构; ⚪ 掌握Spark的RDD操作方法; ⚪ 掌握Spark的RDD常用变换方法、常用执行方法; 一、Spark最核心的数据结构——RDD弹性分布式数据集 1. 概述 初学Spark时,把RDD看…

超实用的批量管理工具 pssh 和 window 文件传输工具 pscp

文章目录 一、概述1&#xff09;pssh2&#xff09;pscp 二、pssh 工具安装三、pssh 命令的基本语法四、pscp 工具安装1&#xff09;Windows 上安装2&#xff09;Linux 系统上安装 五、 pscp 命令的基本语法1&#xff09;从 windows 向 linux 传文件2&#xff09;从 linux 传文件…

算法:滑动窗口解决连续区间子数组问题

文章目录 实现原理实现思路典型例题长度最小的子数组无重复字符的最小字串最大连续1的个数III将x减到0的最小操作水果成篮找到字符串中所有字母异位词(哈希表比较优化)对哈希表内元素比较的优化 总结 本篇积累的是滑动窗口的问题&#xff0c;滑动窗口在算法实现中有重要作用&am…

Python可视化在量化交易中的应用(16)_Seaborn热力图

Seaborn中热力图的绘制方法 seaborn中绘制热力图使用的是sns.heatmap()函数&#xff1a; sns.heatmap(data,vmin,vmax,cmap,center,robust,annot,fmt‘.2g’,annot_kws,linewidths0,linecolor‘white’,cbar,cbar_kws,cbar_ax,square,xticklabels‘auto’,yticklabels‘auto’…

systemd:初学者如何理解其中的争议

导读对于什么是 systemd&#xff0c;以及为什么它经常成为 Linux 世界争议的焦点&#xff0c;你可能仍然感到困惑。我将尝试用简单的语言来回答。 在 Linux 世界中&#xff0c;很少有争议能像传统的 System V 初始化 系统&#xff08;通常称为 SysVinit&#xff09;和较新的 s…

QT设置widget背景图片

首先说方法&#xff0c;在给widget或者frame或者其他任何类型的控件添加背景图时&#xff0c;在样式表中加入如下代码&#xff0c;指定某个控件&#xff0c;设置其背景。 类名 # 控件名 { 填充方式&#xff1a;图片路径 } 例如&#xff1a; QWidget#Widget {border-image: url…

1. 微信小程序开发环境搭建

下载 微信的小程序开发需要使用到微信开发者工具&#xff0c;通过https://developers.weixin.qq.com/miniprogram/dev/devtools/stable.html可以下载 下载完成后 安装

Linux 系统编程拾遗

Linux 系统编程拾遗 进程的创建 进程的创建 fork()、exit()、wait()以及execve()的简介 创建新进程&#xff1a;fork()

人工智能原理(6)

目录 一、机器学习概述 1、学习和机器学习 2、学习系统 3、机器学习发展简史 4、机器学习分类 二、归纳学习 1、归纳学习的基本概念 2、变型空间学习 3、归纳偏置 三、决策树 1、决策树组成 2、决策树的构造算法CLS 3、ID3 4、决策树的偏置 四、基于实例的学习…

嵌入式系统总线-片内总线

1.总线概述 总线是CPU与存储器和设备通信的机制&#xff0c;是计算机各部件之间传送数据、地址和控制信息的公共通道。 2.总线参数 总线宽度&#xff1a;又称总线位宽&#xff0c;指的是总线能同时传送数据的位数。如16位总线就是具有16位数据传送能力。 总线频率&#xff…

apex安装出错:TypeError unsupported operand type(s) for +: “NoneType“ and “str“

Windows 10 环境下安装apex报错&#xff1a;TypeError unsupported operand type(s) for : “NoneType“ and “str“ 1、首先apex不能直接pip install apex安装。 2、具体安装步骤&#xff1a;【python】【深度学习】apex的安装_apex python_愿东大没有食堂的博客-CSDN博客 …

深入竞品:解读竞品分析的艺术与策略

引言&#xff1a;为何竞品分析至关重要&#xff1f; 在当今的产品环境中&#xff0c;市场变得越来越拥挤。每个角落都有新的创业公司试图创造下一个行业的颠覆者&#xff0c;同时也有成熟的巨头在不断地迭代和优化他们的产品。在这样的环境中&#xff0c;不了解您的竞争对手是…

『C语言初阶』第八章 -结构体

前言 今天小羊又来给铁汁们分享关于C语言的结构体&#xff0c;在C语言中&#xff0c;结构体类型属于一种构造类型&#xff08;其他的构造类型还有&#xff1a;数组类型&#xff0c;联合类型&#xff09;&#xff0c;今天我们主要简单了解一下结构体。 一、结构体是什么&#x…

Linux Mint 21.3 计划于 2023 年圣诞节发布

Linux Mint 项目近日公布了基于 Ubuntu 的 Linux Mint 发行版下一个重要版本的一些初步细节&#xff0c;以及备受期待的基于 Debian 的 LMDE 6&#xff08;Linux Mint Debian Edition&#xff09;版本。 近日&#xff0c;Linux Mint 项目负责人克莱门特-勒菲弗&#xff08;Clem…

ECA模块详解

注意&#xff1a;本文代码为自己理解之后实现&#xff0c;与原论文代码原理相同但并不完全一样&#xff0c;主要是输入张量的形状不同&#xff0c;若更想了解原文代码&#xff0c;可以访问&#xff1a;https://blog.csdn.net/weixin_45084253/article/details/124282580 &#…

使用RDP可视化远程桌面连接Linux系统

使用RDP可视化远程桌面连接Linux系统 远程桌面连接Linux安装安装包准备服务器安装xrdp远程连接 远程桌面连接Linux 通常使用SSH来连接服务器&#xff0c;进行命令行操作&#xff0c;但是这次需要远程调试生产环境的内网服务器&#xff0c;进行浏览器访问内网网站&#xff0c;至…