Spark 基础

news2024/7/6 19:04:20
/*
Why Spark
    一、MapReduce编程模型的局限性
    	1、繁杂:只有Map和Reduce两个操作,复杂的逻辑需要大量的样板代码
    	2、处理效率低:
    		2.1、Map中间结果写磁盘,Reduce写HDFS,多个Map通过HDFS交换数据
    		2.2、任务调度与启动开销大
    	3、不适合迭代处理、交互式处理和流式处理
    二、Spark是类Hadoop MapReduce的通用【并行】框架
    	1、Job中间输出结果可以保存在内存,不再需要读写HDFS
    	2、比MapReduce平均快10倍以上
    三、版本
    	2014	1.0
    	2016	2.x
    	2020	3.x
    四、优势
        1、速度快
        	基于内存数据处理,比MR快100个数量级以上(逻辑回归算法测试)
        	基于硬盘数据处理,比MR快10个数量级以上
        2、易用性
        	支持Java、【Scala】、【Python:pyspark】、R语言
        	交互式shell方便开发测试
        3、通用性
        	一栈式解决方案:
        		批处理
        		交互式查询
        		实时流处理(微批处理)
        		图计算
        		机器学习
        4、多种运行模式
        	YARN ✔、Mesos、EC2、Kubernetes、Standalone、Local[*]
    五、技术栈
        1、Spark Core:核心组件,分布式计算引擎 RDD
        2、Spark SQL:高性能的基于Hadoop的SQL解决方案
        3、Spark Streaming:可以实现高吞吐量、具备容错机制的准实时流处理系统
        4、Spark GraphX:分布式图处理框架
        5、Spark MLlib:构建在Spark上的分布式机器学习库
	六、spark-shell:Spark自带的交互式工具
        local:spark-shell --master local[*]
        alone:spark-shell --master spark://MASTERHOST:7077
        yarn :spark-shell --master yarn
*/

cd /opt/software/spark-3.1.2/sbin
./start-all.sh
spark-shell --master local
-------------------------------------------------
sc.textFile("file:///root/spark/wordcount.log")
	.flatMap(line=>line.split("\\s+"))
	.map(word=>(word,1))
	.reduceByKey(_+_)
	.collect
-------------------------------------------------
res1: Array[(String, Int)] = Array((hello,2), (welcome,2), (world,1))
-------------------------------------------------

/*
	七、运行架构
        1、在驱动程序中,通过SparkContext主导应用的执行
        2、SparkContext可以连接不同类型的 CM(Standalone、YARN),连接后,获得节点上的 Executor
        3、一个节点默认一个Executor,可通过 SPARK_WORKER_INSTANCES 调整
        4、每个应用获取自己的Executor
        5、每个Task处理一个RDD分区
	Spark服务
		Master : Cluster Manager
		Worker : Worker Node
*/
/*
	八、Spark架构核心组件
	Application		建立在Spark上的用户程序,包括Driver代码和运行在集群各节点Executor中的代码
	Driver program	驱动程序。Application中的main函数并创建SparkContext
	Cluster Manager	在集群(Standalone、Mesos、YARN)上获取资源的外部服务
	Worker Node		集群中任何可以运行Application代码的节点
	Executor		某个Application运行在worker节点上的一个进程
	Task			被送到某个Executor上的工作单元
	Job				多个Task组成的并行计算,由Action触发生成,一个Application中含多个Job
	Stage			每个Job会被拆分成多组Task,作为一个TaskSet,其名称为Stage
*/

SparkContext

/*
    连接Driver与Spark Cluster(Workers)
    Spark的主入口
    每个JVM仅能有一个活跃的SparkContext 
*/

/*
【配置】
	master:
    	local[*] : CPU核数为当前环境的最大值
    	local[2] : CPU核数为2
    	local : CPU核数为1
    	yarn
*/
val conf:SparkConf = new SparkConf()
      .setAppName(name:String)
	  .set(key:String,value:String) // 多项设置
      .setMaster(master:String)
val sc: SparkContext = SparkContext.getOrCreate(conf)

/**
	封装:工具类
*/
class SparkCom(appName:String,master:String,logLevel:String="INFO") {
  private val conf:SparkConf = new SparkConf().setAppName(appName).setMaster(master)
  private var _sc:SparkContext = _
  private var _spark:SparkSession = _
  def sc() = {
    if (Objects.isNull(_sc)) {
      _sc = new SparkContext(conf)
      _sc.setLogLevel(logLevel)
    }
    _sc
  }
  def spark() = {
    if (Objects.isNull(_spark)) {
      _spark = SparkSession.builder().config(conf).getOrCreate()
    }
    _spark
  }
  def stop() = {
    if (Objects.nonNull(_sc)) {
      _sc.stop()
    }
    if (Objects.nonNull(_spark)) {
      _spark.stop()
    }
  }
}
object SparkCom{
  def apply(appName:String): SparkCom = new SparkCom(appName,"local[*]")
  def apply(appName:String, master:String): SparkCom = new SparkCom(appName,master)
  def apply(appName:String, master:String, logLevel:String): SparkCom = new SparkCom(appName,master,logLevel)
}

