大数据分析与应用实验任务九

news2025/1/11 0:47:28

大数据分析与应用实验任务九

实验目的

  • 进一步熟悉pyspark程序运行方式;

  • 熟练掌握pysaprkRDD基本操作相关的方法、函数,解决基本问题。

实验任务

进入pyspark实验环境,打开命令行窗口,输入pyspark,完成下列任务:

在实验环境中自行选择路径新建以自己姓名拼音命名的文件夹,后续代码中涉及的文件请保存到该文件夹下(需要时文件夹中可以创建新的文件夹)。

一、参考书中相应代码,练习RDD持久性、分区及写入文件(p64、67、80页相应代码)。
1.持久化

迭代计算经常需要多次重复使用同一组数据。下面就是多次计算同一个RDD的例子。

listlzy=["Hadoop","Spark","Hive","Darcy"]
rddlzy=sc.parallelize(listlzy)
print(rddlzy.count())#行动操作,触发一次真正从头到尾的计算
print(','.join(rddlzy.collect()))#行动操作,触发一次真正从头到尾的计算

image-20231123112954850

一般而言,使用cache()方法时,会调用persist(MEMORY_ONLY)。针对上面的实例,增加持久化语句以后的执行过程如下:

listlzy=["Hadoop","Spark","Hive","Darcy"]
rdd=sc.parallelize(listlzy)
rdd.cache()#会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,因为这时rdd还没有被计算生成
print(rdd.count())#第一次行动操作,触发一次真正从头到尾的计算,这时上面的rdd.cache()才会被执行,把这个rdd放到缓存中
print(','.join(rdd.collect()))#第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd

image-20231123113247048

2.分区
  • 设置分区的个数

    在调用textFile()和parallelize()方法的时候手动指定分区个数即可,语法格式如下:

    sc.textFile(path,partitionNum)
    

    其中,path参数用于指定要加载的文件的地址,partitionNum参数用于指定分区个数。下面是一个分区的实例。

    listlzy=[5,2,0,1,3,1,4]
    rddlzy=sc.parallelize(listlzy,2)//设置两个分区
    

    image-20231123113420685

  • 使用repartition方法重新设置分区个数

    通过转换操作得到新RDD时,直接调用repartition方法即可。例如

datalzy=sc.parallelize([1,2,3,4,5],2)
len(datalzy.glom().collect())#显示datalzy这个RDD的分区数量
rdd = datalzy.repartition(1) #对 data 这个 RDD 进行重新分区
len(rdd.glom().collect()) #显示 rdd 这个 RDD 的分区数量

image-20231123113519540

  • 自定义分区方法

    下面是一个实例,要求根据 key 值的最后一位数字将 key 写入到不同的文件中,比如,10 写入到 part-00000,11 写入到 part-00001,12 写入到 part-00002。打开一个 Linux 终端,使用 vim 编辑器创建一个代码文件“/root/Desktop/luozhongye/TestPartitioner.py”,输入以下代码:

    from pyspark import SparkConf, SparkContext
    
    
    def MyPartitioner(key):
    	print("MyPartitioner is running")
    	print('The key is %d' % key)
    	return key % 10
    
    
    def main():
    	print("The main function is running")
    	conf = SparkConf().setMaster("local").setAppName("MyApp")
    	sc = SparkContext(conf=conf)
    	data = sc.parallelize(range(10), 5)
    	data.map(lambda x: (x, 1)).partitionBy(10, MyPartitioner).map(lambda x: x[0]).saveAsTextFile(
    		"file:///root/Desktop/luozhongye/partitioner")
    
    
    if __name__ == '__main__':
    	main()
    
    

使用如下命令运行 TestPartitioner.py:

cd /root/Desktop/luozhongye
python3 TestPartitioner.py

或者,使用如下命令运行 TestPartitioner.py:

cd /root/Desktop/luozhongye
spark-submit TestPartitioner.py

程序运行后会返回如下信息:

image-20231123114351343

3.文件数据写入
  • 把 RDD 写入到文本文件中
