Spark的内核调度

news2024/11/18 17:53:29

目录

概述

RDD的依赖

 DAG和Stage

 DAG执行流程图形成和Stage划分

 Stage内部流程

Spark Shuffle

Spark中shuffle的发展历程

优化前的Hash shuffle

 经过优化后的Hash shuffle

 Sort shuffle

Sort shuffle的普通机制

Job调度流程

Spark RDD并行度


概述

Spark内核调度任务:

1.构建DAG有向无环图

2.划分stage夹断

3.Driver底层的运转

4.分区的划分(线程)

的Spark内核调度的目的:尽可能用最少的资源高效地完成任务计算

RDD的依赖

RDD的依赖:一个RDD的形成可能由一个或者多个RDD得到的,此时这个RDD和之前的RDD之间产生依赖关系

Spark中,RDD之间的依赖关系,只要有两种类型:宽依赖和窄依赖

窄依赖:

作用:能够让Spark程序并行计算,也就是一个分区数据计算出现问题的时候,其它分区不受影响

特点:父RDD的分区和子RDD的分区是一对一关系,也就是父RDD分区的数据会整个被下游子RDD的分区接收

宽依赖:

作用:划分stage的重要依据,宽依赖也叫shuffle依赖

特点:父RDD的分区和子RDD的分区关系是一对多的关系,也就是父RDD的分区数据会被划成多份给到下游子RDD的多个分区做接收

注意:如果有宽依赖,shuffle下游的其他操作,必须等待shuffle执行完成以后才能够继续执行,为了避免数据的不完整

算子中一般以ByKey结尾的会发生shuffle;另外是重分区算子会发生shuffle

 DAG和Stage

DAG:有向无环图,只要描述一段执行任务,从开始一直往下走,不允许出现回调操作

Spark应用程序中,遇到一个Action算子,就会触发一个JOB任务的产生

对于每个JOB的任务,都会产生一个DAG执行流程图,流程图的形成的层级关系如下:

层级关系:

1.一个spark应用程序→遇到一个Action算子,就会触发形成一个JOB任务

2.一个JOB任务只有一个DAG有向无环图

3.一个DAG有向无环图→有多个stage

4.一个stage→有多个Task线程

5.一个RDD→有多个分区

6.一个分区会被一个Task线程所处理

 DAG执行流程图形成和Stage划分

 1.spark应用程序遇到Action算子后,就会触发一个JOB任务的产生,JOB任务就会将它所依赖的算子全部加载进来,形成一个stage

2.接着从action算子从后往前回溯,遇到窄依赖就将算子放在同一个stage中,如果遇到宽依赖,就划分形成新的stage,最后一直到回溯完成

 Stage内部流程

 默认并行度值的确认:

1.使用textFile读取HDFS上的文件,因此RDD分区数=max(文件的block块数量,defaultminpartition),继续需要知道defaultminpartition的值是多少

2.defaultminpartition=min(spark.default.parallelism,2)取最小值,最终确认spark.default.parallelism的参数值就能最终确认RDD的分区数有多少个

spark.default.parallelism参数值的确认:

1.如果有父RDD,就取父RDD的最大分区数

2.如果没有父RDD,根据集群模式进行取值

        本地模式:机器的最大cpu核数

        Mesos:默认是8

        其它模式:所有执行节点上的核总数或2,以较大者为准

Spark Shuffle

Spark中shuffle的发展历程

1- 在1.1版本以前,Spark采用Hash shuffle (优化前 和 优化后)

2- 在1.1版本的时候,Spark推出了Sort Shuffle

3- 在1.5版本的时候,Spark引入钨丝计划(优化为主)

4- 在1.6版本的时候,将钨丝计划合并到sortShuffle中

5- 在2.0版本的时候,将Hash Shuffle移除,将Hash shuffle方案移植到Sort Shuffle

