目录
1. 前言
2. 分发驱动中scala集合中的数据
2.1 parallelize
2.2 makeRDD
2.3 range
3. 分发外部存储系统中的数据
3.1 textFile
3.2 wholeTextFiles
1. 前言
众所周知,spark是一种计算引擎(用来计算数据),但是数据从何而来呢?
spark获取数据主要有两种方式:
方式1:
分发驱动程序中scala集合中的数据
方式2:
分发外部存储系统中的数据(HDFS、HBase、其他共享文件系统)
spark将读来的数据,分发到了哪里去?
spark是一个分布式计算引擎,spark会将读取来的数据
按照指定的并行度,分发到不同的计算节点上去
2. 分发驱动中scala集合中的数据
spark提供了两个方法,用来将本地集合的数据(客户端JVM)切分成若干份
然后再分发到不同的计算节点中去
主要有两个参数:
seq: Seq[T] 本地集合
numSlices: Int 切片数(可选参数,不指定时使用默认切片数)
2.1 parallelize
test("parallelize") {
// 初始化 spark配置实例
val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
// 初始化 spark环境对象
val sc: SparkContext = new SparkContext(sparkconf)
val list = List("java", "scala", "c++", "c#")
// 指定切片数
val rdd1: RDD[String] = sc.parallelize(list, 2)
// 使用默认切片
val rdd2: RDD[String] = sc.parallelize(list)
sc.stop()
}
2.2 makeRDD
test("makeRDD") {
/*
* TODO : 源码中 makeRDD 调用的还是 parallelize方法
*
* */
// 初始化 spark配置实例
val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
// 初始化 spark环境对象
val sc: SparkContext = new SparkContext(sparkconf)
val list = List("java", "scala", "c++", "c#")
// 指定切片数
val rdd1: RDD[String] = sc.makeRDD(list, 2)
// 使用默认切片
val rdd2: RDD[String] = sc.makeRDD(list)
sc.stop()
}
2.3 range
def range(start: Long, end: Long,step: Long = 1,numSlices: Int = defaultParallelism): RDD[Long]
功能:
创建一个Long类型的RDD,元素内容为 start到end,公差为step 的等差数列
参数:
start: Long, 起始位置
end: Long, 结束位置,不包括该位置
step: Long = 1, 数列公差,默认为1
numSlices: Int = defaultParallelism 切片数,不指定时使用默认切片数
使用场景:
常用来造数据使用
test("range") {
// 初始化 spark配置实例
val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
// 初始化 spark环境对象
val sc: SparkContext = new SparkContext(sparkconf)
val rdd: RDD[Long] = sc.range(0, 10)
sc.stop()
}
3. 分发外部存储系统中的数据
spark提供了两个方法,用于将外部文件数据切片后,再分发到不同的计算节点上去
主要有两个参数:
path: String 指定文件系统URL
minPartitions: Int 指定切片数(不指定时,使用默认切片数)
使用限制:
对文件系统的要求:
读取的文件系统必须是 HDFS、本地文件系统、任何hadoop支持的文件系统
对读取文件的要求:
文件格式必须是 text格式且UTF-8
对url的要求:
支持单个文件 /my/directory/1.txt
支持多个文件 /my/directory/*.txt
支持目录 /my/directory (目录下的必须都是文件,不能有目录存在)
java.io.IOException: Path: /dir/dir2 is a directory, which is not supported by the record
只能读取目录下的文件,不会对子目录进行遍历读取
支持gz格式的压缩文件 /my/directory/*.txt
3.1 textFile
返回 RDD[String] 格式的rdd,每个元素内容为 读取到text文件的每行,rdd的长度为所有文件的行数
test("textFile") {
// 初始化 spark配置实例
val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
// 初始化 spark环境对象
val sc: SparkContext = new SparkContext(sparkconf)
// 读取目录下的所有文件
val rdd: RDD[String] = sc.textFile("src/main/resources/data/dir")
// 读取gz格式的压缩文件
//val rdd: RDD[String] = sc.textFile("src/main/resources/data/dir/1.txt.gz")
rdd.foreach(println(_))
sc.stop()
}
3.2 wholeTextFiles
返回 RDD[(String, String)] 格式的rdd,每个元素内容为 (文件名称,文件内容),rdd的长度为读取到的文件个数
test("wholeTextFiles") {
// 初始化 spark配置实例
val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
// 初始化 spark环境对象
val sc: SparkContext = new SparkContext(sparkconf)
/*
* TODO data/dir目录下 虽然存在目录不会报错,但是读取时会过滤掉目录,并不会递归读取子目录
* */
val rdd: RDD[(String, String)] = sc.wholeTextFiles("src/main/resources/data/dir")
rdd.foreach(e => println(s"fileName:${e._1} data:${e._2}"))
sc.stop()
}