textFile = sc.textFile("file:///root/Desktop/luozhongye/wordlzy.txt") 
textFile.saveAsTextFile("file:///root/Desktop/luozhongye/writeback")

其中wordlzy.txt的内容

Hadoop is good
Spark is fast
Spark is better
luozhongye is handsome

image-20231123115053912

Spark 采用惰性机制。可以使用如下的“行动”类型的操作查看 textFile 中的内容:

textFile.first()

正因为 Spark 采用了惰性机制,在执行转换操作的时候,即使输入了错误的语句,pyspark 也不会马上报错,而是等到执行“行动”类型的语句启动真正的计算时,“转换”操作语句中的错误才会显示出来,比如:

textFile = sc.textFile("file:///root/Desktop/luozhongye/wordcount/word123.txt")

image-20231123115210529

  • 把 RDD 写入到文本文件中

可以使用 saveAsTextFile()方法把 RDD 中的数据保存到文本文件中。下面把 textFile 变量中的内容再次写回到另外一个目录 writeback 中,命令如下:

textFile = sc.textFile("file:///root/Desktop/luozhongye/wordlzy.txt") 
textFile.saveAsTextFile("file:///root/Desktop/luozhongye/writeback")

进入到“/root/Desktop/luozhongye/writeback”目录查看

cd /root/Desktop/luozhongye/writeback
ls

image-20231123115411983

二、逐行理解并运行4.4.2实例“文件排序”。

新建多个txt文件file1.txt 、file2.txt 、file3.txt ,其内容分别如下:

33
37
12
40
4
16
39
5
1
45
25

要求读取所有文件中的整数,进行排序后,输出到一个新的文件中,输出的内容为每行两个整数,第一个整数为第二个整数的排序位次,第二个整数为原待排序的整数。

实现上述功能的代码文件“/root/Desktop/luozhongye/FileSort.py”的内容如下:

#!/usr/bin/env python3 
from pyspark import SparkConf, SparkContext

index = 0


def getindex():
	global index
	index += 1
	return index


def main():
	conf = SparkConf().setMaster("local[1]").setAppName("FileSort")
	sc = SparkContext(conf=conf)
	lines = sc.textFile("file:///root/Desktop/luozhongye/file*.txt")
	index = 0
	result1 = lines.filter(lambda line: (len(line.strip()) > 0))
	result2 = result1.map(lambda x: (int(x.strip()), ""))
	result3 = result2.repartition(1)
	result4 = result3.sortByKey(True)
	result5 = result4.map(lambda x: x[0])
	result6 = result5.map(lambda x: (getindex(), x))
	result6.foreach(print)
	result6.saveAsTextFile("file:///root/Desktop/luozhongye/sortresult")
    
    
if __name__ == '__main__':
		main()

image-20231123232005765

三、完成p96实验内容3,即“编写独立应用程序实现求平均值问题”,注意每位同学自己修改题目中的数据。

每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个字段是学生名字,第二个字段是学生的成绩;编写 Spark 独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。

数学成绩.txt:

小罗 110
小红 107
小新 100
小丽 99

英语成绩.txt:

小罗 95 
小红 81 
小新 82
小丽 76

政治成绩.txt:

小罗 65 
小红 71 
小新 61 
小丽 66

408成绩.txt:

小罗 100
小红 103
小新 94
小丽 110

实现代码如下:

from pyspark import SparkConf, SparkContext

# 初始化Spark配置和上下文
conf = SparkConf().setAppName("AverageScore")
sc = SparkContext(conf=conf)

# 读取数学成绩文件
math_rdd = sc.textFile("数学成绩.txt").map(lambda x: (x.split()[0], int(x.split()[1])))

# 读取英语成绩文件
english_rdd = sc.textFile("英语成绩.txt").map(lambda x: (x.split()[0], int(x.split()[1])))

# 读取政治成绩文件
politics_rdd = sc.textFile("政治成绩.txt").map(lambda x: (x.split()[0], int(x.split()[1])))

# 读取408成绩文件
computer_rdd = sc.textFile("408成绩.txt").map(lambda x: (x.split()[0], int(x.split()[1])))