/*
RDD[?]
【数据集创建】
	RDD:Spark核心,主要数据抽象
		将数据项拆分为多个分区的集合,存储在集群的工作节点上的内存和磁盘
		RDD是用于数据转换的接口
		RDD指向了
			或存储在(HIVE)HDFS、Cassandra、HBase等
			或缓存(内存、内存+磁盘、仅磁盘等)
			或在故障或缓存收回时重新计算其他RDD分区中的数据
	RDD:是弹性分布式数据集(Resilient Distributed Datasets)
        分布式数据集
        	RDD是只读的、分区记录的集合,每个分区分布在集群的不同节点上
        	RDD并不存储真正的数据,只是【对数据和操作】的描述
        弹性
        	RDD默认存放在内存中,当内存不足,Spark自动将RDD写入磁盘
        容错性
        	根据数据血统,可以自动从节点失败中恢复分区
    RDD与DAG:Stage
    	两者是Spark提供的核心抽象
		DAG【有向无环图:如下图】反映了RDD之间的依赖关系

 

/*
	RDD的特性
		一系列的分区(分片)信息,每个任务处理一个分区
        每个分区上都有compute函数,计算该分区中的数据
        RDD之间有一系列的依赖
        分区器决定数据(key-value)分配至哪个分区
        优先位置列表,将计算任务分派到其所在处理数据块的存储位置
    RDD分区:Partition -> Partitioner -> Hash | Range ...	
    	分区是RDD被拆分并发送到节点的不同块之一
        我们拥有的分区越多,得到的并行性就越强
        每个分区都是被分发到不同Worker Node的候选者
        每个分区对应一个Task
    RDD操作类型:分为lazy与non-lazy两种
        Transformation(lazy):也称转换操作、转换算子
        Actions(non-lazy):立即执行,也称动作操作、动作算子
*/
// 集合创建:小数据集,可通过 numSlices 指定并行度(分区数)
val rdd: RDD[T] = sc.parallelize(seq:Seq[T], numSlices:Int) // ✔
val rdd: RDD[T] = sc.makeRDD(seq:Seq[T], numSlices:Int) // 调用了 parallelize

// 外部数据源创建: 可通过 minPartitions 指定分区数,CPU核占用数
// 文件系统:local(file:///...)或hadoop(hdfs://)
val rdd: RDD[String] = sc.textFile(path:String, minPartitions:Int)
val rdd: RDD[String] = sc.wholeTextFiles(dir:String, minPartitions:Int)

// 其他 RDD 创建
val rdd2: RDD[Map[String, Int]] = rdd
      .mapPartitions(_
        .map(
          _
            .split("[^a-zA-Z]+")
            .map((_, 1))
            .groupBy(_._1)
            .map(t2 => (t2._1, t2._2.length))
        )
      )

转换算子:RDD transform

/*
	简单类型 RDD[T]
*/

// 【逐条处理】
val rdd2: RDD[U] = rdd.map(f:T=>U)
// 【扁平化处理】:TraversableOnce : Trait用于遍历和处理集合类型元素,类似于java:Iterable
val rdd2: RDD[U] = rdd.flatMap(f:T=>TraversableOnce[U])
/* 【分区内逐行处理】:以分区为单位(分区不变)逐行处理数据 ✔
	map VS mapPartitions
	1、数量:前者一进一出IO数量一致,后者多进多出IO数量不一定一致
	2、性能:前者多分区逐条处理,后者各分区并行逐条处理更佳,常时间占用内存易导致OOM,内存小不推荐
*/
val rdd2: RDD[U] = rdd.mapPartitions(f:Iterator[T]=>Iterator[U][,preservePar:Boolean])
// 【分区内逐行处理】:以分区为单位(分区不变)逐行处理数据,并追加分区编号
val rdd2: RDD[U] = rdd.mapPartitionsWithIndex(f:(Int,Iterator[T])=>Iterator[U][,preservePar:Boolean])
// 【转内存数组】:同分区的数据转为同类型的内存数组,分区不变
val rdd2: RDD[Array[T]] = rdd.glom();
// 【数据分组】:同键同组同区,同区可多组;打乱shuffle,按f:T=>K规则,分区不变,【数据可能倾斜skew】
val rdd2: RDD[(K,Iterable[T])] = rdd.groupBy(f:T=>K)
// 【数据过滤】:过滤规则 f:T=>Boolean,保留满足条件数据,分区不变,【数据可能倾斜skew】
val rdd2: RDD[T] = rdd.filter(f:T=>Boolean)
/* 【数据抽样】
	withReplacement:Boolean		是否有放回抽样
	fraction:Double				抽样率
	seed:Long					随机种子,默认为当前时间戳(一般缺省)
      若数据总理为100条
        false, 0.4 => 抽样40%的数据,40条左右
        true,  0.4 => 每条数据被抽取的概率为40%
*/
val rdd2: RDD[T] = rdd.sample(withReplacement:Boolean,fraction:Double,seed:Long)
// 【数据去重】:numPartitions:Int 设定去重后的分区数
val rdd2: RDD[T] = rdd.distinct([numPartitions:Int])(implicit order:Ording[T] = null)
/* 【数据排序】
	处理数据f:T=>K,升降序asc:Boolean,分区数numPartitions:Int
	默认排序前后分区一致,【有shuffle】,除非重新设定 numPartitions
*/
val rdd2: RDD[T] = rdd.sortBy(f:T=>K,asc:Boolean,numPartitions:Int)

