Flink,spark对比

news2024/11/25 3:07:43

三:az 如何调度Spark、Flink,MR 任务
首先,使用java编写一个spark任务,定义一个类,它有main方法,里面写好逻辑,sparkConf 和JavaSparkContext 获取上下文,然后打成一个jar包,创建一个sh文件,使用spark提交任务的spark-submit 命令,指定jar包和对应的类名,和运行的参数,然后在job 文件里面指定sh 脚本,接着dependencies指定好依赖就行。最终打包成一个zip包上传。

如果是提交flink任务呢,也是定义一个类,在main方法里,Flink 流批任务只需要分别使用StreamExecutionEnvironment或者ExecutionEnvironment获取对应的执行环境,然后获取到DataStream 或者DataSet, 然后进行一系列的转换,最终达成一个jar 包,它是使用/bin/flink run 去提交任务的,后面的参数指定和spark 大同小异 ,az 也大同小异

MR 如何提交任务呢,肯定要编写Mapper和Reducer的实现处理类,然后有个主类,获取到Hadoop 的Configuration 的对应环境配置,获取到job 指定输入输出以及Mapper以及reducer类,然后打包成一个jar包,使用hadoop jar xx.jar 提交任务。

四:简单介绍下Flink
那就对比下Flink,Spark,MapReduce
Flink ,大数据分布式处理框架,从流处理开始,打造流批一体的框架,用于对无界和有界的数据流进行有状态计算,提供了诸多高级api供用户开发分布式任务,提供了数据分布,容错机制,资源管理和调度等功能

4.1: 首先从编程模型来看,MR的基础就是一条record,spark 就是RDD,rdd就是一批数据,而Flink 是DataStream 和 DataSet,这两个也是一批数据;从这个最开始的编程模型的输入来看就知道spark以及Flink 比 MR 快,后续的数据转换spark和Flink 都有丰富的算子(transform和collect 算子,flink是operator chain),而MR就很局限了,要自己定义
4.2:从数据流转的介质来看,MR会落盘,就是那个Map阶段的结尾会落盘,涉及到磁盘I/O,比较耗费时间;其实Flink 和Spark 也会进行数据的落盘,但是他们和mr的最大的本质不同就是他们可以把数据放在内存中,最后再落盘,而MR一定会落盘;

4.3:算子方面,flink是dataset api,DataStream API, table api, sql;而spark 是 RDD, DataSet, DataFrame, sparkSql;Flink 的核心引擎是runTime,spark的是SparkCore

五:Flink 和sparkStreaming 的区别
5.1: 一个实时,一个微批
5.2: 一个使用StreamingExecutionEnvironment, 一个使用JavaStreamingContext;
5.3: 一个DataStream, 一个是Dstream 的流数据
5.4: 任务调度来说,一个是会依次创建StreamGraph, JobGraph, ExecutionGraph,JobManager 调度ExecutionGraph;而另一个是 创建DstreamGraph, JobGenerator, 和JobScheduler
5.5: 时间机制方面,一个是有数据时间,摄入时间和处理时间;而sparkStreaming 是只有处理时间
5.6: 容错方面,Flink 有分布式快照,使用两阶段提交协议可以做到只有一次处理,而sparkStreaming 也有checkpoint ,能恢复数据,但是做不到恰好一次处理,可能会重复。

六:Flink 和spark的checkpoint 的异同点
6.1: checkpoint 说白了都是为了持久化数据的,Flink 是保存比如某个数据的状态,说白了就是会动态变化的值,比如用户的订单总额就是用户订单数据的状态,而spark 是保存RDD的数据到hdfs,截断RDD,防止数据异常中断,可以恢复;不过都是把内存中的数据持久化到外部的系统中,这里一般是hdfs,持久化嘛
6.2: checkpoint的触发方式不一样,Flink 的checkpoint 是由jobManager 定时触发的,如果配置了的话;而Spark是需要在代码中手动触发的
6.3: checkpoint 的触发机制不一样,Flink的checkpoint 说白了有两个阶段,预提交阶段和提交阶段,预提交阶段会做三个事,如下所示:
6.3.1: 进行checkpoint, 比如记录了用户1和2的订单金额分别是200和300
6.3.2: 写WAL 日志,就是用户1和2又有新的动作,由增加了订单金额100和50(这个可以认为是状态)
6.3.3: 锁定资源,告诉外部系统,用户1和2的订单总金额分别是300和350,但是让外部系统知道,并不是立马更新
如果上述有任何一步失败,我们都会滚到上个checkpoint,然后接下来就是提交阶段,会做两个事:
6.3.4: 把checkpoint 的状态提交
6.3.5: 外部系统更新对应的订单总金额300和350