# 合并所有成绩数据
all_scores_rdd = math_rdd.union(english_rdd).union(politics_rdd).union(computer_rdd)

# 计算每个学生的成绩总和和成绩数量
sum_count_rdd = all_scores_rdd.combineByKey(lambda value: (value, 1),
                                            lambda acc, value: (acc[0] + value, acc[1] + 1),
                                            lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))

# 计算平均成绩
average_scores_rdd = sum_count_rdd.mapValues(lambda x: x[0] / x[1])

# 输出到新文件
average_scores_rdd.saveAsTextFile("平均成绩")

# 关闭Spark上下文
sc.stop()

image-20231123233508387

实验心得

在这次实验中,我进一步熟悉了使用PySpark进行大数据处理和分析的方法,并深入了解了PySpark RDD的基本操作。学会了分区、持久化、数据写入文件,并解决实际问题。这次实验让我对PySpark有了更深入的理解,并增强了我处理和分析大数据的能力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1245042.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

视频剪辑达人分享:高效减片头时长并调整播放速度的技巧,提升视频品质

在视频剪辑的过程中,许多初学者经常会遇到一些问题,如片头过长、播放速度不适当等,这些问题不仅会影响观众的观看体验,还会对视频品质产生负面影响。在调整播放速度时,要根据视频内容来进行调整。一般来说,…

git的用法

目录 一、为什么需要git 二、git基本操作 2.1、初始化git仓库 2.2、配置本地仓库的name和email 2.3、认识工作区、暂存区、版本库 三、git的实际操作 3.1 提交文件 3.2 查看git状态以及具体的修改 3.3 git版本回退 git reset 3.1 撤销修改 四、git分支管理 4.…

洛谷 P1883 函数

P1883 函数 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) Error Curves - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 这两题是一模一样的,过一题水两题。 分析 主要难点在于证明F(x)是一个单峰函数可以被三分,但是我随便画了几个f(x)之后发现好像…

Shell循环:for(一)

语法结构: for 变量名 [ in 取值列表] do 循环体 done 示例1: 1、需求:自动循环创建10个用户 2、演示: [rootlocalhost ~]# vim for.sh #脚本编写 #!/bin/bash for i in {1..10} do useradd "user$…

Redis-Redis高可用集群之水平扩展

Redis3.0以后的版本虽然有了集群功能,提供了比之前版本的哨兵模式更高的性能与可用性,但是集群的水平扩展却比较麻烦,今天就来带大家看看redis高可用集群如何做水平扩展,原始集群(见下图)由6个节点组成,6个节点分布在三…

Android设计模式--装饰模式

千淘万漉虽辛苦,吹尽黄沙始到金 一,定义 动态地给一个对象添加一些额外的职责。就增加功能来说,装饰模式相比生成子类更为灵活。 装饰模式也叫包装模式,结构型设计模式之一,其使用一种对客户端透明的方式来动态地扩展…

性能相关的闪存特性

一、多Plane操作 上章提到若干个Plane组成Die或者叫LUN,即一个Die上有多个Plane 每次进行写操作时,控制器先将数据写入页缓存中,等同一个Die上另一个Plane也写数据的时候,再同时写入,原来单独操作一个Plane的时间变成了可以同时做…

Vue学习之路------指令

Vue指令 vue会根据不同的指令&#xff0c;针对标签实现不同的功能 指令:带有v-前缀的特殊标签属性 1&#xff1a;v-html&#xff1a;指令 <div v-html"msg"></div> 2&#xff1a;v-show 作用&#xff1a;控制元素显示隐藏 语法&#xff1a;v-show&quo…

AMESim|学习记录

此文记录AMESim学习过程中的各种情况。 目录 01 王佳. AUV 浮力调节系统设计及控制策略研究[D]. 天津大学, 2017.01 王佳. AUV 浮力调节系统设计及控制策略研究[D]. 天津大学, 2017. 01 王佳. AUV 浮力调节系统设计及控制策略研究[D]. 天津大学, 2017. 开始步入正文 01 王佳.…

