pyspark_自定义udf_解析json列【附代码】

news2024/12/24 18:31:33

pyspark_自定义udf_解析json列【附代码】

    • 一、背景:
    • 二、调研方案:
    • 三、利用Pyspark + udf自定义函数实现大数据并行计算
      • 整体流程
      • 案例代码运行结果:
      • 案例代码:
      • 代码地址:
      • 代码

一、背景:

车联网数据有很多车的时序数据,现有一套云端算法需要对每一辆车历史数据进行计算得到结果,每日将全部车算一遍存到hive数仓中

二、调研方案:

1、python脚本运行,利用pyhive拉取数据到pandas进行处理,将结果to_parquet后用hdfs_client存到数仓中
问题:数据量上亿,对内存要求极大,无法直接拉取到python脚本所在的服务器内存中运算
2、将算法内容改写成SQL或者SPARKSQL,每日调度
问题:代码改写SQL要重新梳理代码逻辑,且很多函数SQL实现复杂,有些函数不支持

三、利用Pyspark + udf自定义函数实现大数据并行计算

整体流程

1、pyspark-spark sql拉取数据到spark df
2、spark df 按 车辆唯一标识分组,执行udf自定义函数(算法),每一个分组的返回值是String类型的json字符串,执行完成后返回的是result_df, spark_df【索引(车辆唯一标识)、数据(String类型的json字符串)】
3、解析json并拼接成spark_df
4、spark_df生成临时表,将临时表数据写入hive数仓

案例代码运行结果:

案例代码运行结果

案例代码:

代码地址:

https://github.com/SeafyLiang/Python_study/blob/master/pyspark_demo/pyspark_udf_json.py

代码

from pyspark.sql import SparkSession  # SparkConf、SparkContext 和 SQLContext 都已经被封装在 SparkSession
from pyspark.sql import functions as F
import pandas as pd
from pyspark.sql import types as T  # spark df的数据类型
from pyspark.sql.functions import array, from_json, col, explode
import sys


def get_auc(id, date, vol):
    temp_df = pd.DataFrame({
        'id': id,
        'date': date,
        'vol': vol
    })
    temp_df['date'] = temp_df['date'].apply(lambda x: x + 'aaa')
    temp_df_json = temp_df.to_json(orient='records')  # orient='records'是关键,可以把json转成array<json>
    return temp_df_json


if __name__ == '__main__':
    spark = SparkSession.builder.appName('test_sklearn_pyspark') \
        .config("spark.sql.warehouse.dir", "hdfs://nameservice1/user/hive/warehouse") \
        .config("hive.exec.dynamici.partition", True) \
        .config("hive.exec.dynamic.partition.mode", "nonstrict") \
        .config("spark.sql.crossJoin.enabled", "true"). \
        config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .enableHiveSupport() \
        .getOrCreate()
    print(spark)

    temp_dict = {
        'id': [1, 2, 3, 4, 1, 1],
        'date': ['2022-05-01', '2022-05-02', '2022-05-03', '2022-05-04', '2022-05-05', '2022-05-05'],
        'vol': [68.22, 45.10, 899.33, 45.11, 32.22, 99.33]
    }
    tempdf = pd.DataFrame(temp_dict)
    df = spark.createDataFrame(tempdf)

    # 自定义函数(计算AUC),并且变成UDF
    """注意:自定义函数的重点在于定义返回值的数据类型,这个返回值的数据类型必须与该函数return值的数据类型一致,否则会报错。
    该例子中,该函数return的值auc,是string类型,在将该函数定义成udf的时候,指定的返回值类型,也必须是string!!"""

    get_auc_udfs = F.udf(get_auc, returnType=T.StringType())  # 定义成udf,并且此udf的返回值类型为string

    # 分组聚合操作:分别计算每月样本量、逾期率、AUC
    """使用上面定义的UDF,结合F.collect_list(col)来实现UDAF的功能。
    F.collect_lits(col)的作用是将列col的值变成一个list返回."""

    df_result = df.groupby('id').agg(get_auc_udfs(
        F.collect_list(F.col('id').cast('int')),
        F.collect_list(F.col('date').cast('string')),
        F.collect_list(F.col('vol').cast('double'))
    ).alias('json_str'))  # 利用自定的UDF,实现指定聚合计算

    df_result.show(truncate=False)

    opn = 2
    if opn == 1:
        # 【不推荐】方式一:spark_df转成pandas_df,拼接json成pandas_all_df后再转成spark_df写入
        # 数据量大时会把大量数据拉到driver本地,导致内存溢出
        all_result_df = pd.DataFrame()
        df_result_pandas = df_result.toPandas()
        for row in df_result_pandas.itertuples():
            print(row.json_str)
            temp_df = pd.read_json(row.json_str)
            all_result_df = pd.concat([all_result_df, temp_df], ignore_index=True)
        print(all_result_df)
    elif opn == 2:
        # 【推荐】方式二:解析json成新的spark_df
        json_schema = T.ArrayType(
            T.StructType().add("id", T.IntegerType()).add("date", T.StringType()).add("vol", T.DoubleType()))
        df_result = df_result.withColumn('parsed_json', from_json(col('json_str'), json_schema))
        df_result.show()
        df_result.select('parsed_json').show(3, truncate=False)
        df_result = df_result.select(explode(col('parsed_json')).alias('parsed_json_explode'))
        df_result.show()
        df_result = df_result.select(col('parsed_json_explode.id').alias('id'),
                                     col('parsed_json_explode.date').alias('date'),
                                     col('parsed_json_explode.vol').alias('vol'))
        df_result.show()
        print('df_result:', df_result.count())
        # 写入hive表
        # dt_before1day = sys.argv[1]
        # print('dt_before1day:', dt_before1day)
        # # df 转为临时表/临时视图
        # df_result.createOrReplaceTempView("df_tmp_view")
        # # spark.sql 插入hive
        # spark.sql("""
        #         insert overwrite table table_name partition(dt='{DT}')
        #         select
        #         *
        #         from df_tmp_view
        #         """.format(DT=dt_before1day))
        # print('spark write end!')

    print('end')

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/827534.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【MATLAB第63期】基于MATLAB的改进敏感性分析方法IPCC,拥挤距离与皮尔逊系数法结合实现回归与分类预测

