实验二 RDD基础编程

news2025/2/24 23:11:49

实验二 RDD基础编程

前提是配置好大数据环节。

hadoop,spark,scala等必须的软件

以及下载pyshark

 1.实验目的

1. 掌握 RDD 基本操作;

2. 熟悉使用 RDD 编程解决实际具体问题的方法;

2.实验内容

本人仅提供测试代码!可以给你提供思路的代码,仅供参考!

1.pyspark 交互式编程

下载 chapter4-data1.txt,该数据集包含了某大学计算机系的成绩,数据格式如下所 示: Tom,DataBase,80 Tom,

Algorithm,50 Tom,

DataStructure,60 Jim,

DataBase,90 Jim,

Algorithm,60 Jim,

DataStructure,80 ……

请根据给定的实验数据,在 pyspark 中通过编程来计算以下内容:

(1)该系总共有多少学生;

(2)该系共开设了多少门课程;

(3)Tom 同学的总成绩平均分是多少;

(4)求每名同学的选修的课程门数;

(5)该系 DataBase 课程共有多少人选修;

(6)各门课程的平均分是多少;

(7)使用累加器计算共有多少人选了 DataBase 这门课。

# 导入必要的模块
from pyspark import SparkConf, SparkContext

# 创建 SparkConf 对象
conf = SparkConf().setAppName("SparkRDD").setMaster("local[*]")

# 创建 SparkContext 对象
sc = SparkContext(conf=conf)
# 设置日志级别为 ALL
sc.setLogLevel("WARN")

# 读取数据文件 自己改成绝对路径比较好
data_rdd = sc.textFile("./chapter4-data01.txt")

# (1) 该系总共有多少学生
students = data_rdd.map(lambda stu: stu.split(',')[0]).distinct().count()
print("该系总共有 %d 名学生" % students)

# (2) 该系共开设了多少门课程
courses = data_rdd.map(lambda c: c.split(',')[1]).distinct().count()
print("该系共开设了 %d 门课程" % courses)

# (3) Tom 同学的总成绩平均分是多少
tom_scores = data_rdd.filter(lambda s: s.startswith("Tom")).map(lambda g: int(g.split(',')[2]))
tom_avg_score = tom_scores.mean()
print("Tom 同学的总成绩平均分是 %.2f 分" % tom_avg_score)

# (4) 求每名同学的选修的课程门数
student_courses = data_rdd.map(lambda cs: (cs.split(',')[0], 1)).reduceByKey(lambda a, b: a + b).collect()
for student, course_count in student_courses:
    print("%s 同学选修了 %d 门课程" % (student, course_count))

# (5) 该系 DataBase 课程共有多少人选修
db_selected_count = data_rdd.filter(lambda c: c.split(',')[1] == "DataBase").count()
print("该系 DataBase 课程共有 %d 人选修" % db_selected_count)

# (6) 各门课程的平均分是多少
course_scores = data_rdd.map(lambda line: (line.split(',')[1], int(line.split(',')[2]))).groupByKey().mapValues(lambda scores: sum(scores) / len(scores)).collect()
for course, avg_score in course_scores:
    print("%s 课程的平均分是 %.2f 分" % (course, avg_score))

# (7) 使用累加器计算共有多少人选了 DataBase 这门课
db_selected_acc = sc.accumulator(0) #创建累加器对象,初始化值0

def count_db_selected(score): #将选择DataBase+1
    if score.split(',')[1] == "DataBase":
        db_selected_acc.add(1)

data_rdd.foreach(count_db_selected) #遍历累加
db_selected_count_acc = db_selected_acc.value #获取累加值
print("使用累加器计算,共有 %d 人选了 DataBase 这门课" % db_selected_count_acc)

# 关闭 SparkContext 对象
sc.stop()

 2.编写独立应用程序实现数据去重

对于两个输入文件 A 和 B,编写 Spark 独立应用程序,对两个文件进行合并,并剔除其 中重复的内容,得到一个新文件 C。下面是输入文件和输出文件的一个样例,供参考。

输入文件 A 的样例如下:

20170101 x

20170102 y

20170103 x

20170104 y

20170105 z

20170106 z

输入文件 B 的样例如下:

20170101 y

20170102 y

20170103 x

20170104 z

20170105 y

根据输入的文件 A 和 B 合并得到的输出文件 C 的样例如下:

20170101 x

20170101 y

20170102 y

20170103 x

20170104 y

20170104 z

20170105 y

20170105 z

20170106 z

参考代码:

from pyspark import SparkContext, SparkConf