mysql 行转列 GROUP_CONCAT 试验

1.概要 很多时候需要用到行专列的方式做数据分析。比如对通讯数据的采集 数据采集结果如下&#xff1a; 变量值采集周期131251132272 我想要看的结果 变量1变量2采集周期351372 就是我想看到相关数据的周期变化情况。 2.试验 2.1创建数据如下&#xff08;表名 tb5&…

人工智能对网络安全的影响越来越大

如果问当前IT行业最热门的话题是什么&#xff0c;很少有人会回答除了人工智能&#xff08;AI&#xff09;之外的任何话题。 在不到 12 个月的时间里&#xff0c;人工智能已经从一项只有 IT 专业人员才能理解的技术发展成为从小学生到作家、程序员和艺术家的每个人都使用的工具…

案例025:基于微信小程序的移动学习平台的设计与实现

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

Qt实现自定义IP地址输入控件(百分百还原Windows 10网络地址输入框)

在开发网络相关的程序时,我们经常需要输入IP地址,例如源地址和目标地址。Qt提供了一些基础的控件,如QLineEdit,但是它们并不能满足我们对IP地址输入的要求,例如限制输入的格式、自动跳转到下一个输入框、处理回车和退格键等。因此,我们需要自己编写一个自定义的IP地址输入…

深度学习技术前沿:探索与挑战

深度学习技术前沿&#xff1a;探索与挑战 一、引言 近年来&#xff0c;深度学习作为人工智能领域的重要分支&#xff0c;取得了令人瞩目的成就。它凭借强大的学习能力和出色的性能&#xff0c;在图像识别、语音识别、自然语言处理等众多任务中展现出巨大潜力。本文将深入探讨深…

预处理机制

跟着肯哥&#xff08;不是我&#xff09;学预处理机制 预处理类别 宏定义&#xff1a;#define 将文本替换为表达式或语句 条件编译&#xff1a;#ifdef、#ifndef和#if、#elif、#endif 根据标识符是否被定义选择编译代码 头文件包含&#xff1a;#include 将其他文件&#x…

YM5411 WIFI 5模块 完美替代AP6256

YM5411是沃特沃德推出的一款低成本&#xff0c;低功耗的模块&#xff0c;该模块具有Wi-Fi&#xff08;2.4GHz和5GHz IEEE 802.11 a/b/g/n/ac&#xff09;蓝牙&#xff08;BT5.0&#xff09;功能&#xff0c;并通过了SRRC认证&#xff0c;带mesh&#xff0c;完美替换AP6256。高度…

和数集团出席中科院上海高研院​第三十三期“高研交叉论坛”信息能源融合专场

2023年11月21日&#xff0c;中国科学院上海高等研究院第三十三期“高研交叉论坛”信息能源融合专场在上海高研院成功举办。本次论坛由中国科学院上海高等研究院智能信息通信技术研究与发展中心、中国科学院低碳转化科学与工程重点实验室、中科院和数智能区块链与能源系统应用联…

AndroidNDK开发之交叉编译

在Android studio2.2以及以上&#xff0c;构建原生库的默认工具是cmake。 CMake是一个跨平台的构建工具&#xff0c;可以使用简单的语句来描述所有平台的安装(编译过程)。 能够输出各种各样的makefile或者project文件。cmake并不直接构建出最终的软件&#xff0c;而是产生其他工…

MySQL错误之ONLY_FULL_GROUP_BY

报错信息&#xff1a; 翻译&#xff1a; 对该报错的解释 所以&#xff0c;实际上该报错是由于在SQL查询语句中有group by&#xff0c;而这个包含group by的SQL查询写的并不规范导致的&#xff0c;这个ONLY_FULL_GROUP_BY模式开启之后检查就会很严格&#xff0c;如果select列表…

node与 pnpm、node-sass 等工具的版本兼容关系

1. node & pnpm 2. node & node-sass 3. node-sass & sass-loader sass-loader依赖于node-sass&#xff0c;以下是部分版本号对应