pyspark之sparksql数据交互

news2024/11/15 13:40:36

在pyspark中,使用sparksql进行mysql数据的读写处理,将程序保存为test.py

#-*- coding: UTF-8 -*- 
# 设置python的默认编码
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
# Spark 初始化
from pyspark.sql import SQLContext, SparkSession, Row
spark = SparkSession.Builder().master('local').appName('sparkSql').getOrCreate()
# 设置数据库信息
prop = {'user':'','password':'','driver':'com.mysql.cj.jdbc.Driver'}
url_dim = 'jdbc:mysql://ip:port/kscs_dim?useUnicode=true&autoReconnect=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai'
url_dwd = 'jdbc:mysql://ip:port/kscs_dwd?useUnicode=true&autoReconnect=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai'
url_dws = 'jdbc:mysql://ip:port/kscs_dws?useUnicode=true&autoReconnect=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai'
url_ads = 'jdbc:mysql://ip:port/kscs_ads?useUnicode=true&autoReconnect=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai'

# 读取数据
print('====read====')
data_shop = spark.read.jdbc(url=url_dim, table='dim_ec_douyin_shop_account_info', properties=prop)
data_seller = spark.read.jdbc(url=url_dwd, table='dwd_ec_douyin_live_detail_data_seller', properties=prop)
data_talent = spark.read.jdbc(url=url_dwd, table='dwd_ec_douyin_live_detail_data_talent', properties=prop)
data = spark.read.jdbc(url=url_ads, table='ads_ec_douyin_live_data', properties=prop)
# print(type(data))
# data.show()

# 单表写入
print('====write====')
data.write.jdbc(url=url_ads, table='ads_ec_douyin_live_data_test', mode='append', properties=prop)