如果是spark的checkpoint ,则直接把数据存储到hdfs了,没有啥特殊的。

7:Flink 和Spark的集群规模
Flink on yarn,一般是10台;cpu核数是36;内存是128G;
spark on yarn,是200台,pb级别的数据,cpu 核数是36,内存是128G

8:Flink 和spark, yarn 的集群角色
8.1:说明
Flink 是有client,jobManager 以及taskManger;client 是提交任务的作用,并且接收结果返回;而JobManager 接收提交任务,进行任务调度,故障恢复,容错管理;管理tm;
spark 也是有driver,master 以及 worker,和flink的一一对应,此外还有个executor 和 clusterManager
yarn 则是有ResourceManager(整体资源的管理), NoderManager(管理节点上的资源), ApplicationMaster(一个应用程序的管理者),Container(实际运行程序的容器)以及Client

9:flink 以及 spark 还有Mr 提交任务到yarn上的流程对比
9.1:Flink 提交任务流程如下,Flink 支持三种模式,session 模式,perJob模式和Application 模式,前面两者都相当于spark的yarn-cleint 模式,一个是共享资源,一个独享资源;而Application 模式是相当于spark的yarn-cluster 模式,客户端在yarn上,生产环境使用application模式,如下所示:
在这里插入图片描述
这里的ResourceManager 是flink 自己的,不是yarn的

9.2:spark 在yarn上有yarn-client 模式和yarn-cluster 模式之分,一般我们使用yarn-cluster 模式,这个最主要的点就是driver 是在客户端还是yarn上,这里的applicationMaster 就可以理解为Driver,生产环境如下:
在这里插入图片描述
10. Flink 的TaskSlot
它的目的是为了控制一个taskManager 能运行多少个task,所以对资源进行了分配,划分成不同的slot,一般和cpu是1:1 的关系,所以一个算子分布在不同的taskManger 上面,在一个tm的并行度和slot是一比一的关系,那么全局的并行度就是我们自己设置的并行度了,不过我们在考虑的时候就是考虑单个tm里面的并行度好点;slot 做了内存隔离,没有做cpu的隔离。

11:Flink 和spark的常用算子比较
FLink 独有的算子,keyBy, process, window
spark 独有的,mapPartition, repartition,colease, union ; transformation 和 action 算子

12.Flink 分区策略
GlobalPartitioner; ShufflePartitioner, RebalancePartitioner; RescalePartitioner(根据上下游算子的并行度分发数据), BrodcastPartitioner,ForwardPartitioner(上下游算子并行度一致);KeyGroupStreamPartitioner(Hash分区),CustomPartitioner(自定义分区策略)
Flink的默认分区数就是等于并行度

spark的默认分区数等于cpu的核数,也可以使用repartition,

13:Flink 和Spark的编程流转区别
Flink 流式这边一直返回的会是DataStream, 批返回的是DataSet的数据集
而Spark这边流失返回的会是Dstream以及衍生类的数据集,而批返回的则是RDD以及衍生类的数据集

14: Spark 和Flink 的序列化
为什么这两者都要实现自己的序列化框架呢,因为Java的序列化存储密度低,分布式计算的话内存要用在刀刃上,所以他们实现了自己的序列化框架,Spark 是使用了KyroSerializer 序列化,Flink的序列化的基本类是TypeInfomation.

15: Spark 和flink的反压机制
spark.streaming.backpressure.enabled, sparkStreaming 动态调整,
Flink 手动调整,看并行度,算子处理情况。

16:flink 和spark 数据在内存的抽象
16.1: 就是java对象 --StreamRecord–Buffer–memorySegment–Byte数组
16.2 RDD在缓存到内存之前,partition中record对象实例在堆内other内存区域中的不连续空间中存储。RDD的缓存过程中, 不连续存储空间内的partition被转换为连续存储空间的Block对象,并在Storage内存区域存储,此过程被称作为Unroll(展开)。

