Spark:DataFrame介绍及使用

news2024/11/25 4:32:15

1. DataFrame详解

DataFrame是基于RDD进行封装的结构化数据类型,增加了schema元数据,最终DataFrame类型在计算时,还是转为rdd计算。DataFrame的结构化数据有Row(行数据)和schema元数据构成。

  • Row 类型 表示一行数据
    • DataFrame就算是多行构成
# 导入行类Row
from pyspark.sql import Row

# 创建行数据
r1 = Row(1, '张三', 20)

# 行数取取值 按照下标取值
data = r1[0]
print(data)
data1 = r1[1]
print(data1)

# 指定字段创建行数据
r2 = Row(id=2, name='李四', age=22)
# 按照字段取值
data3 = r2['id']
print(data3)
data4 = r2['name']
print(data4)
  • schema表信息
    • 定义DataFrame中的表的字段名和字段类型。
# 导入数据类型
from pyspark.sql.types import *

# 定义schema信息
# 使用StructType类进行定义
# add()方法是指定字段信息
# 第一参数,字段名
# 第二个参数,字段信息
# 第三个参数是否允许为空值  默认是True,允许为空
schema_type = StructType().\
    add('id',IntegerType()).\
    add('name',StringType()).\
    add('age',IntegerType(),False)

2. DataFrame创建

创建datafram数据需要使用一个sparksession的类创建,SparkSession类是在SparkContext的基础上进行了封装,也就是SparkSession类中包含了SparkContext。

2.1 基本创建

#DataFrame 的基本创建
#Row就是行数据定义的类
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import *

#行数据创建
r1 = Row(1,"刘向阳",23,'男')
print(r1)

#行数据下标取值
print(r1[0])
print(r1[1])

#创建行数据时可以指定字段名
r2 = Row(id=2,name='李四',age=20,gender='女')
print(r2)
#使用字段名取值
print(r2['name'])

# 定义元数据
schema = (StructType().add('id', IntegerType()).add('username', StringType()).add('age', IntegerType()).add('gender', StringType()))
print(schema)

# 将元数据和行数据放在一起合成DataFrame
ss = SparkSession.builder.getOrCreate()

# 调用创建df的方法
df = ss.createDataFrame([r1,r2],schema=schema)

# 查看df中数据
df.show()

#查看元数据信息
df.printSchema()

运行结果:
在这里插入图片描述

2.2 RDD和DF之间的转化

  • rdd的二维数据转化为DataFrame
    • rdd.toDF()
      在这里插入图片描述
# rdd 和 dataframe的转化
from pyspark.sql import SparkSession

#创建SparkSession对象
ss = SparkSession.builder.getOrCreate()

#基于ss对象获取sparkContext
sc = ss.sparkContext

#创建rdd , 要使用二维列表指定每行数据
rdd = sc.parallelize([[1,'张三',20,'男'],[2,'李四',20,'男']])

#将rdd转为df
df = rdd.toDF(schema='id int,name string,age int,gender string')

#df数据查看
df.show()
df.printSchema()

#df可以转rdd
res = df.rdd.collect()
print(res)

rdd2 = df.rdd.map(lambda x:x['name'])

res2 = rdd2.collect()
print(res2)

运行结果:
在这里插入图片描述

2.3 pandas和spark之间转化

  • spark的df转为pandas的df
    • toPandas
#pandas 和 spark的dataframe转化
from pyspark.sql import SparkSession
import pandas as pd

ss = SparkSession.builder.getOrCreate()

#创建pandas的df
df_pd = pd.DataFrame(
    {
        'id':[1,2,3,4],
        'name':['张三','李四','王五','赵六'],
        'age':[1,2,3,4],
        'gender':['男','女','女','女']
    }
)
#查看数据
print(df_pd)

#取值
name = df_pd['name'][0]
print(name)
# 将pandas中的df转为spark的df
df_spark = ss.createDataFrame(df_pd)

#查看
df_spark.show()

#取值
row = df_spark.limit(1).first()
print(row['name'])

#将spark的df重新转为pandas的df
df_pandas = df_spark.toPandas()
print(df_pandas)

运行结果:
在这里插入图片描述

2.4 读取文件数据转为df

通过read方法读取数据转为df

  • ss.read
#读取文件转为df
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

#读取不同文件数据转为df
# txt文件
df = ss.read.text('hdfs://node1:8020/data/students.txt')
df.show()

# json 文件
df_json = ss.read.json('hdfs://node1:8020/data/baike_qa_valid.json')
df_json.show()

#orc文件
df_orc = ss.read.orc('hdfs://node1:8020/data/users.orc')
df_orc.show()

#去取csv文件
#header或csv文件中的第一行作为表头字段数据
df_csv = ss.read.csv('hdfs://node1:8020/data/students.csv')
df_csv.show()

3. DataFrame基本使用

3.1 SQL语句

使用sparksession提供的sql方法,编写sql语句执行

#使用sql操作dataframe结构化数据
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

#读取文件数据转为df
df_csv = ss.read.csv('hdfs://node1:8020/data/students.csv', header=True,sep=',')

#使用sql操作df数据
#将df指定一个临时表名
df_csv.createTempView('stu')