/*
	多个类型 RDD[T]:纵向
		交并差操作:数据类型一致,根据元素 equals 认定是否相同
		拉链操作:要求分区数和分区内的数据量一致
*/
// 【求交集】:重载可重设分区数numPartitions:Int,或定义分区规则par:Partitioner[T]
val rdd2: RDD[T] = rdd.intersection(rdd3:RDD[T])
val rdd2: RDD[T] = rdd.intersection(rdd3:RDD[T], numPartitions:Int)
val rdd2: RDD[T] = rdd.intersection(rdd3:RDD[T], par:Partitioner[T])
// 【求并集】
val rdd2: RDD[T] = rdd.union(rdd3:RDD[T])
// 【求差集】:重载可重设分区数numPartitions:Int,或定义分区规则par:Partitioner[T]
val rdd2: RDD[T] = rdd.subtract(rdd3:RDD[T])
val rdd2: RDD[T] = rdd.subtract(rdd3:RDD[T], numPartitions:Int)
val rdd2: RDD[T] = rdd.subtract(rdd3:RDD[T], par:Partitioner[T])
// 【拉链操作】
val rdd2: RDD[(T,U)] = rdd.zip(rdd3:RDD[U])
val rdd2: RDD[(T,Long)] = rdd.zipWithIndex()
val rdd2: RDD[(T,Long)] = rdd.zipWithUniqueId()
// 有三个重载:1+1,1+2,1+3
val rdd2: RDD[V]rdd.zipPartitions(rddA:RDD[A])(f:(Iterator[T],Iterator[A])=>Iterator[V])
val rdd2: RDD[V]rdd.zipPartitions(rddA:RDD[A],preserveParitioning:Boolean)(f:(Iterator[T],Iterator[A])=>Iterator[V])

键值算子:PairRDD(K,V)

/*
	【再分区操作】
	abstract class Partitioner(){
	  // 分区总数
      def numPartitions : scala.Int
      // 针对键的值进行相关的计算等到分区号
      def getPartition(key : scala.Any) : scala.Int
    }
    // 自定义分区器
    class KVPartitioner(np:Int) extends Partitioner{
      override def numPartitions: Int = np
      override def getPartition(key: Any): Int = key.toString.length%numPartitions
    }
    // 若在分区器和现有分区器相同,则不执行分区操作
    org.apache.spark.Partitioner
    	HashPartitioner
    	
*/
val pairRdd2: RDD[(K,V)] = pairRdd.partitionBy(p:Partitioner)
// 【按键排序】:K 必须实现 Ordered 特质
val pairRdd2: RDD[(K,V)] = pairRdd.sortByKey(ascending:Boolean=true, numPartitions:Int)

// reduceByKey + foldByKey + aggregateByKey 都调用 combineByKeyClassTag
// 【按键聚合值】: combiner和reduce的值类型相同,计算规则相同
val pairRdd2:RDD[(K,V)] = pairRdd.reduceByKey(f:(V,V)=>V)
val pairRdd2:RDD[(K,V)] = pairRdd.reduceByKey(f:(V,V)=>V, numPartitions:Int)
val pairRdd2:RDD[(K,V)] = pairRdd.reduceByKey(partitioner:Partitioner, f:(V,V)=>V)
// 【按键聚合值】: combiner和reduce的值类型相同,计算规则相同,带初值
val pairRdd2:RDD[(K,V)] = pairRdd.foldByKey(initV:V)(inParOp:(V,V)=>V)
val pairRdd2:RDD[(K,V)] = pairRdd.foldByKey(initV:V,numPartitions:Int)(inParOp:(V,V)=>V)
val pairRdd2:RDD[(K,V)] = pairRdd.foldByKey(initV:V,partitioner:Partitioner)(inParOp:(V,V)=>V)
// 【按键分别执行分区内和分区间计算】: combiner和reduce的值类型可不同,计算规则可不同
val pairRdd2:RDD[(K,U)] = pairRdd.aggregateByKey(initV:U)(inParOp:(U,V)=>U,betParOp:(U,U)=>U)
val pairRdd2:RDD[(K,U)] = pairRdd.aggregateByKey(initV:U,numPartitions:Int)(inParOp:(U,V)=>U,betParOp:(U,U)=>U)
val pairRdd2:RDD[(K,U)] = pairRdd.aggregateByKey(initV:U,partitioner:Partitioner)(inParOp:(U,V)=>U,betParOp:(U,U)=>U)
// 【✔ 按键分别执行分区内和分区间计算】: combiner和reduce的值类型可不同,计算规则可不同
val pairRdd2:RDD[(K,U)] = pairRdd.combineByKey(initV:V=>U,inParOp:(U,V)=>U,betParOp:(U,U)=>U)
val pairRdd2:RDD[(K,U)] = pairRdd.combineByKey(initV:V=>U,inParOp:(U,V)=>U,betParOp:(U,U)=>U,numPartitions:Int)
val pairRdd2:RDD[(K,U)] = pairRdd.combineByKey(initV:V=>U,inParOp:(U,V)=>U,betParOp:(U,U)=>U,partitioner:Partitioner,mapSideCombine:Boolean,serializer:Serializer)
// 【按键分组】
val pairRdd2: RDD[(K, Iterable[V])] = pairRdd.groupByKey()
val pairRdd2: RDD[(K, Iterable[V])] = pairRdd.groupByKey(numPartitions:Int)
val pairRdd2: RDD[(K, Iterable[V])] = pairRdd.groupByKey(partitioner:Partitioner)

