大数据分析与应用实验任务八

news2024/11/22 23:43:43

大数据分析与应用实验任务八

实验目的

  • 进一步熟悉pyspark程序运行方式;
  • 熟练掌握pysaprk RDD基本操作相关的方法、函数。

实验任务

进入pyspark实验环境,在图形界面的pyspark命令行窗口中完成下列任务:

在实验环境中自行选择路径新建以自己姓名拼音命名的文件夹,后续代码中涉及的文件请保存到该文件夹下(需要时文件夹中可以创建新的文件夹)。

一、 参考书上例子,理解并完成RDD常用操作(4.1.2节内容);
1.转换操作
(1)filter(func)

filter(func)操作会筛选出满足函数 func 的元素,并返回一个新的数据集。例如:

lines = sc.textFile("file:///root/Desktop/luozhongye/wordlzy.txt") 
linesWithSpark = lines.filter(lambda line: "Spark" in line) 
linesWithSpark.foreach(print) 

image-20231116111150747

(2)map(func)

map(func)操作将每个元素传递到函数 func 中,并将结果返回为一个新的数据集。例如:

data = [1,2,3,4,5] 
rdd1 = sc.parallelize(data) 
rdd2 = rdd1.map(lambda x:x+10) 
rdd2.foreach(print) 

image-20231116111301428

下面是另外一个实例:

lines = sc.textFile("file:///root/Desktop/luozhongye/wordlzy.txt") 
words = lines.map(lambda line:line.split(" ")) 
words.foreach(print) 

image-20231116111354416

(3)flatMap(func)

flatMap(func)与 map()相似,但每个输入元素都可以映射到 0 或多个输出结果。例如:

lines = sc.textFile("file:///root/Desktop/luozhongye/wordlzy.txt") 
words = lines.flatMap(lambda line:line.split(" ")) 
words.foreach(print) 

image-20231116111511592

(4)groupByKey()

groupByKey()应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集。下面给

出一个简单实例,代码如下:

words = sc.parallelize([("Hadoop",1),("is",1),("good",1),("Spark",1),("is",1),("fast",1),("Spark",1),("is",1),("better",1)]) 
words1 = words.groupByKey() 
words1.foreach(print) 

image-20231116111553622

(5)reduceByKey(func)

reduceByKey(func)应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每

个值是将每个 key 传递到函数 func 中进行聚合后得到的结果。这里给出一个简单实例,代码如下:

words = sc.parallelize([("Hadoop",1),("is",1),("good",1),("Spark",1),("is",1),("fast",1),("Spark",1),("is",1),("better",1)]) 
words1 = words.reduceByKey(lambda a,b:a+b) 
words1.foreach(print)

image-20231116111642594

2.行动操作
rdd = sc.parallelize([1,2,3,4,5]) 
rdd.count() 
rdd.first() 
rdd.take(3) 
rdd.reduce(lambda a,b:a+b) 
rdd.collect() 
rdd.foreach(lambda elem:print(elem))

image-20231116111735249

3.惰性机制
lineslzy = sc.textFile("file:///root/Desktop/luozhongye/wordlzy.txt") 
lineLengths = lineslzy.map(lambda s:len(s)) 
totalLength = lineLengths.reduce(lambda a,b:a+b) 
print(totalLength)

image-20231116111910718

二、 参考书上例子,理解并完成键值对RDD常用操作(4.2.2节内容);

常用的键值对转换操作包括 reduceByKey(func)、groupByKey()、keys、values、sortByKey()、sortBy()、mapValues(func)、join()和 combineByKey 等。

1.reduceByKey(func)
pairRDD = sc.parallelize([("Hadoop",1),("Spark",1),("Hive",1),("Spark",1),("罗忠烨",1)]) 
pairRDD.reduceByKey(lambda a,b:a+b).foreach(print) 

image-20231116112042871

2.groupByKey()
list = [("spark",1),("spark",2),("hadoop",3),("hadoop",5)] 
pairRDD = sc.parallelize(list) 
pairRDD.groupByKey() 
pairRDD.groupByKey().foreach(print)

image-20231116112213649

对于一些操作,既可以通过 reduceByKey()得到结果,也可以通过组合使用 groupByKey()和 map()操作得到结果,二者是“殊途同归”,下面是一个实例:

words = ["one", "two", "two", "three", "three", "three"] 
wordPairsRDD = sc.parallelize(words).map(lambda word:(word, 1)) 
wordCountsWithReduce = wordPairsRDD.reduceByKey(lambda a,b:a+b) 
wordCountsWithReduce.foreach(print) 
wordCountsWithGroup = wordPairsRDD.groupByKey().map(lambda t:(t[0],sum(t[1]))) 
wordCountsWithGroup.foreach(print) 