#编写sql字符串语句,支持hivesql语法
sql_str ="""
select * from stu 
"""

#执行sql语句,执行结果返回一个新的df
df_res = ss.sql(sql_str)
df_csv.show()
df_res.show()

3.2 DSL方法

DSL方法是df提供的数据操作函数
使用方式:

  • df.方法()
  • 可以进行链式调用
  • df.方法().方法().方法()
  • 方法执行后返回一个新的df保存计算结果
  • new_df = df.方法()

spark提供DSL方法和sql的关键词一样,使用方式和sql基本类似,在进行数据处理时,要按照sql的执行顺序去思考如何处理数据。
from join 知道数据在哪 df本身就是要处理的数据 df.join(df2) from 表
where 过滤需要处理的数据 df.join(df2).where()
group by 聚合 数据的计算 df.join(df2).where().groupby().sum()
having 计算后的数据进行过滤 df.join(df2).where().groupby().sum().where()
select 展示数据的字段 df.join(df2).where().groupby().sum().where().select()
order by 展示数据的排序 df.join(df2).where().groupby().sum().where().select().orderBy()
limit 展示数据的数量 df.join(df2).where().groupby().sum().where().select().orderBy().limit()
DSL方法执行完成后会得到一个处理后的新的df

#使用DSL方法操作dataframe
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

#读取文件数据转为df
df_csv = ss.read.csv('hdfs://node1/data/students.csv', header=True,sep=',')

#使用DSL方法对df数据进行操作
df2 = df_csv.select('id','name')

#查看结果
df2.show()

#第二种指定字段的方式
df3 = df_csv.select(df_csv.age,df_csv.gender)

#给字段起别名
df4 = df_csv.select(df_csv.age.alias('new_age'),df_csv.gender)
df4.show()

#修改字段类型
df_csv.printSchema()
df5 = df_csv.select(df_csv.age.cast('int'),df_csv.gender)
df5.printSchema()

#where 的数据过滤
age = 20
df6 = df_csv.where(f'age > {age}')
df6.show()

#过滤年龄大于20并且性别为女性的学生信息
df7 = df_csv.where(f'age > 20 and gender = "女" ')
df7.show()

#使用第二种字段判断方式
df8 = df_csv.where(df_csv.age == age)
df8.show()

#分组聚合计算
df9 = df_csv.select(df_csv.gender,df_csv.cls,df_csv.age.cast('int').alias('age')).groupby('gender','cls').sum('age')
df9.show()

#分组后过滤where 聚合计算时只能一次计算一个聚合数据
df10 = df_csv.select(df_csv.gender,df_csv.cls,df_csv.age.cast('int').alias('age')).groupby('gender','cls').sum('age').where('sum(age) > 80')
df10.show()

#排序
df11 = df_csv.orderBy('age')  #默认排序
df11.show()

df12 = df_csv.orderBy('age',ascending=False)  #降序
df12.show()

#分页
df13 = df_csv.limit(5)
df13.show()

#转为rdd
res = df_csv.rdd.collect()[5:10]
print(res)
df_new = ss.createDataFrame(res)
df_new.show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2211639.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL 8.4修改初始化后的默认密码

MySQL 8.4修改初始化后的默认密码 (1)初始化mysql: mysqld --initialize --console (2)之后,mysql会生成一个默认复杂的密码,如果打算修改这个密码,可以先用旧密码登录: mysql -u…

Redis set类型 zset类型

set类型 类型介绍 集合类型也是保存多个字符串类型的元素的,但和列表类型不同的是,集合中 1)元素之间是⽆序 的 2)元素不允许重复 ⼀个集合中最多可以存储 个元素。Redis 除了⽀持 集合内的增删查改操作,同时还⽀持多…

【图书推荐】《R语言医学数据分析实践》

本书重点 梅俏、卢龙、丁健、张晟、黄龙、胡志坚、张琼瑶、林志刚等业内专家联袂推荐。 以公共医学数据为例,精选大量的实用案例,深入浅出地介绍统计建模分析方法。 帮助读者解决医学数据分析中遇到的实际问题。 通过实际操作引导读者入门科研论文数…

生信分析流程:从数据准备到结果解释的完整指南

介绍 生物信息学(生信)分析是一个复杂的过程,涉及从数据准备到结果解释的多个步骤。随着高通量测序技术的发展和生物数据的迅猛增长,了解和掌握生信分析的标准流程变得尤为重要。这不仅有助于提高分析的准确性,还能优…

HarmonyOS NEXT 应用开发实战(五、页面的生命周期及使用介绍)

HarmonyOS NEXT是华为推出的最新操作系统,arkUI是其提供的用户界面框架。arkUI的页面生命周期管理对于开发者来说非常重要,因为它涉及到页面的创建、显示、隐藏、销毁等各个阶段。以下是arkUI页面生命周期的介绍及使用举例。 页面的生命周期的作用 页面…

7-I2C与AHT20温湿度传感器

I2C与AHT20温湿度传感器 嵌入式领域另一种常见的通信IIC通信,并用其与AHT20传感器进行交互,获取房间的温度与湿度。 I2C有一条用于传递数据的数据线称为SDA(Serial Data),另一条是用于提供同步时钟脉冲的时钟线SCL&am…