// 【多数据集分组】:1VN 同键同组,不同RDD值进入TupleN的不同Iterable
-------------------------------------------------------------------------------
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.groupWith(otherA: RDD[(K,V1)])
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1],Iterable[V2])] = pairRdd.groupWith(otherA: RDD[(K,V1)],otherB: RDD[(K,V2)])
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1],Iterable[V2],Iterable[V3])] = pairRdd.groupWith(otherA: RDD[(K,V1)],otherB: RDD[(K,V2)],otherC: RDD[(K,V3)])
-------------------------------------------------------------------------------
// 重载 1+1 1+2 1+3,追加再分区操作
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.cogroup(otherA: RDD[(K,V1)])
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.cogroup(otherA: RDD[(K,V1)],numPartitions:Int)
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.cogroup(otherA: RDD[(K,V1)],partitioner:Partitioner)
/*
	【关联操作】:1V1			Shuffle ?
		横向,根据键做关联
		重载:numPartitions:Int 或 partitioner:Partitioner
*/
val pairRdd: RDD[(K, (V, V1))] = pairRdd1.join(pairRdd3:RDD[(K,V1)])
val pairRdd: RDD[(K, (V, Option[V1]))] = pairRdd1.leftOuterJoin(pairRdd3:RDD[(K,V1)])
val pairRdd: RDD[(K, (Option[V]), V1)] = pairRdd1.rightOuterJoin(pairRdd3:RDD[(K,V1)])
val pairRdd: RDD[(K, (Option[V]), Option[V1])] = pairRdd1.fullOuterJoin(pairRdd3:RDD[(K,V1)])

行动算子:action

/* 【返回】所有元素分别在分区间和分区内执行【聚集】操作的结果
	reduce & fold 分区内和分区间执行相同操作,且类型与元素类型一致
	aggregate 分区内和分区间执行不同操作,且类型与元素类型不一致
*/
val rst:T = rdd.reduce(f:(T,T)=>T)
val rst:T = rdd.fold(init:T)(f:(T,T)=>T)
val rst:U = rdd.aggregate(init:U)(f:(U,T)=>U,f:(U,T)=>U)
// 返回包含数据集中所有元素的数组
val array:Array[T] = rdd.collect()
// 返回数据集中元素数量
val rst:Long = rdd.count()
val rst:Map[K,Long] = pairRdd.countByKey()
// 返回数据集中最大值
val rst:T = rdd.max()
// 返回数据集中最小值
val rst:T = rdd.min()
// 返回数据集中的第一个元素
val rst:T = rdd.first()
// 返回数据集中的前 num 个元素
val array:Array[T] = rdd.take(num:Int)
// 返回排序后数据集中的前 num 个元素
val array:Array[T] = rdd.takeOrdered(num:Int)(implicit ord:Ordering[T])
/* 持久化至文本文件,重载追加压缩功能
	import org.apache.hadoop.io.compress.{BZip2Codec, SnappyCodec}
	import io.airlift.compress.lzo.LzopCodec
	rdd.saveAsTextFile("out_path",classOf[BZip2Codec])
*/
rdd.saveAsTextFile(path:String)
rdd.saveAsTextFile(path:String,codec: Class[_ <: CompressionCodec])
rdd.saveAsObjectFile(path:String)
// 遍历迭代
rdd.foreach(f:T=>Unit)

练习

/*
	现有客户信息文件 customers.csv,请找出:
        客户中的前5个最大家族
        客户中的前10个最流行的名字
*/

/*
	现有客户信息文件 scores.txt,请找出:
	班级 ID 姓名 年龄 性别 科目 成绩
	需求如下:
    1. 一共有多少人参加考试?
        1.1 一共有多少个小于 20 岁的人参加考试?
        1.2 一共有多少个等于 20 岁的人参加考试?
        1.3 一共有多少个大于 20 岁的人参加考试?
    2. 一共有多个男生参加考试?
        2.1 一共有多少个女生参加考试?
    3. 12 班有多少人参加考试?
        3.1 13 班有多少人参加考试?
    4. 语文科目的平均成绩是多少?
        4.1 数学科目的平均成绩是多少?
        4.2 英语科目的平均成绩是多少?
    5. 单个人平均成绩是多少?
    6. 12 班平均成绩是多少?
        6.1 12 班男生平均总成绩是多少?
        6.2 12 班女生平均总成绩是多少?
        6.3 同理求 13 班相关成绩
    7. 全校语文成绩最高分是多少?
        7.1 12 班语文成绩最低分是多少?
        7.2 13 班数学最高成绩是多少?
    8. 总成绩大于 150 分的 12 班的女生有几个?
*/

