一、累加器
1、累加器的引入
案例:没读取一条文件中的数据,count+1,并打印在Drive端(控制台)
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo20Accumulator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("累加器的引入")
val context = new SparkContext(conf)
val studentRDD: RDD[String] = context.textFile("spark/data/student.csv")
var count = 0
studentRDD.foreach((line:String)=>{
count+=1
println("-------------------------")
println(count) //打印1~1000
println("-------------------------")
})
println(s"count的值为:${count}") //count的值为:0
}
}
累加器的引入代码图解:
2、累加器的应用
object Demo20Accumulator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("累加器的应用")
val context = new SparkContext(conf)
val studentRDD: RDD[String] = context.textFile("spark/data/student.csv")
/**
* 累加器:由SparkContext来创建
* 注意:
* 1、累加器能保证在Spark任务出现问题被重启的时候不会出现重复计算.
* 2、累加器只有在Action算子执行的时候才会被触发.
*/
val accumulator: LongAccumulator = context.longAccumulator
studentRDD.foreach((line: String) => {
accumulator.add(1)
})
println(s"accumulator的值为:${accumulator.value}") //accumulator的值为:1000
}
}
累加器的应用代码图解:
二、广播变量
1、广播变量的引入
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.io.Source
object Demo21Broadcast {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("广播变量引入演示")
val context = new SparkContext(conf)
//使用Scala的方式读取学生数据文件,将其转换以学号作为键的map集合,属于在Driver端的一个变量
val studentsMap: Map[String, String] = Source.fromFile("spark/data/student.csv")
.getLines()
.toList
.map((line: String) => {
val infos: Array[String] = line.split(",")
val stuInfo: String = infos.mkString(",")
infos(0) -> stuInfo
}).toMap
val scoresRDD: RDD[String] = context.textFile("spark/data/score.txt")
/**
* 将Spark读取的分数RDD与外部变量学生Map集合进行关联
* 循环遍历scoresRDD,将学号一样的学生信息关联起来
*/
val resMapRDD: RDD[(String, String)] = scoresRDD.map((score: String) => {
val id: String = score.split(",")(0)
//使用学号到学生map集合中获取学生信息
val studentInfo: String = studentsMap.getOrElse(id, "无学生信息")
score -> studentInfo
})
resMapRDD.foreach(println)
}
}
广播变量的引入代码图解:
2、广播变量的应用
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.io.Source
object Demo21Broadcast {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("广播变量应用演示")
val context = new SparkContext(conf)
//使用Scala的方式读取学生数据文件,将其转换以学号作为键的map集合,属于在Driver端的一个变量
val studentsMap: Map[String, String] = Source.fromFile("spark/data/student.csv")
.getLines()
.toList
.map((line: String) => {
val infos: Array[String] = line.split(",")
val stuInfo: String = infos.mkString(",")
infos(0) -> stuInfo
}).toMap
/**
* 将studentsMap变成一个广播变量,让每一个将来需要执行关联的Executor中都有一份studentsMap数据
* 避免了每次Task任务拉取都要附带一个副本,拉取的速度变快了,执行速度也就变快了
*
*/
val studentsMapBroadcast: Broadcast[Map[String, String]] = context.broadcast(studentsMap)
val scoresRDD: RDD[String] = context.textFile("spark/data/score.txt")
/**
* 使用广播变量进行关联
*/
val resMapRDD: RDD[(String, String)] = scoresRDD.map((score: String) => {
val id: String = score.split(",")(0)
val stuMap: Map[String, String] = studentsMapBroadcast.value //获取广播变量中的值
//使用学号到学生map集合中获取学生信息
val studentInfo: String = stuMap.getOrElse(id, "无学生信息")
(score, studentInfo)
})
resMapRDD.foreach(println)
}
}
广播变量的应用代码图解:
三、blockmanager
在广播变量的应用代码图解中提到了blockmanager拉取Driver端的数据,在此详细说明一下blockmanager,blockmanager是Executor的组成部分之一,它负责管理内存和磁盘上的数据块。