# 多表处理
# 新建临时表
sqlContext = SQLContext(spark)
sqlContext.registerDataFrameAsTable(data_shop,'df_shop')
sqlContext.registerDataFrameAsTable(data_seller,'df_seller')
sqlContext.registerDataFrameAsTable(data_talent,'df_talent')
# 多表联查:计算字段必须要设置别名,与库表字段一致,否则无法识别数据对应库表中的哪一个字段,导致写数失败
df = sqlContext.sql("""select 
    now() add_time
    ,now() update_time
    ,a.data_date
	,a.shop_num
	,a.shop_name
    ,a.brand
	,a.trade
	,a.shop_manager
	,a.trade_manager
	,a.dept_name
	,a.`power`
	,a.status
    ,a.start_date
	,a.end_date
    ,ifnull(a.account_opr_type,b.account_opr_type) account_opr_type
    ,a.account_type
    ,a.account_id
    ,a.account_name
    ,b.live_id
	,b.live_show_pv
	,b.live_show_uv
	,b.watch_uv
	,b.watch_uv_zr
	,b.watch_uv_ff
	,b.pay_ucnt
	,b.a_c_u
	,b.p_c_u
	,b.watch_cvr_uv
	,b.watch_interact_rate_uv
	,b.avg_watch_duration
	,b.new_fans_cnt
	,b.watch_follow_rate_uv
	,b.like_cnt
	,b.comment_cnt
    ,cast(b.pay_ucnt_interact+b.unpaid_ucnt_interact as int) interact_uv
    ,cast(b.watch_uv*b.watch_uv_rate_fans as int) watch_uv_fans
	,b.watch_uv_rate_fans
	,b.unpaid_ucnt
	,b.watch_unpaid_rate
	,b.pay_ucnt_newc
	,b.pay_ucnt_rate_newc
    ,cast(b.pay_ucnt_newc*b.fans_ucnt_rate_paynewc as int) fans_ucnt_paynewc
    ,cast(b.pay_ucnt_newc*b.interact_ucnt_rate_paynewc as int) interact_ucnt_paynewc
	,b.fans_ucnt_rate_paynewc
	,b.interact_ucnt_rate_paynewc
	,b.pay_ucnt_oldc
	,b.pay_ucnt_rate_oldc
    ,cast(b.pay_ucnt_oldc*b.fans_ucnt_rate_payoldc as int) fans_ucnt_payoldc
    ,cast(b.pay_ucnt_oldc*b.interact_ucnt_rate_payoldc as int) interact_ucnt_payoldc
	,b.fans_ucnt_rate_payoldc
	,b.interact_ucnt_rate_payoldc
	,b.pay_ucnt_interact
	,b.pay_ucnt_rate_interact
    ,cast(b.pay_ucnt_interact*b.fans_ucnt_rate_payinteract as int) fans_ucnt_payinteract
    ,cast(b.pay_ucnt_interact*b.newc_ucnt_rate_payinteract as int) newc_ucnt_payinteract
	,b.fans_ucnt_rate_payinteract
	,b.newc_ucnt_rate_payinteract
	,b.pay_ucnt_nointeract
	,b.pay_ucnt_rate_nointeract
    ,cast(b.pay_ucnt_nointeract*b.fans_ucnt_rate_paynointeract as int) fans_ucnt_paynointeract
    ,cast(b.pay_ucnt_nointeract*b.newc_ucnt_rate_paynointeract as int) newc_ucnt_paynointeract
	,b.fans_ucnt_rate_paynointeract
	,b.newc_ucnt_rate_paynointeract
	,b.pay_ucnt_fans
	,b.pay_ucnt_rate_fans
    ,cast(b.pay_ucnt_fans*b.newfans_ucnt_rate_payfans as int) newfans_ucnt_payfans
	,b.interact_ucnt_rate_payfans
	,b.newfans_ucnt_rate_payfans
	,b.pay_ucnt_passer
	,b.pay_ucnt_rate_passer
    ,cast(b.pay_ucnt_passer*b.interact_ucnt_rate_paypasser as int) interact_ucnt_paypasser
    ,cast(b.pay_ucnt_passer*b.newc_ucnt_rate_paypasser as int) newc_ucnt_paypasser
	,b.interact_ucnt_rate_paypasser
	,b.newc_ucnt_rate_paypasser
	,b.watch_uv_unpaidval
	,b.watch_uv_rate_unpaidval
    ,cast(b.watch_uv_unpaidval*b.fans_ucnt_rate_unpaidval as int) fans_ucnt_unpaidval
    ,cast(b.watch_uv_unpaidval*b.interact_ucnt_rate_unpaidval as int) interact_ucnt_unpaidval
	,b.fans_ucnt_rate_unpaidval
	,b.interact_ucnt_rate_unpaidval
	,b.watch_uv_unpaidinv
	,b.watch_uv_rate_unpaidinv
    ,cast(b.watch_uv_unpaidinv*b.fans_ucnt_rate_unpaidinv as int) fans_ucnt_unpaidinv
    ,cast(b.watch_uv_unpaidinv*b.interact_ucnt_rate_unpaidinv as int) interact_ucnt_unpaidinv
	,b.fans_ucnt_rate_unpaidinv
	,b.interact_ucnt_rate_unpaidinv
	,b.unpaid_ucnt_interact
	,b.unpaid_ucnt_rate_interact
    ,cast(b.unpaid_ucnt_interact*b.fans_ucnt_rate_unpaidinteract as int) fans_ucnt_unpaidinteract
    ,cast(b.unpaid_ucnt_interact*b.watch_ucnt_rate_inv_unpaidinteract as int) watch_ucnt_inv_unpaidinteract
	,b.fans_ucnt_rate_unpaidinteract
	,b.watch_ucnt_rate_inv_unpaidinteract
	,b.unpaid_ucnt_nointeract
	,b.unpaid_ucnt_rate_nointeract
    ,cast(b.unpaid_ucnt_nointeract*b.fans_ucnt_rate_unpaidnointeract as int) fans_ucnt_unpaidnointeract
    ,cast(b.unpaid_ucnt_nointeract*b.watch_ucnt_rate_inv_unpaidnointeract as int) watch_ucnt_inv_unpaidnointeract
	,b.fans_ucnt_rate_unpaidnointeract
	,b.watch_ucnt_rate_inv_unpaidnointeract
	,b.unpaid_ucnt_fans
	,b.unpaid_ucnt_rate_fans
    ,cast(b.unpaid_ucnt_fans*b.watch_ucnt_rate_inv_unpaidfans as int) watch_ucnt_inv_unpaidfans
	,b.interact_ucnt_rate_unpaidfans
	,b.watch_ucnt_rate_inv_unpaidfans
	,b.unpaid_ucnt_passer
	,b.unpaid_ucnt_rate_passer
    ,cast(b.unpaid_ucnt_passer*b.interact_ucnt_rate_unpaidpasser as int) interact_ucnt_unpaidpasser
    ,cast(b.unpaid_ucnt_passer*b.watch_ucnt_rate_inv_unpaidpasser as int) watch_ucnt_inv_unpaidpasser
	,b.interact_ucnt_rate_unpaidpasser
	,b.watch_ucnt_rate_inv_unpaidpasser
	,b.punish_cnt
	,b.punish_detail 
from 
-- 店铺-账号信息
(select * from df_shop where data_date>='2022-12-06') a

-- 直播详情数据
left join
(-- 商家视角
SELECT
	data_date
	,shop_num
	,shop_name
	,account_opr_type
	,account_id
	,live_id
	,live_show_pv
	,live_show_uv
	,watch_uv
	,watch_uv_zr
	,watch_uv_ff
	,pay_ucnt
	,a_c_u
	,p_c_u
	,watch_cvr_uv
	,watch_interact_rate_uv
	,avg_watch_duration
	,new_fans_cnt
	,watch_follow_rate_uv
	,like_cnt
	,comment_cnt
	,watch_uv_rate_fans
	,unpaid_ucnt
	,watch_unpaid_rate
	,pay_ucnt_newc
	,pay_ucnt_rate_newc
	,fans_ucnt_rate_paynewc
	,interact_ucnt_rate_paynewc
	,pay_ucnt_oldc
	,pay_ucnt_rate_oldc
	,fans_ucnt_rate_payoldc
	,interact_ucnt_rate_payoldc
	,pay_ucnt_interact
	,pay_ucnt_rate_interact
	,fans_ucnt_rate_payinteract
	,newc_ucnt_rate_payinteract
	,pay_ucnt_nointeract
	,pay_ucnt_rate_nointeract
	,fans_ucnt_rate_paynointeract
	,newc_ucnt_rate_paynointeract
	,pay_ucnt_fans
	,pay_ucnt_rate_fans
	,interact_ucnt_rate_payfans
	,newfans_ucnt_rate_payfans
	,pay_ucnt_passer
	,pay_ucnt_rate_passer
	,interact_ucnt_rate_paypasser
	,newc_ucnt_rate_paypasser
	,watch_uv_unpaidval
	,watch_uv_rate_unpaidval
	,fans_ucnt_rate_unpaidval
	,interact_ucnt_rate_unpaidval
	,watch_uv_unpaidinv
	,watch_uv_rate_unpaidinv
	,fans_ucnt_rate_unpaidinv
	,interact_ucnt_rate_unpaidinv
	,unpaid_ucnt_interact
	,unpaid_ucnt_rate_interact
	,fans_ucnt_rate_unpaidinteract
	,watch_ucnt_rate_inv_unpaidinteract
	,unpaid_ucnt_nointeract
	,unpaid_ucnt_rate_nointeract
	,fans_ucnt_rate_unpaidnointeract
	,watch_ucnt_rate_inv_unpaidnointeract
	,unpaid_ucnt_fans
	,unpaid_ucnt_rate_fans
	,interact_ucnt_rate_unpaidfans
	,watch_ucnt_rate_inv_unpaidfans
	,unpaid_ucnt_passer
	,unpaid_ucnt_rate_passer
	,interact_ucnt_rate_unpaidpasser
	,watch_ucnt_rate_inv_unpaidpasser
	,punish_cnt
	,punish_detail 
FROM df_seller where data_date>='2022-12-06'

-- 达人视角
union
SELECT
	data_date
	,shop_num
	,shop_name
	,account_opr_type
	,account_id
	,live_id
	,live_show_pv
	,live_show_uv
	,watch_uv
	,watch_uv_zr
	,watch_uv_ff
	,pay_ucnt
	,a_c_u
	,p_c_u
	,watch_cvr_uv
	,watch_interact_rate_uv
	,avg_watch_duration
	,new_fans_cnt
	,watch_follow_rate_uv
	,like_cnt
	,comment_cnt
    ,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,null
	,punish_cnt
	,punish_detail 
FROM df_talent where data_date>='2022-12-06') b 
on a.data_date=b.data_date and a.shop_num=b.shop_num and a.account_id=b.account_id""")
# print(type(df))
# df.show()

