PySpark中RDD的数据输出详解

news2025/1/15 13:14:01

目录

一. 回顾

二.输出为python对象

collect算子

演示

reduce算子

 演示

 take算子

 演示

 count算子

 演示

小结

三.输出到文件中

savaAsTextFile算子

 演示

配置Hadoop依赖

 修改rdd分区为1个

 小结

四.练习案例

需求: 

代码


 

一. 回顾

数据输入:

  • sc.parallelize
  • sc.textFile

数据计算:

  • rdd.map
  • rdd.flatMap
  • rdd.reduceByKey
  • .…

二.输出为python对象

数据输出可用的方法是很多的,这里简单介绍常会用到的4个

  1. collect:将RDD内容转换为list
  2. reduce:对RDD内容进行自定义聚合
  3. take:取出RDD的前N个元素组成list
  4. count:统计RDD元素个数

collect算子

功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
用法:
rdd.collect()
返回值是一个list

演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"

conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)

#准备一个RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,输出RDD为list对象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())

结果是

 单独输出rdd,输出的是rdd的类名而非内容

reduce算子

功能:对RDD数据集按照你传入的逻辑进行聚合

语法:

代码

 

 返回值等于计算函数的返回值

 演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"

conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)

#准备一个RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,输出RDD为list对象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的类型是:",type(rdd.collect()))
#reduce算子,对RDD进行两两聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)

结果是

 take算子

功能:取RDD的前N个元素,组合成list返回给你
用法:

 

 演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"

conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)

#准备一个RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,输出RDD为list对象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的类型是:",type(rdd.collect()))
#reduce算子,对RDD进行两两聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)
#take算子,取出RDD前n个元素,组成list返回
take_list=rdd.take(3)
print(take_list)

结果是

 count算子

功能:计算RDD有多少条数据,返回值是一个数字
用法:

 演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"

conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)

#准备一个RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,输出RDD为list对象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的类型是:",type(rdd.collect()))
#reduce算子,对RDD进行两两聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)
#take算子,取出RDD前n个元素,组成list返回
take_list=rdd.take(3)
print(take_list)
#count算子,统计rdd中有多少条数据,返回值为数字
num_count=rdd.count()
print(num_count)
#关闭链接
sc.stop()

结果是

小结

1.Spark的编程流程就是:

  • 将数据加载为RDD(数据输入)对RDD进行计算(数据计算)
  • 将RDD转换为Python对象(数据输出)

 2.数据输出的方法

  • collect:将RDD内容转换为list
  • reduce:对RDD内容进行自定义聚合
  • take:取出RDD的前N个元素组成list
  • count:统计RDD元素个数

数据输出可用的方法是很多的,这里只是简单介绍4个

三.输出到文件中

savaAsTextFile算子

功能:将RDD的数据写入文本文件中支持本地写出, hdfs等文件系统.
代码:

 演示

 这是因为这个方法本质上依赖大数据的Hadoop框架,需要配置Hadoop 依赖.

配置Hadoop依赖

调用保存文件的算子,需要配置Hadoop依赖。

  • 下载Hadoop安装包解压到电脑任意位置
  • 在Python代码中使用os模块配置: os.environ['HADOOP_HOME']='HADOOP解压文件夹路径′。
  • 下载winutils.exe,并放入Hadoop解压文件夹的bin目录内
  • 下载hadoop.dll,并放入:C:/Windows/System32文件夹内

配置完成之后,执行下面的代码

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'

conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)

#准备rdd
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)])
rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]])
#输出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")

结果是

 输出的文件夹中有这么8文件,是因为RDD被默认为分成8个分区
SaveAsTextFile算子输出文件的个数是根据RDD的分区来决定的,有多少分区就会输出多少个文件,RDD在本电脑中默认是8(该电脑CPU核心数是8核)

 打开设备管理器就可以查看处理器个数,这里是有8个逻辑CPU
或者打开任务管理器就可以看到是4核8个逻辑CPU

 修改rdd分区为1个

方式1, SparkConf对象设置属性全局并行度为1:

 方式2,创建RDD的时候设置( parallelize方法传入numSlices参数为1)

 

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'

conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#rdd分区设置为1
conf.set("spark.default.parallelism","1")
sc=SparkContext(conf=conf)

#准备rdd
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)])
rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]])
#输出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")

结果是

 小结

1.RDD输出到文件的方法

  • rdd.saveAsTextFile(路径)
  • 输出的结果是一个文件夹
  • 有几个分区就输出多少个结果文件

2.如何修改RDD分区

  • SparkConf对象设置conf.set("spark.default.parallelism", "7")
  • 创建RDD的时候,sc.parallelize方法传入numSlices参数为1

四.练习案例

需求: 

