spark的学习-03

news2024/11/14 3:17:23

RDD的创建的两种方式:

方式一:并行化一个已存在的集合

方法:parallelize 并行的意思

将一个集合转换为RDD

方式二:读取外部共享存储系统

方法:textFile、wholeTextFile、newAPIHadoopRDD等

读取外部存储系统的数据转换为RDD

RDD的五大特征:

  1. 每个RDD 都由一系列的分区构成

  2. RDD 的转换操作本质上就是对RDD所有分区的并行转换

  3. 每个RDD 都会保存与其他RDD之间的依赖关系:血链机制或者血脉机制

  4. 如果是二元组【KV】类型的RDD,在Shuffle过程中可以自定义分区器,默认是hash分区(hash值取模进行分区)

  5. 可选的,Spark程序运行时,Task的分配可以指定实现本地优先计算:最优计算位置

 
 

RDD的五大特性分别是什么?

a. 每个RDD都可以由多个分区构成

b. 对RDD转换处理本质上是对RDD所有分区的并行转换处理

c. 对每个RDD都会保留与其他RDD之间的依赖关系:血脉机制

d. 可选的,对于KV结构的RDD,在经过Shuffle时,可以干预分区规则,默认是Hash分区

e. 可选的,Spark分配Task时会优先本地计算,尽量将Task分配到数据所在的节点

转换算子:

map:
# map:
list1 = [1, 2, 3, 4, 5]
# 目标是求出集合中各个元素的 3 次方
listRdd = sc.parallelize(list1)
mapRdd = listRdd.map(lambda x: math.pow(x, 3))
mapRdd.foreach(lambda x: print(x))  # foreach是触发算子

flatMap:
# flatMap:
# 目标是根据/切割,得到每个歌名
fileRdd = sc.textFile("../../datas/wordcount/songs.txt")
flatMapRdd = fileRdd.flatMap(lambda line: line.split("/"))
flatMapRdd.foreach(lambda x:print(x))

filter:

过滤算子

# filter :
# 目标是过滤掉不符合的文本
fileRdd2 = sc.textFile("../../datas/wordcount/songs2.txt")
filterRdd = fileRdd2.filter(lambda line: re.split("\s",line)[2] != '-1' and len(re.split("\s",line)) == 4)
filterRdd.foreach(lambda x: print(x))

union:

联合

list2 = [1,2,3,4,5,6,7,8]
list3 = [6,7,8,9,10]
rdd1 = sc.parallelize(list2)
rdd2 = sc.parallelize(list3)

rdd3 = rdd1.union(rdd2)

rdd3.foreach(lambda x: print(x))   # 1 2 3 4 5 6 7 8 6 7 8 9 10
distinct:

去重

rdd4 = rdd3.distinct()
rdd4.foreach(lambda x: print(x))  # 1 2 3 4 5 6 7 8 9 10
分组聚合算子 groupByKey 以及 reduceByKey:

groupByKey只根据key进行分组,但不聚合 reduceByKey根据key进行分组,且进行聚合 (必须进行shuffle,可以指定分区的数量和规则) groupByKey转换算子,只对 KV键值对的RDD 起作用

rdd5 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
rdd6 = rdd5.groupByKey()  # ("word",List[10,5])
rdd6.foreach(lambda x: print(x[0], *x[1]))   

rdd7 = rdd5.reduceByKey(lambda total, num: total + num)
rdd7.foreach(print)

重分区算子:repartition、coalesce :

二者都可以将分区变大变小

repartition必须经过shuffle 因为底层代码中 shuffle = True,可以将分区变小或者变大

而coalesce 可以选择经过不经过shuffle,默认情况下不经过,在默认情况下,只能将分区变小,不能将分区变大。假如shuffle=True,也可以将分区变大。

使用repartition更改分区的数量:
list01 = [1, 5, 2, 6, 9, 10, 4, 3, 8, 7]
# 没有指定分区,走默认,默认分区个数,因为是local 模式,所以跟核数有关,所以 分区数为2
rdd = sc.parallelize(list01)
print(rdd.getNumPartitions())  # getNumPartitions() 获取分区的数量 返回值不是RDD,所以不是转换算子,是触发算子   # 2
# 使用 repartition 将 分区数量改为4 或 1
changeRdd = rdd.repartition(4)  # 经过shuffle过程,将分区数量更改为4
print(changeRdd.getNumPartitions())  # 现在就将rdd 的分区更改为4了   # 4
# 还可以更改为1 (缩小分区)
print(rdd.repartition(1).getNumPartitions())   # 1
使用coalesce 更改分区的数量:

将小分区变为大分区,必须进行shuffle过程 在coalesce的中默认shuffle=Flase,所以我们需要手动更改为True

changeRdd2 = rdd.coalesce(4,shuffle=True)  #
print(changeRdd2.getNumPartitions())  # 4

将大分区改为小分区,在coalesce中可以不进行shuffle过程,所以不需要改为True

print(rdd.coalesce(1).getNumPartitions())  # 1 
排序算子:sortBy、sortByKey:
 fileRdd = sc.textFile("../../datas/c.txt")
    #fileRdd.sortBy(lambda line:line.split(",")[1],ascending=False).foreach(print)

    # sortByKey  对KV类型的RDD进行排序
    rdd5 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
    #rdd5.sortByKey(ascending=False).foreach(print)

    # 假如你想根据value排序,怎么办?
    rdd5.sortBy(lambda tuple:tuple[1],ascending=False).foreach(print)  
    # ascending=False降序排序

触发算子:

常见的触发算子:count、foreach、take
# 较为常见的触发算子
# count  foreach  saveAsTextFile
# count
list1 = [1,2,3,4,5,6,7,8,9]
rdd1 = sc.parallelize(list1,2)
print(rdd1.count())  #9

rdd1.foreach(lambda x: print(x))

print(rdd1.take(3))  # [1 2 3]

其他触发算子:

first、take:
# first: 返回RDD集合中的第一个元素
print(rdd1.first())  # 1 

print(rdd1.take(3))  # [1 2 3]
collect:

我们在上面sortBy案例中写到了collect,如果不collect就直接打印结果的话,出来的是各个分区中排序的结果,并不是全局的(sortBy是全局排序的,只不过我们之前有分区,只在分区中排序)

想看到全局的排序,可以直接将分区数量更改为1,或者直接使用collect收集

reduce:

我们在上面的案例中也使用到了reduceByKey转换算子,这个和上面的差不多,只不过reduce只进行聚合,而不需要根据key分组什么的,因为就没有key

print(rdd1.reduce(lambda sum, num:sum + num))  # 45
top 和 takeOrdered:

先对RDD中的所有元素进行升序排序,top返回最大的几个元素、takeOrdered返回最小的几个元素

都不经过shuffle,将所有元素放入Driver内存中排序,性能更好,只能适合处理小数据量

list2 = [2,1,5,79,435,33,576]
rdd2 = sc.parallelize(list2)
print(rdd2.top(3))  # [576, 435, 79]
# takeOrdered 也是一个触发算子,返回排序之后的最小的几个值
print(rdd2.takeOrdered(3))  # [1, 2, 5]

join 方面的算子:

join leftOuterJoin rightOuterJoin fullOuterJoin 都为转换算子

join的过程,必然引发相同key值的数据汇总在一起,引发shuffle 操作

join:
rdd_singer_age = sc.parallelize([("周杰伦", 43), ("陈奕迅", 47), ("蔡依林", 41), ("林子祥", 74), ("陈升", 63)],
                                numSlices=2)
rdd_singer_music = sc.parallelize(
    [("周杰伦", "青花瓷"), ("陈奕迅", "孤勇者"), ("蔡依林", "日不落"), ("林子祥", "男儿当自强"),
     ("动力火车", "当")], numSlices=2)

# join
joinRdd = rdd_singer_age.join(rdd_singer_music).foreach(lambda x : print(x))   
# ('周杰伦', (43, '青花瓷'))
# ('蔡依林', (41, '日不落'))
# ('陈奕迅', (47, '孤勇者'))
# ('林子祥', (74, '男儿当自强'))
leftOuterJoin:

和sql中的leftjoin一样,左边的值全出,右边的值有的就显示,没有就显示null

rightOuterJoin 同理