// 样例类参与 RDD 运算不能写在 main 中,否则报错:序列化异常
case class Score(classId:Int, name:String, age:Int, gender:String, subject:String, score:Int ) extends Serializable {
    def claSub = s"$classId,$subject"
}

val regex:Regex = "(\\d+)\\s+(.*?)\\s+(.*?)\\s+(.*?)\\s+(.*?)\\s+(.*?)".r
implicit def strToScore(line:String)={
    line match {
        case regex(classId,name,age,gender,subject,score)
        =>Score(classId.toInt,name,age.toInt,gender,subject,score.toInt)
    }
}

val scores: RDD[Score] = sc.textFile("hdfs://single:9000/spark/cha01/scores.txt", 4)
.mapPartitionsWithIndex(
    (ix,it) => {
        if(ix==0){
            it.drop(1)
        }
        it.map(line=>{
            val score:Score = line
            score
        })
    }
).cache()

val num20s: RDD[(String, Int)] = scores.mapPartitions(
    _.map(score => (
    	if (score.age < 20) "SCORE_LT_20" 
        else if (score.age == 20) "SCORE_EQ_20" 
        else "SCORE_GT_20", 1)))
	.reduceByKey(_ + _)

val numClass: RDD[(Int, Int)] = scores
	.mapPartitions(_.map(score => (score.classId, 1)))
	.reduceByKey(_ + _)

val numGender: RDD[(String, Int)] = scores
	.mapPartitions(_.map(score => (score.gender, 1)))
	.reduceByKey(_ + _)

val avgScoreBySubject: RDD[(String, Float)] = scores
	.mapPartitions(_.map(score => (score.subject, score.score)))
	.groupByKey()
	.mapPartitions(_.map(t => (t._1, t._2.sum * 1.0f / t._2.size)))

val avgScoreByName: RDD[(String, Float)] = scores
	.mapPartitions(_.map(score => (score.name, score.score)))
	.groupByKey()
	.mapPartitions(_.map(t => (t._1, t._2.sum * 1.0f / t._2.size)))

val avgScoreByClassGender: RDD[((Int, String), Float)] = scores
	.mapPartitions(_.map(score => ((score.classId, score.gender), score.score)))
	.groupByKey()
	.mapPartitions(_.map(t => (t._1, t._2.sum * 1.0f / t._2.size)))

val maxChinese: Int = scores
	.filter(_.subject.equals("chinese"))
	.map(_.score)
	.max()
val min12Chinese: Int= scores
	.filter(_.claSub.equals("12,chinese"))
	.map(_.score)
	.min()
val max13Math: Int = scores
	.filter(_.claSub.equals("13,math"))
	.map(_.score)
	.max()

val numSumScore12Gt150: Long = scores
    .filter(score => score.classId == 12 && score.gender.equals("女"))
    .mapPartitions(_.map(score => (score.name, score.score)))
    .reduceByKey(_+_)
    .filter(_._2 > 150)
    .count()

优化:optimize

org.apache.spark.util.Utils
	
/*
	shuffle性能较差:因为shuffle必须落盘,内存中等数据会OOM
	groupByKey只分组(存在Shuffle) + reduce只聚合
		<=结果同,性能不同=>
	reduceByKey先分组、预聚合、再聚合(存在Shuffle) ✔
*/

/*
【设置日志管理】
	日志级别:INFO|DEGUG|WARN|ERROR|FATAL
*/
sc.setLogLevel(logLevel:String)

/*
【设置检查点:容错,恢复】
*/
sc.setCheckpointDir(path:String)

/*
【RDD重用:检查点、缓存与持久化】
	cache      临时存储于【内存】重用,job结束后自动删除 ✔
		<=> persist(StorageLevel.MEMORY_ONLY)
	persisit   临时存储于【磁盘】重用,job结束后自动删除,涉及IO性能较差
		StorageLevel.MEMORY_ONLY
        StorageLevel.DISK_ONLY
        StorageLevel.OFF_HEAP
        StorageLevel.MEMORY_AND_DISK
        StorageLevel.MEMORY_AND_DISK_SER
        StorageLevel.MEMORY_AND_DISK_SER_2
	checkpoint 长久存储于【磁盘】重用,job结束后不会删除,涉及IO性能较差,安全且一般和cache组合使用
*/
val rddCache: RDD[T] = rdd.cache()
val rddCache: RDD[T] = rdd.persist(level:StorageLevel)
rdd.checkpoint()

/*
	广播变量:broadcast:【如下图】
		将数据集或配置广播到每个Executor以readonly方式存在,不会在Task之间传输
		若不使用广播变量,则将会为每个Task发送一份数据
*/
val bc:BroadCast[T] = sc.broadcast(value:T)
rdd.mapPartitions(itPar=>{
    val v:T = bc.value
    ...
})

