Python学习从0到1 day26 第三阶段 Spark ④ 数据输出

news2024/11/15 0:33:24

半山腰太挤了,你该去山顶看看

                                        —— 24.11.10

一、输出为python对象

1.collect算子

功能:

将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象

语法:

rdd.collect()

返回值是一个list列表

示例:

from pyspark import SparkConf,SparkContext
import os

conf = SparkConf().setMaster("local").setAppName("test_spark")
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
sc = SparkContext(conf = conf)

Set = {"小明","小红","小强"}
Tuple = ("小明","小红","小强")

set_rdd = sc.parallelize(Set)
tuple_rdd = sc.parallelize(Tuple)

print(set_rdd.collect())
print(tuple_rdd.collect())


2.reduce算子

功能:

对RDD数据集按照你传入的逻辑进行聚合

语法:

rdd.reduce(func)

rdd = sc.parallelize(range(1 , 10))
# 将rdd的数据进行累加求和
print(rdd.reduce(lambda a , b : a + b))

返回值等同于计算函数的返回值

示例:

from pyspark import SparkContext,SparkConf
import os
import json

os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"

conf = SparkConf().setMaster("local").setAppName("test_spark")
sc = SparkContext(conf = conf)

List = [1,2,3,4,5,6,7,8,9]
rdd = sc.parallelize(List)
print(rdd.reduce(lambda x, y : x + y))


3.take算子

功能:

取RDD的前N个元素,组合成list返回

语法:

sc.parallelize([3,2,1,4,5,6]).take(5)    # [3,2,1,4,5]

 返回前n个元素组成的list

示例:

from pyspark import SparkContext,SparkConf
import os
import json

os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
List = (1,2,3,4,5,6,7,8,9)
rdd = sc.parallelize(List)
res = rdd.take(4)
print("前四个元素为:"+res)


4.count算子

功能:

计算RDD有多少条数据

语法:

sc.parallelize([3,2,1,4,5,6]).count()

返回值是一个数字

示例:

from pyspark import SparkConf,SparkContext
import os
import json

os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

rdd = sc.parallelize(["yyh","hl","grq","zxj","cby","wfe","mrr","qjy"])
print(rdd.count())


二、输出到文件中

1.saveAsTextFile算子

功能:

RDD的数据写入文本文件

支持本地写出、 hdfs等文件系统

语法:

rdd = sc.parallelize([1,2,3,4,5])
rdd.saveAsTextFile("../data/output/test.txt")

2.配置Hadoop相关依赖

调用保存文件的算子,需要配置Hadoop依赖

① 下载Hadoop安装包

http://archive.apache.org/dist/hadoop/common/hadoop-3.0.0/hadoop-3.0.0.tar.gz

② 解压到电脑任意位置

③ 在Python代码中使用os模块配置:

os.environ['HADOOP HOME']='HADOOP解压文件夹路径'
E:\python.learning\hadoop分布式相关\hadoop-3.0.0

④ 下载winutils.exe,并放入Hadoop解压文件夹的bin目录内

https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/winutils.exe

⑤ 下载hadoop.dll,并放入:C:/Windows/System32 文件夹内

https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/hadoop.dll


3.代码示例

from pyspark import SparkConf,SparkContext
import os

conf = SparkConf().setMaster("local").setAppName("test_spark")
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
sc = SparkContext(conf = conf)

# 准备RDD1
rdd1 = sc.parallelize([1,2,3,4,5])

# 准备RDD2
rdd2 = sc.parallelize([("Hello, 3"),("Spark", 5),("Hi", 7)])

# 准备RDD3
rdd3 = sc.parallelize([[1, 3, 5],[6, 7, 9],[11, 13, 11]])

# 输出到文件中
rdd1.saveAsTextFile("E:\python.learning\hadoop分布式相关\data\output1/rdd1")
rdd2.saveAsTextFile("E:\python.learning\hadoop分布式相关\data\output2/rdd2")
rdd3.saveAsTextFile("E:\python.learning\hadoop分布式相关\data\output3/rdd3")

注:如果输出路径的文件存在,代码将会报错


4.运行结果

创建几个文件取决于Hadoop上的分区数量

解决方式:修改rdd的分区


5.修改rdd分区为1个

方式1

Sparkconf对象设置属性全局并行度为1:

from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
os.environ['HADOOP_HOME'] = "E:\python.learning\hadoop分布式相关\hadoop-3.0.0"
conf = SparkConf().setMaster("local").setAppName("test_spark")
conf.set("spark.default.parallelize", "1")
sc = SparkContext(conf = conf)

# 准备RDD1
rdd1 = sc.parallelize([1,2,3,4,5])

# 准备RDD2
rdd2 = sc.parallelize([("Hello, 3"),("Spark", 5),("Hi", 7)])

# 准备RDD3
rdd3 = sc.parallelize([[1, 3, 5],[6, 7, 9],[11, 13, 11]])

# 输出到文件中
rdd1.saveAsTextFile("E:\python.learning\hadoop分布式相关\data\output1/rdd1")
rdd2.saveAsTextFile("E:\python.learning\hadoop分布式相关\data\output2/rdd2")
rdd3.saveAsTextFile("E:\python.learning\hadoop分布式相关\data\output3/rdd3")

方式2

创建RDD的时候设置 parallelize方法传入numSlices参数为1:

rdd1 = sc.parallelize([1,2,3,4,5],1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2240497.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

机器学习在网络安全中的应用

💓 博客主页:瑕疵的CSDN主页 📝 Gitee主页:瑕疵的gitee主页 ⏩ 文章专栏:《热点资讯》 机器学习在网络安全中的应用 机器学习在网络安全中的应用 机器学习在网络安全中的应用 引言 机器学习概述 定义与原理 发展历程 …

JMeter进阶篇

目录 上篇导航: 总目录: 一、逻辑控制器: 1.逻辑控制器和关联: 2.if逻辑控制器: 3.forEach控制器: 4.循环控制器: 二、关联: 1.xpath: 2.正则表达式提取器&…

O-RAN简介

O-RAN简介 概览 如今,全球蜂窝数据使用量持续增长,因此,电信系统必须随之进行革新,才能满足这一需求量。虽然5G标准能够满足更高的蜂窝吞吐量需求,且有望实现各种新的应用场景,但如果网络没有进行相应的改进,许多拟定的5G应用只能是纸上谈兵。以高可靠低延时通信(URLL…

Spring设计模式

设计模式 是一种软件开发中的解决方案,设计原则。目的是使代码具有扩展性,可维护性,可读性,如: 单例模式(Singleton Pattern) Spring IoC 容器默认会将 Bean 创建为单例,保证一个类…

安全的时钟启动

Note:文章内容以 Xilinx 系列 FPGA 进行讲解 1、什么是安全启动时钟 通常情况下,在MMCM/PLL的LOCKED信号抬高之后(由0变为1),MMCM/PLL就处于锁定状态,输出时钟已保持稳定。但在此之前,输出时钟会…

【含开题报告+文档+PPT+源码】基于Springboot和vue的电影售票系统

开题报告 随着电影产业的快速发展和科技的不断进步而逐渐形成的。在早期,电影票的销售主要依赖于传统的实体售票窗口和人工售票员,这种方式虽然直接,但效率低下,容易出现错误,并且无法满足大规模、高流量的售票需求。…

5G的发展演进

5G发展的驱动力 什么是5G [远程会议,2020年7月10日] 在来自世界各地的政府主管部门、电信制造及运营企业、研究机构约200多名会议代表和专家们的共同见证下,ITU-R WP 5D#35e远程会议宣布3GPP 5G技术(含NB-IoT)满足IMT-2020 5G技…

Vue全栈开发旅游网项目(11)-用户管理前端接口联调

联调基本步骤 1.阅读接口文档 2.配置接口地址 3.使用axios获取数据 4.将数据设置到模型层 1.发送验证码联调 1.1 配置接口地址 文件地址:src\utils\apis.js //系统相关的接口 const SystemApis {sliderListUrl:apiHost"/system/slider/list/",//发送…

接口返回的结构体里包含图片(做图片预览)

摘要&#xff1a;有这么一种情况&#xff0c;页面上有一块儿内容是接口返回的&#xff0c;前端做渲染&#xff0c;比如 <div><p><img srcxxx /></p><p>测试</p> </div> 这是接口返回的字符串结构数据&#xff0c;这时候需要前端做…

免费,WPS Office教育考试专用版

WPS Office教育考试专用版&#xff0c;不仅满足了考试需求&#xff0c;更为教育信息化注入新动力。 https://pan.quark.cn/s/609ef85ae6d4

aws中AcmClient.describeCertificate返回值中没有ResourceRecord

我有一个需求&#xff0c;就是让用户自己把自己的域名绑定我们的提供的AWS服务器。 AWS需要验证证书 上一篇文章中我用php的AcmClient中的requestCertificate方法申请到了证书。 $acmClient new AcmClient([region > us-east-1,version > 2015-12-08,credentials>[/…

ctfshow-web入门-反序列化(web271-web278)

目录 1、web271 2、web272 3、web273 4、web274 5、web275 6、web276 7、web277 8、web278 laravel 反序列化漏洞 1、web271 laravel 5.7&#xff08;CVE-2019-9081&#xff09; poc <?php namespace Illuminate\Foundation\Testing{use Illuminate\Auth\Generic…

程序员的数学之进制与零

最近一年多发生了很多平凡的大事&#xff0c;应接不暇&#xff0c;一度断更。从今儿起再接上来。 先从数学开始吧&#xff0c;因为太枯燥了。 生活中有许多种进制在共同起作用&#xff0c;例如&#xff0c;数学上的十进制、计算机中的二进制、八进制和十六进制、计时的60进制、…

GPT-5 要来了:抢先了解其创新突破

Microsoft 的工程师计划于 2024 年 11 月在 Azure 上部署 Orion (GPT-5)。虽然这一版本不会向公众开放&#xff0c;但其上线被视为人工智能领域的一个重要里程碑&#xff0c;并将产生深远的影响。 文章目录 GPT-5 真的要来了GPT-4 的局限性GPT-5 的创新突破与遗留挑战GPT-5 预期…

01-Ajax入门与axios使用、URL知识

欢迎来到“雪碧聊技术”CSDN博客&#xff01; 在这里&#xff0c;您将踏入一个专注于Java开发技术的知识殿堂。无论您是Java编程的初学者&#xff0c;还是具有一定经验的开发者&#xff0c;相信我的博客都能为您提供宝贵的学习资源和实用技巧。作为您的技术向导&#xff0c;我将…

堆中的时间复杂度+TOP K问题

堆中的时间复杂度分析 回顾: 堆在物理上:数组 逻辑上:完全二叉树 1.堆排序是什么? // 排升序void HeapSort(int* a, int n){// 建大堆 -for (int i (n - 1 - 1) / 2; i > 0; --i){AdjustDown(a, n, i);}int end n - 1;while (end > 0){Swap(&a[0], &a[end]…

学Linux的第八天

目录 管理进程 概念 程序、进程、线程 进程分类 进程前后台调用 查看进程 ps命令 unix 风格 bsd风格 GNU风格 top命令 格式 统计信息区 进程信息区&#xff1a;显示了每个进程的运行状态 kill命令 作用 格式 管理进程 概念 程序、进程、线程 程序&#x…

网络初识--Java

一、网络通信基础 1.IP地址 IP地址主要⽤于标识⽹络主机、其他⽹络设备&#xff08;如路由器&#xff09;的⽹络地址。简单说&#xff0c;IP地址⽤于定位主 机的⽹络地址。 就像我们发送快递⼀样&#xff0c;需要知道对⽅的收货地址&#xff0c;快递员才能将包裹送到⽬的地。…

Linux软件包管理与Vim编辑器使用指南

目录 一、Linux软件包管理器yum 1.什么是软件包&#xff1f; 2.什么是软件包管理器&#xff1f; 3.查看软件包 4.安装软件 ​编辑 5.卸载软件 Linux开发工具&#xff1a; 二、Linux编辑器---vim 1.vim的基本概念 (1) 正常/普通模式&#xff08;Normal mode&#xff0…

标准库 -- 为什么 EXTI中断需要使能复用时钟与为什么不需要使能?

在STM32中&#xff0c;使用外部中断&#xff08;EXTI&#xff09;时需要使能复用功能&#xff0c;这和其他中断&#xff08;如串口中断、定时器中断&#xff09;有所不同。以下是为什么在使用外部中断时需要使能复用&#xff0c;以及其他中断不需要复用的原因。 一、为什么 EX…