学习大数据DAY61 宽表加工

news2025/1/11 16:45:13

目录

模型设计

加工宽表

任务调度:


大表 - 把很多数据整合起来
方便后续的明细查询和指标计算

模型设计

设计 建模
设计: excel 文档去编写
建模: 使用建模工具 PowerDesigner Navicat 在线画图工具... 把表结构给绘
制出来
共享\项目课工具\pd

加工宽表

数据层 DWS 层
dws_lijinquan.dws_xbd_mxm_memberinfo_dim_t
dws_shihaihong.dws_xbd_mxm_memberinfo_dim_t:
Python:
#!/bin/python3
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from datetime import datetime
import os
import re
# 宽表加工
# pyspark + spark sql
# 宽表加工
# pyspark + spark sql
spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '6g') \
.config('spark.executor.memory', '6g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()
# 会员检测临时表
def do_member_tmp_check():
sql = '''
SELECT
member_id AS member, -- 会员卡号
MIN(CASE WHEN detect_time = max_detect_time THEN erp_code END)
AS rec_detect_store, -- 最近检测门店
max(detect_time) AS rec_detect_date, -- 最近检测时间
count(1) AS check_count, -- 累计检测次数
min(substr(detect_time,1,10)) AS filing_date, -- 建档时间
min(CASE WHEN detect_time = min_detect_time THEN erp_code END)
AS store_name, -- 建档门店名称
max(extend) AS is_anamnesis, -- 有无既往病史
CASE WHEN COUNT(bec_chr_mbr_date) > 0 THEN 1 ELSE 0 END AS
is_chr_mbr -- 是否特慢病会员
FROM (
SELECT
*,
MIN(detect_time) OVER (PARTITION BY member_id) AS
min_detect_time,
MAX(detect_time) OVER (PARTITION BY member_id) AS
max_detect_time
FROMchange_shihaihong.his_chronic_patient_info_new
)
GROUP BY member_id
'''
df = spark.sql(sql)
df.show()
# 保存到 hive: change_shihaihong.member_check
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/change/member_check")
os.system("""
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists change_shihaihong location
"/zhiyun/shihaihong/change";
'
""")
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/change/member_check")
\
.saveAsTable("change_shihaihong.member_check")
print("写入 hive 表成功")
# 会员订单情况临时表
def do_member_tmp_sale():
@F.udf()
def handle_pay_fav_type(val1,val2,val3):
payments = {
"银行卡": val1,
"手机支付": val2,
"现金": val3
}
payment_tuples = list(payments.items())
payment_tuples.sort(key=lambda x: (-x[1], x[0])) #使用
负值来确保从大到小排序
result_strings = [method for method, _ in payment_tuples]
# 使用>符号连接字符串result = '>'.join(result_strings)
return result
sql='''
select
m.member, -- 会员卡号
count(1) as order_total, -- 总订单数
round(sum(m.precash),2) as order_amount, -- 消费总额
max(m.starttime) as last_order_date, -- 最后一单日期
min(m.starttime) as first_order_date, -- 首单日期
count(case when m.starttime >= date_sub('2018-01-01',
30) then 1 end) as order_30, -- 30 天订单量
count(case when m.starttime >= date_sub('2018-01-01',
90) then 1 end) as order_90, -- 90 天订单量
sum(case when m.starttime >= date_sub('2018-01-01', 30)
then round(m.precash,2) else 0 end) as amount_30, -- 30 天消费金
额
sum(case when m.starttime >= date_sub('2018-01-01', 90)
then round(m.precash,2) else 0 end) as amount_90, -- 90 天消费金
额
sum(case when m.starttime >= date_sub('2018-01-01',
180) then round(m.precash,2) else 0 end) as amount_180, -- 180
天消费金额
count(case when eusp.paytype = '银行卡 ' then 1 end) as
bank_count,
count(case when eusp.paytype = '手机支付' then 1 end)
as credit_card_count,
count(case when eusp.paytype = '现金 ' then 1 end) as
cash_count,
'' as pay_fav_type
from change_shihaihong.erp_u_sale_m_inc_new m
left join change_shihaihong.erp_u_sale_pay_inc_new eusp
on m.saleno = eusp.saleno -- 确保连接条件是正确的
group by m.member
'''
df = spark.sql(sql)
df = df.withColumn("pay_fav_type",
handle_pay_fav_type("bank_count","credit_card_count","cash_cou
nt"))
df.drop("bank_count")
df.drop("credit_card_count")
df.drop("cash_count")
df.show()# 保存到 Hive: change_shihaihong.member_sale
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat", "parquet") \
.option("location",
"/zhiyun/shihaihong/change/member_sale") \
.saveAsTable("change_shihaihong.member_sale")
print("临时表保存成功")
print("写入 hive 表成功")
# 宽表加工
def do_member_table():
# 主表可以列最多的那个表
# 会员的订单情况可以用子查询统计出来, 也可以使用临时表
sql = '''
with t as (select * from
change_shihaihong.crm_user_base_info_his_new )
select
t.user_id as mbr_code, -- 会员编码
t.user_type as mbr_type, -- 会员类型
t.source as mbr_resource, -- 会员来源
m.memcardno as mbr_cardno, -- 会员卡号
t.erp_code as store_code, -- 注册门店编码
t.active_time as sto_reg_date, -- 门店注册日期
"" as reg_platform, -- 注册外部平台
"" as platform_reg_date, -- 外部平台注册时间
t.name as name, -- 姓名
t.sex as gender, -- 性别
t.birthday as birthdate, -- 出生日期
t.age as age, -- 年龄
t.id_card_no as mbr_id_card, -- 身份证号
t.social_insurance_no as social_security_no, -- 社保卡号
t.education as edu_background, -- 教育背景
t.job as profession, -- 职业
"未知" as is_marriage, -- 婚姻状况
"无" as have_children, -- 是否有孩
t.address as address, -- 通信地址
"" as region, -- 区域
m.province as province, -- 省
m.city as city, -- 城市t.last_subscribe_time as cancel_date, -- 注销时间
m.tel as phone, -- 联系电话
m.handset as cell_phone, -- 手机号
t.email as email, -- 邮箱
t.wechat as wechat, -- 微信账号
t.webo as weibo, -- 微博账号
"" as alipay, -- 支付宝账号
"" as app, -- APP 账号
sale.order_total as order_total, -- 总订单数
sale.order_amount as order_amount, -- 消费总额
sale.last_order_date as last_order_date, -- 最后一单日期
sale.first_order_date as first_order_date, -- 首单日期
sale.order_30 as order_30, -- 30 天订单量
sale.order_90 as order_90, -- 90 天订单量
sale.amount_30 as amount_30, -- 30 天消费金额
sale.amount_90 as amount_90, -- 90 天消费金额
sale.amount_180 as amount_180, -- 180 天消费金额
sale.pay_fav_type as pay_fav_type, -- 付款方式偏爱排行
g.groupname as group_name, -- 会员分组
"" as ware_buy_sort, -- 药品购买排行
m.ness as sickness_motion, -- 疾病关注
check.rec_detect_store as rec_detect_store, -- 最近检测门
店
check.rec_detect_date as rec_detect_date, -- 最近检测时间
check.check_count as check_count, -- 累计检测次数
check.filing_date as filing_date, -- 建档时间
check.store_name as store_name, -- 建档门店名称
check.is_anamnesis as is_anamnesis, -- 有无既往病史
check.is_chr_mbr as is_chr_mbr, -- 是否特慢病会员
current_timestamp as etl_time, -- ETL 时间
"ETL by qinyuxiao" as comments -- 备注信息
from t
left join change_shihaihong.erp_u_memcard_reg_full_new m on
m.scrm_userid = t.user_id
left join change_shihaihong.member_sale sale on sale.member =
m.memcardno
left join change_shihaihong.member_check check on
check.member = m.memcardno
left join dwd_qinyuxiao.erp_c_memcard_class_group g on
sale.order_amount >=g.lg and sale.order_amount < g.gt
'''
df = spark.sql(sql)df.show()
# 保存
# 保存到 hive: dws_xbd_mxm_memberinfo_dim_t
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/dws/dws_xbd_mxm_memberinfo_dim_t")
os.system("""
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists dws_shihaihong location
"/zhiyun/shihaihong/dws";
'
""")
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/dws/dws_xbd_mxm_memb
erinfo_dim_t"). \
saveAsTable("dws_shihaihong.dws_xbd_mxm_memberinfo_dim_t")
print("写入 hive 表成功")
# 验证数据
# 注意总数据量应该跟 CRM 表一致 168W 整 多一条都不行
# 计算原表的总记录数
original_count_sql = "select count(1) from
change_shihaihong.crm_user_base_info_his_new"
original_count =
spark.sql(original_count_sql).collect()[0][0]
print(f"原表总记录数: {original_count}")
# 计算新表的总记录数
new_count_sql = "select count(1) from
dws_shihaihong.dws_xbd_mxm_memberinfo_dim_t"
new_count = spark.sql(new_count_sql).collect()[0][0]
print(f"新表总记录数: {new_count}")
do_member_tmp_check()
do_member_tmp_sale()
do_member_table()
print("宽表加工完成")# 部署

任务调度:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2244982.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

零基础入门Flink,掌握基本使用方法

Flink基本概念 首先来讲&#xff0c;Flink是一个面向数据流处理和批处理的分布式开源计算框架。 那么&#xff0c;流处理和批处理分别处理什么样的数据呢&#xff0c;这就涉及两个概念-无界流和有界流 无界流VS有界流 任何类型的数据都可以形成流数据&#xff0c;比如用户…

Linux设置以及软件的安装(hadoop集群安装02)

一、Linux的常见设置 1、设置静态IP vi /etc/sysconfig/network-scripts/ifcfg-ens33 如何查看自己的虚拟机的网关&#xff1a; 完整的配置&#xff08;不要拷贝我的&#xff09;&#xff1a; TYPE"Ethernet" PROXY_METHOD"none" BROWSER_ONLY"no&…

数据中台方法论:数据汇聚

文章目录 一、数据汇聚概述二、 汇聚数据类型2.1 结构化数据2.2 半结构化数据2.3 非结构化数据 三、汇聚数据模式四、汇聚数据方法四、数据汇聚工具五、数据汇聚使用经验 数据小伙伴们&#xff0c;之前咱们长篇大论的聊聊过【数据中台建设方法论从0到1】&#xff0c;从数据中台…

【Maven】nexus 配置私有仓库配置【转】

介绍&#xff1a;【Maven】Nexus几个仓库的介绍-CSDN博客 一、仓库类型 proxy 远程仓库的代理&#xff0c;比如说nexus配置了一个central repository的proxy,当用户向这个proxy请求一个artifact的时候&#xff0c;会现在本地查找&#xff0c;如果找不到&#xff0c;则会从远程…

3C产品说明书电子化转变:用户体验、环保与商业机遇的共赢

在科技日新月异的当代社会&#xff0c;3C产品&#xff08;涵盖计算机类、通信类和消费类电子产品&#xff09;已成为我们日常生活中不可或缺的重要元素。与此同时&#xff0c;这些产品的配套说明书也经历了一场从纸质到电子化的深刻变革。这一转变不仅体现了技术的飞速进步&…

【YOLOv8】安卓端部署-2-项目实战

文章目录 1 准备Android项目文件1.1 解压文件1.2 放置ncnn模型文件1.3 放置ncnn和opencv的android文件1.4 修改CMakeLists.txt文件 2 手机连接电脑并编译软件2.1 编译软件2.2 更新配置及布局2.3 编译2.4 连接手机 3 自己数据集训练模型的部署4 参考 1 准备Android项目文件 1.1…

虚拟网卡驱动和DM9000C移植

网卡驱动程序框架 网卡驱动程序“收发功能”&#xff1a; 只要把上层的数据发给网卡&#xff0c;从网卡来的数据构造成包给上层即可。网卡只需要 “socket”编程&#xff0c;不需要打开某设备。 驱动程序都是以面向对象的思想写的&#xff0c;都有相关的结构体。 编程步骤 …

Vue3 + Vite 项目引入 Typescript

文章目录 一、TypeScript简介二、TypeScript 开发环境搭建三、编译方式1. 自动编译单个文件2. 自动编译整个项目 四、配置文件1. compilerOptions基本选项严格模式相关选项&#xff08;启用 strict 后自动包含这些&#xff09;模块与导入相关选项 2. include 和 excludeinclude…

Cyberchef使用功能之-多种压缩/解压缩操作对比

cyberchef的compression操作大类中有大量的压缩和解压缩操作&#xff0c;每种操作的功能和区别是什么&#xff0c;本章将进行讲解&#xff0c;作为我的专栏《Cyberchef 从入门到精通教程》中的一篇&#xff0c;详见这里。 关于文件格式和压缩算法的理论部分在之前的文章《压缩…

Istio分布式链路监控搭建:Jaeger与Zipkin

分布式追踪定义 分布式追踪是一种用来跟踪分布式系统中请求的方法&#xff0c;它可以帮助用户更好地理解、控制和优化分布式系统。分布式追踪中用到了两个概念&#xff1a;TraceID 和 SpanID。 TraceID 是一个全局唯一的 ID&#xff0c;用来标识一个请求的追踪信息。一个请求…

Linux修改/etc/hosts不起作用(ping: xxx: Name or service not known)的解决方法——开启NSCD

​ 问题描述 起因是我在实验室云资源池的一台虚拟机&#xff08;CentOS 8.5&#xff09;上的/etc/hosts文件中为Fabric网络节点的域名指定了IP&#xff1a; IP可以ping通&#xff0c;但是ping域名时提示ping: xxx: Name or service not known。 问题本身应该是Linux通用的&a…

Python中Tushare(金融数据库)入门详解

文章目录 Python中Tushare&#xff08;金融数据库&#xff09;入门详解一、引言二、安装与注册1、安装Tushare2、注册与获取Token 三、Tushare基本使用1、设置Token2、获取数据2.1、获取股票基础信息2.2、获取交易日历2.3、获取A股日线行情2.4、获取沪股通和深股通成份股2.5、获…

【网络】网络抓包与协议分析

网络抓包与协议分析 一. 以太网帧格式分析 这是以太网数据帧的基本格式&#xff0c;包含目的地址(6 Byte)、源地址(6 Byte)、类型(2 Byte)、数据(46~1500 Byte)、FCS(4 Byte)。 Mac 地址类型 分为单播地址、组播地址、广播地址。 单播地址&#xff1a;是指第一个字节的最低位…

RabbitMQ的工作队列在Spring Boot中实现(详解常⽤的⼯作模式)

上文着重介绍RabbitMQ 七种工作模式介绍RabbitMQ 七种工作模式介绍_rabbitmq 工作模式-CSDN博客 本篇讲解如何在Spring环境下进⾏RabbitMQ的开发.&#xff08;只演⽰部分常⽤的⼯作模式&#xff09; 目录 引⼊依赖 一.工作队列模式 二.Publish/Subscribe(发布订阅模式) …

python学习_3.正则表达式

来源:B站/麦叔编程 1. 正则表达式的7个境界 假设有一段文字&#xff1a; text 身高:178&#xff0c;体重&#xff1a;168&#xff0c;学号&#xff1a;123456&#xff0c;密码:9527要确定文本中是否包含数字123456&#xff0c;我们可以用in运算符&#xff0c;也可以使用inde…

Python学习------第十天

数据容器-----元组 定义格式&#xff0c;特点&#xff0c;相关操作 元组一旦定义&#xff0c;就无法修改 元组内只有一个数据&#xff0c;后面必须加逗号 """ #元组 (1,"hello",True) #定义元组 t1 (1,"hello") t2 () t3 tuple() prin…

nodejs基于微信小程序的云校园的设计与实现

摘 要 相比于传统的校园管理方式&#xff0c;智能化的管理方式可以大幅提高校园的管理效率&#xff0c;实现了云校园管理的标准化、制度化、程序化的管理&#xff0c;有效地防止了云校园信息的不规范管理&#xff0c;提高了信息的处理速度和精确度&#xff0c;能够及时、准确地…

Excel——宏教程(精简版)

一、宏的简介 1、什么是宏&#xff1f; Excel宏是一种自动化工具&#xff0c;它允许用户录制一系列操作并将其转换为VBA(Visual Basic for Applications)代码。这样&#xff0c;用户可以在需要时执行这些操作&#xff0c;以自动化Excel任务。 2、宏的优点 我们可以利用宏来…

绿光一字线激光模组:工业制造与科技创新的得力助手

在现代工业制造和科技创新领域&#xff0c;绿光一字线激光模组以其独特的性能和广泛的应用前景&#xff0c;成为了不可或缺的关键设备。这种激光模组能够发射出一条明亮且精确的绿色激光线&#xff0c;具有高精度、高稳定性和长寿命的特点&#xff0c;为各种精密加工和测量需求…

Python Turtle绘图:重现汤姆劈树的经典瞬间

Python Turtle绘图&#xff1a;重现汤姆劈树的经典瞬间 &#x1f980; 前言 &#x1f980;&#x1f41e;往期绘画&#x1f41e;&#x1f40b; 效果图 &#x1f40b;&#x1f409; 代码 &#x1f409; &#x1f980; 前言 &#x1f980; 《汤姆与杰瑞》&#xff08;Tom and Jerr…