在pyspark中,使用sparksql进行mysql数据的读写处理,将程序保存为test.py
#-*- coding: UTF-8 -*-
# 设置python的默认编码
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
# Spark 初始化
from pyspark.sql import SQLContext, SparkSession, Row
spark = SparkSession.Builder().master('local').appName('sparkSql').getOrCreate()
# 设置数据库信息
prop = {'user':'','password':'','driver':'com.mysql.cj.jdbc.Driver'}
url_dim = 'jdbc:mysql://ip:port/kscs_dim?useUnicode=true&autoReconnect=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai'
url_dwd = 'jdbc:mysql://ip:port/kscs_dwd?useUnicode=true&autoReconnect=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai'
url_dws = 'jdbc:mysql://ip:port/kscs_dws?useUnicode=true&autoReconnect=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai'
url_ads = 'jdbc:mysql://ip:port/kscs_ads?useUnicode=true&autoReconnect=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai'
# 读取数据
print('====read====')
data_shop = spark.read.jdbc(url=url_dim, table='dim_ec_douyin_shop_account_info', properties=prop)
data_seller = spark.read.jdbc(url=url_dwd, table='dwd_ec_douyin_live_detail_data_seller', properties=prop)
data_talent = spark.read.jdbc(url=url_dwd, table='dwd_ec_douyin_live_detail_data_talent', properties=prop)
data = spark.read.jdbc(url=url_ads, table='ads_ec_douyin_live_data', properties=prop)
# print(type(data))
# data.show()
# 单表写入
print('====write====')
data.write.jdbc(url=url_ads, table='ads_ec_douyin_live_data_test', mode='append', properties=prop)
# 多表处理
# 新建临时表
sqlContext = SQLContext(spark)
sqlContext.registerDataFrameAsTable(data_shop,'df_shop')
sqlContext.registerDataFrameAsTable(data_seller,'df_seller')
sqlContext.registerDataFrameAsTable(data_talent,'df_talent')
# 多表联查:计算字段必须要设置别名,与库表字段一致,否则无法识别数据对应库表中的哪一个字段,导致写数失败
df = sqlContext.sql("""select
now() add_time
,now() update_time
,a.data_date
,a.shop_num
,a.shop_name
,a.brand
,a.trade
,a.shop_manager
,a.trade_manager
,a.dept_name
,a.`power`
,a.status
,a.start_date
,a.end_date
,ifnull(a.account_opr_type,b.account_opr_type) account_opr_type
,a.account_type
,a.account_id
,a.account_name
,b.live_id
,b.live_show_pv
,b.live_show_uv
,b.watch_uv
,b.watch_uv_zr
,b.watch_uv_ff
,b.pay_ucnt
,b.a_c_u
,b.p_c_u
,b.watch_cvr_uv
,b.watch_interact_rate_uv
,b.avg_watch_duration
,b.new_fans_cnt
,b.watch_follow_rate_uv
,b.like_cnt
,b.comment_cnt
,cast(b.pay_ucnt_interact+b.unpaid_ucnt_interact as int) interact_uv
,cast(b.watch_uv*b.watch_uv_rate_fans as int) watch_uv_fans
,b.watch_uv_rate_fans
,b.unpaid_ucnt
,b.watch_unpaid_rate
,b.pay_ucnt_newc
,b.pay_ucnt_rate_newc
,cast(b.pay_ucnt_newc*b.fans_ucnt_rate_paynewc as int) fans_ucnt_paynewc
,cast(b.pay_ucnt_newc*b.interact_ucnt_rate_paynewc as int) interact_ucnt_paynewc
,b.fans_ucnt_rate_paynewc
,b.interact_ucnt_rate_paynewc
,b.pay_ucnt_oldc
,b.pay_ucnt_rate_oldc
,cast(b.pay_ucnt_oldc*b.fans_ucnt_rate_payoldc as int) fans_ucnt_payoldc
,cast(b.pay_ucnt_oldc*b.interact_ucnt_rate_payoldc as int) interact_ucnt_payoldc
,b.fans_ucnt_rate_payoldc
,b.interact_ucnt_rate_payoldc
,b.pay_ucnt_interact
,b.pay_ucnt_rate_interact
,cast(b.pay_ucnt_interact*b.fans_ucnt_rate_payinteract as int) fans_ucnt_payinteract
,cast(b.pay_ucnt_interact*b.newc_ucnt_rate_payinteract as int) newc_ucnt_payinteract
,b.fans_ucnt_rate_payinteract
,b.newc_ucnt_rate_payinteract
,b.pay_ucnt_nointeract
,b.pay_ucnt_rate_nointeract
,cast(b.pay_ucnt_nointeract*b.fans_ucnt_rate_paynointeract as int) fans_ucnt_paynointeract
,cast(b.pay_ucnt_nointeract*b.newc_ucnt_rate_paynointeract as int) newc_ucnt_paynointeract
,b.fans_ucnt_rate_paynointeract
,b.newc_ucnt_rate_paynointeract
,b.pay_ucnt_fans
,b.pay_ucnt_rate_fans
,cast(b.pay_ucnt_fans*b.newfans_ucnt_rate_payfans as int) newfans_ucnt_payfans
,b.interact_ucnt_rate_payfans
,b.newfans_ucnt_rate_payfans
,b.pay_ucnt_passer
,b.pay_ucnt_rate_passer
,cast(b.pay_ucnt_passer*b.interact_ucnt_rate_paypasser as int) interact_ucnt_paypasser
,cast(b.pay_ucnt_passer*b.newc_ucnt_rate_paypasser as int) newc_ucnt_paypasser
,b.interact_ucnt_rate_paypasser
,b.newc_ucnt_rate_paypasser
,b.watch_uv_unpaidval
,b.watch_uv_rate_unpaidval
,cast(b.watch_uv_unpaidval*b.fans_ucnt_rate_unpaidval as int) fans_ucnt_unpaidval
,cast(b.watch_uv_unpaidval*b.interact_ucnt_rate_unpaidval as int) interact_ucnt_unpaidval
,b.fans_ucnt_rate_unpaidval
,b.interact_ucnt_rate_unpaidval
,b.watch_uv_unpaidinv
,b.watch_uv_rate_unpaidinv
,cast(b.watch_uv_unpaidinv*b.fans_ucnt_rate_unpaidinv as int) fans_ucnt_unpaidinv
,cast(b.watch_uv_unpaidinv*b.interact_ucnt_rate_unpaidinv as int) interact_ucnt_unpaidinv
,b.fans_ucnt_rate_unpaidinv
,b.interact_ucnt_rate_unpaidinv
,b.unpaid_ucnt_interact
,b.unpaid_ucnt_rate_interact
,cast(b.unpaid_ucnt_interact*b.fans_ucnt_rate_unpaidinteract as int) fans_ucnt_unpaidinteract
,cast(b.unpaid_ucnt_interact*b.watch_ucnt_rate_inv_unpaidinteract as int) watch_ucnt_inv_unpaidinteract
,b.fans_ucnt_rate_unpaidinteract
,b.watch_ucnt_rate_inv_unpaidinteract
,b.unpaid_ucnt_nointeract
,b.unpaid_ucnt_rate_nointeract
,cast(b.unpaid_ucnt_nointeract*b.fans_ucnt_rate_unpaidnointeract as int) fans_ucnt_unpaidnointeract
,cast(b.unpaid_ucnt_nointeract*b.watch_ucnt_rate_inv_unpaidnointeract as int) watch_ucnt_inv_unpaidnointeract
,b.fans_ucnt_rate_unpaidnointeract
,b.watch_ucnt_rate_inv_unpaidnointeract
,b.unpaid_ucnt_fans
,b.unpaid_ucnt_rate_fans
,cast(b.unpaid_ucnt_fans*b.watch_ucnt_rate_inv_unpaidfans as int) watch_ucnt_inv_unpaidfans
,b.interact_ucnt_rate_unpaidfans
,b.watch_ucnt_rate_inv_unpaidfans
,b.unpaid_ucnt_passer
,b.unpaid_ucnt_rate_passer
,cast(b.unpaid_ucnt_passer*b.interact_ucnt_rate_unpaidpasser as int) interact_ucnt_unpaidpasser
,cast(b.unpaid_ucnt_passer*b.watch_ucnt_rate_inv_unpaidpasser as int) watch_ucnt_inv_unpaidpasser
,b.interact_ucnt_rate_unpaidpasser
,b.watch_ucnt_rate_inv_unpaidpasser
,b.punish_cnt
,b.punish_detail
from
-- 店铺-账号信息
(select * from df_shop where data_date>='2022-12-06') a
-- 直播详情数据
left join
(-- 商家视角
SELECT
data_date
,shop_num
,shop_name
,account_opr_type
,account_id
,live_id
,live_show_pv
,live_show_uv
,watch_uv
,watch_uv_zr
,watch_uv_ff
,pay_ucnt
,a_c_u
,p_c_u
,watch_cvr_uv
,watch_interact_rate_uv
,avg_watch_duration
,new_fans_cnt
,watch_follow_rate_uv
,like_cnt
,comment_cnt
,watch_uv_rate_fans
,unpaid_ucnt
,watch_unpaid_rate
,pay_ucnt_newc
,pay_ucnt_rate_newc
,fans_ucnt_rate_paynewc
,interact_ucnt_rate_paynewc
,pay_ucnt_oldc
,pay_ucnt_rate_oldc
,fans_ucnt_rate_payoldc
,interact_ucnt_rate_payoldc
,pay_ucnt_interact
,pay_ucnt_rate_interact
,fans_ucnt_rate_payinteract
,newc_ucnt_rate_payinteract
,pay_ucnt_nointeract
,pay_ucnt_rate_nointeract
,fans_ucnt_rate_paynointeract
,newc_ucnt_rate_paynointeract
,pay_ucnt_fans
,pay_ucnt_rate_fans
,interact_ucnt_rate_payfans
,newfans_ucnt_rate_payfans
,pay_ucnt_passer
,pay_ucnt_rate_passer
,interact_ucnt_rate_paypasser
,newc_ucnt_rate_paypasser
,watch_uv_unpaidval
,watch_uv_rate_unpaidval
,fans_ucnt_rate_unpaidval
,interact_ucnt_rate_unpaidval
,watch_uv_unpaidinv
,watch_uv_rate_unpaidinv
,fans_ucnt_rate_unpaidinv
,interact_ucnt_rate_unpaidinv
,unpaid_ucnt_interact
,unpaid_ucnt_rate_interact
,fans_ucnt_rate_unpaidinteract
,watch_ucnt_rate_inv_unpaidinteract
,unpaid_ucnt_nointeract
,unpaid_ucnt_rate_nointeract
,fans_ucnt_rate_unpaidnointeract
,watch_ucnt_rate_inv_unpaidnointeract
,unpaid_ucnt_fans
,unpaid_ucnt_rate_fans
,interact_ucnt_rate_unpaidfans
,watch_ucnt_rate_inv_unpaidfans
,unpaid_ucnt_passer
,unpaid_ucnt_rate_passer
,interact_ucnt_rate_unpaidpasser
,watch_ucnt_rate_inv_unpaidpasser
,punish_cnt
,punish_detail
FROM df_seller where data_date>='2022-12-06'
-- 达人视角
union
SELECT
data_date
,shop_num
,shop_name
,account_opr_type
,account_id
,live_id
,live_show_pv
,live_show_uv
,watch_uv
,watch_uv_zr
,watch_uv_ff
,pay_ucnt
,a_c_u
,p_c_u
,watch_cvr_uv
,watch_interact_rate_uv
,avg_watch_duration
,new_fans_cnt
,watch_follow_rate_uv
,like_cnt
,comment_cnt
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,null
,punish_cnt
,punish_detail
FROM df_talent where data_date>='2022-12-06') b
on a.data_date=b.data_date and a.shop_num=b.shop_num and a.account_id=b.account_id""")
# print(type(df))
# df.show()
# 写入数据
print('====write====')
df.write.jdbc(url=url_dws, table='dws_ec_douyin_live_detail_data_test', mode='append', properties=prop)
spark.stop()
程序提交
程序可以放在调度平台上进行启动(调度平台的服务器上需要部署Spark运行环境)
也可以直接在spark服务器上提交程序到集群
将test.py放在服务器的以下路径:dolphinscheduler/data/kscs/resources/test.py,执行脚本如下:
# 提交程序到local模式的spark
./bin/spark-submit --master local --driver-cores 1 --driver-memory 512M --num-executors 2 --executor-cores 2 --executor-memory 2G --jars mysql-connector-java-8.0.30.jar data/kscs/resources/test.py