【MATLAB第63期】基于MATLAB的改进敏感性分析方法IPCC&#xff0c;拥挤距离与皮尔逊系数法结合实现回归与分类预测 思路 考虑拥挤距离指标与PCC皮尔逊相关系数法相结合&#xff0c;对回归或分类数据进行降维&#xff0c;通过SVM支持向量机交叉验证得到平均指标&#xff0c;来…

《华为认证》SR MPLS BE配置

实验需求&#xff1a;在PE1和PE3之间建立mp-bgp邻居传递CE1和CE2的私网路由&#xff0c;并且使用SR mpls BE的方式传递私网流量 实验步骤 步骤1&#xff1a;配置设备接口ip地址以及AS 100内的igp协议&#xff08;略&#xff09; 步骤2&#xff1a;AS 100内的设备开启mpls &am…

Vue2 第十四节 scoped样式和本地存储

1.scoped样式 2.本地存储 一.scoped样式 ① 作用&#xff1a;让样式在局部生效&#xff0c;防止冲突 ② 写法&#xff1a;<style scoped> ③ 代码示例&#xff1a; <style scoped> .demo {background-color: lightblue; } </style> ④ scoped样式一般…

【redis】创建集群

这里介绍的是创建redis集群的方式&#xff0c;一种是通过create-cluster配置文件创建部署在一个物理机上的伪集群&#xff0c;一种是先在不同物理机启动单体redis&#xff0c;然后通过命令行使这些redis加入集群的方式。 一&#xff0c;通过配置文件创建伪集群 进入redis源码…

分布式应用:ELK企业级日志分析系统

目录 一、理论 1.ELK 2.ELK场景 3.完整日志系统基本特征 4.ELK 的工作原理 5.ELK集群准备 6.Elasticsearch部署&#xff08;在Node1、Node2节点上操作&#xff09; 7.Logstash 部署&#xff08;在 Apache 节点上操作&#xff09; 8.Kiabana 部署&#xff08;在 Node1 节点…

C++设计模式之适配器设计模式

文章目录 C适配器设计模式什么是适配器设计模式该模式有什么优缺点优点缺点 如何使用 C适配器设计模式 什么是适配器设计模式 适配器设计模式是一种行为型设计模式&#xff0c;它允许你将两个不兼容的接口组合在一起&#xff0c;使它们能够协同工作。 该模式有什么优缺点 优…

elementUI全屏loading的使用(白屏的解决方案)

官网中有使用方法&#xff0c;但是我实际上手之后会出现白屏&#xff0c;解决办法如下&#xff1a; <el-button type"text" size"small" click"delRow(scope)"> 删除</el-button>loading: false, // loading 动画loadingInstance…

玩转Java IO流:轻松读写文件、网络

申明&#xff1a;本人于公众号Java筑基期&#xff0c;CSDN先后发当前文章&#xff0c;标明原创&#xff0c;转载二次发文请注明转载公众号&#xff0c;另外请不要再标原创 &#xff0c;注意违规 字符流和字节流 在Java中&#xff0c;IO&#xff08;输入输出&#xff09;操作涉…

