PySpark sql 中一些函数的总结(持续更新)

news2024/11/17 5:43:07

看到什么函数就记录了,没有什么逻辑关系

spark 中DataFrame 和 pandas的DataFrame的区别

 1. F.split() 和 df.withColumn()

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *


df = spark.sql(sql) 

split_col = F.split(df_order['xxxx'], '\|')  # 对列切分
df_order = df_order.withColumn('llat', split_col.getItem(0).cast('float')) #withColumn 新增一列,第二个参数传入已有列的Column对象,否则会报错

2.   F.collect_list() 和 df.groupby()

F.collect_list() 聚合函数,返回一个list

Spark SQL/DataFrame/DataSet操作(三)-----分组聚合groupBy_微步229的博客-CSDN博客

df_group = df_order.groupby(['user_id', 'loc_h3_id']).agg(
        F.collect_list("loaded_latlon").alias("loaded_latlon"),
        F.collect_list("order_datetime").alias("time"),
        F.collect_list("sl_distance").alias("sl_distance")). \
        filter(F.size('loaded_latlon') >= 2).persist() # 持久化

3.  F.udf()   第一个传入函数名,函数可以用lambda表达式;第二个是返回类型,如果有多个返回值,用StructType()

def get_h3_heat_schema(res=12):
    schema = StructType([
        StructField("h3_res{}_d7".format(res), IntegerType(), False),
        StructField("h3_res{}_d14".format(res), IntegerType(), False),
        StructField("h3_res{}_d30".format(res), IntegerType(), False),
        StructField("h3_res{}_d60".format(res), IntegerType(), False),
        StructField("h3_res{}_d90".format(res), IntegerType(), False),
        StructField("h3_res{}_d120".format(res), IntegerType(), False),
        StructField("h3_res{}_decay7".format(res), FloatType(), False),
        StructField("h3_res{}_decay30".format(res), FloatType(), False),
    ])
    return schema

def get_h3_features(data):
    # 获取h3相关特征feature
    feature_list = []
    interval_list = [7, 14, 30, 60, 90, 120]
    for interval in interval_list:
        feature_list.append(len(list(filter(lambda x: x <= interval, data))))

    head_decay_7, head_decay_30 = 0, 0
    for time in data:
        head_decay_7 += math.pow(DECAY_7, time)
        head_decay_30 += math.pow(DECAY_30, time)
    feature_list.extend([round(head_decay_7, 2), round(head_decay_30, 2)])
    return feature_list

schema = get_h3_heat_schema(11)
udf_get_h3_features = F.udf(get_h3_features, schema)
df = df_order.groupBy('load_h3_11') \
            .agg(F.collect_list('time_days').alias("time_days"), \
                 F.countDistinct('user_id').alias("h3_11_user_cnt")) \
            .withColumn('h3_11_heat', udf_get_h3_features(F.col('time_days'))) \
            .select('load_h3_11', 'h3_11_user_cnt', 'h3_11_heat.*')

简单的可以使用以下方式注册udf函数

@F.udf(returnType=IntegerType())
def get_time_ago(order_date, end_date):
    order_date = datetime.strptime(order_date, '%Y-%m-%d %H:%M:%S')
    return (end_date - order_date).days + 1

4. .alias()  # alias()起别名后在dataframe会增加一个新列,和withColumn()效果一致

5.  F.struct()   # 这种数据结构同C语言的结构体,内部可以包含不同类型的数据

df = df_order.groupBy('loc_h3_11') \
            .agg(F.collect_list(F.struct('load_h3_12', 'time_days')).alias("tuple")) \
            .withColumn('trans_prob', get_trans_prob(F.col('tuple'))) \
            .drop('tuple')

6.  F.lit(x)  创建一列值为x的列,也可以把这个值作为参数

第一种用法:

df.withColumn("spark_user",F.lit(True))

结果如下: 

 第二种用法:

@F.udf(returnType=StringType())
def lat_lng_to_h3(lat, lng, h3_level=12):
    """
    map lng lat to h3
    """
    return h3.geo_to_h3(lat, lng, h3_level)

df_order = df_order.withColumn('loc_h3_11', lat_lng_to_h3(F.col('alat'), F.col('alng'), F.lit(11)))  #把一列的11作为参数传入

7. F.countDistinct()   去重统计

8. df.select()  和 df.selectExpr()  

Spark---DataFrame学习(二)——select、selectExpr函数_stan1111的博客-CSDN博客_selectexpr

9.   两个表进行关联的方式

