2024.1.8 Day04_SparkCore_homeWork

news2025/1/12 4:09:34

目录

1. 简述Spark持久化中缓存和checkpoint检查点的区别

2 . 如何使用缓存和检查点?

3 . 代码题

浏览器Nginx案例

先进行数据清洗,做后续需求用

1、需求一:点击最多的前10个网站域名

2、需求二:用户最喜欢点击的页面排序TOP10

3、需求三:统计每分钟用户搜索次数

学生系统案例

4. RDD依赖的分类

5. 简述DAG与Stage 形成过程 

DAG :  

Stage : 


1. 简述Spark持久化中缓存和checkpoint检查点的区别

1- 数据存储位置不同
    缓存: 存储在内存或者磁盘 或者 堆外内存中
    checkpoint检查点: 可以将数据存储在磁盘或者HDFS上, 在集群模式下, 仅能保存到HDFS上

2- 数据生命周期:
    缓存: 当程序执行完成后, 或者手动调用unpersist 缓存都会被删除
    checkpoint检查点: 即使程序退出后, checkpoint检查点的数据依然是存在的, 不会删除, 需要手动删除

3- 血缘关系:
    缓存: 不会截断RDD之间的血缘关系, 因为缓存数据有可能是失效, 当失效后, 需要重新回溯计算操作
    checkpoint检查点: 会截断掉依赖关系, 因为checkpoint将数据保存到更加安全可靠的位置, 不会发生数据丢失的问题, 当执行失败的时候, 也不需要重新回溯执行
    
4- 主要作用不同:
    缓存: 提高Spark程序的运行效率
    checkpoint检查点: 提高Spark程序的容错性

2 . 如何使用缓存和检查点?

       将两种方案同时用在一个项目中, 先设置缓存,再设置检查点 ,  最后一同使用Action算子进行触发, 这样程序只会有一次IO操作, 如果先设置检查点的话,就会有2次IO操作;

         当在后续工程中读取数据的时候,优先从缓存中读取,如果缓存中没有数据, 再从检查点读取数据,并且会将数据缓存一份到内存中 ,后续直接从缓存中读取数据

3 . 代码题

浏览器Nginx案例

        

先进行数据清洗,做后续需求用

import os
from pyspark import SparkConf, SparkContext,StorageLevel
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 绑定指定的Python解释器
from pyspark.sql.types import StructType, IntegerType, StringType, StructField

if __name__ == '__main__':
# 1- 创建SparkSession对象
    conf = SparkConf().setAppName('需求1').setMaster('local[*]')
    sc = SparkContext(conf=conf)
# 2- 数据输入
    init_rdd = sc.textFile('file:///export/data/2024.1.2_Spark/1.6_day04/SogouQ.sample')

# 3- 数据处理
    filter_tmp_rdd = init_rdd.filter(lambda line:line.strip()!='')
    print('过滤空行的数据',filter_tmp_rdd.take(10))
    map_rdd = filter_tmp_rdd.map(lambda line:line.split())
    print('map出来的数据',map_rdd.take(10))
    len6_rdd = map_rdd.filter(lambda line:len(line)==6)
    print('字段数为6个的字段',len6_rdd.take(10))
    etl_rdd = len6_rdd.map(lambda list:(
        list[0],list[1],list[2][1:-1],list[3],list[4],list[5])  )
    print('转换成元组后的数据',etl_rdd.take(10))
    # 设置缓存
    etl_rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK).count()

1、需求一:点击最多的前10个网站域名


    print('点击最多的前10个网站域名','-'*50)
    website_map_rdd = etl_rdd.map(lambda tup:(tup[5].split('/')[0],1))
    print('把网站域名切出来,变成(hello,1)的格式',website_map_rdd.take(10))
    website_reducekey_rdd = website_map_rdd.reduceByKey(lambda agg,curr:agg+curr)
    print('进行聚合',website_reducekey_rdd.take(10))
    sort_rdd =website_reducekey_rdd.sortBy(lambda tup:tup[1],ascending=False)
    print('进行降序排序',sort_rdd.take(10))
# 4- 数据输出
# 5- 释放资源
    sc.stop()

2、需求二:用户最喜欢点击的页面排序TOP10

    print('用户最喜欢点击的页面排序TOP10','-'*100)
    top_10_order = etl_rdd.map(lambda tup:(tup[4],1))
    print('点击量排行',top_10_order.take(10))
    top_10_reducebykey = top_10_order.reduceByKey(lambda agg,curr:agg+curr)
    print('进行聚合',top_10_reducebykey.take(10))
    sortby_top10 = top_10_reducebykey.sortBy(lambda line:line[1],ascending=False)
    print('进行排序',sortby_top10.take(10))
