参考链接
spark入门实战系列--8MLlib spark 实战_mob6454cc68310b的技术博客_51CTO博客https://blog.51cto.com/u_16099212/7454034
Spark和Hadoop的安装-CSDN博客https://blog.csdn.net/weixin_64066303/article/details/138021948?spm=1001.2014.3001.5501
1. spark-shell交互式编程
启动spark-shell
cd /usr/local/spark/
./bin/spark-shell
1.1 该系总共有多少学生
注:我将下载的chapter5-data1.txt文件放在“/home/hadoop/下载”目录下。
val lines = sc.textFile("file:///home/hadoop/下载/chapter5-data1.txt") #读取文件
lines.map(row=>row.split(",")(0)).distinct().count #每一行作为一个字符串,用’,’分割,取第一个元素,distinct去重,count统计有多少数据项
1.2 该系共开设来多少门课程
lines.map(row=>row.split(",")(1)).distinct().count #去第二个元素,去重,统计元素数量
1.3 Tom同学的总成绩平均分是多少
lines.filter(row=>row.split(",")(0)=="Tom") #以','作为分隔符,用filter进行过滤,筛选出第一项是“Tom”的数据项
.map(row=>(row.split(",")(0),row.split(",")(2).toInt)) #把第一项和第三项(姓名+成绩)合在一起构成一个数据项
.mapValues(x=>(x,1)) #去除value,把x变成(x,1),第一项是原始数据,第二项是数字1
.reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)) #针对想对的Key(也就是姓名),来进行运行,运算规则是(x.1+y._1),表示求和,也就是对(x,1)分别进行求和
.mapValues(x=>(x._1/x._2)).collect() #求平均值运算,x._1是原始数据的求和,x._2是1的求和,表示数据项的个数
读取的是字符串,所以需要转Int .
1.4 求每名同学的选修的课程门数
lines.map(row=>(row.split(",")(0),1)).reduceByKey((x,y)=>x+y).collect
首先是将数据变成(姓名,1)的map,然后针对相同key(姓名)的数据进行求和,也就是统计数据项的个数。
1.5 该系DataBase课程共有多少人选修
lines.filter(row=>row.split(",")(1)=="DataBase").count #直接是筛选第二项(课程)是DataBase的数据,然后进行统计个数
1.6 各门课程的平均分是多少
lines.map(row=>(row.split(",")(1),row.split(",")(2).toInt)).mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>(x._1/x._2)).collect()
求平均分的部分和前面是保持一致的,区别就是筛选构成map的时候前面是根据“Tom”来划分,现在是根据第二项的课程来进行划分。
1.7 使用累加器计算共有多少人选了DataBase这门课
val acc=sc.longAccumulator("My Accumulator") #定义一个累加器
# #筛选第二项是DataBase的数据项,构成一个(DataBase,1)的map,用foreach,对values值来进行累加
lines.filter(row=>row.split(",")(1)=="DataBase").map(row=>(row.split(",")(1),1)).values.foreach(x=>acc.add(x))
#输出累加值
acc.value
2. 编写独立应用程序实现数据去重
2.1创建相关项目
sudo mkdir -p /example/sparkapp4/src/main/scala
cd /example/sparkapp4/src/main/scala
sudo touch A.txt
sudo vim A.txt
sudo touch B.txt
sudo vim B.txt
sudo vim SimpleApp.scala
import java.io.FileWriter
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
def main(args: Array[String]): Unit = {
//配置
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
//读取文件A.txt
val A = sc.textFile("file:///example/sparkapp4/src/main/scala/A.txt")
//读取文件B.txt
val B = sc.textFile("file:///example/sparkapp4/src//main/scala/B.txt")
//对两个文件进行合并
val C = A ++ B
//1.用distinct进行去重
//2.以空格来进行分割
//3.根据key排序
val distinct_lines = C.distinct().map(row => (row.split(" ")(0), row.split(" ")(1))).sortByKey()
//将RDD类型的数据转换为数组
val result = distinct_lines.collect()
//将结果输出到C.txt中
val out = new FileWriter("/example/sparkapp4/src/main/scala/C.txt", true)
for (item <- result) {
out.write(item + "\n")
println(item)
}
out.close()
}
}
2.2创建.sbt文件
cd /example/sparkapp4
sudo touch build.sbt
sudo vim build.sbt
name := "Simple Project"
version := "1.0"
scalaVersion := "2.13.13"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"
2.3打包执行
出现Exception in thread "main" java.io.FileNotFoundException:/example/sparkapp4/src/main/scala/C.txt (权限不够)
切换到root用户:su root
他这个空格我还是粘贴的,如果代码只有一个空格分割他的结果第二个数据是空的。
sudo /usr/local/sbt/sbt package
su root
spark-submit --class "SimpleApp" ./target/scala-2.13/simple-project_2.13-1.0.jar
3. 编写独立应用程序实现求平均值问题
3.1创建相关文件
sudo mkdir -p /example/sparkapp5/src/main/scala
cd /example/sparkapp5/src/main/scala
sudo vim Algorithm.txt
sudo vim Database.txt
sudo vim Python.txt
vim ./src/main/scala/SimpleApp.scala
import java.io.FileWriter
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
def main(args: Array[String]): Unit = {
//配置
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
//读取文件Algorithm.txt
val Algorithm = sc.textFile("file:///example/sparkapp5/src//main/scala/Algorithm.txt")
//读取文件Database.txt
val Database = sc.textFile("file:///example/sparkapp5/src//main/scala/Database.txt")
//读取文件Python.txt
val Python = sc.textFile("file:///example/sparkapp5/src//main/scala/Python.txt")
//对三个文件进行整合
val scoreSum = Algorithm ++ Database ++ Python
//以空格切割将名字作为key,(成绩,1)作为value
val student_grade = scoreSum.map(row => (row.split(" ")(0), (row.split(" ")(1).toInt, 1)))
//求平均分数
val student_ave = student_grade.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).map(x => (x._1, 1.0 * x._2._1 / x._2._2))
//将RDD类型的数据转化为数组
val result = student_ave.collect()
val out = new FileWriter("/example/sparkapp5/src/main/scala/average.txt", true)
for (item <- result) {
out.write(item + "\n")
println(item)
}
out.close()
}
}
2.2创建.sbt文件
如上同
2.3打包执行
如上同
题目要求要保留两位小数,我找的那个没有保留小数,我目前写的这个小数后面不止两位。
写入文件采用的是追加的方式。
补:
还是解决了,先写简单的程序调试,然后直接替换。
刚开始想的不对,直接用的是Array,结果不出意外报错了。
object Test {
def main(args: Array[String]): Unit = {
var a = Array("feng", 12.355353)
println(a)
println(a(0))
println(a(1))
println(a(1).formatted("%.2f"))
printf("%s %.2f\n", a(0), a(1))
}
}
因为需要格式化输出的是一个Map,不是Array,所以需要修改代码。
[Ljava.lang.Object;@43a25848
feng
12.355353
12.36
feng 12.36
object Test {
def main(args: Array[String]): Unit = {
var map = Map[String, Double]("feng" -> 12.442424, "xi" -> 13.35262, "ze" -> 23.151425)
for (elem <- map) {
println(elem)
}
for ((key, value) <- map) {
val roundedValue = BigDecimal(value).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble
println(s"($key,$roundedValue)")
}
}
}
(feng,12.442424)
(xi,13.35262)
(ze,23.151425)
(feng,12.44)
(xi,13.35)
(ze,23.15)
之后就是直接替换原始的代码就行了。
import java.io.FileWriter
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
def main(args: Array[String]): Unit = {
//配置
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
//读取文件Algorithm.txt
val Algorithm = sc.textFile("file:///example/sparkapp5/src//main/scala/Algorithm.txt")
//读取文件Database.txt
val Database = sc.textFile("file:///example/sparkapp5/src//main/scala/Database.txt")
//读取文件Python.txt
val Python = sc.textFile("file:///example/sparkapp5/src//main/scala/Python.txt")
//对三个文件进行整合
val scoreSum = Algorithm ++ Database ++ Python
//以空格切割将名字作为key,(成绩,1)作为value
val student_grade = scoreSum.map(row => (row.split(" ")(0), (row.split(" ")(1).toInt, 1)))
//求平均分数
val student_ave = student_grade.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).map(x => (x._1, 1.0 * x._2._1 / x._2._2))
//将RDD类型的数据转化为数组
val result = student_ave.collect()
val out = new FileWriter("/example/sparkapp5/src/main/scala/average.txt", true)
/* for (item <- result) {
out.write(item + "\n")
println(item)
}*/
for ((key, value) <- result) {
val roundedValue = BigDecimal(value).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble
out.write(s"($key,$roundedValue)\n")
println(s"($key,$roundedValue)")
}
out.close()
}
}