leftJoinRdd = rdd_singer_age.leftOuterJoin(rdd_singer_music).foreach(lambda x:print(x))
#('周杰伦', (43, '青花瓷'))
#('蔡依林', (41, '日不落'))
#('陈升', (63, None))
#('陈奕迅', (47, '孤勇者'))
#('林子祥', (74, '男儿当自强'))
fullOuterJoin:
fullJoinRdd = rdd_singer_age.fullOuterJoin(rdd_singer_music).foreach(lambda x: print(x)) 
# ('动力火车', (None, '当'))
# ('周杰伦', (43, '青花瓷'))
# ('蔡依林', (41, '日不落'))
# ('陈升', (63, None))
# ('陈奕迅', (47, '孤勇者'))
# ('林子祥', (74, '男儿当自强'))

分区算子 mapPartitions -- 转换算子 foreachParition -- 触发算子

mapPartitions:
input_rdd = sc.parallelize((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), numSlices=2)
# 使用mapPartitions:对每个分区进行处理
def map_partition(part):
    rs = [i * 2 for i in part]
    return rs

# 每个分区会调用一次:将这个分区的数据放入内存,性能比map更好,优化型算子,注意更容易出现内存溢出
map_part_rdd = input_rdd.mapPartitions(lambda part: map_partition(part))

foreachParition:

- 优点:性能快、节省外部连接资源 - 缺点:如果单个分区的数据量较大,容易出现内存溢出 - 场景: -数据量不是特别大,需要提高性能【将整个分区的数据放入内存】 -需要构建外部资源时【基于每个分区构建一份资源】

def save_part_to_mysql(part):
    # 构建MySQL连接
    for i in part:
        # 利用MySQL连接将结果写入MySQL
        print(i)

# 将每个分区的数据直接写入MySQL,一个分区就构建一个连接
map_part_rdd.foreachPartition(lambda part: save_part_to_mysql(part))

Spark的容错机制:(重点)

1、RDD容错机制:persist持久化机制

其中有三个算子: cache 、 persist 、 unpersist

cache:
# 功能:将RDD缓存在内存中
# 本质其实底层还是调用的 persist ,但是只缓存在内存中,如果内存不足的话,缓存就会失败
语法:cache()
persist :

  与cache不同的是,persist 可以自己指定缓存的方式(级别)

# 将RDD缓存在磁盘中
StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False)
StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
StorageLevel.DISK_ONLY_3 = StorageLevel(True, False, False, False, 3)

# 将RDD缓存在内存中
StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False)
StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)

# 将RDD优先缓存在内存中,如果内存不足,就缓存在磁盘中
StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False)
StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)

# 使用堆外内存
StorageLevel.OFF_HEAP = StorageLevel(True, True, True, False, 1)

# 使用序列化
StorageLevel.MEMORY_AND_DISK_DESER = StorageLevel(True, True, False, True)

常用的有

MEMORY_AND_DISK_2   -- 先缓存内存,如果内存不足就缓存在磁盘
MEMORY_AND_DISK_DESER   -- 使用序列化
unpersist :

功能就是将缓存释放出去

  • unpersist(blocking=True):等释放完再继续下一步 (默认为False)

  • 场景:明确RDD已经不再使用,后续还有很多的代码需要执行,将RDD的数据从缓存中释放,避免占用资源

  • 注意:如果不释放,这个Spark程序结束,也会释放这个程序中的所有内存

总体代码演示:

 # step3: 保存结果
  # 对RDD进行缓存
  rs_rdd.cache() # 只缓存在内存中
  rs_rdd.persist(StorageLevel.MEMORY_AND_DISK)
    # 打印结果:构建RDD
  rs_rdd.foreach(lambda x: print(x))
    # 打印第一行:重新构建一遍
  print(rs_rdd.first())
    # 统计行数:重新构建一遍
    print(rs_rdd.count())

    # todo:3-关闭SparkContext
    time.sleep(10000)
  # 如果这个RDD明确后续代码中不会再被使用,一定要释放缓存
    rs_rdd.unpersist(blocking=True)

# unpersist(blocking=True):等RDD释放完再继续下一步
# blocking = True:阻塞

2、checkPoint 检查点

checkpoint需要在触发算子的前面设置检查点,之后设置的话可能会出现只产生文件夹,而不产生结果的情况

# 创建sc对象
conf = SparkConf().setMaster("local[2]").setAppName("第一个pysparkDemo")
sc = SparkContext(conf=conf)

