文章作者邮箱:yugongshiye@sina.cn 地址:广东惠州
▲ 本章节目的
⚪ 掌握Spark的常用案例——WordCount;
⚪ 掌握Spark的常用案例——求平均值;
⚪ 掌握Spark的常用案例——求最大值和最小值;
⚪ 掌握Spark的常用案例——TopK;
⚪ 掌握Spark的常用案例——二次排序;
一、案例——WordCount
1. 实现步骤
1. 创建spark的项目,在scala中创建项目 导入spark相关的jar包。
2. 开发spark相关代码。
代码示例:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object WordCountDriver {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("spark://hadoop01:7077").setAppName("wordcount")
val sc=new SparkContext(conf)
val data=sc.textFile("hdfs://hadoop01:9000/words.txt", 2)
val result=data.flatMap { x => x.split(" ") }.map { x => (x,1) }.reduceByKey(_+_)
result.saveAsTextFile("hdfs://hadoop01:9000/wcresult")
}
}
3. 将写好的项目打成jar,上传到服务器,进入bin目录。
执行:spark-submit --class cn.tedu.WordCountDriver /home/software/spark/conf/wc.jar
二、案例——求平均值
案例文件:
1 16
2 74
3 51
4 35
5 44
6 95
7 5
8 29
10 60
11 13
12 99
13 7
14 26
正确答案:
42
代码示例一:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object AverageDriver {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("AverageDriver")
val sc=new SparkContext(conf)
val data=sc.textFile("d://average.txt")
val ageData=data.map { line=>{line.split(" ")(1).toInt}}
val ageSum=ageData.reduce(_+_)
val pepopleCount=data.count()
val average=ageSum/pepopleCount
println(average)
}
}
代码示例二:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object AverageDriver {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("AverageDriver")
val sc=new SparkContext(conf)
val data=sc.textFile("d://average.txt",3)
val ageData=data.map { line=>{line.split(" ")(1).toInt}}
val ageSum=ageData.mapPartitions{it=>{
val result=List[Int]()
var i=0
while(it.hasNext){
i+=it.next()
}
result.::(i).iterator
}}.reduce(_+_)
val pepopleCount=data.count()
val average=ageSum/pepopleCount
println(average)
}
}
三、案例——求最大值和最小值
案例文件:
1 M 174
2 F 165
3 M 172
4 M 180
5 F 160
6 F 162
7 M 172
8 M 191
9 F 175
10 F 167
代码示例一:
package cn.tedu
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object MaxMinDriver {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("MaxMin")
val sc=new SparkContext(conf)
val data=sc.textFile("d://MaxMin.txt")
val manData=data.filter { x => x.contains("M") }.map { x => x.split(" ")(2).toInt}
val girlData=data.filter { x => x.contains("F") }.map { x => x.split(" ")(2).toInt}
println("Man Max is:"+manData.max()+"Man min is:"+manData.min())
}
}
代码示例二:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object MaxMinDriver {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("MaxMin")
val sc=new SparkContext(conf)
val data=sc.textFile("d://MaxMin.txt")
val manMax=data.filter { line => line.contains("M") }.
sortBy({line=>line.split(" ")(2)},false).first().mkString
val manMin=data.filter { line => line.contains("M") }.
sortBy({line=>line.split(" ")(2)},true).first.mkString
println(manMax+"\n"+manMin)
}
}
代码示例三:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object MaxMinDriver {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("spark://hadoop01:7077").setAppName("MaxMin")
val sc=new SparkContext(conf)
val data=sc.textFile("hdfs://hadoop01:9000/MaxMin.txt",3)
val manMax=data.filter { line => line.contains("M") }.
sortBy({line=>line.split(" ")(2)},false).first.mkString
val manMin=data.filter { line => line.contains("M") }.
sortBy({line=>line.split(" ")(2)},true).first.mkString
val result=sc.makeRDD(Array(manMax,manMin))
//--spark输出文件时,默认是有几个Task,就会生成几个结果文件,
//--所以如果想控制文件个数,控制分区数(task)即可
result.coalesce(1,true).saveAsTextFile("hdfs://hadoop01:9000/MaxMinResult")
}
}
四、案例——TopK
1. 案例说明
Top K算法有两步,一是统计词频,二是找出词频最高的前K个词。
2. 案例代码
文件数据:
hello world bye world
hello hadoop bye hadoop
hello world java web
hadoop scala java hive
hadoop hive redis hbase
hello hbase java redis
代码示例:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object TopkDriver {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setMaster("local").setAppName("topk")
val sc=new SparkContext(conf)
val data=sc.textFile("e://topk.txt",2)
val count=data.flatMap { x => x.split(" ") }
.map { x => (x,1) }.reduceByKey(_+_)
val orderingDesc = Ordering.by [(String, Int), Int](_._2)
val topk=count.top(3)(orderingDesc)
//val topk=count.top(3)(Ordering.by{case (word,count)=>count})
topk.foreach{println}
}
}
3. 应用场景
Top K的示例模型可以应用在求过去一段时间消费次数最多的消费者、访问最频繁的IP地址和最近、更新、最频繁的微博等应用场景。
五、案例——二次排序
文件数据:
aa 12
bb 32
aa 3
cc 43
dd 23
cc 5
cc 8
bb 33
bb 12
要求:先按第一例升序排序,再按第二列降序排序
自定义排序类代码:
class SecondarySortKey(val first:String,val second:Int) extends Ordered[SecondarySortKey] with Serializable {
def compare(other:SecondarySortKey):Int={
var comp=this.first.compareTo(other.first)
if(comp==0){
other.second.compareTo(this.second)
}else{
comp
}
}
}
Driver代码:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object SsortDriver {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("ssort")
val sc=new SparkContext(conf)
val data=sc.textFile("d://ssort.txt",3)
val ssortData=data.map { line =>{
(new SecondarySortKey(line.split(" ")(0),line.split(" ")(1).toInt),line)
}
}
val result=ssortData.sortByKey(true)
result.foreach(println)
}
}