image-20231116121259280

3.keys
list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)] 
pairRDD = sc.parallelize(list) 
pairRDD.keys().foreach(print)

image-20231116112613497

4.values
list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1),("luozhongyeTop1",1)] 
pairRDD = sc.parallelize(list) 
pairRDD.values().foreach(print)

image-20231116112822976

5.sortByKey()
list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)] 
pairRDD = sc.parallelize(list) 
pairRDD.foreach(print) 
pairRDD.sortByKey().foreach(print)

image-20231116113006599

6.sortBy()
d1 = sc.parallelize([("c",8),("b",25),("c",17),("a",42),("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9),("luozhongye",1)]) 
d1.reduceByKey(lambda a,b:a+b).sortByKey(False).collect()

image-20231116113102891

sortByKey(False)括号中的参数 False 表示按照降序排序,如果没有提供参数 False,则默认采用升序排序(即参数取值为 True)。从排序后的效果可以看出,所有键值对都按照 key 的降序进行了排序,因此输出[(‘g’, 21), (‘f’, 29), (‘e’, 17), (‘d’, 9), (‘c’, 27), (‘b’, 38), (‘a’, 42)]。但是,如果要根据 21、29、17 等数值进行排序,就无法直接使用 sortByKey()来实现,这时可以使用 sortBy(),代码如下:

d1 = sc.parallelize([("c",8),("b",25),("c",17),("a",42),("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9)]) 
d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x,False).collect() 
d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[0],False).collect() 
d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[1],False).collect() 

image-20231116113232289

7.mapValues(func)
list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)] 
pairRDD = sc.parallelize(list) 
pairRDD1 = pairRDD.mapValues(lambda x:x+1) 
pairRDD1.foreach(print)

image-20231116113332961

8.join()
pairRDD1 = sc.parallelize([("spark",1),("spark",2),("hadoop",3),("hadoop",5)]) 
pairRDD2 = sc.parallelize([("spark","fast")]) 
pairRDD3 = pairRDD1.join(pairRDD2) 
pairRDD3.foreach(print)

image-20231116113434643

9.combineByKey

(1)createCombiner:在第一次遇到 key 时创建组合器函数,将 RDD 数据集中的 V 类型值转换成 C 类型值(V => C);

(2)mergeValue:合并值函数,再次遇到相同的 key 时,将 createCombiner 的 C 类型值与这次传入的 V 类型值合并成一个 C 类型值(C,V)=>C;

(3)mergeCombiners:合并组合器函数,将 C 类型值两两合并成一个 C 类型值;

(4)partitioner:使用已有的或自定义的分区函数,默认是 HashPartitioner;

(5)mapSideCombine:是否在 map 端进行 Combine 操作,默认为 true。

下面通过一个实例来解释如何使用 combineByKey 操作。假设有一些销售数据,数据采用键值对的形式,即<公司,当月收入>,要求使用 combineByKey 操作求出每个公司的总收入和每月平均收入,并保存在本地文件中。

为了实现该功能,可以创建一个代码文件“/root/Desktop/luozhongye/Combinelzy.py”,并输入如下代码:

#!/usr/bin/env python3 
from pyspark import SparkConf, SparkContext 

conf = SparkConf().setMaster("local").setAppName("Combine ") 

sc = SparkContext(conf = conf) 

data = sc.parallelize([("company-1",88),("company-1",96),("company-1",85),("company-2",94),("company-2",86),("company-2",74),("company-3",86),("company-3",88),("company-3",92)],3) 

res = data.combineByKey(lambda income:(income,1),lambda acc,income:(acc[0]+income, acc[1]+1),lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1])).map(lambda x:(x[0],x[1][0],x[1][0]/float(x[1][1])))

res.repartition(1).saveAsTextFile("file:///root/Desktop/luozhongye/combineresultlzy")

执行如下命令运行该程序:

cd /root/Desktop/luozhongye 
/usr/local/spark/bin/spark-submit Combinelzy.py

image-20231116113910011

三、 逐行理解并运行4.4.1实例“求TOP值”。

假设在某个目录下有若干个文本文件,每个文本文件里面包含了很多行数据,每行数据由 4 个字段的值构成,不同字段值之间用逗号隔开,4 个字段分别为 orderid、userid、payment 和 productid,要求求出 Top N 个 payment 值。如下为一个样例文件 file0lzy.txt:

1,1768,50,155 
2,1218,600,211 
3,2239,788,242 
4,3101,28,599 
5,4899,290,129 
6,3110,54,1201 
7,4436,259,877 
8,2369,7890,27

