大数据编程期末大作业
文章目录
- 大数据编程期末大作业
- 一、Hadoop基础操作
- 二、RDD编程
- 三、SparkSQL编程
- 四、SparkStreaming编程
一、Hadoop基础操作
-
在HDFS中创建目录 /user/root/你的名字 例如李四同学 /user/root/lisi
首先我们需要启动hdfs,我们直接在终端输入如下命令:
start-dfs.sh
我们在终端输入如下命令创建目录:
hadoop fs -mkdir /user hadoop fs -mkdir /user/root hadoop fs -mkdir /user/root/***(这里是你自己的名字)
上面是逐个创建文件夹,我们还可以使用参数-p一次性创建多级目录:
hadoop fs -mkdir -p /user/root/***
-
创建本地文件a.txt,文件内容:You love Hadoop ,并将改文件上传到HDFS中第1题所创建的目录中
我们直接在终端的root目录下面创建我们的本地文件并输入题目要求的内容:
vim a.txt
然后我们再在终端输入上传命令:
hadoop fs -put a.txt /user/root/***
-
查看上传到HDFS中的a.txt文件的内容
我们直接在终端输入查看命令:
hadoop fs -cat /user/root/***/a.txt
-
在Hadoop官方的示例程序包hadoop-mapreduce-examples-3.1.4.jar中,包括计算Pi值的测试模块,使用hadoop jar命令提交计算Pi的MapReduce任务
我们首先进入到hadoop下的mapreduce目录中:
cd /usr/local/servers/hadoop/share/hadoop/mapreduce/
然后执行如下命令即可计算Pi:
hadoop jar hadoop-mapreduce-examples-3.1.3.jar pi 5 5
可以看出精度不是很高,上面命令后面的两个数字含义是,第一个5是运行5次map任务,第二个5是每个map任务投掷次数,总投掷次数就是两者相乘,想要提高精度就可以让数字变大,但是很容易出现作业计算失败的异常,这是因为计算内存不够,所以不能调的太大。
二、RDD编程
现有一份2019年我国部分省份高考分数线数据文件exam2019.csv,共有四个数据字段,字段说明如表1所示:
表1 高考分数线数据字段说明
字段名称 | 说明 |
---|---|
地区 | 省、直辖市或自治区 |
考生类别 | 考生报考类别,如理科 |
批次 | 划定的学校级别,如本科批次 |
分数线 | 达到所属批次的最低分 |
为了解2019年全国各地的高考分数线情况,请使用Spark编程,完成以下需求:
-
读取exam2019.csv并创建RDD
我们首先将该文件上传到我们的终端,我直接放在root目录下的。
然后我们启动pyspark:
pyspark
然后我们读取我们的文件并创建RDD:
data = sc.textFile("file:///root/exam2019.csv")
-
查找出各地区本科批次的分数线
# 对RDD数据进行map操作,拆分每一行数据 data_map = data.map(lambda x: x.split(",")) # 对拆分后的RDD进行filter操作,过滤出本科的数据 data_filter = data_map.filter(lambda x: x[2] == '本科') # 对过滤后的RDD进行map操作,抽取出地区和分数线 data_result = data_filter.map(lambda x:(x[0],x[3])) # 对抽取后的RDD进行reduceByKey操作,按地区进行分组 data_reduce = data_result.reduceByKey(lambda x,y:x+','+y) # 打印结果 data_reduce.collect()
-
将结果以文本格式存储到HDFS上,命名为/user/root/你的名字exam2019
data_reduce.saveAsTextFile("hdfs://localhost:9000/uesr/root/*** exam2019")
三、SparkSQL编程
某餐饮企业预备使用大数据技术对过往餐饮点评大数据进行分析以提高服务与菜品质量,实现服务升级,具体情况如下:现有一份顾客对某城市餐饮店的点评数据restaurant.csv,记录了不同类别餐饮店在口味、环境、服务等方面的评分,数据共有12列,前10列数据字段的说明如表2所示,最后两列的数据为空则不描述。
表2 顾客对某城市餐饮店的点评数据字段说明
字段名称 | 字段名称 |
---|---|
类别 | 餐饮店类别 |
行政区 | 餐饮店所在位置区域 |
点评数 | 有多少人进行了点评 |
口味 | 口味评分 |
环境 | 环境评分 |
服务 | 服务评分 |
人均消费 | 人均消费(单位:元) |
城市 | 餐饮店所在城市 |
Lng | 经度 |
Lat | 纬度 |
为探究人们对该城市餐饮店的点评分布情况,分析客户在餐饮方面的消费喜好,请使用Spark SQL进行编程,完成如下需求:
-
读取restaurant.csv数据,删除最后为空值的两列,再删除含有空值的行
# 读取文件 df = spark.read.csv("file:///root/restaurant.csv", header=True) # 删除最后两列 df = df.drop(df._c10).drop(df._c11) # 删除含有空值的行 df = df.na.drop()
-
筛选出口味评分大于7分的数据
result1 = df.filter(df.口味 > 7)
-
统计各类别餐饮店点评数,并按降序排列
# 选出需要的列,转换成rdd dps = df.select('类别', '点评数').rdd # 计算每种类别餐饮点评数的总和 dps = dps.map(lambda x:(x[0], int(x[1]))).reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1], False) # 将计算得出的表格标签进行修改 dps = dps.toDF().withColumnRenamed('_1', '类别').withColumnRenamed('_2', '点评数').show()
-
将步骤2和步骤3的结果保存到HDFS上,命名为/user/root/你的名字restaurant
result1.saveAsTextFile("hdfs://localhost:9000/uesr/root/*** kouwei7") dps.saveAsTextFile("hdfs://localhost:9000/uesr/root/*** dps")
四、SparkStreaming编程
现有一份某饭店的菜单数据文件menu.txt,部分数据如表3所示,每一行有3个字段,分别表示菜品ID、菜名和单价(单位:元)。
表3 某饭店的菜单数据
一位顾客依次点了红烧茄子、京酱肉丝和剁椒鱼头共3个菜,为实时计算顾客点餐的费用,请使用Spark streaming 编程完成以下操作:
-
在虚拟机上启动8888端口
直接在终端输入如下命令:
nc -lk 8888
-
使用Spark streaming连接虚拟机的8888端口,并实时统计顾客点餐的总费用
我们创建一个py程序名为prizeSum.py,并填入如下代码:
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import sys
# 从sys.argv中获取主机名和端口号
if len(sys.argv) != 3:
print("Usage:prizeSum.py <hostname> <port>", file=sys.stderr)
exit(-1)
# 创建SparkContext
sc = SparkContext(appName="pythonSparkStreamingPrizeSum")
# 创建StreamingContext
ssc = StreamingContext(sc, 5)
# 创建函数,实现累加
def accumulate(values, sums):
return sum(values) + (sums or 0)
# 设置检查点目录
ssc.checkpoint("file:///root/test/")
initialStateRDD = sc.parallelize([])
# 从指定的主机和端口接收数据流
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
# 将数据流中的每一行转换为一个元组
costs= lines.map(lambda x: x.split(" "))
# 将每一行的价格累加
totalCost = costs.map(lambda x: ("总价", int(x[2]))).updateStateByKey(accumulate, initialRDD=initialStateRDD)
totalCost.map(lambda x: x.values())
# 打印结果
totalCost.pprint()
# 启动Streaming处理流
ssc.start()
# 等待程序终止
ssc.awaitTermination()
-
启动Spark streaming程序,在8888端口输入顾客所点的菜单数据,如“3 红烧茄子15”,查看顾客本次点餐的总费用
我们启动我们创建的py程序:
python3 prizeSum.py localhost 8888