17: Spark 和Flink以及Hive 调优
都是从三个方面来说,
分别是资源调优,代码性能调优,业务调优
17.1: 对于spark 和Flink 来说,资源调优方面,可以使得单个executor 或者taskManager 可以使用的内存和cpu最大的话就尽量可以配置最大,先说spark;
17.1.1: spark一般调整的就是num-executors ,相当于flink的tm的个数;executro-memory, executor-cores,以及driver-memory 分别相当于tm的内存,tm的slot 个数,jm的内存;spark.default.parallelism 也相当于flink的并行度,spark.storage.memoryFraction 是用来持久化RDD的那部分内存,一般是executor-memory 堆内内存的60%的50%;spark.shuffle.memoryFraction就是用来shuffle的内存,和刚刚的一样,占有堆内内存的60%的50%;所以实际生产看看到底哪个用的多一点,就多给点

17.1.2: 在资源参数这里,hive需要调整的无非也是内存和cpu这方面,如下所示:
mapreduce.map.java.opts, map 阶段的jvm进程的堆内存;
mapreduce.map.memory.mb,map阶段的jvm 进程的堆内存和堆外内存的和;
mapreduce.reduce.java.opts,reduce 阶段的jvm进程的堆内存;
mapreduce.reduce.memory.mb,reduce 阶段的 的jvm 进程的堆内存和堆外内存的和;
mapreduce.map/reduce.cpu.vcores, map 和reduce 阶段可用的cpu 的个数;当给大点

但是hive中的map和reduce 的task的数量取决于总文件的个数和每个文件数的大小,一般是每个文件数的大小起作用,如下所示:
mapred.min/max.split.size,就是可以分割文件的最小和最大文件大小,但是map的task数量还不是由这个决定的,还是由多个因素决定的,看下图
在这里插入图片描述
因为hadoop系统中dfs.block.size 一般是128M,所以如果我们没有设置上述的最小和最大的话,就是默认按照128去分割,如果要提高task数量,要么提高mapred.map.tasks的数量,要么增大mapred.min.split.size 的大小,到256M也可以。

那么reduce的task的数量呢?
reducer_num = min(total_size/hive.exec.reducers.bytes.per.reducers, mapred.reduce.tasks);
所以最直接的办法是通过mapred.reduce.tasks = 10 来设定就可以,当然设定太小了执行时间会长,所以要居中;太大的话则小文件过多,也不好。

17.2: 算子性能调优
17.2.1: spark算子性能调优
spark.sql.adaptive.enabled 默认为false 自适应执行框架的开关
spark.sql.adaptive.skewedJoin.enabled 默认为 false 倾斜处理开关
spark.hadoop.mapreduce.input.fileinputformat.split.minsize 是用于聚合input的小文件,用于控制每个mapTask的输入文件,防止小文件过多时候,产生太多的task
spark.sql.autoBroadcastJoinThreshold 用于控制在spark sql中使用BroadcastJoin时候表的大小阈值,适当增大可以让一些表走BroadcastJoin,提升性能,但是如果设置太大又会造成driver内存压力
用 reduceByKey( ) 和 aggregrateByKey( ) 来取代 groupByKey,因为前者会进行预聚合
操作数据库建义采用foreachPartition( ) ,资源可以的情况下使用mapPartitions 代替map
数据复用使用persist
减少数据碎片使用 coalesce( )进行重分区
spark.shuffle.file.buffer参数是调节map端缓冲区大小,单位是kb,减少磁盘溢写次数;
spark.reducer.maxSizeInFlight 参数是调节shuffle的时候reduce端的缓冲区大小,单位是MB
spark.shuffle.io.maxRetries reduce端拉取重试次数,以及拉取失败等待间隔,spark.shuffle.io.retryWait,单位是s,比如60s
spark.shuffle.sort.bypassMergeThreshold, 如果确实不需要排序操纵,那就调大sortByPass的阈值,调大到400等,默认是200

17.2.2: Hive 性能调优
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 自动合并小文件
set hive.merge.mapredfiles = true; 设置reduce 端对输出文件的合并
set hive.archive.enabled=true; 使用hadoop archive 文件对小文件归档
set hive.mapred.mode=strict 开启严格模式;不允许对分区表查询where不带分区,order by 必须加上limit,不允许笛卡尔积等;
set hive.exec.parallel=true; //打开任务并行执行
set mapred.job.reuse.jvm.num.tasks=10 设置jvm重用
set hive.map.aggr=true; set hive.groupby.skewindata = true; 进行数据负载均衡,数据倾斜优化
set hive.fetch.task.conversion=more; 可以减少不必要的走mapreduce 任务
set hive.auto.convert.join = true; 开启map join

