目录
0. 相关文章链接
1. 闭包检查
2. 序列化方法和属性
3. Kryo 序列化框架
4. 核心点总结
0. 相关文章链接
Spark文章汇总
1. 闭包检查
从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor 端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。Scala2.12 版本后闭包编译方式发生了改变。
2. 序列化方法和属性
从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
* @ date: 2023/7/4
* @ author: yangshibiao
* @ desc: 项目描述
*/
object ModelTest {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("ModelTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.创建一个RDD
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "12345"))
//3.1创建一个Search对象
val search: Search = new Search("hello")
//3.2 函数传递,打印:ERROR Task not serializable
search.getMatch1(rdd).collect().foreach(println)
//3.3 属性传递,运行正常
// search.getMatch2(rdd).collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
class Search(query: String) {
def isMatch(s: String): Boolean = {
s.contains(query)
}
// 函数序列化案例
def getMatch1(rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
// 属性序列化案例
def getMatch2(rdd: RDD[String]): RDD[String] = {
val q: String = query
rdd.filter((x: String) => x.contains(q))
}
}
运行第一个方法进行函数传递时,抛出 ERROR Task not serializable,如下图所示:
运行第二个方法进行属性传递时,运行正常,打印出正常结果,如下图所示:
3. Kryo 序列化框架
参考地址: https://github.com/EsotericSoftware/kryo
Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制。Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。
注意:即使使用 Kryo 序列化,也要继承 Serializable 接口。
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
* @ date: 2023/7/4
* @ author: yangshibiao
* @ desc: 项目描述
*/
object ModelTest {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf()
.setAppName("ModelTest")
.setMaster("local[*]")
// 替换默认的序列化机制
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册需要使用 kryo 序列化的自定义类
.registerKryoClasses(Array(classOf[Search]))
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.创建一个RDD
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "12345"))
//3.1创建一个Search对象
val search: Search = new Search("hello")
//3.2 函数传递
search.getMatch1(rdd).collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
class Search(query: String) extends Serializable {
def isMatch(s: String): Boolean = {
s.contains(query)
}
// 函数序列化案例
def getMatch1(rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
}
运行成功,如下图所示:
4. 核心点总结
- 在Spark中,如果有类进行序列化,该类需要继承Serializable,如下图所示:
- 在Spark中,Serializable比较重,所以可以使用更优的Kryo框架,但是注意的是即使使用 Kryo 序列化,也要继承 Serializable 接口(如果Spark包中的类均已注册,但如果是自定义的类,还需要手动注册),如下图示所示:
注:其他Spark相关系列文章链接由此进 -> Spark文章汇总