Spark RDD dataframe嘿嘿

news2024/12/29 4:39:50

RDD(Resilient Distributed Datasets)可扩展的弹性分布式数据集,RDD是spark最基本的数据抽象,RDD表示一个只读、分区且不变的数据集合,是一种分布式的内存抽象,与分布式共享内存(Distributed Shared Memory,DSM)都是分布式的内存抽象,但两者是不同的。RDD支持两种类型的操作: transformations(转换)和 actions(动作)。transformations操作会在一个已存在的 RDD上创建一个新的 RDD,但实际的计算并没有执行,仅仅记录操作过程,所有的计算都发生在actions环节。actions操作触发时,会执行 RDD记录的所有运行transformations操作,并计算结果,结果可返回到 driver 程序,也可保存的相关存储系统中。

RDD基本操作

创建Rdd

parallelize列表创建

data=[1,2,6,4,7,3] 
rdd=sc.parallelize(data) 
rdd.collect()

textFile文本文件创建

distFile=sc.textFile("/home/ubuntu/Desktop/book.txt")
type(distFile)
1,apple
2,banana
3,banana
4,pear
5,orange
6,banana
7,orange
8,apple
9,apple
10,apple
11,pear
12,pear

文本的创建后读取是一个怎么样的结果呢,首先如果读取的写法如上读入后,文本并没有划分为(k,v)值的,可以通过如下办法划分

distFile.map(lambda line: line.split(",")).foreach(print)

在这里插入图片描述

简单计算

count操作统计为列表输出

sc.parallelize([2,3,4]).count()

countByKey根据值排序

rdd=sc.parallelize([("a",1),("b",1),("a",1)])
sorted(rdd.countByKey().items()) 

countByValue返回唯一值的个数

sorted(sc.parallelize([1,2,1, 2, 2], 2).countByValue().items()) 

distinct去除重复值

sorted(sc.parallelize([1,1,2,3]).distinct().collect()) 

reduceByKey根据key把value加起来

from operator import add
rdd=sc.parallelize([("a", 1), ("b", 1), ("a",1)]) 
sorted(rdd.reduceByKey(add).collect()) 

复杂计算

filter筛选过滤操作,通常与lambda运用,下

rdd=sc.parallelize([1,2, 3, 4,5]) 
rdd.filter(lambda x:x%2== 0).collect()

flatMap,生成多元素输出,例如字典格式k,v

rdd=sc.parallelize([2,3, 4])
sorted(rdd.flatMap(lambda x:range(1,x)).collect()) 

自定义函数foreach


def f(k):
	print(k*2)
	return k*2
sc.parallelize([1,2, 3,4,5]).foreach(f)

自定义函数输出

.foreach(print)

groupBy根据自定义条件进行分组

以下结果简单解释以下,会按照是否能整除2分组,分成0,1,后面接着的是他的组员,其中sorted是对y组员进行排序的

rdd=sc.parallelize([1,1, 2, 3,5,8]) 
result= rdd.groupBy(lambda x:x% 2).collect()
sorted([(x,sorted(y)) for (x,y) in result]) 

map每个元素的单独定义

rdd=sc.parallelize(["b","a","c"])
sorted(rdd.map(lambda x: (x,1)).collect()) 

虚拟机环境配置(pycharm)

打开file–>setting–>projectstructure点击右边的add
目录在/opt/spark版本号/python/lib
里面有两个包一个个加入,一般这样就行了,不太需要去搞环境变量

在这里插入图片描述

spark dataframe

dataframe读取

df = spark.read.csv('1.csv',head=True,inferSchema=True)
df.show()

数据处理

filter groupBy sort

df.filter(df.conmae=='123').show()

df.groupBy('coname').count().show()

df.sort('coname').show()

转换为pandas dataframe

.toPandas()

其他的操作和pandas比较类似这里就不继续讲解了

RDD转换为dataframe,toDF

df = logs_rdd.toDF(["day","time", "path", "'method""status"])

sql

registerTempTable 创建临时表,其中book_df是spark的Dataframe

book_df.registerTempTable("tb_book")

执行sql

spark.sql("select * from tb_book").show(15)

二、RDD计算