# 写入数据
print('====write====')
df.write.jdbc(url=url_dws, table='dws_ec_douyin_live_detail_data_test', mode='append', properties=prop)

spark.stop()

程序提交

程序可以放在调度平台上进行启动(调度平台的服务器上需要部署Spark运行环境)
在这里插入图片描述
也可以直接在spark服务器上提交程序到集群
将test.py放在服务器的以下路径:dolphinscheduler/data/kscs/resources/test.py,执行脚本如下:

# 提交程序到local模式的spark
./bin/spark-submit --master local --driver-cores 1 --driver-memory 512M --num-executors 2 --executor-cores 2 --executor-memory 2G --jars mysql-connector-java-8.0.30.jar data/kscs/resources/test.py

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/108341.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【推荐】DDD领域驱动设计和中台实践资料合集

Domain Driven Design(简称 DDD),又称为领域驱动设计,起源于杰出软件建模专家Eric Evans在2003年发表的书籍《DOMAIN-DRINEN DESIGN —TACKLING COMPLEXITY IN THE HEART OF SOFTWARE》(中文译名《领域驱动设计—软件核…

卓海科技冲刺创业板:拟募资5.47亿 相宇阳控制52.9%股权

雷递网 雷建平 12月20日无锡卓海科技股份有限公司(简称:“卓海科技”)日前递交招股书,准备在深交所创业板上市。卓海科技计划募资5.47亿元,其中,1.04亿元用于半导体前道量检测设备扩产项目,1.84…

ev_api_server:大事件node接口项目开发

Headline 大事件后台 API 项目,API 接口文档请参考 https://www.showdoc.cc/escook?page_id3707158761215217 1. 初始化 1.1 创建项目 新建 api_server 文件夹作为项目根目录,并在项目根目录中运行如下的命令,初始化包管理配置文件&#x…

尚医通-前端Vue学习(十)

目录: (1)node.js介绍 (2)npm包管理工具 (3)es6模块化 (4)babel转码器 (5)webpack打包工具 (1)node.js介绍 浏览器的…

Python使用pandas导入xlsx格式的excel文件内容

Python使用pandas导入xlsx格式的excel文件内容1. 基本导入2. 列标题与数据对齐3. 指定导入某个sheet4. 指定行索引5. 指定列索引6. 指定导入列7. 指定导入的行数8. 更多的参数1. 基本导入 在 Python中使用pandas导入.xlsx文件的方法是read_excel()。 # codingutf-8 import pa…

Windows环境下在VScode中运行开源运动规划库(zhm-real / PathPlanning)的方法

本文主要介绍Windows环境下,在Vscode中运行zhm-real发布的开源运动规划库PathPlanning的实现方法,包括环境配置及运行开源包时常见错误解决方法。    一、环境配置 (1)VScode 下载及安装,官网如下: http…

Flowable工作流进阶使用教程(监听器+流程变量+网关)

一、任务分配和流程变量 1.任务分配 1.1 固定分配 固定分配就是我们前面介绍的,在绘制流程图或者直接在流程文件中通过Assignee来指定的方式 1.2 表达式分配 Flowable使用UEL进行表达式解析。UEL代表Unified Expression Language,是EE6规范的一部分…

【Python】用python将html转化为pdf

其实早在去年就有做过,一直没有写,先简单记录下 1、主要用到的工具【wkhtmltopdf】 【下载地址】wkhtmltopdf 根据系统选择安装包,速度有点慢,先挂着 2、下载Python库 pip install pdfkit pip install wkhtmltopdf 3、简单代码…

CAD教程:CAD自定义之基础设置的操作技巧

在使用国产CAD软件绘制CAD图纸的过程中,有些时候会需要CAD自定义设置,那么你知道浩辰CAD建筑软件中CAD自定义之基础设置怎么使用吗?不知道也没关系,接下来的CAD教程就让小编来给大家介绍一下国产CAD软件——浩辰CAD建筑软件中CAD自…

【1799. N 次操作后的最大分数和】

来源:力扣(LeetCode) 描述: 给你 nums ,它是一个大小为 2 * n 的正整数数组。你必须对这个数组执行 n 次操作。 在第 i 次操作时(操作编号从 1 开始),你需要: 选择两个…

实验一 逻辑回归

一、实验目的 (1)学习并掌握常见的机器学习方法; (2)能够结合所学的python知识实现机器学习算法; (3)能够用所学的机器学习算法解决实际问题。 二、实验内容与要求 &#xff08…

设计模式之备忘录模式

Memento design pattern 备忘录模式的概念、备忘录模式的结构、备忘录模式的优缺点、备忘录模式的使用场景、备忘录模式的实现示例、备忘录模式的源码分析 1、备忘录模式的概念 备忘录模式,又称快照模式,即在不破坏封装的前提下,获取并保存一…

【数电】Simulation Test 模拟测试

一、 选择题:(共20分,每小题2分) 1、逻辑函数的所有最小项之和等于多少? A. 0 B. 1 C. 0或1 D. 任意值 2、与非门的多余输入端应如何处理?…

MySQL面试常问问题(基础) —— 赶快收藏

目录 1. 什么是内连接、外连接、交叉连接、笛卡尔积呢? 2. 那MySQL 的内连接、左连接、右连接有有什么区别? 3.说一下数据库的三大范式? 4.varchar与char的区别? 5.blob和text有什么区别? 6.DATETIME和TIMESTAMP…

SCSS学习笔记

文章目录1.安装scss2.选择器嵌套3.属性嵌套4.父选择器&5.变量5.1变量的规范5.2变量的作用域5.3给变量设置默认值(!default)6数据类型7.运算符8.插值语法9.流程控制语句9.1 条件语句9.2循环语句9.2.1for9.2.2each9.2.3while10import10.1引入scss不编译10.2嵌套引入scss11.mi…

【软件测试】概念篇

目录 一、需求 1.1用户需求 1.2软件需求 1.3需求的重要性 二、测试用例 三、BUG 3.1什么是BUG 3.2如何描述一个BUG 4.3BUG优先级 四、软件开发模型 4.1软件生命周期 4.2开发模型 定义:软件测试就是一系列活动,这些活动是为了评估一个程序或者…

新店速递 | IU酒店带您领略“东方古罗马”

淄博,位处鲁中,是黄河三角洲生态经济和蓝色经济区的交汇处。四季分明的气候造就了这座齐国故都的生态多样性,南高北低的地理位置使其峻岭平原兼具,鲁中的位置又赋予他交通枢纽的重要性。这里历史气息浓厚,社会文化自由…

中文语法纠错全国大赛获奖分享:基于多轮机制的中文语法纠错

中文语法纠错任务旨在对文本中存在的拼写、语法等错误进行自动检测和纠正,是自然语言处理领域一项重要的任务。同时该任务在公文、新闻和教育等领域都有着落地的应用价值。但由于中文具有的文法和句法规则比较复杂,基于深度学习的中文文本纠错在实际落地…

新能源汽车市场渗透率不断提高,锂电设备需求空间较大

根据观研报告网发布的《中国新能源汽车行业发展深度研究与投资趋势调研报告(2022-2029年)》显示,近年来,随着各国开启能源转型,在汽车领域,由于电动汽车具有高效节能、零排放等优点,已逐渐成为汽…

配置小型公司网络WLAN基本业务(AC通过三层口管理AP)

组网需求: 某小型企业由于业务需要,希望员工能在企业内部随时随地的访问Internet,进行移动办公。该企业部署了一台AR路由器作为出口网关,希望同时在AR路由器上部署WLAN功能,为企业员工提供无线网络接入服务&#xff0c…