PySpark是由Spark官方开发的Python语言第三方库,Python开发者可以通过使用python语言来编写Spark程序和SparkSQL完成开发。
之所以采用PySpark而不采用Java/Scala,是由于:
-
Dataworks可通过将代码在线写入DataWorks Python资源的方式,实现PySpark作业开发,并通过ODPS Spark节点提交运行该代码逻辑,开发过程较为简单。
-
而使用Java或Scala语言类型代码前,需先在本地开发好Spark on MaxCompute作业代码,再通过DataWorks上传为MaxCompute的资源。
步骤一、编写PySpark代码
-
在Dataworks业务流程中,右键点击MacCompute文件夹下的资源文件夹,选择新建资源,选择Python。
-
设置python脚本名称,以.py为后缀名,点击新建
-
编写PySpark代码(注意:代码中不能含有中文)
-
代码示例1:判断一个字符串是否可以转换为数字
注意,如果表/代码中含有中文,必须修改为utf8编码格式 默认编码是ascii,出现中文字符会报错 # -*- coding: utf-8 -*- import sys reload(sys) sys.setdefaultencoding('utf8')
# -*- coding: utf-8 -*- # Spark2.x import sys from pyspark.sql import SparkSession try: # for python 2 reload(sys) sys.setdefaultencoding('utf8') except: # python 3 not needed pass if __name__ == '__main__': spark = SparkSession.builder\ .appName("spark sql")\ .config("spark.sql.broadcastTimeout", 20 * 60)\ .config("spark.sql.crossJoin.enabled", True)\ .config("odps.exec.dynamic.partition.mode", "nonstrict")\ .config("spark.sql.catalogImplementation", "odps")\ .getOrCreate() def is_number(s): try: float(s) return True except ValueError: pass try: import unicodedata unicodedata.numeric(s) return True except (TypeError, ValueError): pass return False print(is_number('foo')) print(is_number('1')) print(is_number('1.3')) print(is_number('-1.37')) print(is_number('1e3'))
- 代码示例2:建表、插入数据、读取数据
# -*- coding: utf-8 -*- #Spark2.x from pyspark.sql import SparkSession if __name__ == '__main__': spark = SparkSession.builder.appName("spark sql").getOrCreate() spark.sql("DROP TABLE IF EXISTS spark_sql_test_table") spark.sql("CREATE TABLE spark_sql_test_table(name STRING, num BIGINT)") spark.sql("INSERT INTO spark_sql_test_table SELECT 'abc', 100000") spark.sql("SELECT * FROM spark_sql_test_table").show() spark.sql("SELECT COUNT(*) FROM spark_sql_test_table").show()
-
-
点击提交,保存并提交资源
步骤二、创建并配置ODPS Spark节点
- 右键点击文件夹,选择新建节点,选择新建ODPS Spark节点
- 输入ODPS Spark节点名称,点击确认
- 配置节点参数:选择Spark版本,选择Python语言,选择主python资源(步骤一的PySpark代码),添加配置项目;设置调度配置参数,点击提交,保存并提交节点
步骤三、查看输出结果
由于数据开发中的ODPS Spark节点没有运行入口,因此需要在开发环境的运维中心执行Spark任务。
- 进入运维中心,点击周期任务运维,点击补数据实例,点击新建补数据任务
- 搜索ODPS Spark节点任务,点击添加
- 设置补数据运行策略,点击提交
- 等待运行完成,点击实例名称pyspark_test,点击查看日志,点击logview url
- 点击master-0,点击StdOut,查看输出结果
参考文档:
- 开发ODPS Spark任务
-
PySpark开发示例
-
PySpark基础操作
-
SparkSQL基础语法
-
pyspark中文api_pyspark中文文档-CSDN博客