创建本地文件: “products.txt”.

id,name price,category
1,Apple,1.2,Fruit 
2,Banana,0.5,Fruit
3,Carrot,8.6,Vegetable
4,Orange,1.3,Fruit
5,Cabbage ,1.0,Vegetable

1、基于pyspark库, 创建应用,加载数据为RDD.
2、过滤去除表头,保留剩下的记录。
3、计算每个水果category的平均价格。
4、显示结果。

三、SparkSQL-DataFrame (每小问5分,共20分)
数据:创建本地文件: “exam.txt”.

id,name,age,city,score
1,3ohn,25,New York,80
2,Alice,22,Los Angeles,90 
3,Bob,24,Chicago,85
4,David,28,San Francisco,95
5,Eva,21,New York,92 
  1. 使用pyspark读取文件, 并创建一个DataFrame.
  2. 统计每个城市的学员数量和平均分数。
  3. 筛选出平均分数大于85分的城市。
  4. 输出结果
# 导入必要的模块
from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.getOrCreate()

# 1. 创建应用,加载数据为RDD
rdd = spark.sparkContext.textFile("products.txt")

# 2. 过滤去除表头,保留剩下的记录
header = rdd.first()
filtered_rdd = rdd.filter(lambda row: row != header)

# 3. 计算每个水果category的平均价格
fruit_rdd = filtered_rdd.filter(lambda row: row.split(",")[3].strip() == "Fruit")
fruit_prices_rdd = fruit_rdd.map(lambda row: float(row.split(",")[2]))
average_price = fruit_prices_rdd.mean()

# 4. 显示结果
print("平均水果价格:", average_price)

# 导入必要的模块
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, count

# 创建 SparkSession
spark = SparkSession.builder.getOrCreate()

# 1. 使用pyspark读取文件, 并创建一个DataFrame
df = spark.read.csv("exam.txt", header=True, inferSchema=True)

# 2. 统计每个城市的学员数量和平均分数
city_stats = df.groupBy("city").agg(count("*").alias("student_count"), avg("score").alias("average_score"))

# 3. 筛选出平均分数大于85分的城市
filtered_cities = city_stats.filter(city_stats.average_score > 85)

# 4. 输出结果
filtered_cities.show()

上各个字段,依次说明为:
1.IP地址:请求来源的IP地址
2.-: 标记符,表示客户端用户标识的缺失
3.-:标记符,表示客户端用户标识的缺失
4. [日期时间]:请求的时间和日期
5.“请求方法URL协议/版本":请求的方法、URL和协议版本
6.状态码:服务器响应的HTTP状态码
7.响应数据大小:服务器响应的流量大小,以字节为单位

要求:
1.创建一个本地文件夹,并监听该文件夹。
2使用窗口函数,设置窗口时长为10秒,滑动间隔为4秒。
3.对每个窗口内的数据进行统计:请求方法是GET的访问中,产生流量最多的IP是(使用sq|),
4.输出结果。

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import desc

# 创建 SparkSession 和 StreamingContext
spark = SparkSession.builder.appName("StreamingLogAnalysis").getOrCreate()
sc = spark.sparkContext
streaming_context = StreamingContext(sc, 10)  # 设置窗口时长为10秒

# 创建一个本地文件夹,并监听该文件夹
log_folder = "/path/to/log/folder"  # 替换为实际的日志文件夹路径
logs = streaming_context.textFileStream(log_folder)

# 使用窗口函数,设置窗口时长为10秒,滑动间隔为4秒
windowed_logs = logs.window(10, 4)

# 对每个窗口内的数据进行统计
def process_logs(window_rdd):
    if not window_rdd.isEmpty():
        # 解析日志数据
        parsed_logs = window_rdd.map(lambda log: log.split(" "))

        # 筛选请求方法为GET的访问
        get_requests = parsed_logs.filter(lambda log: log[5] == "GET")

        # 统计每个IP地址的流量大小
        ip_traffic = get_requests.map(lambda log: (log[0], int(log[6])))
        ip_traffic_stats = ip_traffic.reduceByKey(lambda a, b: a + b)

        # 找出产生流量最多的IP地址
        most_traffic_ip = ip_traffic_stats.max(lambda x: x[1])

        # 输出结果
        print("在该窗口中,产生流量最多的IP是:", most_traffic_ip[0])

