从0开始学人工智能测试节选:Spark -- 结构化数据领域中测试人员的万金油技术(二)

news2025/1/22 22:58:23

Dataframe

dataframe 是spark中参考pandas设计出的一套高级API,用户可以像操作pandas一样方便的操作结构化数据。毕竟纯的RDD操作是十分原始且麻烦的。而dataframe的出现可以让熟悉pandas的从业人员能用非常少的成本完成分布式的数据分析工作, 毕竟跟数据打交道的人很少有不懂dataframe的。

初始化dataframe的方法

from pyspark import SparkContext, SparkConf, SQLContext

from pyspark.sql import Row



logFile = "/Users/xxxx/tools/spark-3.0.3-bin-hadoop2.7/README.md"

conf = SparkConf().setMaster("local").setAppName("My App")

sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

dataA = sqlContext.read.csv("路径")





dicts = [{'col1': 'a', 'col2': 1}, {'col1': 'b', 'col2': 2}]

dataf = sqlContext.createDataFrame(dicts)

dataf.show()



dicts = [['a', 1], ['b', 2]]

rdd = sc.parallelize(dicts)

dataf = sqlContext.createDataFrame(rdd, ['col1','col2'])

dataf.show()





rows = [Row(col1='a', col2=1), Row(col1='b', col2=2)]

dataf= sqlContext.createDataFrame(rows)

dataf.show()



dataf.write.csv(path="/Users/cainsun/Downloads/test_spark", header=True, sep=",", mode='overwrite')

可以看到创建dataframe有多种方式, 可以从文件中读取, 可以从列表中初始化,可以用简单的方式指定列信息, 也可以使用Row类来初始化列信息。

dataframe常用操作

读取数据:

df = spark.read.json("data.json")



df = spark.read.csv("data.csv", header=True, inferSchema=True)



df = spark.read.parquet("data.parquet")

显示数据:

# 显示前 n 行数据,默认为 20 行

df.show(n=5)

# 打印 DataFrame 的 schema

df.printSchema()

选择和过滤数据:

# 选择特定列

selected_df = df.select("column1", "column2")

# 使用条件过滤数据

filtered_df = df.filter(df["age"] > 30)

聚合和分组数据:

from pyspark import SparkContext, SparkConf, SQLContext





conf = SparkConf().setMaster("local").setAppName("My App")

sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)





dicts = [

['teacher', 202355, 16, '336051551@qq.com'],

['student', 2035, 16, '336051551@qq.com'],

['qa', 2355, 16, '336051551@qq.com'],

['qa', 20235, 16, '336051551@qq.com'],

['teacher', 35, 16, '336051asdf'],

['student', 453, 16, '336051asdf'],





]

rdd = sc.parallelize(dicts, 3)

data = sqlContext.createDataFrame(rdd, ['title', 'sales', 'age', 'email'])





result = data.groupBy("title").max("sales").alias("max_sales")

resultA = data.groupBy("title").sum("sales").alias("sum_sales")



# 显示结果

result.show()

resultA.show()





+-------+----------+

| title|max(sales)|

+-------+----------+

|teacher| 202355|

| qa| 20235|

|student| 2035|

+-------+----------+



+-------+----------+

| title|sum(sales)|

+-------+----------+

|teacher| 202390|

| qa| 22590|

|student| 2488|

+-------+----------+



数据排序:



from pyspark.sql.functions import desc



# 按列排序

sorted_df = df.sort("column1")



# 按列降序排序

sorted_df = df.sort(desc("column1"))

添加,修改和删除列:



from pyspark.sql.functions import upper



# 添加新列

new_df = df.withColumn("new_column", df["column1"] * 2)



# 修改现有列

modified_df = df.withColumn("column1", upper(df["column1"]))



# 删除列

dropped_df = df.drop("column1")



重命名列:

# 重命名 DataFrame 中的列

renamed_df = df.withColumnRenamed("old_column_name", "new_column_name")

spark sql

初始化

from pyspark import SparkContext, SparkConf, SQLContext



# 创建 SparkSession

conf = SparkConf().setMaster("local").setAppName("My App")

sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)





dicts = [

['teacher', 202355, 16, '336051551@qq.com'],

['student', 2035, 16, '336051551@qq.com'],

['qa', 2355, 16, '336051551@qq.com'],

['qa', 20235, 16, '336051551@qq.com'],

['teacher', 35, 16, '336051asdf'],

['student', 453, 16, '336051asdf'],





]

rdd = sc.parallelize(dicts, 3)

data = sqlContext.createDataFrame(rdd, ['title', 'sales', 'age', 'email'])



data.createOrReplaceTempView("table")

要使用spark sql的能力, 需要利用createOrReplaceTempView创建一个临时表,然后才能执行 sql

简单的sql执行

query = "select * from table where title = 'qa'"



resultB = sqlContext.sql(query)



resultB.show()



# 执行结果

