Transformation(转换算子)

news2024/11/16 2:43:34

分布式代码的分析

请添加图片描述

启动spark程序的代码

在yarn中启动(没有配置环境变量)

/export/server/spark/bin/spark-submit --master yarn --num-executors 6 /root/helloword.py 
# 配置环境变量
spark-submit --master yarn --num-executors 6 /root/helloword.py 

RDD的五大特征

1、RDD是分区的

2、计算方法都在作用在每一个分区上

3、RDD之间是有依赖关系的(RDD之间有血缘关系)

4、kv型RDD是可以有分区器的

5、RDD分区数据的读取都会接近数据所在地

RDD的创建

通过并行集合进行创建(并行化创建)

概念:并行化创建是指将本地集合-> 转向分布式RDD

这一步就是分布式的开端:本地转分布式

API:

rdd = Sparkcontext.parallelize(参数1,参数2)
参数1:集合即对象,比如list
参数2:分区数

使用案例:

from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    # 初始化执行环境,构建sparkContext对象
    conf  = SparkConf().setAppName("TEST").setMaster("local[*]")
    # 通过conf创建一个SparkContext对象
    sc = SparkContext(conf=conf)
    # 通过并行化集合的方式去创建RDD
    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)分区数的参与度很小
    print('分区数:',rdd.getNumPartitions())
    print("rdd的内容:",rdd.collect())

读取外部数据源(读取文件)

读取文件创建RDD使用textfile的API

textfile 可以读取本地数据,也可以读取hdfs数据

使用方法:

sparkcontext.textFile(参数1,参数2)
# 参数1:必填,文件路径支持本地文件
# 参数2:可选,表示最小分区数量,最小分区数是参考值

wholeTextFile

读取文件API使用场景:适合读取一堆小文件

使用方法:

Sparkcontext。wholeTextfiles(参数1,参数2)

# 参数1:必填,文件路径,支持本地文件,支持HDFS 也支持一些协议例如s3协议

# 参数:可选,最小分区数

RDD算子

算子定义

算子:分布式集合对象的api称之为算子

方法函数:本地对象的API,叫做方法\函数

算子:分布式对象的API叫做算子

算子分类

RDD的算子分为两类:

​ Transformation:转换算子

​ Action:动作(行动)算子

Transformation(转换算子)

定义:RDD算子,返回值仍旧是一个RDD的,称之为转换算子

特性:这类算子就是 lazy 懒加载的如果没有action算子,Transformation算子是不工作的

常用的Transformation算子

map算子:

功能:map算子是将rdd的数据一条条的处理(处理的逻辑 基于map算子中接收的处理函数),并且返回新的rdd

API:

rdd.map(func)
# func: f:(T)->U
# f:表示一个函数或者方法
# (T)——》表示的是方法的定义:
# ()表示传入的参数 (T)表示 传入一个参数 ()
# T 是泛型的代称,在这里表示 任意类型
# U 也是泛型代称,在这里表示,任意类型
# -> 表示返回值
# (T)—> U 总结起来的意思是:这是一个方法,这个方法接受一个参数据传入,传入的方式类型不限,返回一个返回值,返回的类型不限
# (A)-> A 总结起来的意思是:这是一个方法,这个方法接受一个参数传入,传入的参数的类型不限,返回一个返回值,但是返回值的传入参数类型一致

map的定义方法:

# 作为算子传入函数体
rdd = sc.parallelize([1,2,3,4,5,6],3)
    def add(data):
        return data*10
    print(rdd.map(add).collect())
    

reduceByKey算子

功能:针对kv型RDD自动按照key分组,然后根据自己提供的聚合逻辑完成子内数据的聚合操作

api

rdd.reduceByKey(func)
# func:(v,v)-> v
# 接受2个传入参数(类型要一致),返回一个返回值,类型和传入要求一致

代码实现案例:

rdd = sc.parallelize([('a',1),('a',1),('b',1),('b',1),('a',1)])
print(rdd.reduceByKey(lambda a,b:a+b).collect())

mapValues算子

功能:针对二元元组RDD,对其内部的二元元组的value执行map操作

api:

rdd.mapValues(func)
# func:(V)->u
# 注意,传入的参数,是二元元组的value值
# 我们这个传入的方法,只对value进行处理

案例:

rdd = sc.parallelize([('a',1),('a',11),('a',6),('b',3)])
print(rdd.mapValues(lambda values :values * 10).collect())

groupBy算子

功能:将rdd的数据进行分组

API:

rdd.groupBy(func)
# func 函数
# func:(t)->k
# 函数要求传入一个参数,返回一个返回值,类型无所谓
# 这个函数是 拿到你的返回值后,将相同返回值的放入一个组中
# 分组完成后,每个组是一个二元元组,key就是返回值,所有的数据放入一个迭代器中作为value

算子案例:

from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setAppName('groupby').setMaster('local[*]')
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([('a',1),('a',1),('b',1),('b',1),('b',1)])
    # 通过group by 对数据机型分组
    # group By 传入的函数意思是:这个函数确定按照谁来分组
    # 分组规则和SQL是一致的,相同的在一组
    result = rdd.groupBy(lambda t: t[0])
    print(result.collect())
    print(result.map(lambda t:(t[0] ,list(t[1]))).collect())

ilter算子

功能:过滤不想要的数据

算子案例:

from pyspark import SparkConf,SparkContext
conf = SparkConf().setAppName('filer').setMaster('local[*]')
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1,2,3,4,5,6])
# 使用filer进行过滤 
rdd_filer = rdd.filter(lambda x:x>1)
print(rdd_filer.collect())

distinct算子

功能:对rdd的数据进行去重,并且返回新的RDD

api:

rdd.distinct(参数1)
# 参数1:去重分区数量,一般不用

算子案例:

from pyspark import SparkConf,SparkContext
if __name__ == '__main__':

    Conf = SparkConf().setMaster('local[*]').setAppName('distin')
    sc = SparkContext(conf=Conf)
    rdd = sc.parallelize([1,1,1,1,1,2,2,2,22,3,3,3,3,3,34])
    # 使用distinct对RDD数据进行去重处理
    rdd_distinct = rdd.distinct()
    print(rdd_distinct.collect())
    
# 结果[1, 2, 34, 3, 22]

union算子

功能:将两个rdd合并成为一个rdd返回

算子特点:

1、rdd的类型不同也是可以进行合并的

2、union算子时不可以自动去重的

api:

rdd.union(other_rdd)

算子案例:

from pyspark import SparkConf,SparkContext
if __name__ == '__main__':

    conf = SparkConf().setAppName('union').setMaster('local[*]')
    sc = SparkContext(conf=conf)
    rdd1 = sc.parallelize([1,2,3,4,5])
    rdd2  = sc.parallelize(['a','s','d','f','f'])
    rdd_union = rdd1.union(rdd2)
    print(rdd_union.collect())
    

join算子

功能:join算子对两个RDD执行join操作(可实现SQL的内/外连接)

对于join算子来说 关联条件 是按照二元元组的key进行关联的

注意:join算子只能用于二元元组

API:

rdd.join(other_rdd)# 内连接
rdd.leftoutherjoin(other_rdd)# 左外
rdd.rightOutherjoin(other_rdd)# 右外

算子案例:

from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setMaster('local[*]').setAppName('JOIN')
    sc = SparkContext(conf=conf)

    rdd1 = sc.parallelize([(1001,"文章"),(1002,'英文')])
    rdd2 = sc.parallelize([(1001,"于金陵"),(1002,'yujn=inlong'),(1003,'尽情与')])
    rdd_join = rdd1.join(rdd2)
    print(rdd_join.collect())
    rdd_left = rdd1.leftOuterJoin(rdd2)
    print(rdd_left.collect())
    rdd_right = rdd1.rightOuterJoin(rdd2)
    print(rdd_right.collect())
   

intersection算子

功能:求2个rdd的交集,返回一个新的rdd

api:

rdd.instersection(other_rdd)

算子案例:

from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setAppName('instersection').setMaster('local[*]')
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([1,2,3,4])
    rdd2 = sc.parallelize([1,2,3,44,550])
    rdd1 = rdd.intersection(rdd2)
    print(rdd1.collect())