fileRdd = sc.textFile("../../datas/wordcount/sogou.tsv")
mapRdd = (fileRdd.filter(lambda line: len(re.split("\s+", line)) == 6) \
          .map(lambda line: (re.split("\s+", line)[0], re.split("\s+", line)[1], re.split("\s+", line)[2][1:-1])))

sc.setCheckpointDir("../datas/chk/chk1")

mapRdd.checkpoint()
# checkpoint需要在触发算子的前面设置检查点,之后设置的话可能会出现只产生文件夹,而不产生结果的情况

print(mapRdd.count())

time.sleep(100)

sc.stop()

容错机制面试题:

RDD的cache、persist持久化机制和checkpoint检查点机制有什么区别?

  • 存储位置

    • persist:将RDD缓存在内存或者磁盘中

    • chk:将RDD的数据存储在文件系统磁盘中

  • 生命周期

    • persist:当代码中遇到了unpersist或者程序结束,缓存就会被自动清理

    • chk:检查点的数据是不会被自动清理的,只能手动删除

  • 存储内容

    • persist:会保留RDD的血脉关系,如果缓存丢失,可以通过血脉进行恢复

    • chk:会斩断RDD的血脉关系,不会保留RDD的血脉关系的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2238232.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

axios平替!用浏览器自带的fetch处理AJAX(兼容表单/JSON/文件上传)