+-----+-----+---+----------------+

|title|sales|age| email|

+-----+-----+---+----------------+

| qa| 2355| 16|336051551@qq.com|

| qa|20235| 16|336051551@qq.com|

+-----+-----+---+----------------+

分组查询

query = "select title, sum(sales), max(sales) from table group by title"



resultC = sqlContext.sql(query)



resultC.show()



# 执行结果

+-------+----------+----------+

| title|sum(sales)|max(sales)|

+-------+----------+----------+

|teacher| 202390| 202355|

| qa| 22590| 20235|

|student| 2488| 2035|

+-------+----------+----------+

Spark sql适合熟悉sql语法的人使用,本质上sql和dataframe最终都会被翻译成rdd来运行。我们可以把它看成是rdd的高级语法糖就可以。 大家喜欢哪种操作方式就选择哪种就可以。

数据测试/监控

回顾自学习与数据闭环那里,我们知道在这样的系统中针对与每天新采集的数据,需要做一道数据校验。下面我模拟一个场景写这样一个检查脚本。

from pyspark import SparkContext, SparkConf, SQLContext

from pyspark.sql import SparkSession

import pyspark.sql.functions as F



conf = SparkConf().setMaster("local").setAppName("My App")

sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)



rdd = sc.parallelize(range(1000))

print(rdd.map(lambda x: '%s,%s' % ('男', '16')).collect())



dicts = [

['frank', 202355, 16, '336051551@qq.com'],

['frank', 202355, 16, '336051551@qq.com'],

['frank', 202355, 16, '336051551@qq.com'],

['frank', 202355, 16, '336051551@qq.com'],

['frank', 202355, 16, '336051asdf'],

['', 452345, 16, '336051asdf'],





]

rdd = sc.parallelize(dicts, 3)

dataf = sqlContext.createDataFrame(rdd, ['name', 'id', 'age', 'email'])





# 验证 id 字段必须是整数

id_filter = F.col("id").cast("int") >= 0



# 验证 name 字段必须是非空字符串

name_filter = F.col("name").isNotNull() & (F.col("name") != "")



# 验证 age 字段必须是大于等于 0 的整数

age_filter = F.col("age").cast("int") >= 0



# 验证 email 字段必须是有效的电子邮件地址

email_filter = F.col("email").rlike("^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$")



# 应用过滤条件

valid_data = dataf.filter(id_filter & name_filter & age_filter & email_filter)



# 输出符合质量要求的数据

valid_data.show()



# 输出不符合质量要求的数据

invalid_data = dataf.exceptAll(valid_data)

invalid_data.show()

更多内容欢迎来到我的知识星球:
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1606317.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数仓建模—数仓架构发展史

数仓建模—数仓架构发展史 时代的变迁,生死的轮回,历史长河滔滔,没有什么是永恒的,只有变化才是不变的,技术亦是如此,当你选择互联网的那一刻,你就相当于乘坐了一个滚滚向前的时代列车&#xf…

电视音频中应用的音频放大器

电视机声音的产生原理是将电视信号转化为声音,然后通过扬声器将声音播放出来。当我们打开电视并选择频道时,电视机首先从天线或有线电视信号中获取声音信号。声音信号经过放大器放大之后,就能够通过扬声器发出声音。电视机声音的产生原理和音…

Ubuntu20.04 ISAAC SIM仿真下载使用流程(4.16笔记补充)

机器:华硕天选X2024 显卡:4060Ti ubuntu20.04 安装显卡驱动版本:525.85.05 参考: What Is Isaac Sim? — Omniverse IsaacSim latest documentationIsaac sim Cache 2023.2.3 did not work_isaac cache stopped-CSDN博客 Is…

2024蓝桥杯每日一题(最短路径)

备战2024年蓝桥杯 -- 每日一题 Python大学A组 试题一:奶牛回家 试题二:Dijkstra求最短路 II 试题三:spfa求最短路 试题四:作物杂交 试题一:奶牛回家 【题目描述】 晚餐时间马上就到了&#x…

【JavaEE多线程】Thread类及其常见方法(上)

系列文章目录 🌈座右铭🌈:人的一生这么长、你凭什么用短短的几年去衡量自己的一生! 💕个人主页:清灵白羽 漾情天殇_计算机底层原理,深度解析C,自顶向下看Java-CSDN博客 ❤️相关文章❤️:清灵白羽 漾情天…

【opencv】dnn示例-segmentation.cpp 通过深度学习模型对图像进行实时语义分割

模型下载地址: http://dl.caffe.berkeleyvision.org/ 配置文件下载: https://github.com/opencv/opencv_extra/tree/4.x/testdata/dnn 该段代码是一个利用深度学习进行语义分割的OpenCV应用实例。下面将详细解释代码的功能和方法。 引入库 引入了一些必要…

蓝桥杯2024年第十五届省赛真题-宝石组合

