背景:
小批量数据可以使用pandas 进行分析,方便灵活。但大批量(千万级别)数据,使用pandas分析处理,速度很慢,且需一次性读取全部数据,内存可能溢出。
此时使用spark分布式分析处理速度很快,且数据分区,再配上jupyter 在线分析工具界面,可以很方便进行交互式大数据集分析。
注意:
python第三方库pyspark,和spark自带组件pyspark,都提供了与Spark 集群交互的 Python 接口,让Python 开发人员能够利用 Apache Spark 的强大功能来处理大规模数据。
区别:spark集群自带组件pyspark,与spark一体,无需进行配置,可直接使用;python pip安装的pyspark模块,需要配置上spark集群相关信息,才能利用spark集群处理数据。
环境:
Jupyter-lab(python3.7) + Spark集群(sparkV2.4.0 - cdh6.3.4)
文章目录
- 1、Jupyter Pyspark 在线交互式环境配置
- 1.1 第一种方式
- 1.2 第二种方式[未验证]
- 2、在线交互式大数据分析测试
- 3、spark-submit
- 4、client 和 cluster 运行模式注意点
1、Jupyter Pyspark 在线交互式环境配置
1.1 第一种方式
# 安装pyspark类库
> pip install pyspark==2.4.0 # 与spark集群版本保持一致
# 启动jupyter-lab
> jupyter-lab
# jupyter环境
import os
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,FloatType,IntegerType
import pyspark.sql.functions as F
from pyspark.ml.feature import QuantileDiscretizer
import pandas as pd
import numpy as np
import logging
import sys
import warnings
# 日志配置
logging.getLogger("py4j").setLevel(logging.WARN) # 屏蔽spark运行debug日志
logging.getLogger("pyspark").setLevel(logging.WARN)
# 配置集群spark、hadoop家目录
os.environ['SPARK_HOME'] = '/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6751098/lib/spark'
os.environ['HADOOP_CONF_DIR'] = '/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6751098/lib/hadoop' # spark on hive
os.environ['PYSPARK_PYTHON'] = './py3/python3.7/bin/python' # 使用 spark.yarn.dist.archives 分发到各节点的python环境;也可以是所在节点绝对路径(要求每台机器所安装python环境一致)
os.environ['PYSPARK_DRIVER_PYTHON'] = '/data/d2/anaconda3/anaconda3/envs/python3.7/bin/python' # spark client 模式运行时,driver 本地运行,无法使用spark.yarn.dist.archives 分发到各节点的python环境,需单独指定
# 创建sparksql会话
parameters = [
('spark.app.name','测试pyspark连接'),# 设置spark应用名称
# spark.yarn.dist.archives 依赖归档文件(zip/tar.gz等),用于指定哪些文件应该分发到每个执行节点上。
# 这些压缩文件将在应用程序执行前在executor节点上工作目录内解压可用。
# 这里将整个python3.7解释器,压缩为py3,随spark执行程序一起分发到各个executor工作目录内。
('spark.yarn.dist.archives','hdfs://namenode2:8020/python3/python3.7.tar.gz#py3'),
('spark.master','yarn'), #集群运行模式,yarn(连接到正在运行的 YARN 集群,Spark应用程序会作为 YARN 上的一个应用来提交和执行)
('spark.submit.deploymode','client'), # driver运行模式 -> 客户端模式运行
#('spark.submit.pyFiles','./test.txt') # job开始前上传所依赖py文件,client模式下可能提示找不到(文件会随submit自动上传)
('spark.debug.maxToStringFields',100)
]
conf = SparkConf.setALL(parameters)
# sc = SparkContext.getOrCreate(conf=conf)
# sc.addPyFile('./funcs.py') #(local_file_path/hdfs/url) 添加单个python文件到executor上,甚至在开始job后也可以添加py文件
# 创建回话
spark = SparkSession\
.builder\
.config(conf=conf)\
.enableHiveSupport()\ #.enableHiveSupport():这个方法会自动包括一些必要的 Hive 类库和配置,以便与 Hive 服务通信
.getOrCreate()
spark
# 关闭回话
# spark.stop()
1.2 第二种方式[未验证]
spark官网说明
## 通过配置driver端python环境为jupyter,然后再启动{SPARK_HOME}/bin/pyspark实现
> export PYSPARK_DRIVER_PYTHON=/xx/anaconda3/bin/jupyter #jupyter启动服务命令所在目录
> export PYSPARK_DRIVER_PYTHON_OPTS=notebook # jupyter启动参数,jupyter notebook方式
> export PYSPARK_DRIVER_PYTHON_OPTS='lab --allow-root' # jupyter-lab 方式 二选其一
# 上面两个配置可以直接加到{SPARK_HOME}/bin/pyspark 启动文件里
# 再次启动pyspark
> ./bin/pyspark
2、在线交互式大数据分析测试
#1、读取hive表数据
data = spark.sql('select * from test.id4')
data.show(4)
#2、使用udf进行数据处理
@F.udf(IntegerType())
def add(x):
return x+1
data.withColumn('flag+1',add(data.flag))
data.show(4)
3、spark-submit
spark-submit 是Spark提交各类任务(python、R、Java、Scala)的工具,可以使用shell脚本运行指定的py脚本。
其实,{SPARK_HOME}/bin/pyspark交互式环境,运行时底层也是使用的spark-submit提交资源管理器进行计算。
spark-submit(V3.1.3)具体参数:*
【从spark-submit --help 翻译而来(不同版本间可能有差异)】
提交脚本格式 | |
---|---|
Usage: spark-submit [options] "app jar | python file | R file" [app arguments] | |
Options: | 注释 |
--master MASTER_URL | spark://host:port, mesos://host:port, yarn,k8s://https://host:port,local (Default: local[*]). |
--deploy-mode DEPLOY_MODE | 运行模式 'client/cluster '(Default: client) |
--class CLASS_NAME | 您的应用程序的主类(用于 Java / Scala 应用程序,python应该不需要)。 |
--name NAME | 应用程序的名称。 |
--packages | 要包含在驱动程序和执行程序类路径中的 jar 的 maven 坐标的逗号分隔列表。 |
--py-files PY_FILES | 逗号分隔的 .zip、.egg 或 .py 文件列表,提交的python文件和入口文件在同一目录下,这里面包括Python应用主程序,这些文件将被交付给每一个执行器来使用。 |
--files FILES | 逗号分隔的文件列表,放置在每个执行器的工作目录中。这些文件在执行器中的文件路径可以通过 SparkFiles.get(fileName) 访问。 |
--archives ARCHIVES | 要提取到每个执行程序的工作目录中的以逗号分隔的档案列表。 |
--conf, -c PROP=VALUE | 任意 Spark 配置属性 |
--properties-file FILE | 加载额外属性的文件路径。如果未指定,默认查找 conf/spark-defaults.conf |
--driver-memory MEM | driver程序内存(例如 1000M,2G)(默认值:1024M)。 |
--executor-memory MEM | 每个executor的内存(例如 1000M,2G)(默认值:1G) |
仅集群部署模式: | |
--driver-cores NUM | drive使用的核心数(默认值:1)。 |
--executor-cores | 每个executor可以使用的cpu核心(Yarn和K8S默认1,standalone默认worker可利用的全部核心) |
Spark 仅适用于 YARN 和 Kubernetes: | |
--num-executors NUM | 要启动的执行程序数量(默认值:2)。 如果启用了动态分配,则执行器的初始数量将至少为 NUM。 |
--principal PRINCIPAL | 用于登录 KDC 的主体。 |
--keytab KEYTAB | 包含以上指定主体的密钥表的文件的完整路径。 |
Spark 仅适用于 YARN: | |
--queue QUEUE_NAME | 要提交到的 YARN 队列(默认值:“default”)。 |
4、client 和 cluster 运行模式注意点
起初,在配置spark运行环境时,指定spark python环境配置如下:
vim spark-defaults.conf
spark.yarn.dist.archives=hdfs://***/***/***/env/python_env.zip#python_env
spark.pyspark.driver.python=./python_env/bin/python # pyspark程序内部自定义函数或类执行环境
spark.pyspark.python=./python_env/bin/python
Spark-submit在进行client模式提交时,提示 "Cannot run program “./python_env/bin/python “: error=2, No such file or dictor "错误,而进行cluster提交时正常运行。
原因:
--archives # code运行依赖文档。
--py-files # code依赖python。
如上面依赖,也可能出现 “file not found” 问题,原理应该一样。
driver服务启动后,会自动上传依赖文件到executor中,解压到当前文件夹下(通过查看yarn运行日志,可以看到上传解压过程)。
当使用client模式时,driver运行在本地spark-submit进程中,未进行archives的上传解压,所以报错找不到python文件。
当使用cluster模式提交时,会优先在yarn的机器中,开启一个特殊的executor运行driver,在开启executor过程中,伴随着进行archives的上传解压。