Python+大数据-Spark技术栈(三) SparkCore加强

news2024/11/25 22:35:58

Python+大数据-Spark技术栈(三) SparkCore加强

  • 重点:RDD的持久化和Checkpoint
  • 提高拓展知识:Spark内核调度全流程,Spark的Shuffle
  • 练习:热力图统计及电商基础指标统计
  • combineByKey作为面试部分重点,可以作为扩展知识点

Spark算子补充

  • 关联函数补充

  • join为主基础算子

  • # -*- coding: utf-8 -*-
    # Program function:演示join操作
    from pyspark import SparkConf, SparkContext
    
    if __name__ == '__main__':
        print('PySpark join Function Program')
        # TODO:1、创建应用程序入口SparkContext实例对象
        conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
        sc = SparkContext.getOrCreate(conf)
        # TODO: 2、从本地文件系统创建RDD数据集
        x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")])
        y = sc.parallelize([(1001, "sales"), (1002, "tech")])
        # TODO:3、使用join完成联合操作
        print(x.join(y).collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
        print(x.leftOuterJoin(y).collect())
        print(x.rightOuterJoin(y).collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
    
        sc.stop()
    

[掌握]RDD 持久化

为什么使用缓存

  • 缓存可以加速计算,比如在wordcount操作的时候对reduceByKey算子进行cache的缓存操作,这时候后续的操作直接基于缓存后续的计算
  • 缓存可以解决容错问题,因为RDD是基于依赖链的Dependency
  • 使用经验:一次缓存可以多次使用

如何进行缓存?

  • spark中提供cache方法

  • spark中提供persist方法

  • # -*- coding: utf-8 -*-
    # Program function:演示join操作
    from pyspark import SparkConf, SparkContext
    from pyspark.storagelevel import StorageLevel
    import time
    if __name__ == '__main__':
        print('PySpark join Function Program')
        # TODO:1、创建应用程序入口SparkContext实例对象
        conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
        sc = SparkContext.getOrCreate(conf)
        # TODO: 2、从本地文件系统创建RDD数据集
        x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")])
        y = sc.parallelize([(1001, "sales"), (1002, "tech")])
        # TODO:3、使用join完成联合操作
        join_result_rdd = x.join(y)
        print(join_result_rdd.collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
        print(x.leftOuterJoin(y).collect())
        print(x.rightOuterJoin(y).collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
        # 缓存--基于内存缓存-cache底层调用的是self.persist(StorageLevel.MEMORY_ONLY)
        join_result_rdd.cache()
        # join_result_rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
        # 如果执行了缓存的操作,需要使用action算子触发,在4040页面上看到绿颜色标识
        join_result_rdd.collect()
        # 如果后续执行任何的操作会直接基于上述缓存的数据执行,比如count
        print(join_result_rdd.count())
        time.sleep(600)
        sc.stop()
    
    
  • image-20221109161434726

缓存级别

  • image-20221109161510666
  • image-20221109161543709
  • image-20221109161606911
  • 如何选:
  • 1-首选内存
  • 2-内存放不下,尝试序列化
  • 3-如果算子比较昂贵可以缓存在磁盘中,否则不要直接放入磁盘
  • 4-使用副本机制完成容错性质

释放缓存

  • 后续讲到Spark内存模型中,缓存放在Execution内存模块

  • 如果不在需要缓存的数据,可以释放

  • image-20221109161629699

  • 最近最少使用(LRU)

print(“释放缓存之后,直接从rdd的依赖链重新读取”)
print(join_result_rdd.count())


* ![image-20221109161652788](https://img-blog.csdnimg.cn/img_convert/ac49f4fd917006cf788f051a2a9f61e3.png)

何时缓存数据

  • rdd来之不易
  • 经过很长依赖链计算
  • 经过shuffle
  • rdd被使用多次
  • 缓存cache或persist
  • 问题:缓存将数据保存在内存或磁盘中,内存或磁盘都属于易失介质
  • 内存在重启之后没有数据了,磁盘也会数据丢失
  • 注意:缓存会将依赖链进行保存的
  • 如何解决基于cache或persist的存储在易失介质的问题?
  • 引入checkpoint检查点机制
  • 将元数据和数据统统存储在HDFS的非易失介质,HDFS有副本机制
  • checkpoint切断依赖链,直接基于保存在hdfs的中元数据和数据进行后续计算
  • 什么是元数据?
    • 管理数据的数据
    • 比如,数据大小,位置等都是元数据

[掌握]RDD Checkpoint

  • 为什么有检查点机制?

    • 因为cache或perisist将数据缓存在内存或磁盘中,会有丢失数据情况,引入检查点机制,可以将数据斩断依赖之后存储到HDFS的非易失介质中,解决Spark的容错问题
    • Spark的容错问题?
      • 有一些rdd出错怎么办?可以借助于cache或Persist,或checkpoint
  • 如何使用检查点机制?

    • 指定数据保存在哪里?
    • sc.setCheckpointDir(“hdfs://node1:9820/chehckpoint/”)
    • 对谁缓存?答案算子
    • rdd1.checkpoint() 斩断依赖关系进行检查点
    • 检查点机制触发方式
    • action算子可以触发
    • 后续的计算过程
    • Spark机制直接从checkpoint中读取数据
    • image-20221109161715112
    • 实验过程还原:
    • image-20210913112326371
    • image-20210913112413321
    • image-20210913112440134
  • 检查点机制那些作用?

    • 将数据和元数据保存在HDFS中
    • 后续执行rdd的计算直接基于checkpoint的rdd
    • 起到了容错的作用
  • 面试题:如何实现Spark的容错?

    • 1-首先会查看Spark是否对数据缓存,cache或perisist,直接从缓存中提取数据
    • 2-否则查看checkpoint是否保存数据
    • 3-否则根据依赖关系重建RDD
  • 检查点机制案例

持久化和Checkpoint的区别

  • 存储位置:缓存放在内存或本地磁盘,检查点机制在hdfs
  • 生命周期:缓存通过LRU或unpersist释放,检查点机制会根据文件一直存在
  • 依赖关系:缓存保存依赖关系,检查点斩断依赖关系链

案例测试:

先cache在checkpoint测试

  • 1-读取数据文件
  • 2-设置检查点目录
  • 3-rdd.checkpoint() 和rdd.cache()
  • 4-执行action操作,根据spark容错选择首先从cache中读取数据,时间更少,速度更快
  • image-20210913114439275
  • 5-如果对rdd实现unpersist
  • 6-从checkpoint中读取rdd的数据
  • image-20210913114510805
  • 7-通过action可以查看时间
  • image-20210913114535550

[代码实战]SparkCore案例

PySpark实现IP地址查询统计分析

数据认知:

  • image-20221109161854890
  • 日志信息:
  • image-20221109161921273
  • 城市Ip段信息:
  • image-20221109161942522
  • 查看用户在哪个Ip端里面:

需求:

  • 需要将ip地址转化为long类型数据,可以直接使用工具类
  • image-20221109162004893
  • 需要使用long类型ip进行对比查找–折半查找方法
  • image-20210913144725823

步骤:

  • 1-准备Spark的上下文环境
  • image-20221109162044456
  • 2-读取用户所在ip信息的文件,切分后选择下标为1字段就是用户ip
  • 3-读取城市ip段信息,需要获取起始ip的long类型(下标2),结束ip的long类型(下标3),经度(下标13),维度(下标14)
  • image-20221109162102601
  • 4-通过ip地址转化为long类型IP
  • 5-采用折半查找方法寻找ip对应的经纬度
  • 6-根据相同经纬度的数据进行累加统计在进行排序
  • 7-画个图

代码:

  • 
    


# -*- coding: utf-8 -*-

# Program function:实现用户ip的地址查询,实现相同经纬度范围统计

'''

* 1-准备Spark的上下文环境
* 2-读取用户所在ip信息的文件,切分后选择下标为1字段就是用户ip
* 3-读取城市ip段信息,需要获取起始ip的long类型(下标2),结束ip的long类型(下标3),经度(下标13),维度(下标14)
* 4-通过ip地址转化为long类型IP
* 5-采用折半查找方法寻找ip对应的经纬度
* 6-根据相同经纬度的数据进行累加统计在进行排序
* 7-画个图
 '''
 from pyspark import SparkConf, SparkContext
 from pyspark.sql import SparkSession


# 需要拿到用户ip转化为long类型,然后通过二分查找方法在city_ip_rdd查找对应的经度和维度

def ip_transform(ip):
  ips = ip.split(".")  # [223,243,0,0] 32位二进制数
  ip_num = 0
  for i in ips:
      ip_num = int(i) | ip_num << 8
  return ip_num


def binary_search(ip_num, city_rdd_broadcast_value):
  start = 0
  end = len(city_rdd_broadcast_value) - 1

  # (1)起始ip的long(2)结束ip的long(3)经度(4)维度的信息

  # city_rdd_broadcast_value

  while (start <= end):
      mid = int((start + end) / 2)
      # 首先判断是否位于middle的[0]和[1]下标之间
      if (ip_num >= int(city_rdd_broadcast_value[mid][0]) and ip_num <= int(city_rdd_broadcast_value[mid][1])):
          # 指导找到中间位置,返回mid位置=后面的index
          return mid
      # 如果是小于middle[0]起始ip, end=mid
      if (ip_num < int(city_rdd_broadcast_value[mid][0])):
          end = mid
      # 如果是大于middle[1]结束ip, start=mid
      if (ip_num > int(city_rdd_broadcast_value[mid][1])):
          start = mid


def main():
  global city_ip_rdd_broadcast

  # *1 - 准备Spark的上下文环境

  spark = SparkSession.builder.appName("ipCheck").master("local[*]").getOrCreate()
  sc = spark.sparkContext
  sc.setLogLevel("WARN")  # 直接将log4j放入文件夹中

  # *2 - 读取用户所在ip信息的文件,切分后选择下标为1字段就是用户ip

  # user_rdd读取的是包含有user的ip地址的信息

  user_rdd = sc.textFile(
      "/export/data/pyspark_workspace/PySpark-SparkCore_3.1.2/data/ip/20190121000132394251.http.format")
  dest_ip_rdd = user_rdd \
      .map(lambda x: x.split("|")) \
      .map(lambda x: x[1])

  # print(dest_ip_rdd.take(4))#['125.213.100.123', '117.101.215.133', '117.101.222.68', '115.120.36.118']

  # *3 - 读取城市ip段信息,需要获取起始ip的long类型(下标2),结束ip的long类型(下标3),经度(下标13),维度(下标14)

  # .map(lambda x:(x[2],x[3],x[len(x)-2],x[len(x)-1]))

  ip_rdd = sc.textFile("/export/data/pyspark_workspace/PySpark-SparkCore_3.1.2/data/ip/ip.txt")
  city_ip_rdd = ip_rdd \
      .map(lambda x: x.split("|")) \
      .map(lambda x: (x[2], x[3], x[13], x[14]))

  # Broadcast a read-only variable to the cluster,下面的代码中使用city.collect将rdd转化为list在进行广播

  city_ip_rdd_broadcast = sc.broadcast(city_ip_rdd.collect())

  # city_ip_rdd是包含有城市的(1)起始ip的long(2)结束ip的long(3)经度(4)维度的信息

  # print(city_ip_rdd.take(5))

  def GetPos(x):
      city_rdd_broadcast_value = city_ip_rdd_broadcast.value

      def getResult(ip):
          # *4 - 通过ip地址转化为long类型IP
          ip_num = ip_transform(ip)
          # *5 - 采用折半查找方法寻找ip对应的经纬度
          # index 获取 (1)起始ip的long(2)结束ip的long(3)经度(4)维度的信息
          index = binary_search(ip_num, city_rdd_broadcast_value)
          return ((city_rdd_broadcast_value[index][2], city_rdd_broadcast_value[index][3]), 1)
   
      # 得到的是((经度,维度),1),下面是python的map函数
      re = map(tuple, [getResult(ip) for ip in x])
      return re

  # *6 - 根据相同经纬度的数据进行累加统计在进行排序

  ip_rdd_map_partitions = dest_ip_rdd.mapPartitions(GetPos)
  result = ip_rdd_map_partitions.reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], False)
  print("final sorted result is:")
  print(result.take(5))

  # [(('108.948024', '34.263161'), 1824), (('116.405285', '39.904989'), 1535), (('106.504962', '29.533155'), 400), (('114.502461', '38.045474'), 383), (('106.57434', '29.60658'), 177)]

  # *7 - 画个图

  sc.stop()


if __name__ == '__main__':
  main()

总结:

  • 掌握数据开发的思路,通过ip去城市ip段查找
  • ip的类型转化为ip_long类型,寻找工具类
  • index=binarySearch(ipnum,广播变量)
  • 根据索引得到经度和维度
  • 获取相同经纬度的信息的累加,在排序

反思:

  • 对于一直使用的数据,可以使用广播变量可以避免任务个数较多的时候造成大量网络的传输
  • 可以将之前每个Task会获取一个变量的副本转变为一个executor获取一个变量副本,当前executor的所有task可以共享

通过PySpark实现点击流日志分析

数据认知:

  • 网站点击流,一个用户上网产生用户行为日志,Pv(Page View) Uv(User View) 互联网Pv5:1Uv

#每条数据代表一次访问记录 包含了ip 访问时间 访问的请求方式 访问的地址…信息
194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] “GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1” 304 0 “-” “Mozilla/4.0 (compatible;)”




需求:

* 求出Pv:页面访问量,用户每点击一次都会产生一条点击日志,一共计算有多少行的点击日志数据
* 求出Uv:用户访问量,针对Ip地址去重之后得到结果
* 求出TopK:访问的网站求解出用户访问网站的前几名

步骤:

* 1-准备SparkContext的环境
* 2-读取网站日志数据,通过空格分隔符进行分割
* 3-计算Pv,统计有多少行,一行就算做1次Pv
* 4-计算Uv,筛选出ip,统计去重后Ip
* 5-计算topk,筛选出对应业务的topk

代码:

* ```python
 
# -*- coding: utf-8 -*-
# Program function:完成网站访问指标的统计,Pv,Uv,TopK
'''
* 1-准备SparkContext的环境
* 2-读取网站日志数据,通过空格分隔符进行分割
* 3-计算Pv,统计有多少行,一行就算做1次Pv
* 4-计算Uv,筛选出ip,统计去重后Ip
* 5-计算topk,筛选出对应业务的topk
'''
from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
 # *1 - 准备SparkContext的环境
 conf = SparkConf().setAppName("click").setMaster("local[*]")
 sc = SparkContext.getOrCreate(conf=conf)
 sc.setLogLevel("WARN")  # 直接将log4j放入文件夹中
 # *2 - 读取网站日志数据,通过空格分隔符进行分割
 file_rdd = sc.textFile("/export/data/pyspark_workspace/PySpark-SparkCore_3.1.2/data/click/access.log")
 # *3 - 计算Pv,统计有多少行,一行就算做1次Pv
 rdd_map_rdd = file_rdd.map(lambda line: ("pv", 1))
 print("pv result is:", rdd_map_rdd.reduceByKey(lambda x, y: x + y).collect())  # pv result is: [('pv', 14619)]
 # *4 - 计算Uv,筛选出ip,统计去重后Ip
 # file_rdd_map = file_rdd.map(lambda line: line.split(" ")[0])
 file_rdd_map = file_rdd \
     .map(lambda line: line.split(" ")) \
     .map(lambda x: x[0])
 # print(file_rdd_map.take(5))
 uv_num = file_rdd_map \
     .distinct() \
     .map(lambda line: ("uv", 1))
 print("uvCount:", uv_num.reduceByKey(lambda x, y: x + y).collect())  # uvCount: [('uv', 1051)]
 # *5 - 计算topk,筛选出对应业务的topk,访问网站【10下标】的Topk,"需要使用\"-\"
 re = file_rdd \
     .map(lambda x: x.split(" ")) \
     .filter(lambda line: len(line) > 10) \
     .map(lambda line: (line[10], 1)) \
     .reduceByKey(lambda x, y: x + y) \
     .sortBy(lambda x: x[1], False) \
     .filter(lambda x: x[0] != "\"-\"")
 print(re.take(10))
 re1 = file_rdd \
     .map(lambda x: x.split(" ")) \
     .filter(lambda line: len(line) > 10) \
     .map(lambda line: (line[10], 1)) \
     .groupByKey()\
     .mapValues(sum) \
     .sortBy(lambda x: x[1], False) \
     .filter(lambda x: x[0] != "\"-\"")
 print(re1.take(10))

  • 完毕

总结:

  • 步骤中关键的步骤,对数据的理解,根据数据进行切分,过滤

反思:

  • 作为数据开发工程师,需要对数据有基础的认知,比如age字段如果大于100或小于0过滤

[掌握]共享变量

  • 两种共享变量:累加器,广播变量,就是在driver和executor中变量共享的,在driver定义,在exector执行计算

  • 累加器

    • 原理

      • 在Driver端和exeutor端可以共享Executor执行计算的结果
    • 不使用累加器

      • python本地集合可以直接得到结果
      • 但是在分布式集合中得不到累加的
    • 使用累加器

      • acc=sc.accumulate(10),10是初始值
      • acc.add(num)
      • print(acc.value)通过value获取累加器的值
    • 代码

    • # -*- coding: utf-8 -*-
      # Program function:测试累加器
      # 测试1:python集合的累加器,作为单机版本没有问题
      # 测试2:spark的rdd的集合,直接进行累加操作,不会触发结果到driver的
      # 原因;Driver端定义的变量,在executor执行完毕后没有将结果传递到driver
      # 引出共享变量,Driver和Executor共享变量的改变
      from pyspark import SparkContext, SparkConf
      
      if __name__ == '__main__':
          conf = SparkConf().setAppName("miniPy").setMaster("local[*]")
          sc = SparkContext(conf=conf)
          # 下面是rdd的集合
          l1 = [1, 2, 3, 4, 5]
          l1_textFile = sc.parallelize(l1)
          # 定义累加器
          acc_num = sc.accumulator(10)
      
      
          # 执行函数
          def add_num(x):
              global acc_num
              # acc_num+=x,这里了累加器提供的默认的方法是add方法
              acc_num.add(x)
      
      
          # 执行foreach
          l1_textFile.foreach(add_num)
          # 输出累加器的值
          print(acc_num)  # 25
          print(acc_num.value)  # 获取累加器的值,25
      
      
  • 广播变量

    • 原理
    • image-20221109162208080
    • image-20221109162226914
    • image-20221109162245666
    • 不使用广播变量
    • 使用广播变量
    • image-20221109162318409
    • 1-广播变量不是在每个Task拥有一份变量,而是每个节点的executor一份副本
    • 2-广播变量通过本地的executor从blockmanager中过去driver上面变量的副本(计算资源+计算程序)
    • 代码
  • 实战案例演示:

  • 这里对应的fruit_collect_as_map如果后面的水果种类比较多,

  • 每次的访问量比较大的时候,尽可能将fruit_collect_as_map转化为广播变量,通过广播变量让executor获取driver端定义变量的副本信息

  • 
    


# -*- coding: utf-8 -*-

# Program function:

from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
  conf = SparkConf().setAppName("miniPy").setMaster("local[*]")
  sc = SparkContext(conf=conf)

  # 这里定义rdd

  kvFruit = sc.parallelize([(1, "apple"), (2, "orange"), (3, "banana"), (4, "grape")])
  print(kvFruit.collect())
  fruit_collect_as_map = kvFruit.collectAsMap()

  # 声明广播变量

  broadcast_value = sc.broadcast(fruit_collect_as_map)

  # print(fruit_collect_as_map)#{1: 'apple', 2: 'orange', 3: 'banana', 4: 'grape'}

  # 通过指定索引来查询对应水果名称

  friut_ids = sc.parallelize([2, 1, 4, 3])

  # 执行查询

  print("执行查询后的结果")

  # ['orange', 'apple', 'grape', 'banana']

  print(friut_ids.map(lambda fruit: fruit_collect_as_map[fruit]).collect())
  print("执行广播变量之后查询后的结果")

  # 使用广播变量的取值

  print(friut_ids.map(lambda fruit: broadcast_value.value[fruit]).collect())

  • 完毕

  • 整合案例:

  • 需求:利用广播变量和累加器完成非字母的表示统计

  • image-20210913171305034
  • 步骤:

  • 1-读取数据

  • 2-切割字符串

  • 3-定义累加器,这里累加器可以计算非字母个数

  • 4-定义广播变量-------------[# !,@,#,]

  • 5-利用自定义函数累加非字母的表示

  • 6-执行统计

  • 7-停止sparkcontext

  • 代码:

-- coding: utf-8 --

Program function:

from pyspark import SparkConf, SparkContext
import re

‘’’



    # *5 - 利用自定义函数累加非字母的表示
    def f(x):
        global acc_count
        listValue = list_broadcast.value
        if x in listValue:
            # acc_count.add(1)
            acc_count += 1
            return 1
        else:
            return 0


    # *6 - 执行统计
    line__filter = file_rdd \
        .filter(lambda line: (len(line.strip()) > 0)) \
        .flatMap(lambda line: re.split("\s+", line)) \
        .filter(f)
    print(line__filter.count())  # 8
    print("list broadcast value is:", acc_count.value)  # 8
      
    # *6 - 执行各个非单词过滤统计
    line__filter_alpha = file_rdd \
        .filter(lambda line: (len(line.strip()) > 0)) \
        .flatMap(lambda line: re.split("\s+", line)) \
        .filter(f)\
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda x, y: x + y)
    print(line__filter_alpha.collect())#[('#', 2), ('!', 2), ('$', 2), ('%', 2)]
    # *7 - 停止sparkcontext

  • 总结:

  • 累加器和广播变量

  • 累加器注意事项:

  • 如果执行多次action操作的化,如果没有缓存会执行多次累加

  • 这里需要在执行累加的action操作之前最好做缓存,不至于累加器的数据是错误的

  • image-20210913174619884
  • 代码

  • 
    

# -*- coding: utf-8 -*-

# Program function:累加器的注意事项 accumulate

from pyspark import SparkConf, SparkContext

if __name__ == '__main__':

  # *1 - 准备SparkContext的环境

  conf = SparkConf().setAppName("click").setMaster("local[*]")
  sc = SparkContext.getOrCreate(conf=conf)
  sc.setLogLevel("WARN")  # 直接将log4j放入文件夹中

  acc = sc.accumulator(0)


  def judge_even(row_data):
      """
      过滤奇数,计数偶数个数
      """
      global acc
      if row_data % 2 == 0:
          acc += 1
          return 1
      else:
          return 0


  a_list = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
  even_num = a_list.filter(judge_even)
  even_num.cache()

  # 上述结果需要一个action触发

  print("count value is:", even_num.count())

  # 再次执行输出的时候,得到正确的结果 5

  print("acc value is:{}".format(acc))  # acc value is:5

  # 坑:如果再次执行action操作,会发生什么现象?

  # 解析:由于第二次执行action的操作会重新计算所有rdd的算子,会再次累加5,得到10结果

  print("count value is:", even_num.count())
  print("acc value is:{}".format(acc))  # acc value is:10

  • 完毕

[提高扩展]Spark 内核调度

  • RDD依赖

  • 为什么设计依赖?

    • 1-为了实现Spark的容错,rdd1-rdd2-rdd3-rdd4
    • 2-并行计算,划分依赖、
  • 为什么划分宽窄依赖?

  • 为了加速并行计算

  • 窄依赖可以并行计算,如果是宽依赖无法并行计算

  • 依赖的划分

  • 窄依赖:*父 RDD 与子 RDD 间的分区是一对一的*

  • image-20221109162350169

  • 宽依赖:划分Stage

    • *父 RDD 中的分区可能会被多个子 RDD 分区使用*
    • image-20221109162424704
  • 如何区分宽窄依赖?

    • 比如map。filter,flatMap 窄依赖
    • 比如reduceByKey,groupByKey,宽依赖(shuffle)
    • image-20221109162448117
      • 不能说:一个子RDD依赖于多个父rdd,该种情况无法判断
  • DAG和Stage

  • 如何划分Stage?

  • 根据Shuffle依赖,划分Stage?因为Shuffle前后都以执行并行计算

  • image-20221109162504358

  • 什么是DAG?

    • 有向无环图
    • image-20210913181944764
  • DAG如何划分Stage?

    • 一个Dag就是一个Job,一个Dag是由Action算子进行划分
    • 一个Job下面有很多Stage,根据宽依赖Shuffle依赖划分Stage
  • Job调度流程

  • image-20210913184137146

  • 一个Spark应用程序包括Job、Stage及Task:

l 第一:Job是以Action方法为界,遇到一个Action方法则触发一个Job;一个Job就是dag

l 第二:Stage是Job的子集,以RDD宽依赖(即Shuffle)为界,遇到Shuffle做一次划分;

l 第三:Task是Stage的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个task。

  • 结合资源获取,以onyarn为例

  • image-20210913184353535

  • 作业:
  • ip地址查询理清楚,写1遍
  • Job调度流程–只需要在宏观层面理解Job调度,更多借助ppt
  • 思考几个问题:
    • 是什么,为什么,怎么用
    • 缓存
    • checkpoint
    • dag
    • 依赖关系

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/4452.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

NLP | XLNet :用于语言理解的广义自回归预训练 论文详解

论文&#xff1a;XLNet: Generalized Autoregressive Pretraining for Language Understanding 论文地址&#xff1a;https://proceedings.neurips.cc/paper/2019/file/dc6a7e655d7e5840e66733e9ee67cc69-Paper.pdf 1.介绍 XLNet 是从蓬勃发展的自然语言处理 (NLP) 领域中出…

食用前须知(阅读并同意后在食用其他部分)

昨天刚和计科某数据结构老师聊这个事 让我别写题解了 以后会偷摸的在csdn更&#xff0c;大家千万低调点&#xff0c;严谨点&#xff01;&#xff01;&#xff01; 一般不会当天更了&#xff0c;会拖个一两天&#xff0c;大家先把会的写写&#xff0c;不会的再来看我教程 就算真…

高效率Paper写作需要哪些建议?

高效写Paper最关键的是要多写&#xff0c;写多了&#xff0c;英语行文能力提高&#xff0c;并且知道Paper写作大概的套路&#xff0c;Paper写作效率自然上升。小编为同学们带来一些建议。 The key to writing paper efficiently is to write more.If you write more,improve yo…

ijkplayer项目

ijkplayer项目 环境配置 NDK全称&#xff1a;Native Development Kit。 1、NDK是一系列工具的集合。NDK提供了一系列的工具&#xff0c;帮助开发者快速开发C&#xff08;或C&#xff09;的动态库&#xff0c;并能自动将so和java应用一起打包成apk。这些工具对开发者的帮助是巨…

C++ Reference: Standard C++ Library reference: C Library: cwchar: wmemset

C官网参考链接&#xff1a;https://cplusplus.com/reference/cwchar/wmemset/ 函数 <cwchar> wmemset wchar_t* wmemset (wchar_t* ptr, wchar_t wc, size_t num); 填充宽字符数组 将由ptr指向的宽字符数组的第一个num个元素设置为wc指定的值。 这是memset&#xff08;&…

瑞吉外卖强化(一):缓存优化

瑞吉外卖强化&#xff08;一&#xff09;&#xff1a;缓存优化瑞吉外卖 缓存优化Redis基本操作短信验证码 缓存实现缓存菜品数据SpringCache常用注解瑞吉外卖 缓存优化 Redis基本操作 redisTemplate需要配置类 这里的 需要对其进行 序列化操作 reidsTeplate.opsForValue().s…

HummerRisk 快速入门教程

1、一键部署 1. 部署服务器要求 操作系统要求&#xff1a;任何支持 Docker 的 Linux x64CPU内存要求&#xff1a;最低要求 4C8G&#xff0c;推荐 8C16G部署目录空间&#xff08;默认/opt目录&#xff09;要求&#xff1a; 50G网络要求&#xff1a;可访问互联网&#xff08;如…

Recall:JS EventLoop

有时候一段代码没有达到你想要的效果&#xff0c;可能加上setTimeout就好了 之前对事件循环一知半解&#xff0c;今天重新深入理解一下&#x1f602; 宏任务 JS是单线程的&#xff0c;但是浏览器是多线程的&#xff0c;当 JS 需要执行异步任务时&#xff0c;浏览器会另外启…

企业架构概述及业务架构详解

编辑导语&#xff1a;企业架构可以辅助企业完成业务及IT战略规划&#xff0c;还是企业信息化规划的核心&#xff0c;也有助于个人职业的健康长远发展。本文作者对企业架构的全景以及业务架构设计进行了分析&#xff0c;感兴趣的小伙伴们一起来看一下吧。 1&#xff09;对公司而…

PyTorch 加载 Mask R-CNN 预训练模型并 fine-tuning

目录1 Mask R-CNN 原理(简单版)2 ROI Align3 PyTorch 加载预训练模型1 Mask R-CNN 原理(简单版) Mask R-CNN 是一个实例分割&#xff08;Instance segmentation&#xff09;算法&#xff0c;主要是在目标检测的基础上再进行分割。 Mask R-CNN 算法主要是 Faster R-CNN FCN&…

算法练习题(涉外黄成老师)

1.带锁的门在走廊上有n个带锁的门&#xff0c;从1到n依次编号。最初所有的门都是关着的。我们从门前经过n次&#xff0c;每一次都从1号门开始。在第i次经过时(i1,2,…,n)我们改变i的整数倍号锁的状态:如果门是关的&#xff0c;就打开它;如果门是打开的&#xff0c;就关上它。在…

CEC2015:(二)动态多目标野狗优化算法DMODOA求解DIMP2、dMOP2、dMOP2iso、dMOP2dec(提供Matlab代码)

一、cec2015中测试函数DIMP2、dMOP2、dMOP2iso、dMOP2dec详细信息 CEC2015&#xff1a;动态多目标测试函数之DIMP2、dMOP2、dMOP2iso、dMOP2dec详细信息 二、动态多目标野狗优化算法 多目标野狗优化算法&#xff08;Multi-Objective Dingo Optimization Algorithm&#xff0…

#入坑keychron#你还没一起入坑吗?

经济和科技飞速发展的今天&#xff0c;我们早已不在像从前那样有电脑玩就行&#xff0c;现在的我们追求的是更高的配置、更好的体验&#xff0c;就像从前一碗泡面就是最高的理想&#xff0c;而现在最少都得有根泡面搭档才能勉强接受&#xff0c;连泡面都有搭档&#xff0c;电脑…

web前端期末大作业:旅游网页设计与实现——个人旅游博客(4页)HTML+CSS

&#x1f468;‍&#x1f393;学生HTML静态网页基础水平制作&#x1f469;‍&#x1f393;&#xff0c;页面排版干净简洁。使用HTMLCSS页面布局设计,web大学生网页设计作业源码&#xff0c;这是一个不错的旅游网页制作&#xff0c;画面精明&#xff0c;排版整洁&#xff0c;内容…

【后端】初识HTTP_2

我们学习的HTTP协议&#xff0c;是应用层里面最广泛使用的协议~ 我们主要是学习HTTP的请求响应的报文格式 我们可以借助抓包工具来学习&#xff0c;抓包抓到的是文本格式~~ 根据上节内容 我们大概了解了请求和响应的格式 请求有4部分&#xff1a; &#xff08;1&#xff…

leetcode 51. N皇后 回溯法求解(c++版本)

题目描述 简单来说就给一个N*N的棋盘 棋盘上的每一列每一行以及每一个对角不能出现两个皇后 因此明确以下几点 要找出所有可能的解法也是采用回溯法进行求解&#xff08;具体在下面进行详解&#xff09; 用下面一张示例图来说明回溯法的思路 说白了就是进行搜索&#xff0c;…

java项目-第102期基于ssm的校园二手交易平台-java毕业设计

java项目-第102期基于ssm的校园二手交易平台 【源码请到资源专栏下载】 1、项目简述 Hi&#xff0c;大家好&#xff0c;今天分享的源码是基于ssm的校园二手交易平台。 该交易平台分为两部分&#xff0c;前台和后台。用户在前台进行商品选购以及交易&#xff1b;管理员登录后台可…

python-(6-3-3)爬虫---requests入门(对参数封装)

文章目录一 需求二 分析三 代码四 补充说明一 需求 爬取豆瓣电影的“纪录片”的电影信息数据 二 分析 老规矩&#xff0c;先在网页的“检查”中提取我们需要的信息 如下图所示。在“纪录片”那一页面&#xff0c;选择"network"----“XHR”----“preview”。 我们…

【附源码】Python计算机毕业设计面向社区的购物平台系统

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

java计算机毕业设计ssm+vue网络考试信息网站

项目介绍 对网络考试系统进行了介绍&#xff0c;包括研究的现状&#xff0c;还有涉及的开发背景&#xff0c;然后还对系统的设计目标进行了论述&#xff0c;还有系统的需求&#xff0c;以及整个的设计方案&#xff0c;对系统的设计以及实现&#xff0c;也都论述的比较细致&…