glom算子

功能:将RDD的数据加上嵌套,这个嵌套按照分区来进行

当需要解嵌套是可以使用

flaimap算子进行转换

api:

rdd.glom()

算子案例:

from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setAppName('instersection').setMaster('local[*]')
    sc = SparkContext(conf=conf)
    
    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
    print(rdd.glom().collect())

groupByKey算子

功能:针对kv型rdd,自动按照key进行分组

api:

rdd.groupByKey()# 自动按照key分组

算子案例:

from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setAppName('groupbykey').setMaster('local[*]')
    sc = SparkContext(conf=conf)
    
    rdd = sc.parallelize([('a',1),("a",2),('a',3),('b',1),('b',2),('b',3)])
    rdd_bykey = rdd.groupByKey()
    print(rdd_bykey.map(lambda x: (x[0],list(x[1]))).collect())

sortBy算子

功能:对rdd数据进行排序,基于你指定的排序依据

api:

rdd.sortby(func,ascending=false,numparttions=1)
# func(T)->U:告知按照rdd中的哪个数据排序比如:lambda x: x[1]表示按照rdd中的第二列元素进行排序
# ascending True 升序 false 降序
# numPartitons:用多少分区排序

算子案例:

from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setAppName('sortBy').setMaster('local[*]')
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([('a',11),('c',4),('f',3),('g',2)])
    rdd_sort = rdd.sortBy(lambda x: x[1], ascending=True,numPartitions=3)
    print(rdd_sort.collect())

sortByKey算子

功能:针对kv型RDD按照key进行排序

aip:

sortByKey(ascending= True,numPartitions=None,keyfunc=<function RDD.<lambda>)
# ascending:升序或者降序 true是升序,False是降序,默认是升序
# numPartitions:按照几个分区排序,如果全局有序,设置1
# Keyfunc :在排序前对key进行处理,语法(k)->u,一个参数传入,返回一个值

算子案例演示:

from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setAppName('sortBy').setMaster('local[*]')
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([('a',11),('c',4),('f',3),('g',2),('E',1),('s',10),('Q',8)])
    print(rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda key: str(key).lower()).collect())

综合案例:

import json
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setAppName('sortBy').setMaster('local[*]')
    sc = SparkContext(conf=conf)
    #1、 读取数据文件
    rdd = sc.textFile(r'C:\Users\HONOR\Desktop\测试数据\order.text')
    # 2、flatMap算子进行数据整理
    rdd_json = rdd.flatMap(lambda x: x.split('|'))
    # 3、通过json 库进行数据类型的转换
    rdd_json_j =rdd_json.map(lambda x: json.loads(x))
    # 4、筛选出数据中城市为北京的数据
    rdd_json_biejing = rdd_json_j.filter(lambda x: x['areaName'] == '北京')
    # 5、将城市为北京的所有商品数据类型进行的字符段进行合并并且去重
    rdd_l = rdd_json_biejing.map(lambda x: x['areaName']+":"+x['category']).distinct()
    #6、 对筛选的数据进行总结输出
    print(rdd_l.collect())

将案例提交道YARN集群中运行

# 改动1:加入环境变量,让pycharm直接提交到yarn的时候,知道Hadoop的配置在哪,可以读取yarn的信息
import os
os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop'
# 在集群运行,本地文件就不可以用了,需要用hdfs文件
rdd = sc.textFile('hdfs://node1:8020/input/order.text')

如果在pycharm中直接提交到yarn,那么依赖的其他的python文件,可以通过设置文件属性来指定依赖代码

# 如果在代码中运行,那么依赖的文件,可以通过spark.sumbit.pyFiles属性来设置
#conf对象,可以通过setAPI 设置数据,参数1:key 参数2是value
conf.set('spark.submit.pyFiles',"defs.py")

在服务器上通过spark-submit提交到集群运行

# --py-files 可以帮你指定 你依赖的其他python代码,支持.zip(一堆),也可以单个.py文件 都行
/export/server/spark/bin/spark-submit --master yarn --py-files ./defs.[文件格式] ./mian.py

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/342951.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ZYNQ-嵌入式学习(4)

