国内源下载
https://mirrors.cloud.tencent.com/apache/spark/
环境配置(三台机器都要配置)
修改/etc/profile
export JAVA_HOME=/export/server/jdk
export HADOOP_HOME=/export/server/hadoop
export SPARK_HOME=/export/server/spark
export PYSPARK_PYTHON=/pythonenv/pyspark/bin/python
export HADOOP_CONF_DIR=$HADDOP_HOME/etc/hadoop
修改~/.bashrc
export JAVA_HOME=/export/server/jdk
export PYSPARK_PYTHON=/pythonenv/pyspark/bin/python
修改spark-env.sh
JAVA_HOME=/export/server/jdk
## HADOOP软件配置文件目录,读取HDFS上文件和运行YARN集群
HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
YARN_CONF_DIR=/export/server/hadoop/etc/hadoop
#配置worker的python环境,否则他会用系统自带的
export PYSPARK_PYTHON=/pythonenv/pyspark/bin/python
## 指定spark老大Master的IP和提交任务的通信端口
# 告知Spark的master运行在哪个机器上
export SPARK_MASTER_HOST=node1
# 告知sparkmaster的通讯端口
export SPARK_MASTER_PORT=7077
# 告知spark master的 webui端口
SPARK_MASTER_WEBUI_PORT=8080
# worker cpu可用核数
SPARK_WORKER_CORES=1
# worker可用内存
SPARK_WORKER_MEMORY=1g
# worker的工作通讯地址
SPARK_WORKER_PORT=7078
# worker的 webui地址
SPARK_WORKER_WEBUI_PORT=8081
## 设置历史服务器
# 配置的意思是 将spark程序运行的历史日志 存到hdfs的/sparklog文件夹中
SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node1:8020/sparklog/ -Dspark.history.fs.cleaner.enabled=true"
启动sparkonyarn
/export/server/spark/bin/pyspark --master yarn --deploy -mode client|culster
使用spark-submit提交py文件到yarn
#提交到yarn
/export/server/spark/bin/spark-submit --master yarn /sparkproject/00_example/helloword.py
#提交到本地运行
/export/server/spark/bin/spark-submit --master local[*] /sparkproject/00_example/helloword.py
RDD的五大特性
- 分区性,rdd是可以增加缩减分区的
- 通用性,每个rdd方法都会作用于每个分区
- 血缘性,rdd1,rdd2…每个rdd是链式依赖
- key,value数据的分区性
- driver就近构建,driver的构建会尽量贴近数据,从而提高性能.
RDD的创建
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
# 1.通过sparkcof创建conf对象
conf = SparkConf().setAppName('wordcount')
#2.生成sc对象
sc=SparkContext(conf=conf)
#读取一个文件
word_file = sc.textFile('hdfs://node1:9001/input/words.txt')
word_add = word_file.flatMap(lambda line:line.split(' '))
word_with_one_rdd = word_add.map(lambda x:(x,1))
result_rdd = word_with_one_rdd.reduceByKey(lambda a,b:a+b)
print(result_rdd.collect())
wholeTextFiles 处理一个文件夹内包含多个小文件
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
# 1.通过sparkcof创建conf对象
conf = SparkConf().setAppName('wordcount')
#2.生成sc对象
sc=SparkContext(conf=conf)
#读取一个文件夹,直接collect()会返回文件位置:文件内容的元祖形式,可通过map获取.
rdd = sc.wholeTextFile('hdfs://node1:9001/input')
print(rdd.map(lambda x:x[1]).collect())
rdd的算子
转换算子:只要返回结果是rdd的就是转换算子,是懒加载,只有执行执行算子的时候才会处理
执行算子: 返回的不是rdd就是执行算子
map算子
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
# 1.通过sparkcof创建conf对象
conf = SparkConf().setAppName('wordcount').setMaster('local[*]')
#2.生成sc对象
sc=SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6])
def math_10(data):
return data*10
print(rdd.map(math_10).collect())
print(rdd.map(lambda x:x*10).collect())
flatmap用法与map相同,限制性map算子,然后在接触数据嵌套
[(1,2,3),(4,5,6),(7,8,9)] ===>> [1,2,3,4,5,6,7,8,9]
reduceByKey
rdd = sc.parallelize([('a',1),('b',1),('b',2),('a',2),('a',1),('a',1)])
print(rdd.reduceByKey(lambda a,b:a+b).collect())
#[('a', 5), ('b', 3)]
mapValues
rdd = sc.parallelize([('a',1),('b',1),('b',2),('a',2),('a',1),('a',1)])
print(rdd.mapValues(lambda values:values*10).collect())
#[('a', 10), ('b', 10), ('b', 20), ('a', 20), ('a', 10), ('a', 10)]
groupBy
rdd = sc.parallelize([('a',1),('b',1),('b',2),('a',2),('a',1),('a',1)])
print(rdd.groupBy(lambda t:t[0]).collect())
#[('a', <pyspark.resultiterable.ResultIterable object at 0x7faa2b811370>), ('b', <pyspark.resultiterable.ResultIterable object at 0x7faa2b8113a0>)]
result = rdd.groupBy(lambda x:x[0])
print(result.map(lambda t:(t[0],list(t[1]))).collect())
# [('a', [('a', 1), ('a', 2), ('a', 1), ('a', 1)]), ('b', [('b', 1), ('b', 2)])]
filter ====>rdd.filter(func) 传入参数返回值时bool类型,为true的留下,为false的过滤
rdd = sc.parallelize([1,2,3,4,5,6,7,8])
# print(rdd.groupBy(lambda t:t[0]).collect())
print(rdd.filter(lambda x:x%2==1).collect())
# [1, 3, 5, 7]
groupByKey和reduceByKey的区别?
1.groupByKey只进行了分组后可以自定义聚合函数,reduceByKey内置聚合分组聚合.
2.是reduceByKey会在分组前在每个分区先进行聚合,被shuffle的数据可以极大地减少,然后在执行分组操作,然后在执行聚合.相较于groupByKey来讲:大量节省了磁盘的io操作,在数据量较大的情况下,优先使用reduceByKey.
mappartitions和foreachpartitions的区别?
1.相同点:他们两个都是对一整个分区的数据进行处理的
2.不同点,mappartitions是转换算子返回的是rdd,foreachpartitions是执行算子,由executor执行,返回值为none.
coalesce修改分区数量
两个参数第一个参数是要修改的数量值,第二个参数shuffle=true. 建议只减少分区,不增加分区,增加分区会产生shuffle.
rdd数据是过程数据:即每生成一个新的rdd,老的rdd就会被清理
如果数据再生成rdd3时还想使用rdd1,这时候就可以使用rdd的缓存机制,缓存机制是分散存储.
1.rdd.cache()
2.rdd.persist()
3.rdd.unpersist() 清除缓存
rdd缓存和CheckPoint的区别