读取文件转换成RDD,并完成:

  1. 打印输出:热门搜索时间段(小时精度)Top3
  2. 打印输出:热门搜索词Top3
  3. 打印输出:统计黑马程序员关键字在哪个时段被搜索最多
  4. 将数据转换为JSON格式,写出为文件

代码

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'

conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#rdd分区设置为1
conf.set("spark.default.parallelism","1")
sc=SparkContext(conf=conf)

rdd=sc.textFile("D:/search_log.txt")
#需求1 打印输出:热门搜索时间段(小时精度)Top3
# 取出全部的时间并转换为小时
# 转换为(小时,1)的二元元组
# Key分组聚合Value
# 排序(降序)
# 取前3
result1=rdd.map(lambda x:x.split("\t")).\
    map(lambda x:x[0][:2]).\
    map(lambda x:(x,1)).\
    reduceByKey(lambda x,y:x+y).\
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
    take(3)#上面用的‘/’是换行的意思,当一行代码太长时就可以这样用
print(result1)
#需求2 打印输出:热门搜索词Top3
# 取出全部的搜索词
# (词,1)二元元组
# 分组聚合
# 排序
# Top3
result2=rdd.map(lambda x:x.split("\t")).\
    map(lambda x:x[2])\
    .map(lambda x:(x,1)).\
    reduceByKey(lambda x,y:x+y).\
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
    take(3)
print(result2)
#需求3 打印输出:统计黑马程序员关键字在哪个时段被搜索最多
result3=rdd.map(lambda x:x.split("\t")).\
    filter((lambda x:x[2]=="黑马程序员")).\
    map(lambda x:(x[0][:2],1)).\
    reduceByKey(lambda x,y:x+y).\
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
    take(3)
print(result3)
#需求4 将数据转换为JSON格式,写出为文件
rdd.map(lambda x:x.split("\t")).\
    map(lambda x:{"time":x[0],"id":x[1],"key":x[2],"num1":x[3],"num2":x[4],"url":x[5]})\
    .saveAsTextFile("D:/out_json")

结果是

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/162572.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Transformer-XL: Attentive Language Models Beyond a Fixed-Length Context_学习笔记

Transformer-XL学习笔记 一、Transformer-XL出现的原因 首先说明Transformer的变形版本Transformer-XL出现的原因: transformer作为一种特提取器,在NLP中有广泛的应用,但是transformer需要对输入的序列设置固定的长度,例如在Ber…

(考研湖科大教书匠计算机网络)第一章概述-第四节:计算机网络的性能指标