实现上述功能的代码文件“/root/Desktop/luozhongye/TopN.py”的内容如下:

# !/usr/bin/env python3 
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("ReadHBase")
sc = SparkContext(conf=conf)
lines = sc.textFile("file:///root/Desktop/luozhongye/file0.txt")
result1 = lines.filter(lambda line: (len(line.strip()) > 0) and (len(line.split(",")) ==
                                                                 4))
result2 = result1.map(lambda x: x.split(",")[2])
result3 = result2.map(lambda x: (int(x), ""))
result4 = result3.repartition(1)
result5 = result4.sortByKey(False)
result6 = result5.map(lambda x: x[0])
result7 = result6.take(5)
for a in result7:
	print(a)

image-20231116121013386
= result1.map(lambda x: x.split(“,”)[2])
result3 = result2.map(lambda x: (int(x), “”))
result4 = result3.repartition(1)
result5 = result4.sortByKey(False)
result6 = result5.map(lambda x: x[0])
result7 = result6.take(5)
for a in result7:
print(a)

![在这里插入图片描述](https://img-blog.csdnimg.cn/a3c32d0053e2481c83318d6c27bb68bb.png)


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1218173.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

新零售系统平台解决方案 线上线下小程序怎么做

新零售线上线下解决方案是将传统零售业务与互联网科技相结合&#xff0c;通过数字化、智能化手段提升零售业务效率和用户体验的解决方案&#xff0c;它既有提供消费者线下体验&#xff0c;强调“稳”&#xff0c;又有互联网线上的“快”。 线上线下小程序可以通过一体化的进销存…

文本格式清理工具 TextSoap mac中文版软件特色

TextSoap mac是一款文本格式清理工具。TextSoap可以帮助用户清除掉text文档内的文字格式&#xff0c;还可以将文档内的url转换成超链接&#xff0c;简单方便&#xff0c;是你日常办公不可缺少的工具。 TextSoap for mac软件特色 1、清洁界面 2、集成文本编辑器 3、100多个内…

全栈工程师必须要掌握的前端Html技能

作为一名全栈工程师&#xff0c;在日常的工作中&#xff0c;可能更侧重于后端开发&#xff0c;如&#xff1a;C#&#xff0c;Java&#xff0c;SQL &#xff0c;Python等&#xff0c;对前端的知识则不太精通。在一些比较完善的公司或者项目中&#xff0c;一般会搭配前端工程师&a…

OpenCV图像纹理

LBP描述 LBP&#xff08;Local Binary Pattern&#xff0c;局部二值模式&#xff09;是一种用来描述图像局部纹理特征的算子&#xff1b;它具有旋转不变性和灰度不变性等显著的优点。它是首先由T. Ojala, M.Pietikinen, 和D. Harwood 在1994年提出&#xff0c;用于纹理特征提取…

Python实现双进程:防止单点故障的深度解析

更多Python学习内容&#xff1a;ipengtao.com 大家好&#xff0c;我是涛哥&#xff0c;今天为大家分享 Python实现双进程&#xff1a;防止单点故障的深度解析&#xff0c;文章2800字&#xff0c;阅读大约10分钟&#xff0c;大家enjoy~~ 在分布式系统中&#xff0c;确保系统的高…

IntelliJIDEA快捷键中文版

IntelliJIDEA快捷键中文版&#xff0c;对于Android Studio也适用。官方快捷键链接在此&#xff0c;官方上是英文的&#xff0c;我于2023-11-16下载并翻译成中文&#xff0c;可能翻译不太准&#xff0c;所以英文我都保留下来了&#xff0c;大家可以对比着看&#xff0c;有些英文…

文件传输客户端 SecureFX mac中文版支持多种协议

SecureFX mac是一款功能强大的文件传输客户端&#xff0c;可在 Mac 操作系统上使用。它由 VanDyke Software 公司开发&#xff0c;旨在为用户提供安全、可靠、高效的文件传输服务。 SecureFX 支持多种协议&#xff0c;包括 SFTP、SCP、FTP、FTP over SSL/TLS 和 HTTP/S。它使用…

Python---列表 集合 字典 推导式(本文以 列表 为主)

推导式&#xff1a; 推导式comprehensions&#xff08;又称解析式&#xff09;&#xff0c;是Python的一种独有特性。推导式是可以从一个数据序列构建另一个新的数据序列&#xff08;一个有规律的列表或控制一个有规律列表&#xff09;的结构体。 共有三种推导&#xff1a;列表…

Trigger替换Demo

maven工程 pom依赖 <dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.2.1</version> </dependency> import org.quartz.Job; import org.quartz.JobExecutionContext; imp…

怎么使用Cpolar+Lychee搭建私人图床网站并实现公网访问?

文章目录 1.前言2. Lychee网站搭建2.1. Lychee下载和安装2.2 Lychee网页测试2.3 cpolar的安装和注册 3.本地网页发布3.1 Cpolar云端设置3.2 Cpolar本地设置 4.公网访问测试5.结语 1.前言 图床作为图片集中存放的服务网站&#xff0c;可以看做是云存储的一部分&#xff0c;既可…

springcloud程序启动后,nacos服务中心的服务名称与程序spring.application.name所配置的应用名不一致

目录 一、场景二、关键依赖三、bootstrap.yml四、Nacos服务中心五、原因六、解决七、备注 一、场景 1、将SpringBoot项目升级为SpringCloud 2、SpringBoot版本从2.1.3.RELEASE升级为2.7.8 3、程序启动后&#xff0c;在Nacos服务中心展示的服务名称不是spring.application.na…

MAC地址注册的网络安全影响和措施分析

MAC地址注册对网络安全具有重要影响&#xff0c;同时也需要采取相应的措施来应对潜在的安全风险。以下是有关MAC地址注册的网络安全影响和应对措施的分析&#xff1a; 影响&#xff1a; 1. 身份验证&#xff1a;MAC地址注册可用于设备的身份验证&#xff0c;但MAC地址本身并不…

系列二、什么是OOM?什么是StackOverflowError?有哪些方法分析?

一、什么是OOM&#xff1f; OOM是堆内存溢出&#xff0c;产生的原因是堆的空间大小是有限的&#xff0c;当堆空间的大小不足以满足创建对象所需要的内存空间时&#xff0c;就会抛出OOM的异常。 二、什么是StackOverflowError&#xff1f; StackOverflowError是栈内存溢出的意思…

接口中的大事务,该如何进行优化?

作为后端开发的程序员&#xff0c;我们常常会的一些相对比较复杂的逻辑&#xff0c;比如我们需要给前端写一个调用的接口&#xff0c;这个接口需要进行相对比较复杂的业务逻辑操作&#xff0c;比如会进行&#xff0c;查询、远程接口或本地接口调用、更新、插入、计算等一些逻辑…

LabVIEW关于USRPRIO的示例代码

LabVIEW关于USRPRIO的示例代码 USRPRIO 通常以两种方式使用&#xff1a; 1 基于 FPGA 的编程 对于希望修改USRP上的底层FPGA代码以添加自定义DSP模块的应用&#xff0c;请使用USRP示例项目。它可作为构建 USRP RIO 流式处理应用程序的起点&#xff0c;可从“创建项目”对话框…

利用OpenCV做个熊猫表情包 二

之前写了一篇 利用OpenCV做个熊猫表情包吧_Leen的博客-CSDN博客 回想起来觉得有点太弱了&#xff0c;意犹未尽&#xff0c;每次使用需要自己去手动截取人脸&#xff0c;清除黑边什么的才能使用demo去合成表情&#xff0c;于是有空的时候就改进了一下&#xff0c;让它利用open…

腾讯云服务器租用价格,腾讯云服务器租用价格多少钱一年?

腾讯云服务器租用价格&#xff0c;腾讯云服务器租用价格多少钱一年&#xff1f;腾讯云服务器有优惠活动&#xff0c;现在租用只需要88元/年&#xff01;腾讯云服务器优惠购买入口&#xff1a;https://1111.mian100.cn 随着互联网的发展&#xff0c;越来越多的人开始选择将自己…

【部署篇】宝塔liunx中使用docker部署nestjs项目【全过程】

一、 &#x1f44b; 前序工作 连接服务器 获取宝塔面板信息 在命令行输入sudo /etc/init.d/bt default 进入宝塔面板输入账号密码 通过上面网址进入宝塔 安装自己需要的东西 **PS&#xff1a;**这里还需要自己登录宝塔账号&#xff0c;没有账号的同学需要注册一下 安装pm2…

Minio - 多节点多驱动器安装部署

先决条件 网络互通 MinIO集群中的节点的网络需要互相双向互通。 MinIO API默认端口9000 MinIO console默认端口9001 MinIO强烈建议使用负载均衡器来管理与集群的连接。负载均衡器策略使用“最小连接数”逻辑&#xff0c;因为在部署中任何 MinIO 节点都可以接收、路由或处理…

work环境配置

1.计算机右键找到属性 2.配置环境变量 3.新加环境变量 4.修改环境变量path .bat文件内容 php ApplicationsChatstart_register.php ApplicationsChatstart_gateway.php ApplicationsChatstart_businessworker.php pause