优化前的Hash shuffle

 存在的问题:

        上游(map端)的每个Task会产生与下游Task个数相等的小文件个数,导致上游有非常多的小文件,下游(reduce端)来拉取文件的时候,会有大量的网络IO和磁盘IO过程,因为要打开和读取多个小文件

 经过优化后的Hash shuffle

优化后的Hash shuffle:

变成了由每个Executor进程产生与下游Task个数相等的小文件数,这样可以大量减少小文件的产生,以及降低下游拉取文件时候的网络IO和磁盘IO过程

 Sort shuffle

 Sort shuffle分成了两种:普通机制和bypass机制,具体使用哪种由spark底层决定

Sort shuffle的普通机制

 普通机制的运行过程:

每个上游task线程处理数据,数据处理完以后,先放在内存中,接着对内存中的数据进行分区,排序,将内存中的数据溢写到磁盘,形成一个个小文件,溢写完成后,将多个小文件合并成一个大的磁盘文件,并且针对每个大的磁盘文件,提供一个索引文件,接着是下游Task根据索引文件来读取相应的数据

Sort shuffle的bypass机制 

bypass机制 :就是在普通机制的基础上,省略了排序的过程

bypass机制的触发条件:

1.上游的RDD数量不能超过100个

2.上游不能对数据进行提前聚合操作(因为提前聚合,需要先进行分组操作,而分组的操作实际上是有排序的操作)

Job调度流程

主要是讨论:在Driver内部,是如何调度任务

1.Driver进程启动后,底层PY4J创建SparkContext顶级对象,在创建该对象的进程中,还会创建另外两个对象,分别是:DAGScheduler和TaskScheduler

        DAGScheduler:DAG调度器,将Job任务形成DAG有向无环图和划分Stage的阶段

        TaskScheduler:Task调度器,将Task线程分配给到具体的Executor执行

2.一个saprk程序遇到一个action算子触发产生一个job任务,SparkContext将job任务给到DAG调度器,拿到job任务后,会将job任务形成有向无环图和划分stage阶段,并且确定每个stage有多少个Task线程,会将众多的Task线程放到TaskSet的集合中,DAG调度器将TaskSet集合给到Task调度器

3.Task调度器拿到TaskSet集合以后,将Task分配给到具体的Executor执行,底层是基于SchedulerBackend调度队列来实现的

4.Executor开始执行任务,并且Driver会监控各个Executor的执行状态,知道所有的Executor执行完成,就认为任务运行结束

5.Driver通知Namenote释放资源

Spark RDD并行度

整个Spark应用中,影响并行度的因素有以下两个原因:

        1.资源的并行度:Executor数量和CPU核数以及内存的大小

        2.数据的并行度:Task的线程和分区数量

一般将Task想层数量设置为CPU核数的2-3被,另外每个线程分配3-5GB的内存资源

说明: spark.default.parallelism该参数是SparkCore中的参数。该参数只会影响shuffle以后的分区数量。另外该参数对parallelize并行化本地集合创建的RDD不起作用。

import time

from pyspark import SparkConf, SparkContext
import os

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 创建main函数
if __name__ == '__main__':
    print("Spark入门案例: WordCount词频统计")

    # 1- 创建SparkContext对象
    conf = SparkConf()\
        .set("spark.default.parallelism", "5")\
        .setAppName('spark_wordcount_demo')\
        .setMaster('local[*]')

    # 设置并行度参数方式一
    # conf.set("spark.default.parallelism", "4")

    sc = SparkContext(conf=conf)

    # 2- 数据输入
    init_rdd = sc.textFile("file:///export/data/gz16_pyspark/01_spark_core/data/content.txt")

    # 3- 数据处理
    flatmap_rdd = init_rdd.flatMap(lambda line: line.split(" "))

    map_rdd = flatmap_rdd.map(lambda word: (word,1))

    # shuffle前分区数
    print("shuffle前分区数",map_rdd.getNumPartitions())

    result = map_rdd.reduceByKey(lambda agg,curr: agg+curr)

    # shuffle后分区数
    print("shuffle后分区数", result.getNumPartitions())

    # 4- 数据输出
    print(result.collect())

    # 5- 释放资源
    sc.stop()