# 4- 数据输出
# 5- 释放资源
    sc.stop()

3、需求三:统计每分钟用户搜索次数

    print('统计每分钟用户搜索次数','-'*50)
    search_map_rdd = etl_rdd.map(lambda tup:(tup[0][0:5],1))
    print('把网站域名切出来,变成(hello,1)的格式',search_map_rdd.take(10))
    search_reducekey_rdd = search_map_rdd.reduceByKey(lambda agg,curr:agg+curr)
    print('进行聚合',search_reducekey_rdd.take(10))
    sort_rdd =search_reducekey_rdd.sortBy(lambda tup:tup)
    print('按照时间进行排序',sort_rdd.take(10))
# 4- 数据输出
# 5- 释放资源
    sc.stop()

学生系统案例

 数据准备

import os
from pyspark import SparkConf, SparkContext, StorageLevel
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 绑定指定的Python解释器
from pyspark.sql.types import StructType, IntegerType, StringType, StructField

if __name__ == '__main__':
# 1- 创建SparkSession对象
    conf = SparkConf().setAppName('学生案例').setMaster('local[*]')
    sc = SparkContext(conf=conf)
# 2- 数据输入
    init_rdd= sc.textFile('hdfs://node1:8020/input/day04_home_work.txt')

# 3- 数据处理
    stu_rdd = init_rdd.map(lambda line:line.split(',')).cache()
    print('切分后的数据为',stu_rdd.collect())
# 1、需求一:该系总共有多少学生
    stu_cnt = stu_rdd.map(lambda line:line[0]).distinct().count()
    print(f'该系总共有{stu_cnt}个学生')
# 2、需求二:该系共开设了多少门课程
    subject_cnt = stu_rdd.map(lambda line:line[1]).distinct().count()
    print(f'该系共开设了{subject_cnt}门课程')
# 3、需求三:Tom同学的总成绩平均分是多少
    tom_score_sum = stu_rdd.filter(lambda line:line[0]=='Tom').map(lambda line:int(line[2])).sum()
    tom_subject_num = stu_rdd.filter(lambda line:line[0]=='Tom').map(lambda line:line[1]).distinct().count()
    tom_score_avg = tom_score_sum/tom_subject_num
    print(f'Tom同学的总成绩平均分是{round(tom_score_avg,2)}')

# 4、需求四:求每名同学的选修的课程门数
#     every_student_course_num = stu_rdd.map(lambda x: (x[0], x[1])).distinct().map(lambda tup: (tup[0], 1))\
#         .reduceByKey(lambda agg, curr: agg + curr).collect()
    every_student_course_num = stu_rdd.map(lambda x: (x[0], x[1])).distinct()
    print('学生与选修课,把一个学生重修一门选修课的情况去掉',every_student_course_num.collect())
    every_student_course_num2 = every_student_course_num\
        .map(lambda tup:(tup[0],1))\
        .reduceByKey(lambda agg,curr:agg+curr).collect()
    print('每个同学的选修课数',every_student_course_num2)
# 5、需求五:该系DataBase课程共有多少人选修
    subject_database = stu_rdd.filter(lambda line:line[1]=='DataBase').map(lambda line:line[0]).distinct().count()
    print(f'数据库有{subject_database}人选修')
# 6、需求六:各门课程的平均分是多少
    total_score = stu_rdd.map(lambda x:(x[1],int(x[2]))).groupByKey().map(lambda x:(x[0],sum(x[1])))
    print('各科总分为',total_score.collect())
    total_num = stu_rdd.map(lambda x: (x[1], 1)).groupByKey().map(lambda x: (x[0], sum(x[1])))
    print('各科的数量为',total_num.collect())
    #
    total_join =total_score.join(total_num)
    print('join后结果',total_join.collect())
# 各科总分为 [('DataBase', 170), ('Algorithm', 110), ('DataStructure', 140)]
# 各科的数量为 [('DataBase', 2), ('Algorithm', 2), ('DataStructure', 2)]
# 合表后为 [('DataBase', (170, 2)), ('DataStructure', (140, 2)), ('Algorithm', (110, 2))]
    total_avg =total_score.join(total_num).map(lambda x: (x[0], round(x[1][0] / x[1][1], 2))).collect()
    print('各科目的平均分为',total_avg)
# 4- 数据输出

# 5- 释放资源
    sc.stop()

4. RDD依赖的分类

窄依赖:  父RDD分区与子RDD分区是一对一关系

宽依赖:  父RDD分区与子RDD分区是一对多关系

5. 简述DAG与Stage 形成过程 

DAG :  

1-Spark应用程序,遇到了Action算子以后,就会触发一个Job任务的产生。Job任务首先将它所依赖的全部算子加载到内存中,形成一个完整Stage

