连接命令:
pyspark --master spark://node1:7077
一个application 大任务可以分解成 多个小任务 jobs, 一个job又可以分解成多个 stages 阶段,
一个stage又可以分解成 多个tasks(可以认为是两个线程)
standalone Zookeeper 高可用HA,集群中存在多个master,但最先注册的成为active,其它是standby,集群包含Worker、Driver、Application
Spark on Yarn :
master由resource manager 担任
work由node manager担任
Driver在yarn容器的内部或客户端线程中,executors也在yarn容器内,sparkContext入口环境对象是有driver构建的,
spark on yarn的两种部署模式:cluster模式【driver在yarn容器内,通讯耗损低,运行效率高】,client模式【driver在spark-submit提交的节点上,日志在客户端,调试方便】
需要:
spark客户端工具:如 spark-submit
启动命令: pyspark --master yarn
------------------------------------------------------------------------------
sparksql on hive
1.启动sparkthriftserver
./start-thriftserver.sh --hiveconf hive.server2.thrift.port=10000 --master yarn --driver-class-path /data/spark-2.2.0-bin-hadoop2.7/jars/mysql-connector-java-5.1.43-bin.jar --executor-memory 5g --total-executor-cores 5
启动sparkthriftserver后,后台默认会执行spark-sql命令,实际上是用spark-submit向yarn提交一个任务。这样就会在yarn的8088页面任务栏中起一个常驻任务,用来执行spark sql。
2、连接spark
./beeline -u jdbc:hive2://172.168.108.6:10001 -n root
3、这里的sql可以在8088页面看到执行过程
----------------------------------------------------------------------------
wordcount pyspark 实现:
result = sc.textFile("hdfs://text.txt").flatMap(lambda line : line.split(" ")).map(lambda x :(x,1)).reduceByKey(lambda x,y : a+b).collect()
print(result) --包括前面的collect()是Driver运行的
Python on Spark 执行原理
RDD 弹性-分区可以增删和硬、内存
特性:
1.分区 --物理实体 分区组合成-》逻辑对象RDD
2.计算方法作用在每个分区上
3.一系列依赖关系-依赖链条-RDD血缘关系
4.K-v型的RDD可以有分区器(涉及shuffle)-默认分区-hash分区规则
5.分区规划尽量靠近数据所在的服务器
SparkContext 是操作RDD的上下文
SparkSession 是升级版,可以控制DF和DS
sc.parallelize() 创建RDD,本地转向分布式
glom的作用是将同一个分区里的元素合并到一个array里,显示元素所在的分区
map是一个输入元素对应一个输出元素
FlatMap是一个输入元素对应多个输出元素
wholetextfile 小文件读取 API 优化
算子 就是 作用于 分布式对象上的方法
转换算子:返回值是RDD, lazy 加载 执行计划
action算子:指令,返回值根据RDD中元素的类型决定
res1=rdd.count() print(res1)
collect()使用时需考虑Driver内存的大小
fold带有初始值的聚合,分区内和分区间都会基于初始值进行聚合 fold(10,lamdba x,y:x+y)
故分区内聚合的结果进行分区间聚合是以初始值为基础的
first take
top(N) 对RDD元素进行降序排序,取最大的前N个,对象内置的比较函数
takeSample(true【允许重复采样,不是内容的重复】,抽样个数,随机数种子) collect可能把Driver撑爆,使用其代替
takeOrdered(数据的个数,对排序的数据进行更改) 正反向排序
foreach()执行我提供的逻辑(map),但没有返回值
saveAsTextFile 写出数据,本地或HDFS ,有多少个分区,结果就有多少个文件,与Driver无关,看存放的数据的executors直接写出。
mapPartitions一次操作一个分区,意味着读取的次数是分区数,明显比map按元素数读取次数要少,网络IO小,分区间
foreachPartition
partitionBy 自定义分区操作(重新分区后有几个分区,自定义分区规则,函数)
repartition(N)=coalesce(N,shuffle=true) 重新分区,只修改分区数
无状态可并行
有状态串行
广播变量:序列化+网络传输
一个executor(进程)可以托管多个分区(线程)
当一个变量在一个executor接受过后,后面其它分区要就共享
使用:
b=sc.broadcast(变量)
v = b.value #取出来