# 应用窗口函数和数据处理逻辑
windowed_logs.foreachRDD(process_logs)

# 启动 StreamingContext
streaming_context.start()
streaming_context.awaitTermination()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/682693.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

面试题:希尔排序是一种稳定排序吗?

面试题:希尔排序是一种稳定排序吗? 对于算法的稳定性,有这样一个记忆技巧,不稳定排序是"快些选队",对应于快速排序/希尔排序/选择排序/堆排序。希尔排序也名列其中,因此也是一种不稳定排序&…

CODESYS 数组类型变量(ARRAY)使用介绍

博途PLC数组类型变量使用介绍请参看下面文章博客: 博途1200/1500PLC上升沿下降沿指令编程应用技巧(bool数组)_博途上升沿指令_RXXW_Dor的博客-CSDN博客博途PLC的下降沿和上升沿指令,在控制系统编程时经常会使用。和SMARTS7-200有所不同,遵循IEC-6113标准提供的上升沿下降沿…

【初识C语言(6)】指针+结构体

文章目录 1. 指针1.1 内存1.2 指针变量的大小 2. 结构体 1. 指针 想要学好指针,首先必须要先了解内存。 1.1 内存 内存介绍 内存是电脑上特别重要的存储器,计算机中程序的运行都是在内存中进行的 。 所以为了有效的使用内存,就把内存划分…

​LeetCode解法汇总1401. 圆和矩形是否有重叠

目录链接: 力扣编程题-解法汇总_分享记录-CSDN博客 GitHub同步刷题项目: https://github.com/September26/java-algorithms 原题链接:力扣 描述: 给你一个以 (radius, xCenter, yCenter) 表示的圆和一个与坐标轴平行的矩形 (x1…

第三方医药数据供应商有哪些?--数据业务介绍

第三方医药数据供应商主要是为医药企业、健康机构、学术研究、药物研发等提供医药相关数据的收集、整理、分析和应用服务。随着医药市场的需求衍生了许多各高垂直领域的医药数据供应商,这也导致了大家对医药数据供应商涉及领域认识的片面性。 故本文重点介绍各医药…

SpringBoot 如何使用 YourKit 进行性能调优

SpringBoot 如何使用 YourKit 进行性能调优 前言 在应用程序的开发过程中,性能调优是一个重要的环节。如果应用程序的性能不佳,可能会影响用户的体验,甚至导致系统崩溃。而在 Spring Boot 应用程序中,我们可以使用 YourKit 来进…

同比增长超500% 威睿公司三电产品4月装机量增势迅猛

近期,2023年4月装机量数据全新出炉,威睿公司在电驱动系统、电机控制器、驱动电机、PACK、BMS等多个装机量榜单中跻身前十位,同比增长均超过100%。其中电驱动系统装机量位列第六位,电驱动系统、电机控制器、驱动电机装机量同比增长…

本地离线安装Selenium