/*
	累加器:accumulate:只能 add 操作,常用于计数
		1、定义在Driver端的一个变量,Cluster中每一个Task都会有一份Copy
		2、所有的Task都计算完成后,将所有Task中的Copy合并到驱动程序中的变量中
	非累加器:在所有Task中的都会是独立Copy,不会有合并
	自定义累加器:写一个类继承 AccumulatorV2[IN, OUT]
		abstract class AccumulatorV2[IN, OUT] extends Serializable {
          // Returns if this accumulator is zero value or not
          def isZero: Boolean

          //  Creates a new copy of this accumulator, which is zero value
          def copyAndReset(): AccumulatorV2[IN, OUT] = {...}

          // Creates a new copy of this accumulator.
          def copy(): AccumulatorV2[IN, OUT]

          // Resets this accumulator, which is zero value.
          def reset(): Unit

          // 添加:Takes the inputs and accumulates.
          def add(v: IN): Unit

          // 合并:Merges another same-type accumulator and update its state.
          def merge(other: AccumulatorV2[IN, OUT]): Unit

          // 值列表:Defines the current value of this accumulator
          def value: OUT
		}
*/
val accLong: LongAccumulator = sc.longAccumulator("longAcc")
val accDouble: DoubleAccumulator = sc.doubleAccumulator("doubleAcc")
rdd.mapPartitions(itPar=>{
    ...
    accLong.add(v:Long)
    accDouble.add(v:Double)
    ...
})
accXxx.reset()
val isZero:Boolean = accXxx.isZero
val num:Long|Double = accXxx.value|sum|count|avg

/*
【分区控制】
	【缩减分区节省资源】 或 【扩大分区提高并行度】
	 coalesce(numPartitions:Int, shuffle:Boolean):
		缩小分区
    		存在过多的小任务的时候收缩合并分区,减少分区的个数,减少任务调度成本
    		默认情况下,不会对数据重组,比如:3个合成2个,采用 {1+2},{3},容易导致数据倾斜
    		若需数据均衡,则将 shuffle 参数设置为 true 即可
    	扩大分区
    		若需要扩大分区,shuffle 参数必须设置为 true
    		若将2个分区拆分成3个,必须打乱重新分区,否则数据还是在两个分区,{1},{2},{空}
    		repartition(numPartitions:Int) <=> coalesce(numPartitions,true) 
*/
val rdd: RDD[String] = rdd.coalesce(numPartitions:Int, shuffle:Boolean)
val rdd: RDD[String] = rdd.repartition(numPartitions:Int) // ✔

阶段划分 DAG

/*
	【为什么要划分阶段】
		1、基于数据的分区,本着传递计算的性能远高于传递数据,所以数据本地化是提升性能的重要途径之一
		2、一组串行的算子,无需 Shuffle,基于数据本地化可以作为一个独立的阶段连续执行
		3、经过一组串行算子计算,遇到 Shuffle 操作,默认情况下 Shuffle 不会改变分区数量,
			但会因为 numPartitions:Int, partitioner:Partitioner 等参数重新分配,
			过程数据会【写盘供子RDD拉取(类MapReduce)】
*/


/*
    Driver程序提交后
    1、Spark调度器将所有的RDD看成是一个Stage
    2、然后对此Stage进行逆向回溯,遇到Shuffle就断开,形成一个新的Stage
    3、遇到窄依赖,则归并到同一个Stage(TaskSet)
    4、等到所有的步骤回溯完成,便生成一个DAG图
    
    RDD依赖关系
    	Lineage:血统、遗传
            RDD最重要的特性之一,保存了RDD的依赖关系
            RDD实现了基于Lineage的容错机制
    	依赖关系 org.apache.spark.Dependency
            窄依赖 NarrowDependency
            	1V1 OneToOneDependency
            	1VN RangeDependency
            宽依赖 ShuffleDependency
        当RDD分区丢失时
            Spark会对数据进行重新计算,对于窄依赖只需重新计算一次子RDD的父RDD分区
            若配合持久化更佳:cache,persist,checkpoint

 

/*
  【计算任务】
    DAGScheduler: Submitting 4 missing tasks from ShuffleMapStage 0
        MapPartitionsRDD[3] at 【flatMap】 at SparkTest.scala:33
        Adding task set 0.0 with 4 tasks
            Starting|Running|Finished task 0.0 1.0 2.0 3.0
    DAGScheduler: Submitting 4 missing tasks from ResultStage 1 
        MapPartitionsRDD[7] at 【sortBy】 at SparkTest.scala:36
        Adding task set 1.0 with 4 tasks
            Starting|Running|Finished task 0.0 1.0 2.0 3.0
    DAGScheduler: Submitting 4 missing tasks from ShuffleMapStage 3 
        MapPartitionsRDD[5] at 【sortBy】 at SparkTest.scala:36
        Adding task set 3.0 with 4 tasks
            Starting|Running|Finished task 0.0 1.0 2.0 3.0
    DAGScheduler: Submitting 4 missing tasks from ResultStage 4
        MapPartitionsRDD[10] at 【saveAsTextFile】 at SparkTest.scala:37
        Adding task set 4.0 with 4 tasks
*/

val path = "data/wordcount.txt"
sc.textFile(path, 4)
.mapPartitions(
    _
    .map(
    	_
        	.split("[^a-zA-Z]+")
            .map((_, 1))
            .groupBy(_._1)
            .map(t2 => (t2._1, t2._2.length))
     )
)
.flatMap(a => a.map(t => t))
.reduceByKey(_+_)
.sortBy(_._2,false)
.saveAsTextFile("data/test_out8")