GPIO之MIO中断GPIO的MIO中断功能实验&#xff1a;使用GPIO的MIO中断功能&#xff0c;实现按键控制LED的亮灭。GPIO的MIO中断功能 从MIO输入到GPIO的线路有一个通向中断检测模块的分支。 INT_TYPE寄存器表示中断类型。包括边沿和电平两种类型。 INT_POLARITY寄存器表示极性。包括…

基于 STM32+FPGA 的多轴运动控制器的设计

运动控制器是数控机床、高端机器人等自动化设备控制系统的核心。为保证控制器的实用性、实时性和稳定 性&#xff0c;提出一种以 STM32 为主控制器、FPGA 为辅助控制器的多轴运动控制器设计方案。给出了运动控制器的硬件电路设计&#xff0c; 将 S 形加减速算法融入运动控制器&…

【Git】合并多条 commit 注释信息

文章目录1、查看 commit 记录2、合并 commit 注释1、查看 commit 记录 # 3 指的是查看最近 3 次的 commit 记录&#xff0c;如果要查看多次的可以修改数字 # -3 不加&#xff0c;则表示查看所有 commit 记录&#xff0c;一般还是用数字去指定 git log -32、合并 commit 注释 …

【图像分类】基于PyTorch搭建LSTM实现MNIST手写数字体识别(单向LSTM,附完整代码和数据集)

写在前面: 首先感谢兄弟们的关注和订阅,让我有创作的动力,在创作过程我会尽最大能力,保证作品的质量,如果有问题,可以私信我,让我们携手共进,共创辉煌。 提起LSTM大家第一反应是在NLP的数据集上比较常见,不过在图片分类中,它同样也可以使用。我们以比较熟悉的 mnist…

软件测试面试自我介绍/项目介绍居然还有模板?我要是早点发现就好了

目录 1、自我介绍 2、项目介绍 2.1、最全电商项目介绍 2.2、电商项目介绍 2.3、在线教育项目介绍 2.4、互联网金融项目介绍 总结 1、自我介绍 以XXX简历来举例&#xff08;参照下面的案例&#xff0c;编写你的自我介绍&#xff0c;框架就是&#xff1a;我是谁&#xff0…

深入Kafka核心设计与实践原理读书笔记第三章消费者

消费者 消费者与消费组 消费者Consumer负责定于kafka中的主题Topic&#xff0c;并且从订阅的主题上拉取消息。与其他消息中间件不同的在于它有一个消费组。每个消费者对应一个消费组&#xff0c;当消息发布到主题后&#xff0c;只会被投递给订阅它的消费组的一个消费者。 如…

go gin学习记录1

环境&#xff1a; MAC M1&#xff0c;Go 1.17.2&#xff0c;GoLand 默认执行指令的终端&#xff0c;如果没有特别说明&#xff0c;指的都是goland->Terminal 创建项目 Goland中新建项目&#xff0c;在$GOPATH/src/目录下建立t_gin项目。 进入项目&#xff0c;在goland的T…

spark04-文件读取分区数据分配原理

接 https://blog.csdn.net/oracle8090/article/details/129013345?spm1001.2014.3001.5502通过上一节知道 总字节数为7 每个分区字节数为3代码val conf: SparkConf new SparkConf().setMaster("local").setAppName("wordcount")val sc: SparkContext ne…

日日顺供应链|想要看清供应链发展趋势,先回答这三个问题

技术变革如何支撑供应链及管理服务的发展&#xff1f; 数字化与科技化开始承托供应链管理能力的升级与变革&#xff1f; 如何从客户需求的纬度反推供应链及管理服务的模式变革&#xff1f;在过去的三年中&#xff0c;我国的供应链企业经受了最为极端的挑战&#xff0c;但当下&a…

论文写作——公式编辑器、latex表格、颜色搭配器

1、公式编辑器(网页版mathtype可用于latex公式编辑): MathType demo - For DevelopersLive demonstration about the features of Mathtype which allows edition equations and formulas (PNG, flash, SVG, PDF, EPS), based on MathML and compatible with LaTeX.https:/…