# 创建Spark配置对象并设置应用名称
conf = SparkConf().setAppName("Deduplication")
# 创建SparkContext对象
sc = SparkContext(conf=conf)

# 读取文件A和B 设置自己的路径
fileA = sc.textFile("A.txt")
fileB = sc.textFile("B.txt")

# 合并两个文件
mergedRDD = fileA.union(fileB)

# 去重操作
distinctRDD = mergedRDD.distinct()

# 写入输出文件C
distinctRDD.coalesce(1)
distinctRDD.saveAsTextFile("C")

# 关闭SparkContext对象
sc.stop()

3.编写独立应用程序实现求平均值问题

每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生 名字,第二个是学生的成绩;

编写 Spark 独立应用程序求出所有学生的平均成绩,并输出到 一个新文件中。下面是输入文件和输出文件的一个样例,供参考。

Algorithm 成绩: 小明 92 小红 87 小新 82 小丽 90

Database 成绩: 小明 95 小红 81 小新 89 小丽 85

Python 成绩: 小明 82 小红 83 小新 94 小丽 91

平均成绩如下: (小红,83.67) (小新,88.33) (小明,89.67) (小丽,88.67)

创建的项目不要起中文的名称

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, avg

# 创建SparkSession
spark = SparkSession.builder \
    .appName("CalculateAverageScore") \
    .getOrCreate()

# 读取输入文件并创建DataFrame
input_files = [
    ".\\input\\input_file1.txt",
    ".\\input\\input_file2.txt",
    ".\\input\\input_file3.txt"
]

# 合并所有输入文件的数据
scores_df = spark.read.text(input_files)
scores_df = scores_df.withColumn("data", split(scores_df["value"], " "))
scores_df = scores_df.select(scores_df["data"][0].alias("name"), scores_df["data"][1].cast("float").alias("score"))

# 使用DataFrame进行数据处理,计算每个学生的平均成绩
average_scores_df = scores_df.groupBy("name").agg(avg("score").alias("average_score"))
average_scores_df = average_scores_df.orderBy("name")

# 将结果输出到新文件
output_file = "./output.txt"

# 将结果转换为字符串格式
result_str = average_scores_df.rdd \
    .map(lambda row: f"({row['name']},{row['average_score']:.2f})") \
    .collect()

# 将结果写入文件
with open(output_file, "w") as file:
    file.write("\n".join(result_str))

# 停止SparkSession
spark.stop()

生成的效果图

 3.资源地址

https://download.csdn.net/download/weixin_41957626/87780485

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/522183.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

策划专业技能提升攻略,让你在职场中脱颖而出

作为一个10多年的老策划,刚入行的时候也走过很多弯路,后来加入到一家在国内比较知名的策划公司(老板也是当年的十大知名策划人)才真正让我实现水平的跃升。 当时公司经常有内训,新人的第一课就是策划人应该如何快速入…

FreeRTOS-事件组详解

✅作者简介:嵌入式入坑者,与大家一起加油,希望文章能够帮助各位!!!! 📃个人主页:rivencode的个人主页 🔥系列专栏:玩转FreeRTOS 💬保持…

深入理解JVM读书笔记与实战_01_Java内存区域与内存溢出异常

文章目录 运行时数据区域问题引入 运行时数据区域 Java虚拟机在执行Java程序的过程中会把所管理的内存划分为若干个不同的数据区。运行时数据区包括了程序计数器、虚拟机栈、本地方法栈、方法区和堆。 程序计数器:程序计数器是线程私有的内存,用来记住…

vue:组件使用v-model实现2个组件间的数据双向绑定

