1.环境准备
start-all.sh
启动Hadoop
./bin start-all.sh
启动spark
上传数据集
1.求该系总共多少学生
lines=sc.textFile("file:///home/data.txt")
res= lines.map(lambda x:x.split(",")).map(lambda x:x[0])
sum=res.distinct()
sum.cont()
2.求该系设置了多少课程
lines=sc.textFile("file:///home/data.txt")
res= lines.map(lambda x:x.split(",")).map(lambda x:x[1])
class_res=res.distinct()
class_res.count()
结果为8
3.求tom的平均成绩
前两step代码近似
lines=.......
res= lines.filter(lambda x:x.split(",")).map(lambda x:x[0]=="Tom")
res.foreach(print)
先打印一下tom同学的成绩
然后统计tom的总成绩和总科目并相除
score=res.map(lambda x:int(x[2]))
all=res.count()
sum=score.reduce(lambda x,y:x+y)
ave=sum/all
print(ave)
结果为30.8
4.求每名同学的选课情况,也就是经典的map reduce 相加逻辑同上
res= lines.map(lambda x:x.split(",")).map(lambda x:(x[0],1))
pick_res=res.reduceByKey(lambda x,y:x+y)
pick_res.foreach(print)
由于太多只得部分截取
5.求选了Database的人数
res= lines.map(lambda x:x.split(",")).filter(lambda x:x[1]=='DataBase')
res.count()
得到答案为126
6.求出各科平均分
res= lines.map(lambda x:x.split(",")).map(lambda x:(x[1],(int(x[2]),1)))
temp=res.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
ave=temp.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))
ave.foreach(print)
7.使用累加器计算共有多少人选了DataBase这门课。
得出结果126
二·生成两个txt的随机数并合并
1.先创建两个txt文件
2.新建py文件
import random
import string
dic_name_score = {}
s=['x','y','z']
for i in range(300):
student_num = random.randint(20170100,20170199)
student_score = random.choice(s)
dic_name_score[student_num] = student_score
dic_name_score_sort = sorted(dic_name_score.items(), key=lambda x: x[1])
for i in dic_name_score_sort:
print(i[0]," ",i[1])
f=open("/home.txt","w")
for line in dic_name_score_sort:
f.write(line+'\n',)
ps:若生成不了那请自行在windows上执行完复制粘贴,然后上传到linux端。
需要安装pyspark第三方库
执行命令合并
结果如下
随机生成人名和课程并求出平均数
1.随机生成人名和成绩的代码如下,设置了五门课程
import random
import string
dic_name_score = {}
first_name = ["赵", "钱", "孙", "李", "周", "吴", "郑", "王", "冯", "陈", "褚", "卫", "蒋", "沈", "韩", "杨", "朱", "秦", "尤", "许", "何", "吕", "施", "张", "孔", "曹", "严", "华", "金", "魏", "陶", "姜", "戚", "谢", "邹", "喻", "柏", "水", "窦", "章", "云", "苏", "潘", "葛", "奚", "范", "彭", "郎", "鲁", "韦", "昌", "马", "苗", "凤", "花", "方"]
second_name = ["静", "霞", "雪", "思", "平", "东", "志宏", "峰", "磊", "雷", "文","明浩", "光", "超", "军", "达", "伟", "华", "建国", "洋", "刚", "万里", "爱民", "牧", "陆", "路", "昕", "鑫", "兵", "硕"]
for i in range(100):
student_num = random.randint(10,100)
student_score = random.choice(first_name) + random.choice(second_name)
s2=random.randint(10,100)
s3=random.randint(10,100)
s4=random.randint(10,100)
s5=random.randint(5,100)
dic_name_score[student_num,s2,s3,s4,s5] = [student_score]
dic_name_score_sort = sorted(dic_name_score.items(), key=lambda x: x[1])
for i in dic_name_score_sort:
print(i[1]," ",i[0])
f=open("D:\\A.txt","w")
for line in dic_name_score_sort:
f.write(line+'\n',)
执行代码得到了一个人对应了五个科目的成绩,随后进行分割把没科目成绩分到不同的文件中去。
我们将答案放在了c.txt 由于过多我们用tail查看
编写代码求出平均值
from pyspark import SparkContext
sc=SparkContext('local','sparksql')
lines1=sc.textFile('file:///home/A.txt')
lines2=sc.textFile('file:///home/B.txt')
lines=lines1.union(lines2)
data=lines.map(lambda x:x.split(" ").map(lambda x:(x[0],int(x[1]),1)))
res=data.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
result=res.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))
result.repartition(1).saveAsTextFile("file:///home/result")
执行代码最后把结果存在result 由于过多我们用head指令看结果