2-会根据算子间的依赖关系,从Action算子开始,从后往前进行回溯,如果算子间是窄依赖,就放到同一个Stage中;如果是宽依赖,就形成新的Stage。一直回溯完成。

Stage : 

1-Driver进程启动成功以后,底层基于PY4J创建SparkContext对象,在创建SparkContext对象的过程中,还会同时创建DAGScheduler(DAG调度器)和TaskScheduler(Task调度器)
    DAGScheduler: 对Job任务形成DAG有向无环图和划分Stage阶段
    TaskScheduler: 调度Task线程给到Executor进程进行执行

2-Spark应用程序遇到了一个Action算子以后,就会触发一个Job任务的产生。SparkContext对象将Job任务提交DAG调度器,对Job形成DAG有向无环图和划分Stage阶段。并且确定每个Stage阶段需要有多少个Task线程,将这些Task线程放置在TaskSet集合中。再将TaskSet集合给到Task调度器。

3-Task调度器接收到DAG调度器传递过来的TaskSet集合以后,将Task线程分配给到具体的Executor进行执行,底层是基于调度队列SchedulerBackend。Stage阶段是一个一个按顺序执行的,不能并行执行。

4-Executor进程开始执行具体的Task线程。后续过程就是Driver监控多个Executor的执行状态,直到Job任务执行完成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1376619.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2024年最好用的简历编辑工具,助你腾飞职业生涯!

随着科技的不断发展,求职竞争也愈发激烈。在2024年,如何在众多求职者中脱颖而出成为关键问题。为了帮助大家在职业生涯中取得更好的机会,特别推荐一款在2024年最为出色的简历编辑工具——芊芊简历。 1. 创新的编辑功能 芊芊简历拥有直观易用…

Matlab 使用 DH table 建立的 robot 和实际不符

机器人仿真 想借助 matlab robotics toolbox 来仿真机器人,但是直接输入自己的 DH table 显示出来的 robot 和实际不情况不符。 DH table 建立 robot Build Manipulator Robot Using Kinematic DH Parameters 主要使用 setFixedTransform,DH table 中…

YOLOV7剪枝流程