cond = [order_df.user_id == personalized_df.user_id, order_df.loc_h3_9 == personalized_df.loc_h3_id]
    joined_df = order_df.join(personalized_df, on=cond, how='inner'). \
        drop(personalized_df.user_id)  # on传入的是条件,how传入的是关联方式,有inner、left、right、outer,关联完记得要把两个表里的相同列名删除一个,否则会多出来一个

10.  F.explode()  把列数据变为多个行数据

spark之explode()方法--- 行转列_卢子墨的博客-CSDN博客_spark中explode

11. 将列中为空的值过滤掉

order_df = order_df.filter(F.col('slat').isNotNull())

12. F.filter(condition)  # 保留满足条件的值

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/184450.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

打开Jupyter notebook没有虚拟环境内核

放了个假&#xff0c;今天准备打开anaconda中的jupyter notebook重温一下之前的文档&#xff0c;结果双击Anaconda-Navigator出现了【应用程序“Anaconda-Navigator”无法打开】&#xff0c;想着利用终端也可以操作&#xff0c;于是使用下面命令激活了环境并打开jupter noteboo…

JZ11 旋转数组的最小数字

【答案解析】&#xff1a;暴力破解&#xff1a;遍历数组找出最小值即可 更优思想&#xff1a;采用二分查找&#xff0c;这个题主要分析三种旋转情况[1, 2, 3, 4, 5]&#xff0c;使用中间值与右端进行比较。 1. 中间大于右边[3, 4, 5, 1, 2]&#xff0c;这种情况下&#xff0c;最…

Spring Framework-01

Spring两大核心技术 1.IOC 2.AOP Spring 1 Spring Framework 1.核心层 Core Container:核心容器&#xff0c;这个模块是Spring最核心的模块&#xff0c;其他的都需要依赖该模块 2.AOP层 AOP:面向切面编程&#xff0c;它依赖核心层容器&#xff0c;目的是在不改变原有代码的前…

国内那么多的低代码平台,究竟哪家比较强?

国内有特色的低代码快速开发平台产品有哪些&#xff1f;这篇就来介绍下目前市面上主要的几家低代码开发平台&#xff01; 简道云、明道云、IVX这几家目前是无代码赛道的明星选手&#xff0c;在市场综合表现上名列前茅。宜创、红圈营销虽也极具潜力&#xff0c;但在市场表现力上…

virsh日常管理命令

virsh日常管理命令virsh日常管理命令创建vm实例规范网卡编号从0开始Centos7最小化安装环境勾选VM实例建议只分配/分区查看vm列表启动vm实例--startvm实例(软)关机--shutdownvm实例断电关机--destroyvm实例(软)重启--rebootvm实例重命名--domrenamevm实例挂起--suspend查看vm实例…

Elasticsearch:从 Kafka 到 Elasticsearch 的实时用户配置文件数据管道

如今&#xff0c;网络服务、数字媒体、传感器日志数据等众多来源产生了大量数据&#xff0c;只有一小部分数据得到妥善管理或利用来创造价值。读取大量数据、处理数据并根据这些数据采取行动比以往任何时候都更具挑战性。 在这篇文章中&#xff0c;我试图展示&#xff1a; 在…

Discord教程:Discord账号注册、Discord多账号登录和管理

Discord最初是为游戏玩家在群聊和交流而创建的。但自疫情爆发以来&#xff0c;许多企业、公司和初创公司发现&#xff0c;居家办公时使用Discord进行日常沟通非常便捷。 Discord不再是仅限于游戏玩家&#xff0c;平台建立了不同于其他任何社交空间的新空间&#xff0c;封闭又开…

9、矩阵的简单运算

目录 一、矩阵的加减运算 二、矩阵的乘方运算 1.数与矩阵的乘法 2.矩阵与矩阵的乘法 三、矩阵的除法 四、矩阵的幂运算 五、矩阵元素的查找 六、矩阵元素的排序 七、矩阵元素的求和 八、矩阵元素的求积 九、矩阵元素的差分 一、矩阵的加减运算 进行矩阵加法、减法运…

前端学习第四站——CSS全面学习基础篇

目录 一、基础认知 1.1 CSS的介绍 1.2 语法规则 1.3 CSS初体验 1.4 CSS初识-小结 2.1 CSS引入方式 二、基础选择器 1.1 选择器的作用 1.2 标签选择器 1.3. 类选择器 1.4 id选择器 补充&#xff1a;类和id的区别 1.5 通配符选择器 三、字体和文本样式 1. 字体样式 …