通过parallelize构建得到RDD的分区情况(了解):

from pyspark import SparkConf, SparkContext
import os

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("并行化本地集合创建RDD")

    # 1- 创建SparkContext对象
    conf = SparkConf().setAppName('parallelize_rdd').setMaster('local[1]')

    # 设置并行度参数
    conf.set("spark.default.parallelism", 4)

    sc = SparkContext(conf=conf)

    # 2- 数据输入
    # 并行化本地集合得到RDD
    init_rdd = sc.parallelize([1,2,3,4,5])

    # shuffle前分区数
    print("分区数", init_rdd.getNumPartitions())

    # 3- 数据处理
    # 4- 数据输出
    # 获取分区数
    print(init_rdd.getNumPartitions())

    # 获取具体分区内容
    print(init_rdd.glom().collect())


    # 5- 释放资源
    sc.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1378651.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

力扣67. 二进制求和算法

一、【写在前面】 这道题需要,给你两个字符串比如 a "1010", b "1011"答案是:"10101" 然后需要你给出计算结果,那么我们很容易想到两种做法 1. 调库做法:直接转化为整数,然后用内…

python统计分析——小提琴图(sns.violinplot)

参考资料:用python动手学统计学,帮助文档 使用seaborn.violinplot()函数绘制箱线图 sns.violinplot()的做出来的小提琴图比plt.violinplot()更像小提琴。 import numpy as np import pandas as pd from matplotlib import pyplot as plt import seabo…

【自控实验】4. 数字仿真实验

本科课程实验报告,有太多公式和图片了,干脆直接转成图片了 仅分享和记录,不保证全对 使用matlab中的simulink进行仿真 实验内容 线性连续控制系统的数字仿真 根据开环传递函数G(S)的不同,完成两个线性连续控制系统的仿真。 …

MySQL面试题 | 03.精选MySQL面试题

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云…

【linux】NIO中的FileChannel与mmap

FileChannel是Java NIO库中的一个类,用于对文件进行读写操作。它提供了一种高效的方式来读取、写入和操作文件。 使用FileChannel,你可以执行以下操作: 从文件读取数据到缓冲区(Buffer):你可以使用FileCh…

QT 小组件 列表框以及微调框

.cpp文件 #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);QListWidgetItem *pPhone new QListWidgetItem;pPhone->setText("西瓜");pPhone->…

SCI一区级 | Matlab实现RIME-CNN-BiLSTM-Mutilhead-Attention多变量多步时序预测

SCI一区级 | Matlab实现RIME-CNN-BiLSTM-Mutilhead-Attention多变量多步时序预测 目录 SCI一区级 | Matlab实现RIME-CNN-BiLSTM-Mutilhead-Attention多变量多步时序预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.Matlab实现RIME-CNN-BiLSTM-Mutilhead-Attention多…

eclipse ADT安装及abap cds模版创建

文章目录 1.前提2.安装3.创建cds模版 abap cds 常用语法 https://blog.csdn.net/weixin_49198221/article/details/135531478?spm1001.2014.3001.5501 1.前提 需要了解版本关系: **1.eclipse:**2023-06 (4.28), 2023-09 (4.29), 2023-12 (4.30) 2.Windows: ​ 1.Windows …

java多线程(并发)夯实之路-线程池深入浅出

线程池 Thread Pool:线程池,存放可以重复使用的线程(消费者) Blocking Queue:阻塞队列,存放等待执行的任务(生产者) poll方法(有时限地获取任务)相对take注…

【每日小bug】——mybatis-plus拼接sql空格报错,根据时间聚合查询