TCP三次握手与四次断开

TCP三次握手机制 三次握手是指建立一个TCP连接时&#xff0c;需要客户端和服务器总共发送3个包。进行三次握手的主要作用就是为了确认双方的接收能力和发送能力是否正常、指定自己的初始化序列号为后面的可靠性传送做准备。 1、客户端发送建立TCP连接的请求报文&#xff0c;其…

IDEA设置快捷操作

步骤&#xff1a; 1、 2、 3、 4、 然后直接用就可以啦 常用的接口测试模板&#xff1a; given().contentType(JSON).body($requestBody$).log().all().when().post($path$).then().log().all().statusCode(200);given().contentType(ContentType.JSON).body().log().all().w…

使用低代码平台提高生产力

一、前言 低代码平台的概念很火爆&#xff0c;产品也是鱼龙混杂。 对于开发人员来说&#xff0c;在使用绝大部分低代码平台的时候都会遇到一个致命的问题&#xff1a;我在上面做的项目无法得到源码&#xff0c;完全黑盒。一旦我的需求平台满足不了&#xff0c;那就是无解。 与其…

面试热题(无重复字符的最长子串)

无重复字符的最长子串 给定一个字符串 s &#xff0c;请你找出其中不含有重复字符的 最长子串 的长度。 输入: s "abcabcbb" 输出: 3 解释: 因为无重复字符的最长子串是 "abc"&#xff0c;所以其长度为 3。 解法一&#xff1a; public int lengthOfLonge…

UE4/5 PoseDriver 动画蓝图节点使用

PoseDriver节点可以进行Pose的比对&#xff0c;从而针对不同Pose生成不同权重数值&#xff0c;权重数值可应用至MorphTarget上使动画更加逼真&#xff0c;或应用至角色挂件上&#xff0c;制作出类惯性或弹簧的附加效果。 1.创建Pose 这里创建Box作为演示&#xff0c;下图大Bo…

【更新】119所院校考研重点勾画更新预告!

截至目前&#xff0c;我已经发布了47篇不同院校的择校分析。发布了87套名校信号考研真题以及119所不同院校的考研知识点重点勾画。 另外为了更好服务已经报名的同学&#xff0c;24梦马全程班也到了收尾的阶段。即将封班&#xff01;需要报名的同学抓紧啦&#xff01; 去年开始…

4-百度地图

4-百度地图 一 百度地图 1 前期准备 H5端和PC端,对接百度提供JavaScript API。 移动端,对接百度android SDK或ios SDK (1)打开百度地图开放平台 地址:https://lbsyun.baidu.com/ (2)选中开发文档——JavaScript Api 按照文档步骤开通百度开放平台并申请密钥 2 展示地…

Mysql5.8 Windows安装

1、下载安装包 MySQL :: Download MySQL Community Server 下载后解压到某个文件夹 2、配置环境变量 3.创建my.ini文件 [mysqld] # 设置3306端口 port3306 # 设置mysql的安装目录 basedirE:\\software\\mysql\\mysql-8.0.11-winx64 # 切记此处一定要用双斜杠\\&#xff0c;…

小白到运维工程师自学之路 第六十三集 (dockerfile安装sshd、httpd、nginx)

一、概述 Dockerfile的指令根据作用可以分为两种&#xff0c;构建指令和设置指令。构建指令用于构建image&#xff0c;其指定的操作不会在运行image的容器上执行&#xff1b;设置指令用于设置image的属性&#xff0c;其指定的操作将在运行image的容器中执行。 1、FROM 镜像:T…

Profinet转Modbus RTU从站模式的配置流程

兴达易控Profinet转Modbus RTU从站模式的配置流程需要按照以下步骤进行。首先&#xff0c;确保Profinet主站和Modbus RTU从站的设备之间有正确的连接&#xff0c;包括电气连接和网络连接。然后&#xff0c;在Profinet主站上设置适当的通信参数。 下面是具体操作&#xff1a;创…

electron+vue+ts窗口间通信

文章目录 一. 目的二.逻辑分析三. 代码示例 "types/node": "^20.3.1","vitejs/plugin-vue": "^4.1.0","vueuse/electron": "^10.2.1","electron": "^25.2.0","electron-packager":…

python基础2——数据类型

文章目录 一、字符串处理1.1 占位符1.2 拼接符1.3 统计字符串长度1.4 切片取值1.5 内置字符串处理方法 二、组合数据类型2.1 列表2.2 元组2.3 集合2.4 字典 三、数据类型转换 一、字符串处理 1.1 占位符 可以使用%s占位符在字符串中引用变量。 1.有两种写法。 name "qi…