看图识微分与导数概念。

可建立如草图所示的局部坐标系。增量Δydy余项是草图中曲线的方程,微分dyydx(是关于dx的一次函数)是草图中切线的方程。草图形象直观地显示曲线Δy不切线dy。

安全可靠测评结果公告(2024年第1号)

大家可以选择对应的数据库,中央处理器,供参考;尤其是 水资源安可系统 智慧农业安可系统 智慧水利安可系统、智慧水务安可系统,企业安可系统 等参考使用

# 在执行 rpm 卸载软件使用 nodeps 参数时,报错 error: package nodeps is not installed 分析

在执行 rpm 卸载软件使用 nodeps 参数时,报错 error: package nodeps is not installed 分析 一、问题描述: 在执行 rpm 卸载软件使用 nodeps 参数时,报错 error: package nodeps is not installed 如下图: 二、报错分析&…

Java项目分层思路

Java项目分层思路 一、前言二、了解常见的术语1. 应用开发中使用的术语2. 建模和架构设计层面术语总结 三、如何划分1. 单个module2. 多个module 一、前言 每个人、每个开发团队的规范习惯都不太一样,没有固定标准,合适的才是最好的。 二、了解常见的术…

Python Django 查询集的延迟加载特性

Django 查询集的延迟加载特性 一、引言 在 Django 的开发过程中,查询集(QuerySet)是我们与数据库进行交互的重要工具。查询集提供了一种高效的方式来检索和操作数据库中的数据,且能够进行懒加载(Lazy Loading&#x…

Gin框架教程02:AsciiJSON

什么是 AsciiJSON? AsciiJSON 是 Gin 框架中的一个方法,用于生成仅包含 ASCII 字符的 JSON。对于非 ASCII 字符(例如汉字、特殊符号),AsciiJSON 会将其转义为 Unicode 表示(如 \uXXXX)&#xff…

使用CSS+SVG实现加载动画

使用CSSSVG实现加载动画 效果展示 CSS知识点 SVG元素使用SVG相关CSS属性运用 整体页面布局 <section><div class"box"><div class"loader"><svg><circle cx"40" cy"40" r"40"></circl…

vue从0开始的项目搭建(含环境配置)

一、环境准备 下载node.js 检查node.js版本 替换npm下载源 1.下载node.js: Node.js — 在任何地方运行 JavaScript (nodejs.org) 2.查看版本: windowsr输入cmd进入输入node -v命令查看版本号是否出现确认是否安装 2.替换npm下载源: npm config set registry https://reg…

深入Semantic Kernel:插件开发与实践应用(进阶篇)

文章目录 一、引言二、开发Semantic Kernel插件三、实战3.1 时间信息插件3.2 小部件工厂插件3.3 初始化Semantic Kernel实例3.4 四个实战示例3.4.1 模型幻觉3.4.2 给模型提供时间信息3.4.3 AI自动调用函数3.4.4 AI自动调用和使用枚举 四、结论 一、引言 在上一篇入门文章《探索…

vue3.x系列之v-model的使用技巧及面试高频问题

在前面的一篇文章中&#xff0c;我们分析了v-model在v2版中的用法。这次我们分析下在v3中的使用技巧。学习之前&#xff0c;请忘记之前的v2语法&#xff0c;现在的更加简洁易用。 组件上面的v-model 在v3.4版之前的写法如下 子组件Child.vue <!-- Child.vue --> <…

MobileViews: A Large-Scale Mobile GUI Dataset论文学习

这一片论文的工作主要集中在探索app上。 “ 设计#1&#xff1a;LLM增强型自动应用爬虫。为了提高应用程序遍历效率&#xff0c;我们引入了MobileViews Crawler&#xff0c;它使用固定的交互规则来处理繁琐的应用程序操作&#xff0c;LLM增强了其处理复杂UI状态的能力。在这个…

[C++ 核心编程]笔记 4.1.2 struct和class的区别

4.1.2 struct和class的区别 在C中 struct和class唯一的区别就在于 默认的访问权限不同 区别: struct 默认权限为公共class 默认权限为私有 #include<iostream> using namespace std;class C1 {int m_A;//默认私有 }; struct C2 {int m_A;//默认共有 };int main() {//s…

Android -- [SelfView] 多动画效果图片播放器

Android – [SelfView] 多动画效果图片播放器 效果&#xff08;录制的有点卡&#xff09; 1. 引用&#xff1a; <com.nepalese.virgolib.widget.image.BaseImageViewandroid:id"id/base_image"android:layout_width"match_parent"android:layout_heigh…

2024让我爱不释手的Mac清理神器CleanMyMac X4.15.8免费版

大家好&#xff0c;今天我要和大家分享一款让我爱不释手的Mac清理神器——CleanMyMac X。作为一个长期使用Mac的用户&#xff0c;我深知电脑在长时间使用后容易出现卡顿、存储空间不足等问题。而自从我遇到了CleanMyMac X&#xff0c;这些问题都迎刃而解啦&#xff01; #### 一…