17.2.3: Flink 性能调优
算子方面暂无,主要是资源和倾斜方面,要改代码

17.3: 业务代码调优
最典型的问题,数据倾斜怎么办?
hive只能是自己可以通过刚刚那个skew_in_data 去均衡,那么flink 和spark呢?
17.3.1: spark和flink 数据倾斜处理
17.3.1.1: 碰到大量空值的或者就是某个大量值的,加上随机字符串,均匀shuffle
17.3…1.2: 把聚合的步骤往前放,放到hive或者mapreudce 里面去做
17.3.1.3: 过滤掉少数导致倾斜的key
17.3.1.4: 提高shuffle操作的并行度,增加并行处理能力
17.3.1.5: 两阶段聚合,局部聚合+全局聚合,对于倾斜的key打上随机浅醉,聚合后再去掉再聚合,这个适合聚合算子,不适合join
17.3.1.6: Reduce join 换成MapJoin
17.3.1.7: 倾斜key 拆分join,打上随机前缀,然后后续不倾斜的扩容和它join,最终过滤掉前缀得到正确结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1903913.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深度学习之网络构建

目标 选择合适的神经网络 卷积神经网络(CNN):我们处理图片、视频一般选择CNN 循环神经网络(RNN):我们处理时序数据一般选择RNN 超参数的设置 为什么训练的模型的错误率居高不下 如何调测出最优的超参数 …

Node.js介绍 , 安装与使用

1.Node.js 1 什么是Node.js 官网:https://nodejs.org/zh-cn/ 中文学习网:http://nodejs.cn/learn1.Node.js 是一个基于 Chrome V8 引擎的 JavaScript 运行环境。Node.js 使用了一个事件驱动、非阻塞式 I/O 的模型,使其轻量又高效。 2.前端的底层 html…

C++、QT企业管理系统

目录 一、项目介绍 二、项目展示 三、源码获取 一、项目介绍 人事端: 1、【产品中心】产品案列、新闻动态的发布; 2、【员工管理】新增、修改、删除、搜索功能;合同以图片的方式上传 3、【考勤总览】根据日期显示所有员工上班、下班时间…

下载nvm 管理多个node版本并切换

nvm管理多个node版本并切换 安装nvm时不能安装任何node版本(先删除node再安装nvm),保证系统无任何node.js残留 1. 卸载node 控制面板中卸载nodejs 删除以下文件夹: C:\Program Files (x86)\Nodejs C:\Program Files\Nodejs C…

聚类分析方法(一)