fetch 是啥? fetch 函数是 JavaScript 中用于发送网络请求的内置 API,可以替代传统的 XMLHttpRequest。它可以发送 HTTP 请求(如 GET、POST 等),并返回一个 Promise,从而简化异步操作 基本用法 /* 下面是…

Linux(CentOS)安装 Nginx

CentOS版本:CentOS 7 Nginx版本:1.24.0 有两种安装方式 一、通过 yum 安装 需要 root 权限,普通用户使用 sudo 进行命令操作 参考:https://nginx.org/en/linux_packages.html#RHEL 1、安装依赖 sudo yum install yum-utils 2…

[原创]手把手教学之前端0基础到就业——day11( Javascript )

文章目录 day11(Javascript)01Javascript①Javascript是什么②JavaScript组成③ Javascript的书写位置1. 行内式 (不推荐)2 . 内部位置使用 ( 内嵌式 )3. 外部位置使用 ( 外链式 ) 02变量1. 什么是变量2. 定义变量及赋值3. 注意事项4. 命名规范 03输入和输出1) 输出形式12) 输出…

【C++笔记】C++三大特性之继承

【C笔记】C三大特性之继承 🔥个人主页:大白的编程日记 🔥专栏:C笔记 文章目录 【C笔记】C三大特性之继承前言一.继承的概念及定义1.1 继承的概念1.2继承的定义1.3继承基类成员访问方式的变化1.4继承类模板 二.基类和派生类间的转…

Colorful/七彩虹iGame G-ONE Plus 12代处理器 Win11原厂OEM系统 带COLORFUL一键还原

安装完毕自带原厂驱动和预装软件以及一键恢复功能,自动重建COLORFUL RECOVERY功能,恢复到新机开箱状态。 【格式】:iso 【系统类型】:Windows11 原厂系统下载网址:http://www.bioxt.cn 注意:安装系统会…

【LeetCode】分发糖果 解题报告

135. 分发糖果 - 题目链接 n个孩子站成一排。给你一个整数数组ratings表示每个孩子的评分。 你需要按照以下要求,给这些孩子分发糖果: 每个孩子至少分配到1个糖果。相邻两个孩子评分更高的孩子会获得更多的糖果。 请你给每个孩子分发糖果,…

ArcGIS从Excel表格文件导入XY数据并定义坐标系与投影的方法

本文介绍在ArcMap软件中,从Excel表格文件中批量导入坐标点数据,将其保存为.shp矢量格式,并定义坐标系、转为投影坐标系的方法。 已知我们有一个Excel表格文件(可以是.xls、.xlsx、.csv等多种不同的表格文件格式)&#…

爬虫 - 二手交易电商平台数据采集 (一)

背景: 近期有一个需求需要采集某电商网站平台的商品数据进行分析。因此,我计划先用Python实现一个简单的版本,以快速测试技术的实现可能性,再用PHP实现一个更完整的版本。文章中涉及的技术仅为学习和测试用途,请勿用于商业或非法用…

「C/C++」C++标准库 之 #include<iostream> 标准输入输出

✨博客主页何曾参静谧的博客📌文章专栏「C/C」C/C程序设计📚全部专栏「VS」Visual Studio「C/C」C/C程序设计「UG/NX」BlockUI集合「Win」Windows程序设计「DSA」数据结构与算法「UG/NX」NX二次开发「QT」QT5程序设计「File」数据文件格式「PK」Parasoli…

AI出图,在家装行业杀疯了!

家装行业作为一个庞大的产业,长期以来面临着诸多难题,而随着AIGC技术的蓬勃发展,AI促进家装设计行业迎来了新的春天。 在传统家装设计流程中,相信大家对“设计环节充满了繁琐与复杂”有着非常深刻的体验,设计师需要花…

MySQL核心业务大表归档过程

记录一下2年前的MySQL大表的归档,当时刚到公司,发现MySQL的业务核心库,超过亿条的有7张表,最大的表有9亿多条,有37张表超过5百万条,部分表行数如下: 在测试的MySQL环境 : pt-archiv…

深度学习——权重初始化、评估指标、梯度消失和梯度爆炸

文章目录 🌺深度学习面试八股汇总🌺权重初始化零初始化 (Zero Initialization)随机初始化 (Random Initialization)Xavier 初始化(Glorot 初始化)He 初始化正交初始化(Orthogonal Initialization)预训练模型…

Proteus中数码管动态扫描显示不全(已解决)

文章目录 前言解决方法后记 前言 我是直接把以前写的 51 数码管程序复制过来的,当时看的郭天祥的视频,先送段选,消隐后送位选,最后来个 1ms 的延时。 代码在 Proteus 中数码管静态是可以的,动态显示出了问题——显示…

简单说一下Pinia 和 Vuex的区别

Pinia 和 Vuex 是 Vue.js 生态系统中两种状态管理库,它们都用于管理复杂应用的状态。尽管它们的目标相似,但在设计和使用上有许多不同之处。以下是 Pinia 和 Vuex 的主要区别: 1. 设计理念 Vuex 集中式存储:Vuex 采用单一的集中…

个人博客静态样式部署

首页部分 views/layout/layoutwave.vue <script setup> import router from /router; import { ref, onMounted, onUnmounted } from vue import /assets/img/icons/font_p81061dps7l/iconfont.cssconst canvas ref(null) const canvasFa ref(null) const fish ref(n…

综合文化信息管理系统|基于java和小程序的综合文化信息管理系统设计与实现(源码+数据库+文档)

综合文化信息管理系统 目录 基于java和小程序的打印室预约系统设计与实现 一、前言 二、系统设计 三、系统功能设计 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取&#xff1a; 博主介绍&#xff1a;✌️大厂码农|毕设布道师&…

渗透测试(socket,namp,scapy)

socket:可以用来实现不同虚拟机或者不同计算机之间的通信。 socket常用函数&#xff1a; sock.bind(host,port) //host可接受client范围&#xff0c;以及连接的端口 sock.listen()//sever开启监听连接 sock.accpet()//返回 sock&#xff0c;addr 用来接受和发送数据 addr…

leetcode刷题记录(二十六)——151. 反转字符串中的单词

&#xff08;一&#xff09;问题描述 . - 力扣&#xff08;LeetCode&#xff09;. - 备战技术面试&#xff1f;力扣提供海量技术面试资源&#xff0c;帮助你高效提升编程技能,轻松拿下世界 IT 名企 Dream Offer。https://leetcode.cn/problems/reverse-words-in-a-string/desc…

python中常见的8种数据结构之一数组的应用

在Python中&#xff0c;数组是一种常见的数据结构&#xff0c;用于存储一系列相同类型的元素。在实际应用中&#xff0c;数组可以用于解决各种问题。 以下是数组在Python中的一些常见应用&#xff1a; 1. 存储和访问数据&#xff1a;数组可以用于存储和访问一组数据。可以通过…

JS禁用鼠标滚动条功能且滚动条不消失教程

这个JS功能我找了好久好久才找到的&#xff0c;希望能够帮助到大家&#xff0c;网上有很多教程虽然能够实现禁用滚动条的效果&#xff0c;但是滚动条却直接消失不见了&#xff0c;那么今天我就把禁用滚动条但滚动条不消失的JS代码分享给大家。 实例代码如下&#xff1a; JS禁用…