目录
1. 简述Spark持久化中缓存和checkpoint检查点的区别
2 . 如何使用缓存和检查点?
3 . 代码题
浏览器Nginx案例
先进行数据清洗,做后续需求用
1、需求一:点击最多的前10个网站域名
2、需求二:用户最喜欢点击的页面排序TOP10
3、需求三:统计每分钟用户搜索次数
学生系统案例
4. RDD依赖的分类
5. 简述DAG与Stage 形成过程
DAG :
Stage :
1. 简述Spark持久化中缓存和checkpoint检查点的区别
1- 数据存储位置不同
缓存: 存储在内存或者磁盘 或者 堆外内存中
checkpoint检查点: 可以将数据存储在磁盘或者HDFS上, 在集群模式下, 仅能保存到HDFS上
2- 数据生命周期:
缓存: 当程序执行完成后, 或者手动调用unpersist 缓存都会被删除
checkpoint检查点: 即使程序退出后, checkpoint检查点的数据依然是存在的, 不会删除, 需要手动删除
3- 血缘关系:
缓存: 不会截断RDD之间的血缘关系, 因为缓存数据有可能是失效, 当失效后, 需要重新回溯计算操作
checkpoint检查点: 会截断掉依赖关系, 因为checkpoint将数据保存到更加安全可靠的位置, 不会发生数据丢失的问题, 当执行失败的时候, 也不需要重新回溯执行
4- 主要作用不同:
缓存: 提高Spark程序的运行效率
checkpoint检查点: 提高Spark程序的容错性
2 . 如何使用缓存和检查点?
将两种方案同时用在一个项目中, 先设置缓存,再设置检查点 , 最后一同使用Action算子进行触发, 这样程序只会有一次IO操作, 如果先设置检查点的话,就会有2次IO操作;
当在后续工程中读取数据的时候,优先从缓存中读取,如果缓存中没有数据, 再从检查点读取数据,并且会将数据缓存一份到内存中 ,后续直接从缓存中读取数据
3 . 代码题
浏览器Nginx案例
先进行数据清洗,做后续需求用
import os
from pyspark import SparkConf, SparkContext,StorageLevel
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 绑定指定的Python解释器
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
if __name__ == '__main__':
# 1- 创建SparkSession对象
conf = SparkConf().setAppName('需求1').setMaster('local[*]')
sc = SparkContext(conf=conf)
# 2- 数据输入
init_rdd = sc.textFile('file:///export/data/2024.1.2_Spark/1.6_day04/SogouQ.sample')
# 3- 数据处理
filter_tmp_rdd = init_rdd.filter(lambda line:line.strip()!='')
print('过滤空行的数据',filter_tmp_rdd.take(10))
map_rdd = filter_tmp_rdd.map(lambda line:line.split())
print('map出来的数据',map_rdd.take(10))
len6_rdd = map_rdd.filter(lambda line:len(line)==6)
print('字段数为6个的字段',len6_rdd.take(10))
etl_rdd = len6_rdd.map(lambda list:(
list[0],list[1],list[2][1:-1],list[3],list[4],list[5]) )
print('转换成元组后的数据',etl_rdd.take(10))
# 设置缓存
etl_rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK).count()
1、需求一:点击最多的前10个网站域名
print('点击最多的前10个网站域名','-'*50)
website_map_rdd = etl_rdd.map(lambda tup:(tup[5].split('/')[0],1))
print('把网站域名切出来,变成(hello,1)的格式',website_map_rdd.take(10))
website_reducekey_rdd = website_map_rdd.reduceByKey(lambda agg,curr:agg+curr)
print('进行聚合',website_reducekey_rdd.take(10))
sort_rdd =website_reducekey_rdd.sortBy(lambda tup:tup[1],ascending=False)
print('进行降序排序',sort_rdd.take(10))
# 4- 数据输出
# 5- 释放资源
sc.stop()
2、需求二:用户最喜欢点击的页面排序TOP10
print('用户最喜欢点击的页面排序TOP10','-'*100)
top_10_order = etl_rdd.map(lambda tup:(tup[4],1))
print('点击量排行',top_10_order.take(10))
top_10_reducebykey = top_10_order.reduceByKey(lambda agg,curr:agg+curr)
print('进行聚合',top_10_reducebykey.take(10))
sortby_top10 = top_10_reducebykey.sortBy(lambda line:line[1],ascending=False)
print('进行排序',sortby_top10.take(10))
# 4- 数据输出
# 5- 释放资源
sc.stop()
3、需求三:统计每分钟用户搜索次数
print('统计每分钟用户搜索次数','-'*50)
search_map_rdd = etl_rdd.map(lambda tup:(tup[0][0:5],1))
print('把网站域名切出来,变成(hello,1)的格式',search_map_rdd.take(10))
search_reducekey_rdd = search_map_rdd.reduceByKey(lambda agg,curr:agg+curr)
print('进行聚合',search_reducekey_rdd.take(10))
sort_rdd =search_reducekey_rdd.sortBy(lambda tup:tup)
print('按照时间进行排序',sort_rdd.take(10))
# 4- 数据输出
# 5- 释放资源
sc.stop()
学生系统案例
数据准备
import os
from pyspark import SparkConf, SparkContext, StorageLevel
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 绑定指定的Python解释器
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
if __name__ == '__main__':
# 1- 创建SparkSession对象
conf = SparkConf().setAppName('学生案例').setMaster('local[*]')
sc = SparkContext(conf=conf)
# 2- 数据输入
init_rdd= sc.textFile('hdfs://node1:8020/input/day04_home_work.txt')
# 3- 数据处理
stu_rdd = init_rdd.map(lambda line:line.split(',')).cache()
print('切分后的数据为',stu_rdd.collect())
# 1、需求一:该系总共有多少学生
stu_cnt = stu_rdd.map(lambda line:line[0]).distinct().count()
print(f'该系总共有{stu_cnt}个学生')
# 2、需求二:该系共开设了多少门课程
subject_cnt = stu_rdd.map(lambda line:line[1]).distinct().count()
print(f'该系共开设了{subject_cnt}门课程')
# 3、需求三:Tom同学的总成绩平均分是多少
tom_score_sum = stu_rdd.filter(lambda line:line[0]=='Tom').map(lambda line:int(line[2])).sum()
tom_subject_num = stu_rdd.filter(lambda line:line[0]=='Tom').map(lambda line:line[1]).distinct().count()
tom_score_avg = tom_score_sum/tom_subject_num
print(f'Tom同学的总成绩平均分是{round(tom_score_avg,2)}')
# 4、需求四:求每名同学的选修的课程门数
# every_student_course_num = stu_rdd.map(lambda x: (x[0], x[1])).distinct().map(lambda tup: (tup[0], 1))\
# .reduceByKey(lambda agg, curr: agg + curr).collect()
every_student_course_num = stu_rdd.map(lambda x: (x[0], x[1])).distinct()
print('学生与选修课,把一个学生重修一门选修课的情况去掉',every_student_course_num.collect())
every_student_course_num2 = every_student_course_num\
.map(lambda tup:(tup[0],1))\
.reduceByKey(lambda agg,curr:agg+curr).collect()
print('每个同学的选修课数',every_student_course_num2)
# 5、需求五:该系DataBase课程共有多少人选修
subject_database = stu_rdd.filter(lambda line:line[1]=='DataBase').map(lambda line:line[0]).distinct().count()
print(f'数据库有{subject_database}人选修')
# 6、需求六:各门课程的平均分是多少
total_score = stu_rdd.map(lambda x:(x[1],int(x[2]))).groupByKey().map(lambda x:(x[0],sum(x[1])))
print('各科总分为',total_score.collect())
total_num = stu_rdd.map(lambda x: (x[1], 1)).groupByKey().map(lambda x: (x[0], sum(x[1])))
print('各科的数量为',total_num.collect())
#
total_join =total_score.join(total_num)
print('join后结果',total_join.collect())
# 各科总分为 [('DataBase', 170), ('Algorithm', 110), ('DataStructure', 140)]
# 各科的数量为 [('DataBase', 2), ('Algorithm', 2), ('DataStructure', 2)]
# 合表后为 [('DataBase', (170, 2)), ('DataStructure', (140, 2)), ('Algorithm', (110, 2))]
total_avg =total_score.join(total_num).map(lambda x: (x[0], round(x[1][0] / x[1][1], 2))).collect()
print('各科目的平均分为',total_avg)
# 4- 数据输出
# 5- 释放资源
sc.stop()
4. RDD依赖的分类
窄依赖: 父RDD分区与子RDD分区是一对一关系
宽依赖: 父RDD分区与子RDD分区是一对多关系
5. 简述DAG与Stage 形成过程
DAG :
1-Spark应用程序,遇到了Action算子以后,就会触发一个Job任务的产生。Job任务首先将它所依赖的全部算子加载到内存中,形成一个完整Stage
2-会根据算子间的依赖关系,从Action算子开始,从后往前进行回溯,如果算子间是窄依赖,就放到同一个Stage中;如果是宽依赖,就形成新的Stage。一直回溯完成。
Stage :
1-Driver进程启动成功以后,底层基于PY4J创建SparkContext对象,在创建SparkContext对象的过程中,还会同时创建DAGScheduler(DAG调度器)和TaskScheduler(Task调度器)
DAGScheduler: 对Job任务形成DAG有向无环图和划分Stage阶段
TaskScheduler: 调度Task线程给到Executor进程进行执行
2-Spark应用程序遇到了一个Action算子以后,就会触发一个Job任务的产生。SparkContext对象将Job任务提交DAG调度器,对Job形成DAG有向无环图和划分Stage阶段。并且确定每个Stage阶段需要有多少个Task线程,将这些Task线程放置在TaskSet集合中。再将TaskSet集合给到Task调度器。
3-Task调度器接收到DAG调度器传递过来的TaskSet集合以后,将Task线程分配给到具体的Executor进行执行,底层是基于调度队列SchedulerBackend。Stage阶段是一个一个按顺序执行的,不能并行执行。
4-Executor进程开始执行具体的Task线程。后续过程就是Driver监控多个Executor的执行状态,直到Job任务执行完成。