一、需要实现的需求: 子组件输入框的数据发生改变,父组件的数据跟着实时改变; 父组件的数据发生改变,子组件的数据跟着实时改变。 二、实现思路: 1、(1)在父组件引入子组件。(2&…

CAN总线要点总结(CAN2.0A/B)

个人博客原文链接:CAN总线要点总结(CAN2.0A/B) 前言 工作也有几年了,在项目中也接触过几次CAN总线,但总是止步于会用即可,对于很多细节上的东西有时还是稀里糊涂的状态,这几天正好有点时间&am…

【亲测有效】pycharm不显示软件包

http://pypi.hustunique.com/ https://pypi.mirrors.ustc.edu.cn/ http://pypi.tuna.tsinghua.edu.cn/simple/ http://mirrors.aliyun.com/pypi/simple/ http://pypi.douban.com/simple/2023.5.13 亲测有效

单点登录系统:登录,登出,拦截器

什么是单点登录? 单点登录(Single Sign On),简称为 SSO,是目前比较流行的企业业务整合的解决方案之一。SSO 的定义是在多个应用系统中,用户只需要登录一次就可以访问所有相互信任的应用系统。 假设一个企业…

贪心算法(无规则)

目录 1.easy1.455. 分发饼干2.1005. K 次取反后最大化的数组和3.860. 柠檬水找零 2.medium1.序列问题1.376. 摆动序列2.738. 单调递增的数字 2.贪心解决股票问题1.122. 买卖股票的最佳时机 II 3.两个维度权衡问题1.135. 分发糖果*2.406. 根据身高重建队列(linklist,…

【WOA-LSTM】基于WOA优化 LSTM神经网络预测研究(Python代码实现)

💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…

Neuron 2.4.0 发布:体验下一代工业物联网连接和管理

近日,EMQ 旗下的工业协议网关软件 Neuron 发布了最新的 2.4.0 版本。 该版本新增了包括 ABB COMLI 在内的四个南向驱动和一个北向应用,同时对现有的插件功能和 UI 进行了优化。 快速体验 Neuron 新版本 新增驱动插件满足不同场景需求 IEC61850 MMS 和 …

springboot项目如何优雅停机

文章目录 前言kill -9 pid的危害如何优雅的停机理论步骤优雅方式1、kill -15 pid 命令停机2、ApplicationContext close停机3、actuator shutdown 停机 前言 相信很多同学都会用Kill -9 PID来杀死进程,如果用在我们微服务项目里面,突然杀死进程会有什么…

计算机组成原理基础练习题第二章

1.下面四种语言中,()更适应网络环境 A、FORTRAN    B、JavaC、C        D、PASCAL 2.下列叙述中正确的是() A.终端是计算机硬件的一部分,好比电视中的小屏幕 B. ALU是代数逻辑单元的缩写 C.导航用计算机属于一般用途计算…

网络数据链路层介绍

文章目录 一、以太网二、以太网的帧格式三、局域网通信的原理四、ARP协议1.ARP协议的介绍2.ARP协议的工作流程3.ARP数据报格式 一、以太网 以太网并不是一种具体的网络,而是一种技术标准,它既包含了数据链路层的内容,也包含了一些物理层的内…

【数据分享】2023年全国A级景区数据(14847个景区)

我国的旅游景区,依据景区质量划分为五级,从高到低依次为5A、4A、3A、2A、A级旅游景区。我国旅游管理部门对于A级景区实行“有进有出”的动态管理,也就是A级景区的名单每年都在变!我们立方数据学社也一直在跟踪整理每年的A级景区数…

Android面试指南:说说你对组件化/模块化的理解

到现在组件化真的不是什么新鲜东西了,大公司都用的滚瓜烂熟,龙飞凤舞了,也就是现在部分中型项目和小项目在组件化的路上努力。所以同志们,组件化没玩过的,不熟悉的赶紧搞起来,说一点,你不会组件…

【牛客小白月赛72】BCD题

B、数数 比赛AC代码&#xff1a; #include <iostream> using namespace std;int t, n; int ans1;int check(int x) {int ans 0;for(int i 1; i < x/i; i)if(x%i 0){ans ;if(i ! x/i) ans;}ans1 ans;return ans1; } int main() {cin>> t;while(t--){cin&…

每日一练 | 华为认证真题练习Day44

1、如Display信息所示&#xff0c;当此交换机需要转发目的IAC地址为5489-98ec-f011的帧时&#xff0c;下面描述正确的是&#xff08;&#xff09;。 A. 交换机将会在除了收到该帧的端口之外的所有端口泛洪该帧 B. 交换机将会发送目标不可达的消息给源设备 C. 交换机在MAC地址…

easyExcel 与 POI 基础知识

文章目录 POI 与 easyExcel一、 了解1.1 Apache POI1.2 easyExcel 二、 准备工作2.1 Maven坐标2.2 Excel讲解 三、 Excel基本写操作&#xff08;导出Excel&#xff09;3.1 03 版本Excel导出操作3.2 07版本Excel导出操作3.3 大数据量的导出&#xff08;数据批量导入到磁盘&#…

含电动汽车的区域综合能源系统优化调度研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

django路由(多应用配置动态路由正则路由)

一、配置全局路由 在应用下&#xff0c;定义视图函数views.py from django.http import HttpResponse from django.shortcuts import render# Create your views here.def get_order(request):return HttpResponse("orders应用下的路由") 在项目的urls路由配置中&…