算子宽窄依赖划分

// 窄依赖 
rdd.dependencies
		map
		flatMap
		mapPartitions
		mapPartitionsWithIndex
		glom
		filter
		distinct
		intersection
		sample
		union
		subtract
		zip...
		cogroup
// 宽依赖
	ShuffledRDD extends RDD
		sortBy
		sortByKey
		partitionBy
		repartition

// 不一定
/*
	reduceByKey(【partitioner: Partitioner】, func: (V, V) => V)
		若使用的是带 partitioner 的重载且 Partitioner 和父RDD的 Partitioner一致
		则为窄依赖RDD,否则为宽依赖ShuffledRDD
*/
coalesce(nump: Int, shuffle: Boolean = false, pc:partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null)
join[W](other: RDD[(K, W)], partitioner: Partitioner)
groupBy[K](f: T => K, p: Partitioner)
reduceByKey(partitioner: Partitioner, func: (V, V) => V)
foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V)
aggregateByKey[U](z: U, p: Partitioner)(seqOp: (U, V) => U,combOp: (U, U) => U)
combineByKey[C](c: V => C,merge: (C, V) => C,mergeCombine: (C, C) => C,partitioner: Partitioner,mapsizeCombine: Boolean = true,serializer: Serializer = null)
	=> combineByKeyWithClassTag(
        createCombiner: V => C,
      	mergeValue: (C, V) => C,
      	mergeCombiners: (C, C) => C,
      	partitioner: Partitioner,
      	mapSideCombine: Boolean = true,
      	serializer: Serializer = null
    ) => 
    if (self.partitioner == Some(partitioner)) {
        self.mapPartitions(iter => {...}, preservesPartitioning = true)
    } else {
        new ShuffledRDD[K, V, C](self, partitioner)...
    }

任务提交

# 默认路径为 HDFS
spark-submit \
--class cha05.SparkTest \
--master local[*] \
/root/spark/scala-1.0.jar \
file:root/spark/story.txt \
file:root/spark/wc_story01

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1627786.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity对应的c#版本

本文主要是记录一下unity已经开始兼容c#的版本和.net版本&#xff0c;以便更好的利用c#的特性。 c#和.net对应情况 微软已经将.net开发到.net 9了&#xff0c;但是unity的迭代速度远没有c#迭代速度快&#xff0c;已知unity最新的LTS版本unity2023已经兼容了c#9 可以在unity手册…

汽车底盘域的学习笔记

前言&#xff1a;底盘域分为传统车型底盘域和新能源车型底盘域&#xff08;新能源系统又可以分为纯电和混动车型&#xff0c;有时间可以再研究一下&#xff09; 1&#xff1a;传统车型底盘域 细分的话可以分为四个子系统 传动系统 行驶系统 转向系统 制动系统 1.1传动系…

中电金信:向“新”而行——探索融合架构的项目管理在保险行业的应用

近年来&#xff0c;险企在政策推动、市场牵引、自身发展、新技术应用日趋成熟等内外部因素的驱动下&#xff0c;积极投身到数字化转型的浪潮中。在拜访各类保险客户和合作项目的过程中&#xff0c;我们发现不少险企在数字化转型中或多或少都面临着战略如何落地、技术如何承接和…

Java 基础常见面试题整理

目录 1、java的基本数据类型有哪些&#xff1f;2、java为什么要有包装类型&#xff1f;3、String a "123" 和 String a new String("123") 区别&#xff1f;4、String、StringBuilder和StringBuffer的区别&#xff1f;5、如何理解面向对象和面向过程&…

第72天:漏洞发现-Web框架中间件联动GobyAfrogXrayAwvsVulmap

案例一&#xff1a;某 APP-Web 扫描-常规&联动-Burp&Awvs&Xray Acunetix 一款商业的 Web 漏洞扫描程序&#xff0c;它可以检查 Web 应用程序中的漏洞&#xff0c;如 SQL 注入、跨站脚本攻击、身份验证页上的弱口令长度等。它拥有一个操作方便的图形用户界 面&#…

C++系列-输入输出

&#x1f308;个人主页&#xff1a;羽晨同学 &#x1f4ab;个人格言:“成为自己未来的主人~” C输入和输出 我们都知道C语言的输出是用printf函数来实现的&#xff0c;那么C呢&#xff0c;它的实现逻辑是什么呢&#xff0c;让我们一起来看一下&#xff0c; #include<i…

miniTry:Python实现web搜索(全自动+程序操控)

声明&#xff1a;本问给出了全部代码--可以复现--亲测有效 :) [ 代码为图片--> 强制自己去敲一次 又不多] 1.打开网站&#xff1a; 2.利用id去定位到我们要进行输入的内容&#xff08;bing可以直接进行搜索&#xff0c;而csdn需要登录&#xff0c;所以我们用csdn做演示&…

【论文速读】|理解基于大语言模型的模糊测试驱动程序生成

本次分享论文&#xff1a;Understanding Large Language Model Based Fuzz Driver Generation 基本信息 原文作者&#xff1a;Cen Zhang, Mingqiang Bai, Yaowen Zheng, Yeting Li, Xiaofei Xie, Yuekang Li, Wei Ma, Limin Sun, Yang Liu 作者单位&#xff1a;南洋理工大学…