思路:参考博客,对Ha,Hb,Hc分别进行质因数分解会发现,S其实就等于Ha,Hb,Hc的最大公约数,不严谨推导过程如下(字丑勿喷): 找到此规律后,也不能枚举Ha&#xff…

Go 单元测试之mock接口测试

文章目录 一、gomock 工具介绍二、安装三、使用3.1 指定三个参数3.2 使用命令为接口生成 mock 实现3.3 使用make 命令封装处理mock 四、接口单元测试步骤三、小黄书Service层单元测试四、flags五、打桩(stub)参数 六、总结6.1 测试用例定义6.2 设计测试用…

医学图像三维重建与可视化系统 医学图像分割 区域增长

医学图像的三维重建与可视化,这是一个非常有趣且具有挑战性的课题!在这样的项目中,可以探索不同的医学图像技术,比如MRI、CT扫描等,然后利用这些图像数据进行三维重建,并将其可视化以供医生或研究人员使用。…

解决Mac使用Vscode无法调用外部终端

前言 今天遇到一个很奇怪的问题,之前好好的用Vscode还能调用外部终端,怎么今天不行了?问题出在哪里呢?请听我娓娓道来。 检查配置文件 我查看了一下配置文件,发现配置文件都是调用外部控制台,没毛病啊。 …

Objective-C网络数据捕获:使用MWFeedParser库下载Stack Overflow示例

概述 Objective-C开发中,网络数据捕获是一项常见而关键的任务,特别是在处理像RSS源这样的实时网络数据流时。MWFeedParser库作为一个优秀的解析工具,提供了简洁而强大的解决方案。本文将深入介绍如何利用MWFeedParser库,以高效、…

(最详细)关于List和Set的区别与应用

关于List与Set的区别 List和Set都继承自Collection接口; List接口的实现类有三个:LinkedList、ArrayList、Vector。Set接口的实现类有两个:HashSet(底层由HashMap实现)、LinkedHashSet。 在List中,List.add()是基于数组的形式来添…

Spring Boot后端+Vue前端:打造高效二手车交易系统

作者介绍:✌️大厂全栈码农|毕设实战开发,专注于大学生项目实战开发、讲解和毕业答疑辅导。 🍅获取源码联系方式请查看文末🍅 推荐订阅精彩专栏 👇🏻 避免错过下次更新 Springboot项目精选实战案例 更多项目…

机器学习笔记 - 使用 OpenCV 的结构化森林进行边缘检测

一、简述 边缘检测是计算机视觉领域中一项非常重要的任务。这是许多纯计算机视觉任务(例如轮廓检测)的第一步。即使涉及深度学习,较深层也首先学习识别边缘,然后再学习图像的复杂特征。所以,我们可以说边缘检测在计算机视觉领域非常重要。拥有良好且高效的图像边缘检测算法…

后端返回的数据中含有Null的则不在前端展示

方式 1:application 上加配置 只需要在配置文件 上,增加 如下配置。 application 格式配置: spring.jackson.default-property-inclusionnon_null yml 格式配置: spring:jackson:default-property-inclusion: non_null注意&a…

美团财务科技后端一面:如何保证数据一致性?延时双删第二次失败如何解决?

更多大厂面试内容可见 -> http://11come.cn 美团财务科技后端一面:项目内容拷打 美团财务科技后端一面:项目相关面试题,主要包含 Zset、延时双删失败重试、热点数据解决、ThreadLocal 这几个方面相关的内容 由于前几个问题是对个人项目的…

混合云构建-如何创建一个高可用的Site to Site VPN 连接 Azure 和GCP云

在现代云计算环境中,企业通常会采用多云战略,将工作负载分布在不同的云服务提供商上。这种方式可以提高可用性、降低供应商锁定风险,并利用每个云提供商的独特优势。然而,在这种情况下,需要确保不同云环境之间的互联互通,以实现无缝的数据传输和应用程序集成。 本文将详细介绍…

微信小程序之点击事件

微信小程序中常用的点击事件主要是 tap,但除此之外还有其他的触摸类事件,用于不同的交互场景。以下是一些常见的点击和触摸相关的事件及其区别: 1、tap——最基本的点击事件,适用于一般的轻触交互,类似于 HTML 中的 c…

浅析binance新币OMNI的前世今生

大盘跌跌不休,近期唯一的指望就是binance即将上线的OMNI 。虽然目前查到的空投数量不及预期,但OMNI能首发币安,确实远超预期了。 OMNI代币总量1亿,初始流通仅10,391,492枚,其中币安Lanchpool可挖350万枚 对于OMNI这个…

【做算法学数据结构】【链表】删除排序链表中的重复元素

链表 链表单向链表双向链表 题目代码 链表 当涉及到数据结构时,链表是一种常见且重要的数据结构。链表由一系列节点组成,每个节点包含数据和指向下一个节点的引用。相比于数组,链表的大小可以动态地增长或缩小,因为每个节点只需要…