mybatis-plus拼接sql报错 复制报错sql语句到navicat,字段之间缺少空格,补上就可以了 聚合sql 根据时间 json接收JsonFormat(timezone "GMT8", pattern "yyyy-MM-dd")DateTimeFormat(pattern "yyyy-MM-dd")private Date startTim…

如何为数据保护加上“安全锁”?

伴随着数字经济的日趋活跃,数据安全和隐私保护成为了各国政府和企业都十分重视的问题,纷纷加强了数据安全防护。但实际上,近几年数据泄露问题接连不断,虽然没有造成严重的后果,但也足以证明目前数据安全防护的紧迫性。…

使用 Github、Hugo 搭建个人博客

Hugo 静态网站构建手册:https://jimmysong.io/hugo-handbook/ 关键字:开源 博客 框架 1、GitHub Pages 官网:https://pages.github.com/ 文档:https://docs.github.com/zh Github Pages 简介 Websites for you and your project…

京东年度数据报告-2023全年度游戏本十大热门品牌销量(销额)榜单

同笔记本市场类似,2023年度游戏本市场的整体销售也呈下滑态势。根据鲸参谋电商数据分析平台的相关数据显示,京东平台上游戏本的年度销量累计超过350万,同比下滑约6%;销售额将近270亿,同比下滑约11%。 鲸参谋综合了京东…

2.idea查看不到git的提交文件

(1)查看日志 使用idea工具查看git提交日志,如下:项目名上右击,选择git->Show History (2)预期结果 (3)实际结果 只能看见此次提交的commit id,看不见所修…

leetcode 动态规划(爬楼梯、零钱兑换、完全平方数)

70. 爬楼梯&#xff08;进阶版&#xff09; 卡码网&#xff1a;57. 爬楼梯(opens new window) 假设你正在爬楼梯。需要 n 阶你才能到达楼顶。 每次你可以爬至多m (1 < m < n)个台阶。你有多少种不同的方法可以爬到楼顶呢&#xff1f; 注意&#xff1a;给定 n 是一个正…

用单片机设计PLC电路图

自记&#xff1a; 见另一篇文章&#xff0c;MOS驱动差了一个充电电容&#xff0c;栅极电容充电会有问题&#xff1b; 光耦用的直插&#xff0c;但板子用的贴片&#xff0c;此文档仅供参考 基本列出了PCB板情况&#xff0c;基础元器件&#xff0c;部分连接&#xff0c;原理等…

大厂设计师都在用Figma中文替代

设计原型别再只知道 Figma 了&#xff0c;现在百万设计师都在用 Figma 的中文替代——即时设计。即时设计是国内第一款基于 Web 的 UI 设计工具&#xff0c;它的出现的弥补了很多 Figma 在国内使用的局限性&#xff0c;凭借本土化的优势&#xff0c;免费使用的版本、丰富免费的…

【Leetcode】2696. 删除子串后的字符串最小长度

文章目录 题目思路代码 题目 2696. 删除子串后的字符串最小长度 思路 计算通过删除字符串中的 “AB” 和 “CD” 子串后&#xff0c;可获得的最终字符串的最小长度。 主要思路是使用一个栈来模拟字符串的处理过程&#xff0c;每次遍历字符串时&#xff0c;如果当前字符和栈…

Openstack组件glance对接swift

2、glance对接swift &#xff08;1&#xff09;可直接在数据库中查看镜像存放的位置、状态、id等信息 &#xff08;2&#xff09;修改glance-api的配置文件&#xff0c;实现对接swift存储&#xff08;配置文件在/etc/glance/glance-api.conf&#xff0c;建议先拷贝一份&#x…

野牛物联网-阿里云配置流程

1、 概述&#xff1a; 本文围绕阿里云物联网平台&#xff0c;实现设备上云、设备上报消息、云端订阅设备消息、云端下发指令到设备等服务&#xff0c;以野牛物联网YNK-MN316设备接入物联网平台为例&#xff0c;介绍设备如何接入物联网平台&#xff0c;向平台上报消息等。帮助您…