目录 一、聚类分析原理(一)聚类分析概述(二)聚类的数学定义(三)簇的常见类型(四)聚类框架及性能要求(五)簇的距离 二、划分聚类算法(一&#xff0…

【matlab】状态空间模型与传递函数模型的建立与转换

目录 SISO系统 MIMO系统 状态空间模型 状态空间模型到传递函数模型的转换 传递函数模型到状态空间模型的转换 (1) 转换函数ss() (2) 规范形转换函数canon() (3) 常微分方程(传递函数)转换为状态空间模型函数dif2ss() 状态空间模型的变换 特征值、特征向量与广义特征向量的计算…

进程控制-wait和waitpid进程回收

wait 阻塞函数 函数作用: 1. 阻塞并等待子进程退出 2. 回收子进程残留资源 3. 获取子进程结束状态(退出原因) pid_t wait(int *wstatus); 返回值: ‐1 : 回收失败,已经没有子进程了 >0 : 回收子进程对应的…

线上问题---反思与回顾

线上问题一:麦哲伦2.0 人群配置不生效 发现背景:产品发现三层模型部分计划个别测试计划圈选人群特征与数仓统计数据的人群不一致,向值班人员反馈 根因定位: (1)用户配置三层模型计划时,配置单…

文心一言 VS 讯飞星火 VS chatgpt (297)-- 算法导论22.1 1题

一、给定有向图的邻接链表,需要多长时间才能计算出每个结点的出度(发出的边的条数)?多长时间才能计算出每个结点的入度(进入的边的条数)?如果要写代码,请用go语言。 文心一言: 计算出度 对于有向图的邻接链表表示&a…

rufus-4.5 制作 Clonezilla(再生龙)启动盘报syslinux-6.04下载错误

1、官网下载rufus 官网下载rufus-4.5,下载地址:https://rufus.ie/downloads/ 2、下载再生龙(Clonezilla) 下载最新版本: Clonezilla live 版本: 3.1.2-22:https://sourceforge.net/projects/clonezill…

在 Docker 容器中运行 Vite 开发环境,有这两个问题要注意

容器化开发给我们带来了很多便捷,但是在开发环境下也有一些问题要注意,如果不解决这些问题,你的开发体验不会很好。 容器启动正常,却无法访问 我们用 Docker 启动一个 Vite Vue3 项目的开发环境后,发现端口日志一切…

Springboot使用WebSocket发送消息

1. 创建springboot项目&#xff0c;引入spring-boot-starter-websocket依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>完整项目依赖 <?xml ver…

【游戏客户端】大话版本slg玩法正式上线~~

【游戏客户端】制作率土之滨Like玩法 大家好&#xff0c;我是Lampard家杰~~ 好久好久没有更新博客了&#xff0c;有不少大佬都在后台私信我催更&#xff0c;但是很悲伤这段时间都忙的不行QAQ 那在忙什么呢&#xff1f;就是在制作一个SLG类的玩法【帮派纷争】啦 &#xff0c;布…

RNN、LSTM与GRU循环神经网络的深度探索与实战

循环神经网络RNN、LSTM、GRU 一、引言1.1 序列数据的迷宫探索者&#xff1a;循环神经网络&#xff08;RNN&#xff09;概览1.2 深度探索的阶梯&#xff1a;LSTM与GRU的崛起1.3 撰写本博客的目的与意义 二、循环神经网络&#xff08;RNN&#xff09;基础2.1 定义与原理2.1.1 RNN…

java 闭锁(CountDownLatch)

闭锁&#xff08;CountDownLatch&#xff09;是Java中的一个同步辅助类&#xff0c;用于协调多个线程之间的协作。它允许一个或多个线程等待&#xff0c;直到在其他线程中执行的一组操作完成。闭锁非常适用于需要等待一组事件发生之后再执行某些操作的场景。 import java.uti…

【Java】垃圾回收学习笔记(二):分代假说与垃圾回收算法

文章目录 0. 分代收集理论分代假说分代GC定义 1. 垃圾回收算法1.1 标记清除&#xff08;Mark-Sweep&#xff09;算法优点缺点 1.2 标记复制算法优点缺点为什么是8:1:1&#xff1f; 1.3 标记整理算法优点缺点 2. 是否移动&#xff1f; 0. 分代收集理论 分代假说 现在多数JVM G…

Python编程学习笔记(2)--- 列表简介

1、列表是什么 列表由一系列按特定顺序排列的元素组成。可以创建包含字母表中所有字母、数字、0~9或所有家庭成员姓名的列表&#xff1b;也可以将任何东西加入列表中&#xff0c;其中的元素之间可以没有任何关系。列表通常包含多个元素&#xff0c;因此给列表指定一个表示复数…

python库(6):Pygments库

1 Pygments介绍 在软件开发和文档编写中&#xff0c;代码的可读性是至关重要的一环。无论是在博客文章、技术文档还是教程中&#xff0c;通过代码高亮可以使程序代码更加清晰和易于理解。而在Python世界中&#xff0c;Pygments库就是这样一个强大的工具&#xff0c;它能够将各…

1.2 如何让机器说人话?万字长文回顾自然语言处理(NLP)的前世今生 —— 《带你自学大语言模型》系列

本系列目录 《带你自学大语言模型》系列部分目录及计划&#xff0c;完整版目录见&#xff1a;带你自学大语言模型系列 —— 前言 第一部分 走进大语言模型&#xff08;科普向&#xff09; 第一章 走进大语言模型 1.1 从图灵机到GPT&#xff0c;人工智能经历了什么&#xff1…

Powershell 获取电脑保存的所有wifi密码

一. 知识点 netsh wlan show profiles 用于显示计算机上已保存的无线网络配置文件 Measure-Object 用于统计数量 [PSCustomObject]{ } 用于创建Powershell对象 [math]::Round 四舍五入 Write-Progress 显示进度条 二. 代码 只能获取中文Windows操作系统的wifi密码如果想获取…