1、去官网(下载地址:https://pypi.org/project/selenium/#history)去下载selenium版本 2、此处建议大家下载selenium的3.0的版本,我给的地址可以让你直接进入选择历史版本的跳转页面。 3、往下拉,到3左右的版本,点击版…

【Docker】Docker Compose详解

文章目录 概述Docker Compose介绍Docker Compose安装一、下载Docker Compose:二、授权三、快速上手第一步、创建python服务第二步,创建 Dockerfile第三步,使用 Compose 文件定义一个服务第四步、使用 Compose 编译启动应用 Docker Compose常用…

【JUC进阶】04. 无锁CAS

目录 1、前言 2、CAS概念和原理 2.1、什么是CAS 2.2、CAS原理 2.3、工作方式 2.4、无锁CAS优势 3、unsafe类 4、ABA问题 4.1、什么是ABA问题 4.2、解决ABA问题 4.2.1、版本号机制 4.2.2、AtomicStampReference 5、CAS适用的场景 1、前言 无锁的Compare and Swap&…

libevent(9)通过libevent实时监听文件的更新

这里我们利用libevent监听centos系统上的login日志文件&#xff0c;文件路径&#xff1a;/var/log/secure。&#xff08;ubuntu下是"/var/log/auth.log"&#xff09; 代码如下 test_file.cpp&#xff1a; #include <iostream> #include <thread> #inclu…

数据迁移ETL工具分享

1.概述 ETL(是Extract-Transform-Load的缩写&#xff0c;即数据抽取、转换、装载的过程)&#xff0c;对于企业应用来说&#xff0c;我们经常会遇到各种数据的处理、转换、迁移的场景。 我汇总了一些目前市面上比较常用的ETL数据迁移工具&#xff0c;希望对你会有所帮助。 2. …

华为OD机试真题 Java 实现【核酸检测人员安排】【2023Q1 100分】

一、题目描述 在系统、网络均正常的情况下组织核酸采样员和志愿者对人群进行核酸检测筛查。 每名采样员的效率不同&#xff0c;采样效率为N人/小时。 由于外界变化&#xff0c;采样员的效率会以M人/小时为粒度发生变化&#xff0c;M为采样效率浮动粒度&#xff0c;M N * 10…

云数据库是未来趋势,亚马逊云科技位居Gartner报告“领导者”

最近,数据库领域发生了一个大事件,可以称得上是一座里程碑。全球最具权威的IT研究公司Gartner最近发布了一个消息:在2022年的全球DBMS市场份额中,亚马逊云科技的数据库超越微软,登顶第一。 亚马逊云科技、微软、Oracle这三巨头近几年一直排名前三,占据了全球DBMS超过三分之二的…

无法提取请求的数据。有关详细信息,请查看 vSphere Client 日志。vsan没法查询详细信息

解释&#xff1a; 根本原因是证书不一致&#xff0c;但是vc的证书和vsan他们不共用一个证书&#xff0c;所以需要保证集群证书的统一&#xff0c; &#xff0c;当时由于vc的证书到期后&#xff0c;只替换了vc的sts证书&#xff0c;在替换了STS证书之后&#xff0c;可能会导致…

LangChain 构建本地知识库问答应用

一、使用本地知识库构建问答应用 上篇文章基于 LangChain 的Prompts 提示管理构建特定领域模型&#xff0c;如果看过应该可以感觉出来 ChatGPT 还是非常强大的&#xff0c;但是对于一些特有领域的内容让 GPT 回答的话还是有些吃力的&#xff0c;比如让 ChatGPT 介绍下什么是 L…

地下水数值模拟软件有哪些??GMS、Visual MODFLOW Flex、FEFLOW、MODFLOW

目录 ①全流程GMS地下水数值模拟技能培养及溶质运移反应问题深度解析 ②Visual modflow Flex地下水数值模拟及参数优化、抽水实验设计与处理、复杂的饱和/非饱和地下水流分析 ③全流程各工程类型地下水环境影响评价【一级】方法与MODFLOW Flex建模 ④地下水热耦合模拟FEFLO…

亚马逊云科技通过“逆向工作法”,为客户解决数据库问题

最近,数据库领域发生了一个大事件,可以称得上是一座里程碑。全球最具权威的IT研究公司Gartner最近发布了一个消息:在2022年的全球DBMS市场份额中,亚马逊云科技的数据库超越微软,登顶第一。 亚马逊云科技、微软、Oracle这三巨头近几年一直排名前三,占据了全球DBMS超过三分之二的…

8.7 实现TCP通讯

目录 socket函数 与 通信域 套接字类型与协议 bind函数 与 通信结构体 domain通信地址族 与 通信结构体 IPv4地址族结构体 通用地址族结构体 示例&#xff1a;为套接字fd绑定通信结构体addr listen函数 与 accept函数 socket函数 与 通信域 #include <sys/types.h&g…

网络初识知识小结

目录 IP地址 端口号 协议 协议分层 TCP/IP 五层模型 传输过程 接收过程 IP地址 IP地址主要用于标识网络主机、其他网络设备&#xff08;如路由器&#xff09;的网络地址 换句话说 IP是网络中主机的身份证,可以通过IP地址定位该主机在网络中的地址 端口号 在网络通信中…