elasticsearch 常用语法汇总

文章目录 前言elasticsearch 常用语法汇总1. 创建索引2. 检索索引信息3. 删除索引4. 文档操作4.1. 对blog_new索引指定文档ID新增4.2. 对blog_new索引不指定文档ID新增&#xff0c;随机文档ID:4.3. 获取文档4.4. 更新文档4.5. 删除文档 5. 查询5.1. 匹配查询5.2. 范围查询5.3. …

掌握TypeScript,成为前端高手(AI写作一键生成免费)

首先&#xff0c;这篇文章是基于笔尖AI写作进行文章创作的&#xff0c;喜欢的宝子&#xff0c;也可以去体验下&#xff0c;解放双手&#xff0c;上班直接摸鱼~ 按照惯例&#xff0c;先介绍下这款笔尖AI写作&#xff0c;宝子也可以直接下滑跳过看正文~ 笔尖Ai写作&#xff1a;…

免费获取!遗传算法+多目标规划算法+自适应神经模糊系统程序代码!

前言 遗传算法&#xff08;Genetic Algorithm&#xff0c;GA&#xff09;最早是由美国的 John holland于20世纪70年代提出&#xff0c;该算法是模拟达尔文生物进化论的自然选择和遗传学机理的生物进化过程的计算模型&#xff0c;通过数学的方式&#xff0c;将问题的求解过程转…

pyqt QSplitter控件

pyqt QSplitter控件 QSplitter控件效果代码 QSplitter控件 PyQt中的QSplitter控件是一个强大的布局管理器&#xff0c;它允许用户通过拖动边界来动态调整子控件的大小。这个控件对于创建灵活的、用户可定制的用户界面非常有用。 QSplitter控件可以水平或垂直地分割其包含的子…

靠生成式人工智能赚钱? 扎克伯格:再等几年吧

不用多说&#xff0c;AI人工智能就是2024年最热的技术&#xff0c;企业也希望通过AI技术大赚特赚。不过Meta CEO扎克伯格在公司2024年第一财季业绩会议上表示&#xff0c;从生成式人工智能中获利还需要几年时间。 R-C (1).jpg© 由 ITheat热点科技 提供 AI人工智能技术很多…

【数据结构与算法】:手搓顺序表(Python篇)

文章目录 一、顺序表的概念二、顺序表的实现1. 顺序表的创建1.1 扩容1.2 整体建立顺序表 2. 顺序表的基本运算算法2.1 顺序表的添加&#xff08;尾插&#xff09;2.2 指定位置插入2.3 指定位置删除2.4 顺序表的查找2.5 顺序表元素的索引访问2.6 顺序表元素的修改2.7 顺序表长度…

Java毕业设计 基于SpringBoot vue城镇保障性住房管理系统

Java毕业设计 基于SpringBoot vue城镇保障性住房管理系统 SpringBoot 城镇保障性住房管理系统 功能介绍 首页 图片轮播 房源信息 房源详情 申请房源 公示信息 公示详情 登录注册 个人中心 留言反馈 后台管理 登录 个人中心 修改密码 个人信息 用户管理 房屋类型 房源信息管理…

微信小程序:5.数据绑定

在Data中定义数据早wxml中进行数据使用 在data中定义数据 在页面对应的js对象中找到data&#xff0c;然后把数据进行定义即可 Page({data: {motto: Hello World,userInfo: {avatarUrl: defaultAvatarUrl,nickName: ,},hasUserInfo: false,canIUseGetUserProfile: wx.canIUse…

药房管理 T1072

#include<bits/stdc.h> using namespace std; int main(){int m,n;cin>>m>>n;int f[n];for(int i0;i<n;i)cin>>f[i];int count0;for(int i0;i<n;i){if(m>f[i]){mm-f[i];}else {count;}}cout<<count;return 0;}

【Redis 开发】缓存雪崩和缓存击穿

缓存问题 缓存雪崩解决方案 缓存击穿互斥锁逻辑时间基于互斥锁解决缓存击穿问题基于逻辑过期方式解决缓存击穿问题 缓存雪崩 缓存雪崩是指在同一时间段&#xff0c;大量的缓存key同时失效或者Redis服务器宕机&#xff0c;导致大量请求到达数据库&#xff0c;带来巨大压力 解决…

node.js egg.js

Egg 是 Node.js 社区广泛使用的框架&#xff0c;简洁且扩展性强&#xff0c;按照固定约定进行开发&#xff0c;低协作成本。 在Egg.js框架中&#xff0c;ctx 是一个非常核心且常用的对象&#xff0c;全称为 Context&#xff0c;它代表了当前 HTTP 请求的上下文。ctx 对象封装了…

【JavaEE网络】 TCP的可靠传输机制总结

目录 可靠传输实现机制确认应答超时重传连接管理滑动窗口流量控制拥塞控制延迟应答捎带应答 可靠传输实现机制 确认应答 这是保证可靠性的最核心机制 TCP将每个字节的数据都进行了编号。即为序列号。 这是为了防止连续发多条数据的时候&#xff0c;可能出现“后发先至”的情…