文章目录(1)速率(2)带宽(3)吞吐量(4)时延①:基本概念②:计算公式(5)时延带宽积(6)往返时间RTT(7&a…

dp(六) 线性dp整合 最长(公共子串、公共子序列、上升子序列、回文子串)

1、最大公共子串_牛客题霸_牛客网​编辑 2、最长上升子序列(一)_牛客题霸_牛客网 3、最长回文子串_牛客题霸_牛客网 4、最长公共子序列(二)_牛客题霸_牛客网 #include <iostream> using namespace std; #include<vector>int main() {string str1,str2;cin>>…

mysql数据迁移报错问题

mysql8.0.17备份数据库到mysql5.7.26的There was error(s) while executing the queries问题解决&#xff08;数据库高版本向低版本数据迁移解决&#xff09; 问题背景 要将本地的mysql数据库导入到linux中的mysql中&#xff0c;其中&#xff0c;本地mysql数据库的版本是8.0.…

数字硬件建模SystemVerilog-时序逻辑建模(1)RTL时序逻辑的综合要求

数字门级电路可分为两大类&#xff1a;组合逻辑和时序逻辑。锁存器是组合逻辑和时序逻辑的一个交叉点&#xff0c;在后面会作为单独的主题处理。组合逻辑描述了门级电路&#xff0c;其中逻辑块的输出直接反映到该块的输入值的组合&#xff0c;例如&#xff0c;双输入AND门的输出…

N5247A网络分析仪

18320918653 N5247A Agilent N5247A 网络分析仪主要特性与技术指标 10 MHz 至 67 GHz2 端口或 4 端口&#xff0c;具有两个内置信号源可提供 4 端口 110 GHz 单次扫描解决方案110 dB 系统动态范围&#xff0c;32001 个点&#xff0c;32 个通道&#xff0c;5 MHz 中频带宽高输…

MySQL中深入浅出索引

文章目录前言一、索引的常见模型二、InnoDB的索引模型三、索引的维护四、索引的优化覆盖索引联合索引最左前缀原则索引下推前言 我们在看书的时候&#xff0c;打算回看某一个桥段的内容时。这是你肯定会是先翻看书的目录&#xff0c;从目录确定这段内容的位置&#xff0c;然后…

爬虫利用多线程快速爬取数据

一般单线程爬数据太慢了话&#xff0c;就采用多线程。 一般要根据两种情形来选择 自定义线程线程池 往往最关键的地方在&#xff0c;多个线程并发执行后&#xff0c;是否需要线性的返回结果。也就是先调用的线程&#xff0c;返回的结果要在前面。 或者说&#xff0c;某个对…

mysql简单数据查询——数采数据电量与耗料的日统计

目录 前言 步骤1&#xff1a;date_format函数 步骤2&#xff1a;concat函数 步骤3、4&#xff1a;查询中使用变量 完整代码 前言 在数采数据已写入mysql数据库中后&#xff0c;进行数据处理&#xff0c;统计电量与耗料数据 由于数据库版本较低&#xff0c;无法使用较新的…

华为策略路由实验配置

配置接口相关的IP地址&#xff0c;并配置IGP路由协议使得全网互通 AR1配置接口策略路由 对经过本地转发的路由生效&#xff0c;对本地始发的路由不生效 配置nqa检测下一跳状态 nqa test-instance PC1 icmptrace nqa的管理者为PC1&#xff0c;NQA的测试例名为icmptrace test-…

全国青少年软件编程(Scratch)等级考试二级考试真题2022年12月——持续更新.....

1.一个骰子,从3个不同角度看过去的点数如图所示,请问5的对面是什么点数?( ) A.1 B.3 C.4 D.6 正确答案:A 答案解析: 根据图三,用右手定则,大拇指朝上指向6所对的方向,其余四指握起来表示旋转方向,可以看到先5后2,然后把这个姿势对应到图1中,就知道1的对面是5…

C语言进阶(8)——动态内存空间管理

前言 文章目录前言1.为什么存在动态内存分配2.动态内存函数的介绍2.1 malloc函数2.2 free函数2.3 calloc2.4realloc3 常见的动态内存错误4.经典笔试题题目 1&#xff1a;题目 2&#xff1a;题目 3&#xff1a;题目 4&#xff1a;5.C/C程序的内存开辟6.柔性数组6.1 定义6.2 柔性…

1.5、中断和异常

整体框架 1、中断的概念和作用 当中断发生时&#xff0c;CPU 立即进入核心态\color{red}核心态核心态 当中断发生后&#xff0c;当前运行的进程暂停运行&#xff0c;并由操作系统内核对中断进行处理 对于不同的中断信号&#xff0c;会进行不同的处理 发生了中断&#xff0c…

记录--“非主流” 的纯前端性能优化

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 性能优化一直是前端研究的主要课题之一&#xff0c;因为不仅直接影响用户体验&#xff0c;对于商业性公司&#xff0c;网页性能的优劣更关乎流量变现效率的高低。例如 DoubleClick by Google 发现&…

MD5加密

MD5加密 md5加密 明文 加密变成 128位二进制 --> 32位16进制字符串的密文 MD5特征: 明文一样, 得到密文一样密文一样, 推出明文一样明文不一样, 得到密文不一样 缺点&#xff1a; 现在网上有很多暴力破解的网址&#xff0c;直接使用md5加密还是不太安全 为了提高安全性&am…

Linux常用命令——vmstat命令

在线Linux命令查询工具(http://www.lzltool.com/LinuxCommand) vmstat 显示虚拟内存状态 补充说明 vmstat命令的含义为显示虚拟内存状态&#xff08;“Viryual Memor Statics”&#xff09;&#xff0c;但是它可以报告关于进程、内存、I/O等系统整体运行状态。 语法 vmst…

Java高并发编程实战,异步注解@Async自定义线程池

一、Async注解 Async的作用就是异步处理任务。 在方法上添加Async&#xff0c;表示此方法是异步方法&#xff1b; 在类上添加Async&#xff0c;表示类中的所有方法都是异步方法&#xff1b; 使用此注解的类&#xff0c;必须是Spring管理的类&#xff1b; 需要在启动类或配置类…

ELK日志(1)

Elasticsearch开源分布式搜索引擎&#xff0c;它的特点有&#xff1a;分布式&#xff0c;零配置&#xff0c;自动发现&#xff0c;索引自动分片&#xff0c;索引副本机制&#xff0c;restful 风格接口&#xff0c;多数据源&#xff0c;自动搜索负载等。RESTFUL特点包括&#xf…

MES系统之工控

MES系统之工控 要控制MES系统首先要对他有个了解。MES系统最早由1990年&#xff0c;由美国先进制造研究中心AMR提出的&#xff0c;当时中文意思叫制造执行系统概念。直到1997年&#xff0c;MESA(制造执行系统协会)提出了MES功能组件和继承模型&#xff0c;到20世纪90年代初期&a…

动态内存管”家“

&#x1f40b;动态内存管理&#x1f996;动态内存分配存在的意义&#x1f996;动态内存函数的介绍&#x1f424;malloc和free&#x1f424;calloc&#x1f424;realloc&#x1f996;常见动态内存错误&#x1f424;对空指针的解引用操作&#x1f424;对动态开辟空间的越界访问&a…