C++之可调用对象、bind绑定器和function包装器

可调用对象在C中&#xff0c;可以像函数一样调用的有&#xff1a;普通函数、类的静态成员函数、仿函数、lambda函数、类的非静态成员函数、可被转换为函数的类的对象&#xff0c;统称可调用对象或函数对象。可调用对象有类型&#xff0c;可以用指针存储它们的地址&#xff0c;可…

孙子兵法-36计

目录 04、攻其无备&#xff0c;出其不意。——《孙子兵法始计篇》 08、不战而屈人之兵&#xff0c;善之善者也。——《孙子兵法谋攻篇》 09、上兵伐谋&#xff0c;其次伐交&#xff0c;其次伐兵&#xff0c;其下攻城。 01、兵者&#xff0c;国之大事&#xff0c;死生之地&…

想要的古风女生头像让你快速get

如今我看到很多人都喜欢用古风女生当作头像&#xff0c;那么今天我就来教大家如何快速得到一张超美的古风女生头像~ 上图就是我使用 APISpace 的 AI作画(图像生成)服务 快速生成的古风女生头像&#xff0c;不仅可以限定颜色&#xff0c;还可以选择『宝石镶嵌』或『花卉造型』这…

【HAL库】STM32CubeMX开发----STM32F407----Uart串口接收空闲中断

一、Uart串口接收空闲中断----详解 首先介绍串口通信的数据传输方式&#xff0c;这样后面的Uart串口空闲中断能更好的理解。 Uart串口通信----数据传输方式 串口通信的数据由发送设备通过自身的TXD接口传输到接收设备得RXD接口。 一个字符一个字符地传输&#xff0c;每个字符…

设计模式C++实现11:观察者模式

参考大话设计模式&#xff1b; 详细内容参见大话设计模式一书第十四章&#xff0c;该书使用C#实现&#xff0c;本实验通过C语言实现。 观察者模式又叫做发布-订阅&#xff08;Publish/Subscribe&#xff09;模式。 观察者模式定义了一种一对多的依赖关系&#xff0c;让多个观察…

Windows安装Gradle(IDEA兼容版)

IDEA兼容版本 IDEA安装目录下查看兼容Gradle版本&#xff1a;D:\win11\program\idea_2022.2.3\plugins\gradle\lib Gradle下载环境变量 1.创建仓库目录 D:\win11\program\gradle-7.4-bin\gradle-7.4\repository2.添加环境变量 GRADLE_HOME&#xff1a;D:\win11\program\gradle…

Java连接Redis

Jedis是Redis官方推荐的Java连接开发工具。api&#xff1a;https://tool.oschina.net/apidocs/apidoc?apijedis-2.1.0一、 导入包<!-- https://mvnrepository.com/artifact/redis.clients/jedis --> <dependency><groupId>redis.clients</groupId><…

在职阿里6年,一个29岁女软件测试工程师的心声

简单的先说一下&#xff0c;坐标杭州&#xff0c;14届本科毕业&#xff0c;算上年前在阿里巴巴的面试&#xff0c;一共有面试了有6家公司&#xff08;因为不想请假&#xff0c;因此只是每个晚上去其他公司面试&#xff0c;所以面试的公司比较少&#xff09;其中成功的有4家&…

新版bing(集成ChatGPT)的申请方法

大家好,我是herosunly。985院校硕士毕业,现担任算法研究员一职,热衷于机器学习算法研究与应用。曾获得阿里云天池比赛第一名,科大讯飞比赛第三名,CCF比赛第四名。拥有多项发明专利。对机器学习和深度学习拥有自己独到的见解。曾经辅导过若干个非计算机专业的学生进入到算法…

redis知识汇总(部署、高可用、集群)

文章目录一、redis知识汇总什么是redisredis的优缺点&#xff1a;为什么要用redis做缓存redis为什么这么快什么是持久化redis持久化机制是什么&#xff1f;各自优缺点&#xff1f;AOF和RDB怎么选择redis持久化数据和缓存怎么做扩容什么是事务redis事务的概念ACID概念主从复制re…