python数据可视化开发(2):pandas读取Excel的数据格式处理(数据读取、指定列数据、DataFrame转json、数学运算、透视表运算输出)

系列文章目录 python开发低代码数据可视化大屏&#xff1a;pandas.read_excel读取表格python实现直接读取excle数据实现的百度地图标注python数据可视化开发(1)&#xff1a;Matplotlib库基础知识 文章目录系列文章目录前言实践目标一、读取Excel数据read_excel参数说明读取全部…

Launcher应用列表内搜索框显示异常

问题描述 应用列表界面搜索框显示异常。本地试验后发现以下规律。 1、删除几个底边栏图标 2、旋转屏幕 3、进入应用列表&#xff0c;观察上方搜索框显示 问题分析 此问题是launcher内部界面显示问题&#xff0c;比较初级。找到规律后&#xff0c;发现应用列表内搜索框和底边…

【Hadoop】MapReduce数据倾斜问题解决方案

默认情况下Map任务的数量与InputSplit数量保持一致&#xff0c;Map阶段的执行效率也与InputSplit数量相关&#xff0c;当遇到大量的小文件时我们采用SequenceFile合并成一个大文件&#xff0c;以此来提高运行效率&#xff08;【Hadoop】MapReduce小文件问题解决方案&#xff08…

OJ万题详解––高考排名(C++详解)

题目 题目描述 高考成绩的排名规则是按总分由高到低排&#xff0c;总分相同的人排名应相同&#xff0c;例如有 5 个同学的考高成绩&#xff1a; 考号姓名成绩001c1567002ygh605003gl690004xtb605005wzs567按照成绩排序后&#xff0c;成绩如下&#xff1a; 排名考号姓名成绩1003…

C/C++ 相关低耦合代码的设计

在我们设计C/C 程序的时候&#xff0c;有时需要两个类或者两个模块相互“认识”&#xff0c;或者两个模块间函数互相调用&#xff0c;假设我们正在开发一个网上商店&#xff0c;代表的网店客户的类必须要知道相关的账户。UML图如下&#xff0c;这被称为环依赖&#xff0c;这两个…

【GIS前沿】什么是新型基础测绘、内容、产品体系、特征?

《测绘法》指出&#xff0c;基础测绘是建立和维护全国统一的测绘基准和测绘系统&#xff0c;进行航天航空影像获取&#xff0c;建立和更新维护基础地理信息数据库&#xff0c;提供测绘地理信息应用服务等。 文章目录一、什么是新型基础测绘&#xff1f;二、新型基础测绘的特征三…

6、场景法

为什么使用场景法 现在的系统基本上都是由事件来触发控制流程的。如&#xff1a;我们申请一个项目&#xff0c;需先提交审批单据&#xff0c;再由部门经理审批&#xff0c;审核通过后由总经理来最终审批&#xff0c;如果部门经理审核不通过&#xff0c;就直接退回。每个事件触…

1.Docker Desktop安装设置

1.下载最新版本Download Docker Desktop | Docker 2.进行安装 2.1进行4.x版本安装 2.2最新版本出现问题 出现 docker desktop stopped 过一会后 quit退出&#xff0c;下载3.x版本 2.3继续安装 Enable Hyper-V windows Features 启动Hyper-V windows 虚拟化功能 百度百科-验证…

【GD32F427开发板试用】一、环境搭建与freertos移植

本篇文章来自极术社区与兆易创新组织的GD32F427开发板评测活动&#xff0c;更多开发板试用活动请关注极术社区网站。作者&#xff1a;chenjie 【GD32F427开发板试用】一、环境搭建与freertos移植 【GD32F427开发板试用】二、USB库移植与双USB CDC-ACM功能开发 【GD32F427开发板…

java集合类(属于工具类)概述

Java集合类可用于存储数量不等的对象&#xff0c;并可以实现常用的数据结构&#xff0c;如栈、队列等。除此之外&#xff0c;Java集合还可用于保存具有映射关系的关联数组。 Java集合大致可分为Set、List、Queue和Map四种体系&#xff1a; 其中Set代表无序、不可重复的集合&…

限制系统性能瓶颈的因素、衡量系统性能的指标

文章目录限制系统性能瓶颈的因素cpu内存磁盘IO网络IO异常数据库锁竞争衡量系统性能的指标响应时间吞吐量计算机资源分配使用率负载承受能力有时候我们的程序性能不高,需要提升性能,这个时候可以从以下几个角度去考虑是什么限制了我们的性能瓶颈.限制系统性能瓶颈的因素 cpu 有…