YOLOV7剪枝流程 1、训练 1)划分数据集进行训练前的准备,按正常的划分流程即可 2)修改train.py文件 第一次处在参数列表里添加剪枝的参数,正常训练时设置为False,剪枝后微调时设置为True parser.add_argument(--pr…

POI:对Excel的基本写操作 整理1

首先导入相关依赖 <!-- https://mvnrepository.com/artifact/org.apache.poi/poi --><!--xls(03)--><dependency><groupId>org.apache.poi</groupId><artifactId>poi</artifactId><version>5.2.2</version></depend…

鸿蒙Harmony--状态管理器-@Observed装饰器和@ObjectLink装饰器详解

经历的越多&#xff0c;越喜欢简单的生活&#xff0c;干净的东西&#xff0c;清楚的感觉&#xff0c;有结果的事&#xff0c;和说到做到的人。把圈子变小&#xff0c;把语放缓&#xff0c;把心放宽&#xff0c;用心做好手边的事儿&#xff0c;该有的总会有的! 目录 一&#xff…

基于多反应堆的高并发服务器【C/C++/Reactor】(中)在TcpConnection 中接收并解析Http请求消息

一、在TcpConnection 中多添加和http协议相关的request和response struct TcpConnection {struct EventLoop* evLoop;struct Channel* channel;struct Buffer* readBuf;struct Buffer* writeBuf;char name[32];// http协议struct HttpRequest* request;struct HttpResponse* r…

leecode1143 | 最长公共子序列

给定两个字符串 text1 和 text2&#xff0c;返回这两个字符串的最长 公共子序列 的长度。如果不存在 公共子序列 &#xff0c;返回 0 。 一个字符串的 子序列 是指这样一个新的字符串&#xff1a;它是由原字符串在不改变字符的相对顺序的情况下删除某些字符&#xff08;也可以不…

第7章 PKI 和密码应用

7.1 非对称密码 第6章的“现代密码学”一节介绍了私钥&#xff08;对称&#xff09;和公钥&#xff08;非对称&#xff09;密码的基本原则。 你曾学过&#xff0c;对称密钥密码系统要求通信双方使用同一个共享秘密密钥&#xff0c;因而形成了安全分发密钥的问题。 你还曾学过…

如何使用科大讯飞星火大模型AI批量生成文章

如何使用科大讯飞的星火大模型AI工具批量生成文章呢&#xff1f; 我们可以使用科大讯飞AI的星火大模型API接口&#xff0c;它支持批量处理和生成文章的AI功能。 但是星火大模型API接口无法直接使用&#xff0c;一般需要技术人员开发对应程序对接才行。为了让不懂技术的普通用…

【微信小程序开发】深入学习小程序开发之功能扩展和优化

前言 随着移动互联网的快速发展&#xff0c;微信小程序作为一种轻量级应用&#xff0c;已经逐渐成为许多企业和个人进行业务推广和服务提供的重要平台本文将详细介绍 微信小程序开发的功能扩展和优化&#xff0c;帮助开发者更好地提升小程序的用户体验和性能。 一、功能扩展 …

数据库系统概念 第七版 中文答案 第3章 SQL介绍

3.1 将以下查询使用SQL语言编写&#xff0c;使用大学数据库模式。 &#xff08;我们建议您实际在数据库上运行这些查询&#xff0c;使用我们在书籍网站db-book.com上提供的示例数据。有关设置数据库和加载示例数据的说明&#xff0c;请参阅上述网站。&#xff09; a. 查找计算机…

蓝桥杯练习题(六)

&#x1f4d1;前言 本文主要是【算法】——蓝桥杯练习题&#xff08;六&#xff09;的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是听风与他&#x1f947; ☁️博客首页&#xff1a;CSDN主页听风与他 …

软件分发点(DP)的合理规划

软件分发点&#xff08;Distribution Point, DP&#xff09;是用于托管文件以分发到计算机和移动设的服务器&#xff0c;Jamf Pro可以通过分发点分发以下类型的文件&#xff1a; 软件包 脚本 内部应用程序 内部书籍 Jamf Pro支持两种类型的分发点&#xff0c;您可以使用这些类型…

我用 Laf 开发了一个非常好用的密码管理工具

【KeePass 密码管理】是一款简单、安全简洁的账号密码管理工具&#xff0c;服务端使用 Laf 云开发&#xff0c;支持指纹验证、FaceID&#xff0c;N 重安全保障&#xff0c;可以随时随地记录我的账号和密码。 写这个小程序之前&#xff0c;在国内市场找了很多密码存储类的 App …

汽配企业MES管理系统的特点与实践

随着汽车工业的飞速发展&#xff0c;汽车零部件制造企业面临着日益复杂的生产环境和多样化的市场需求。为了应对这些挑战&#xff0c;许多汽配企业开始引入MES管理系统解决方案&#xff0c;以提高生产效率、优化资源配置、提升产品质量。本文将重点探讨汽配企业MES管理系统的特…

阿尔泰推出19“8槽4U上架式CPCI机箱 支持客户定制化机箱需求

阿尔泰科技发展有限公司是北京阿尔泰科技的子公司&#xff0c;公司于2010年正式成立&#xff0c;集全国技术支持与服务&#xff0c;销售&#xff0c;结构设计&#xff0c;项目支持等一批专业从事工控行业的工程师屹立在天府之国。公司涵盖数据采集&#xff0c;无线传输&#xf…

搭建sprinboot服务环境

搭建sprinboot服务环境 安装jdk安装nginx安装Redis安装MySQL一 下载MySQL二 安装MySQL三 启动mysql服务获取初始化密码四 登陆MySQL五 修改密码六 设置远程访问七 相关问题错误&#xff1a;1819错误&#xff1a;1251 或 2059错误&#xff1a;10060忽略表名大小写 记录搭建sprin…

[计算机提升] 创建FTP共享

4.7 创建FTP共享 4.7.1 FTP介绍 在Windows系统中&#xff0c;FTP共享是一种用于在网络上进行文件传输的标准协议。它可以让用户通过FTP客户端程序访问并下载或上传文件&#xff0c;实现文件共享。 FTP共享的用途非常广泛&#xff0c;例如可以让多个用户共享文件、进行文件备份…

solr 远程命令执行漏洞复现 (CVE-2019-17558)

solr 远程命令执行漏洞复现 (CVE-2019-17558) ‍ 名称: solr 远程命令执行 (CVE-2019-17558) 描述: Apache Velocity是一个基于Java的模板引擎&#xff0c;它提供了一个模板语言去引用由Java代码定义的对象。Velocity是Apache基金会旗下的一个开源软件项目&#xff0c;旨在确…

【抓包教程】BurpSuite联动雷电模拟器——安卓高版本抓包移动应用教程

前言 近期找到了最适合自己的高版本安卓版本移动应用抓HTTP协议数据包教程&#xff0c;解决了安卓低版本的问题&#xff0c;同时用最简单的办法抓到https的数据包&#xff0c;特此进行文字记录和视频记录。 前期准备 抓包工具&#xff1a;BurpSuite安卓模拟器&#xff1a;雷…