【4-5章】Spark编程基础(Python版)

news2025/1/24 8:53:16

课程资源:(林子雨)Spark编程基础(Python版)_哔哩哔哩_bilibili


第4章  RDD编程(21节) 

Spark生态系统:

  • Spark Core:底层核心(RDD编程是针对这个)
  • Spark SQL:SQL查询
  • Spark Streaming:流计算(Structured Streaming:结构化数据流)
  • Spark MLlib:机器学习

RDD编程:对RDD进行一次又一次的转换操作

(一)RDD编程基础

1、创建

两种方式:

  1. 从文件系统中加载数据创建RDD:分布式文件系统hdfs 或 本地文件系统 或 云端文件如Amazon S3(Amazon云端存储服务)
  2. 通过并行集合(数组)创建RDD:对集合进行并行化

(1)从文件系统中加载数据:Spark的SparkContext通过 sc.textFile() 读取数据,生成内存中的RDD  

Driver节点为指挥所;SparkContext对象为指挥官 

# 从本地文件系统中加载数据创建RDD
# sc即driver节点里的SparkContext对象
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")  # 本地文件是///
lines.foreach(print)  # 遍历RDD每个元素并输出

SparkContext在独立应用程序(即代码文件)里需要编写代码生成;但在pyspark交互式执行环境里,系统会默认创建sc,不需再人为创建

RDD每个元素对应文本文件的一行,是字符串类型 

(2)从分布式文件系统HDFS中加载数据:

# 三条语句完全等价,可以使用其中任一种
lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")  # hdfs://,localhost为主机名,9000为端口号(系统默认去当前登录Ubuntu系统的用户在HDFS中所对应的用户主目录去找)
lines = sc.textFile("/user/hadoop/word.txt")   # 用户主目录,Linux系统默认/home/用户名,简写为/~
lines = sc.textFile("word.txt")   # txt文件放在当前用户主目录下
  • Linux系统主目录默认为/home/用户名,简写为~ 
  • Hadoop文件系统默认为当前登录Linux系统的用户,hdfs://localhost:9000/user/用户名/(全称路径),可以只写为 /user/用户名/

(3)通过并行集合(数组/列表)创建RDD:SparkContext 的 sc.parallelize() 方法,可以对array并行化,生成内存中的RDD

array = [1,2,3,4,5]
rdd = sc.parallelize(array)
rdd.foreach(print)   # 遍历取出打印
# 结果:
# 1
# 2
# 3
# 4
# 5

2、基本操作

RDD操作:

  • 转换操作(Transformation):
    • filter(过滤)
    • map(一对一映射)
    • flatMap(输出0~多个结果)
    • groupByKey(Key相同的分组)
    • reduceByKey(根据Key分组后对分组的值计算)
  • 行动操作(Action):
    • count(数据集中元素个数)
    • collect(以列表形式返回数据集中所有元素)
    • first(数据集中第一个元素)
    • take(n)(以列表形式返回数据集中前n个元素)
    • reduce(func)(聚合数据集中的元素)
    • foreach(func)(将数据集中每个元素传到func运行)
  • 惰性机制:转换操作只记录轨迹,行动操作才真正执行计算

RDD转换 <=> 业务逻辑完成一次又一次转换(形成DAG有向无环图,Spark即解析DAG,生成很多个Stage,每个阶段的子任务提交到不同工作节点的线程去运行)。很多简单的转换组合后可实现复杂的业务逻辑

  • 对于RDD而言,每一次转换操作都会产生不同的RDD,供一个操作使用(RDD是只读的,一旦生成无法修改,只有在转换的过程中才能修改,生成新的RDD后又无法修改)
  • 转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作(动作类型操作Action)时才会发生真正的计算,从血缘关系的源头开始进行从头到尾的计算操作 

 (1)转换操作

  • filter(func)
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")  # 从底层文本文件加载生成内存中RDD
linesWithSpark = lines.filter(lambda line:"Spark" in line)   # 匿名函数/lambda表达式
linesWithSpark.foreach(print)   # 输出包含Spark的行

  • map(func)
data = [1,2,3,4,5]
rdd1 = sc.parallelize(data)   # 得到一个RDD
rdd2 = rdd1.map(lambda x:x+10)
rdd2.foreach(print)

lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
words = lines.map(lambda line:line.split(" "))  # 一行语句被拆分后得到的是list
words.foreach(print)  # 包含3个元素,每个元素都是list

  •  flatMap(func)
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
words = lines.flatMap(lambda line:line.split(" "))  
words.foreach(print)  # 包含9个元素(英文单词)

词频统计就是用 flatMap 将一行行语句打散成一个个单词

  • groupByKey()

应用于key-value键值对数据集,返回(key, iterable),即把key相同的值封装成一个可迭代对象

# 并行化方式生成列表封装的数据集,列表里的元素为键值对
words = sc.parallelize([("Hadoop",1), ("is",1), ("good",1), ("Spark",1), ("is",1)])
words1 = words.groupByKey()  # 相同key的值会归并起来,pyspark.resultiterable.ResultIterable object封装
words1.foreach(print)

  •  reduceByKey(func)

在groupByKey基础上对value list按照func进行计算

words = sc.parallelize([("Hadoop",1), ("is",1), ("good",1), ("Spark",1), ("is",1)])
words1 = words.reduceByKey(lambda a,b:a+b)  
words1.foreach(print)  # 如('is',2) 即 is出现2次

如若("is",(1,1,1)),第一个1赋给a,第二个1赋给b,求和得到2赋给a,第三个1赋给b,求和得到3,返回最终结果

 (2)行动操作

转换类型操作为惰性机制,并不真正发生计算,只记录轨迹;当遇到第一个行动类型操作时执行真正的计算(从底层磁盘加载数据、生成数据、转换、得到结果)

  • count():返回数据集中的元素个数
  • collect():以数组/列表的形式返回数据集中的所有元素(封装在列表里返回)
  • first():返回数据集中的第一个元素
  • take(n):以数组/列表的形式返回数据集中的前n个元素
  • reduce(func):通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
  • foreach(func):将数据集中的每个元素传递到函数func中运行

rdd.reduce(lambda a,b:a+b):

 惰性机制:

lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")  # 转换操作,只是记录轨迹,并未真正加载
lineLengths = lines.map(lambda s:len(s))  # 转换操作,只是记录轨迹,并未真正加载
totalLength = lineLengths.reduce(lambda a,b:a+b)   # 行动操作,真正执行从头到尾计算
print(totalLength)  # 每行长度相加,得到总长度

3、持久化

多次反复访问同样一组值,不做持久化的话,每次访问都需要重新生成,非常耗时(对于迭代计算而言代价很大,经常需要多次重复使用同一组数据)。持久化将其保存到内存中,下次使用时不需要从头计算

可以通过持久化(缓存)机制避免这种重复计算的开销。持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复利用

可以使用 persist() 方法对一个RDD标记为持久化(之所以说标记为持久化,是因为出现 persist() 语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化

参数:

  • MEMORY_ONLY(只存在内存中):把RDD作为反序列化的对象存在JVM中,若内存不足,就按先进先出原则替换内容    RDD.cache() = RDD.persist(MEMORY_ONLY)
  • MEMORY_AND_DISK(同时保存内存和磁盘):优先保存在内存中,内存不足的部分再存放磁盘

.unpersist() 方法手动把持久化的RDD从缓存中移除

4、分区

RDD就是弹性分布式数据集,可以在计算过程中不断动态调整分区个数

(1)好处:增加并行度(可以在多个节点上同时发生计算);减少通信开销 

  • 增加并行度:
p代表分区

  • 减少通信开销: 

  • userData表:大表,成千上百万用户,包括userID 和 userInfo(数据分块保存在不同机器上,每个块的用户ID都散布在0-1000万之间)
  • Events表:小表,包括userID 和 LinkInfo,记录用户在过去五分钟内所访问的网站链接

连接两表,让j1负责连接0-100w的用户id,以此类推

分区后只涉及events表多次的数据分发 

(2)分区原则:分区个数 = 集群中CPU核心数目

对于Spark不同部署模式(Local模式、Standalone模式、YARN模式、Mesos模式)而言,通过设置具体参数值(spark.default.parallelism)指定默认的分区数目

  • Local模式:默认为本地机器的CPU数目,若设置了Local[N],则默认为N
  •  Apache Mesos模式:默认分区数为8
  • Standalone模式:集群中所有CPU核心数目总和 和 2 中取较大值
  • YARN模式:集群中所有CPU核心数目总和 和 2 中取较大值

(3)指定分区个数:在调用 textFile() 和 parallelize() 方法时指定分区个数

sc.textFile(path, partitionNum)   
# path指定要加载的文件的地址
# partitionNum用于指定分区个数

list = [1,2,3,4,5]
rdd = sc.parallelize(list,2)  # 设置两个分区

(4)使用repartition()方法重新设置分区个数:通过转换操作得到新RDD时,直接调用repartition方法即可

(5)自定义分区方法:

  • 哈希分区 HashPartitioner
  • 区域分区 RangePartitioner
  • 自定义分区 

from pyspark import SparkConf, SparkContext

def MyPatitioner(key):
    print("MyPatitioner is running")
    print("The key is %d" %key)
    return key%10   # 作为分区编号返回

def main():
    print("The main function is running")
    conf = SparkConf().setMaster("local").setAppName("MyApp")
    sc = SparkContext(conf=conf)
    data = sc.parallelize(range(10),5)   # 分成5个分区
    data.map(lambda x:(x,1)) \    # 键值对
        .partitionBy(10, Mypartitioner)  \   # 根据key分区。.partitionBy只接受键值对类型
        .map(lambda x:x[0])  \   # 再从键值对转回原格式
        .saveAsTextFile("file:///usr/local/spark/mycode/rdd/partitioner")   # 写入10个分区,每个分区各1个文件

if __name__ == '__main__':
    main()

.partitionBy只接受键值对类型 

使用如下命令运行 TestPartitioner.py:

cd /usr/local/spark/mycode/rdd
python3 TestPartitioner.py

或者,使用如下命令运行 TestPartitioner.py:

cd /usr/local/spark/mycode/rdd
/usr/local/spark/bin/spark-submit TestPartitioner.py

5、基本实例(词频统计)

再次强调:

本地文件是三个/,即file:/// 

hdfs文件是两个/,即hdfs://

在一个集群中同时部署Hadoop和Spark,把集群中某个节点既作为HDFS的存储节点,也作为Spark的WorkerNode,即Spark的计算组件和HDFS的存储组件放在同一台机器上 

分布式词频统计:

(二)键值对RDD

键值对RDD:RDD的每个元素都是一个键值对 (key, value)

1、创建

(1)从文件中加载

(2)通过并行集合(列表)创建

2、常用的键值对RDD转换操作

  • reduceByKey(func):使用func函数合并具有相同键的值
  • groupByKey():对具有相同键的值进行分组(ResultIterable对象封装)

总结:groupByKey是对每个key进行操作,但只生成一个sequence,本身不能自定义函数,需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作;reduceByKey用于对每个key对应的多个value进行merge操作,能在本地先进行merge操作,且merge操作可以通过函数自定义 

  • keys:把Pair RDD中的key返回形成一个新的RDD
  • values:把Pair RDD中的value返回形成一个新的RDD
  • sortByKey():返回一个根据键排序的RDD,默认True(升序)
  • mapValues(func):对键值对RDD中的每个value都应用一个函数,key不会发生变化
  • join:内连接。对于给定的两个输入数据集 (K, V1) 和 (K, V2),只有在两个数据集中都存在的key才会被输出,最终得到一个 (K, (V1,V2)) 类型的数据集
  • combineByKey

(1)reduceByKey(func):先把key相同的值归并起来,再对值列表用func进行聚合计算

(2)groupBykey() 

对比 reduceByKey 和 groupByKey: 

一个 reduceByKey 等价于 一个 groupByKey + map 

(3)keys()

(4)values() 

(5)sortByKey() 

  • sortByKey() 和 sortBy() 的区别:sortBy()可以根据值进行排序,而sortByKey() 只能根据键 

(6)mapValues(func):key不变,value用func(lambda表达式)进行计算 

3、综合实例

给定一组键值对,key为图书名称,value为某天图书销量。计算每种图书的每天平均销量 

reduceByKey(): 

mapValues():

(三)数据读写

1、文件数据读写

(1)本地文件系统数据读写:

  • 注意:Spark采用惰性执行机制,即使输入了错误的语句,Spark-Shell也不会马上报错

把RDD写入到文本文件中 .saveAsTextFile给出的是目录,而不是具体文件,因为存在分区的概念):

再次把数据加载到RDD中,也要写目录,而不是文件:

(2)分布式文件系统HDFS数据读写: 

把RDD中的数据保存到HDFS文件中(路径是目录,而不是具体文件): 

再次强调,本地文件是file:///开头,而分布式文件是hdfs://开头 

2、读写HBase数据

(1)HBase简介 

HBase(分布式数据库)是Google BigTable的开源实现,也是Hadoop的成员组件,构建在Hadoop分布式文件系统HDFS基础上。即HBase的数据是保存在底层HDFS中

特性: 

  • 每个值是一个未经解释的字符串,没有数据类型
  • 用户在表中存储数据,每一行都有一个可排序的行键和任意多的列
  • 表在水平方向由一个或者多个列族组成,一个列族中可以包含任意多个列,同一个列族里面的数据存储在一起
  • 列族支持动态扩展,可以很轻松地添加一个列族或列,无需预先定义列的数量以及类型,所有列均以字符串形式存储,用户需要自行进行数据类型转换
  • HBase中执行更新操作时,并不会删除数据旧的版本,而是生成一个新的版本,旧版本仍然保留(这是和HDFS只允许追加、不允许修改的特性相关的)

概念:

  • 表:HBase采用表来组织数据,表由行和列组成,列划分为若干个列族
  • 行:每个HBase表都由若干行组成,每个行由行键(row key)来标识
  • 列族:一个HBase表被分组成许多列族(Column Family)的集合,它是基本的访问控制单元
  • 列限定符:列族里的数据通过列限定符(或列)来定位
  • 时间戳:每个单元格都保存着同一份数据的多个版本,这些版本采用时间戳进行索引
  • 单元格:在HBase表中,通过行、列族和列限定符确定一个单元格(cell),单元格中存储的数据没有数据类型,总被视为字节数组byte[] 

HBase - 稀疏、多维度、排序、映射表

  • 相关信息全部汇总在一个表里,不需进行多表连接操作,有利于在大数据时代获得高的数据读写性能
  • 一个表分为若干行和若干列族,一个列族又包含很多列/列限定符,每个行由行键(row key)来标识
  • 行键和列限定符交叉的位置叫单元格,值是以一个单元格的形式写入的(关系型数据库是一行行存储写入的)
  • 每个单元格的值可能会发生变化,但HBase底层存储是基于HDFS(只读,不可修改),通过将指针指向新版本数据间接实现修改

四维坐标定位:行键-列族-列限定符-版本时间戳

  • 关系数据库是二维定位,给出行、列即可唯一确定一个单元格值

HBase概念视图:列族contents、列限定符html

HBase物理视图:底层为行键+列族+时间戳

保存机制:水平分区+切分成很多列族(分布式存储)

(2)创建HBase表

HBase安装(配置成伪分布式模式):

启动HBase(底层存储基础为HDFS,故要先启动Hadoop):

# 启动Hadoop
cd /usr/local/hadoop  # 进入Hadoop安装目录
./sbin/start-all.sh   # 启动Hadoop
# 或start-dfs.sh(启动hdfs)

# 启动HBase
cd /usr/local/hbase   # 进入HBase安装目录
./bin/start-hbase.sh  # 启动HBase
./bin/hbase shell     # 启动HBase Shell

创建student表(确保数据库里不存在student表):

 

disable 'student'
drop 'student'

create 'student', 'info'   # 表、列族

# 录入student表第一个学生记录
put 'student', '1', 'info:name', 'Xueqian'
put 'student', '1', 'info:gender', 'F'
put 'student', '1', 'info:age', '23'

# 录入student表第二个学生记录
put 'student', '2', 'info:name', 'Weiliang'
put 'student', '2', 'info:gender', 'M'
put 'student', '2', 'info:age', '24'
  • 关系型数据库插入数据的方式为 insert into... values...(一次插入一行数据)

(3)配置Spark

把程序运行过程中所需jar包(lib目录下)拷贝到Spark安装目录下(jars目录),需要拷贝:

  • 所有以hbase开头的jar包
  • guava-12.0.1.jar
  • htrace-core-3.1.0-incubating.jar
  • protobuf-java-2.5.0.jar
cd /usr/local/spark/jars
mkdir hbase
cd hbase
cp /usr/local/hbase/lib/hbase*.jar ./
cp /usr/local/hbase/lib/guava-12.0.1.jar ./
cp /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar ./
cp /usr/local/hbase/lib/protobuf-java-2.5.0.jar ./

此外,在Spark2.0以上版本中,缺少把HBase数据转换成Python可读取数据的jar包,需要另行下载。可以访问下面地址下载spark-examples_2.11-1.6.0-typesafe-001.jar。下载以后保存到 /usr/local/spark/jars/hbase/ 目录中https://mvnrepository.com/artifact/org.apache.spark/spark-examples_2.11/1.6.0-typesafe-001

使用vim编辑器打开spark-env.sh文件,设置Spark的spark-env.sh文件,告诉Spark可以在哪个路径下找到HBase相关的jar文件,命令如下:

cd /usr/local/spark/conf
vim spark-env.sh

打开spark-env.sh文件后,可以在文件最前面增加下面一行内容,这样后面编译和运行过程才不会出错:

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/usr/local/hbase/bin/hbase classpath):/usr/local/spark/jars/hbase/*

(4)编写程序读取HBase数据

Spark读取HBase:用SparkContext提供的 newAPIHadoopRDD API将表的内容以RDD的形式加载到Spark中

SparkOperateHBase.py(从HBase读取数据、生成字符串格式并打印到屏幕上):

#!/usr/bin/env python3
 
from pyspark import SparkConf, SparkContext
 
conf = SparkConf().setMaster('local').setAppName("ReadHBase")   # 设置连接方式为本地模式,应用名称为ReadHBase
sc = SparkContext(conf = conf)   # 生成SparkContext对象
host = 'localhost'   # ZooKeeper服务器地址(分布式协调一致性作用)
table = 'student'    # 表名
conf = {"hbase.zookeeper.quorum": host,"hbase.mapreduce.inputtable": table}   # 配置ZooKeeper服务器地址、当前读取的输入表
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"   # 键转换器,把key从HBase格式转换成字符串格式
valueConv="org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"  
hbase_rdd=sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)   # 读取表的格式、从HBase读取的key的类型、从HBase读取的value的类型、指定key的转换类、指定value的转换类、配置信息
count=hbase_rdd.count()   # 有多少个行键(键值对)
hbase_rdd.cache()     # 缓存
output=hbase_rdd.collect()    # 封装在一个列表中返回
for (k,v) in output:
    print(k,v)

执行该代码文件:

cd /usr/local/spark/mycode/rdd
/usr/local/spark/bin/spark-submit SparkOperateHBase.py

执行结果:

(5)编写程序向HBase写入数据

把表中的两个学生信息插入到HBase的student表中:

SparkWriteHBase.py: 

#!/usr/bin/env python3
 
from pyspark import SparkConf,SparkContext
 
conf = SparkConf().setMaster('local').setAppName("WriteHBase")
sc = SparkContext(conf = conf)
host = 'localhost'
table = 'student'
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"  # key转换器,String类型转换成内部格式
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"   # value转换器,字符串列表转换成Put单元格
conf = {"hbase.zookeeper.quorum": host,"hbase.mapred.outputtable": table,"mapreduce.outputformat.class":"org.apache.hadoop.hbase.mapreduce.TableOutputFormat","mapreduce.job.output.key.class":"org.apache.hadoop.hbase.io.ImmutableBytesWritable","mapreduce.job.output.value.class":"org.apache.hadoop.io.Writable"}
 
rawData=['3,info,name,Rongcheng','3,info,gender,M','3,info,age,26','4,info,name,Guanhua','4,info,gender,M','4,info,age,27']
# 首先将6个字符串加载到内存生成RDD,再写入HBase
sc.parallelize(rawData).map(lambda x: (x[0],x.split(','))).saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)   # x[0]为行键,map后6个字符串变为6个键值对(key,value),key为行键,value为字符串列表(即单元格值)
  • StringList:[行键,列族,列,值]

执行:

cd /usr/local/spark/mycode/rdd
/usr/local/spark/bin/spark-submit SparkWriteHBase.py

去HBase Shell查看写入结果:scan 'student' 

(四)综合案例

1、求TOP值

对一个目录下的所有文件的某字段排序,取top5(topN.py)

from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("ReadHBase")
sc = SparkContext(conf = conf)
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/file")
# line.strip() 去掉字符串后面的空格(去掉空行)
# split后==4,即去掉缺失字段的行
result1 = lines.filter(lambda line:(len(line.strip()) > 0) and (len(line.split(",")) == 4))
result2 = result1.map(lambda x:x.split(",")[2])   # 取出payment
result3 = result2.map(lambda x:(int(x),""))   # 转换为(key, value),为了后面排序
result4 = result3.repartition(1)   # 为了保证全局有序,否则可能分区有序,但全局无序
result5 = result4.sortByKey(False)   # sortByKey()必须根据key来排,即输入是(key,value)。False为降序
result6 = result5.map(lambda x:x[0])   # 去掉value的""
result7 = result6.take(5)   # 取出前五名
for a in result7:
    print(a)

过程解析:

参考:Spark求TOP值_11号的乔乔的博客-CSDN博客 

2、文件排序

读取文件中所有整数并进行排序(FileSort.py) 

#!/usr/bin/env python3
from pyspark import SparkContext, SparkConf

index = 0
def getindex():   # 获取全局排序,依次递增
    global index
    index += 1
    return index

def main():
    conf = SparkConf().setMaster('local[1]').setAppName('FileSort')
    sc = SparkContext(conf=conf)
    lines = sc.textFile("file:///usr/local/spark/mycode/rdd/filesort/file*.txt")   # 把某个目录下所有文件加载进来生成RDD
    result1 = lines.filter(lambda line:(len(line.strip())>0))   # 消除空行
    result2 = result1.map(lambda x:(int(x.strip()),"")   # 生成(key,value)
    result3 = result2.repatition(1)   # 1个分区确保全局有序
    result4 = result3.sortByKey(True)
    result5 = result4.map(lambda x:x[0])
    result6 result5.map(lambda x:(getindex(),x))
    result6.saveAsTextFile("file:///usr/local/spark/mycode/rdd/filesort/sortresult")

sortByKey() 的输入必须是 (key, value)

过程解析:

3、二次排序

先根据第一列降序排序,第一列值相等再根据第二列降序排序(SecondarySortKey.py) 

若就把字符串构建(key,value) 输入sortBykey(),是根据字母的升序排序。故本题必须生成一个可比较的key,即下图中的 SecondarySortKey(5,3)

实现思路:

  • 按照Ordered 和 Serializable 接口实现自定义排序的key(人工定义类SecondarySortKey)
  • 将要进行二次排序的文件加载进来生成 <key, value> 类型的RDD,key即人工定义的用于排序的SecondarySortKey,值即文本中的一行
  • 使用sortByKey基于自定义的key进行二次排序
  • 去除掉排序的key只保留排序的结果
from operator import gt
from pyspark import SparkContext, SparkConf

class SecondarySortKey():
    def __init__(self, k):   # __init__为构造函数,k为传入参数,格式为(key,value)
        self.column1 = k[0]
        self.column2 = k[1]
 
    def __gt__(self, other):    # 重写比较函数
        if other.column1 == self.column1:
            return gt(self.column2,other.column2)   # 若第1列相等,比较第2列
        else:
            return gt(self.column1, other.column1)

def main():
  conf = SparkConf().setAppName('spark_sort').setMaster('local[1]')
  sc = SparkContext(conf=conf)

  file="file:///usr/local/spark/mycode/rdd/secondarysort/file4.txt"
  rdd1 = sc.textFile(file)
  rdd2=rdd1.filter(lambda x:(len(x.strip()) > 0))   # 去除空行
  rdd3=rdd2.map(lambda x:((int(x.split(" ")[0]),int(x.split(" ")[1])),x))   # x为字符串
  rdd4=rdd3.map(lambda x: (SecondarySortKey(x[0]),x[1]))
  rdd5=rdd4.sortByKey(False)
  rdd6=rdd5.map(lambda x:x[1])  # 去掉左边的可排序key,只保留右边的字符串
  rdd6.foreach(print)

if __name__ == '__main__':
	main()

过程解析:


第5章  Spark SQL(9节)

(一)Spark SQL简介 

1、Hive:SQL-on-Hadoop

Hive:SQL-on-Hadoop(Hadoop平台上提供了SQL查询的能力,在Hadoop平台上构建数据仓库把SQL语句转换成底层MapReduce程序,对底层HDFS数据进行查询分析)

  • Hive本身不存储数据,借助底层HDFS存储数据
  • Hive可以看成是编程接口,把SQL语句转换成MapReduce作业

2、Shark:Hive on Spark

Shark即Hive on Spark。为了实现和Hive兼容,Shark在HiveQL方面重用了Hive中HiveQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MapReduce作业替换成了Spark作业,通过Hive的HiveQL解析,把HiveQL翻译成Spark上的RDD操作

  • SQL-on-Spark性能比Hive有了10-100倍的提高
  • Shark导致的两个问题:
    • 执行计划优化完全依赖Hive,不方便添加新的优化策略
    • MapReduce是进程级并行,而Spark是线程级并行,故Spark为了兼容Hive就存在线程安全的问题,导致Shark不得不使用另外一套独立维护的打了补丁的Hive源码分支

3、Spark SQL 

Spark SQL在Hive兼容层面仅依赖HiveQL解析、Hive元数据模块,其他模块全部重新开发。也就是说,从HQL被解析成抽象语法树AST开始,就全部由Spark SQL接管了。Spark SQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责

Spark SQL增加了DataFrame(即带有Schema信息的RDD),使用户可以在Spark SQL中执行SQL语句。数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据

  • Spark SQL目前支持三种语言:Java、Scala、Python

Why Spark SQL?

  • 关系型数据库已经很流行,但在大数据时代已经不能满足要求
  • 首先,用户需要从不同数据源执行各种操作,包括结构化和非结构化数据;
  • 其次,用户需要执行高级分析,比如机器学习和图像处理。在实际大数据应用中,经常需要融合关系查询和复杂分析算法(比如机器学习或图像处理),但是,缺少这样的系统

Spark SQL填补了这个鸿沟

  • 首先,可以提供DataFrame API,可以对内部和外部各种数据源执行各种关系操作
  • 其次,可以支持大量的数据源和数据分析算法,Spark SQL可以融合传统关系数据库的结构化数据管理能力和机器学习算法的数据处理能力

(二)DataFrame概述

DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能。Spark能够轻松实现从Mysql到DataFrame的转化,并且支持SQL查询

RDD是Spark Core核心组件的数据抽象,Spark SQL的数据抽象是DataFrame

  • RDD是分布式的Java对象的集合,看不到对象内部结构
  • DataFrame是以RDD为基础的分布式数据集,提供了详细的结构信息

1、创建

从Spark2.0以上版本开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现其对数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能

SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。SparkSession也提供了HiveQL以及其他依赖于Hive的功能的支持 

  • SparkSession接口(Spark SQL程序的指挥官)
  • SparkContext对象(RDD应用程序的指挥官)

(1)创建SparkSession对象

  • 在启动进入Pyspark以后,pyspark就默认提供了一个SparkContext对象(名称为sc)和一个SparkSession对象(名称为spark)
  • 在写独立应用程序时,需要用下面代码生成:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()

(2)创建DataFrame

spark.read() 操作或 spark.read.format().load() 操作

spark.read.text("xxx.txt")
spark.read.json("xxx.json")
spark.read.parquet("xxx.parquet")

spark.read.format("text").load("xxx.txt")
spark.read.format("json").load("xxx.json")
spark.read.format("parquet").load("xxx.parquet")

调用 .show() 可以查看数据

2、保存

使用 spark.write 操作保存DataFrame

df.write.txt("xxx.txt")
df.write.json("xxx.json")
df.write.parquet("xxx.parquet")

df.write.format("text").save("xxx.txt")
df.write.format("json").save("xxx.json")
df.write.format("parquet").save("xxx.parquet")

目录名称读取即可加载(注意不是文件名称)

3、常用操作

printSchema():打印模式信息 

select():选取列显示

filter():过滤

groupBy():分组

sort():排序

(三)利用反射机制推断RDD模式

1、利用反射机制去推断RDD模式

/usr/local/spark/examples/src/main/resources/ 目录下:

from pyspark.sql import Row  # 生成row对象封装一行数据
# spark为SparkSession对象
people = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(lambda line:line.split(",")).map(lambda p:Row(name=p[0],age=int(p[1])))   # people为RDD
schemapeople = spark.createDataFrame(people)

# 必须注册为临时表才能供下面的查询使用
schemapeople.createOrReplaceTempView("people")   # people为临时表名称
personDF = spark.sql("select name,age from people where age>20")
# 查询得到的结果会被封装在DataFrame中
personDF.show()

# DataFrame中的每个元素都是一行记录,包含name和age两个字段,分别用p.name和p.age来获取值
personRDD = personDF.rdd.map(lambda p:"Name:"+p.name+","+"Age:"+str(p.age))
personRDD.foreach(print)

personDF
personRDD

2、用编程方式去定义RDD模式

当无法提前获知数据结构时,采用编程方式定义RDD模式

from pyspark.sql.types import *
from pyspark.sql import Row

# 生成表头
schemaString = "name age"  # 包含两个字段 name 和 age
fields = [StructField(field_name,StringType(),True) for field_name in schemaString.split(" ")]   # 列表里每个元素都是一个StructField对象(用来描述字段)
schema = StructType(fields)

# 生成表中的记录
lines = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
parts = lines.map(lambda x:x.split(","))
people = parts.map(lambda p:Row(p[0],p[1].strip()))   # p为列表

# 表头和内容拼接
schemapeoples = spark.createDataFrame(people,schema)   # (表中记录,表头)

# 注册临时表才能查询
schemapeoples.createOrReplaceTempView("people")
results = spark.sql("select name,age from people")
results.show()

  • StructField(字段名称,字段类型,是否可以为空)   用来描述字段
  • StructType() 生成的对象用来描述数据库模式,即表头

查询的结果被封装在DataFrame里

(四)Spark SQL读取MySQL数据库

1、MySQL准备工作 

在Linux系统中安装MySQL数据库:Ubuntu安装MySQL及常用操作_厦大数据库实验室博客

# 在Linux中启动MySQL数据库
service mysql start
mysql -u root -p   # 屏幕会提示你输入密码(MySQL root用户密码)

# 完成数据库和表的创建
create database spark
use spark
create table student(id int(4), name char(20), gender char(4), age int(4))
insert into student values (1, "Xueqian", 'F', 23)
insert into student values (2, "Weiliang", 'M', 24)
select * from student

Spark SQL通过jdbc连接关系型数据库MySQL,需要安装MySQL的jdbc驱动程序:Linux 下搭建 Hive 环境_mysql-connector-java-5.1.40.tar 对应的mysql版本_GreyZeng的博客-CSDN博客

下载后放入 /usr/local/spark/jars

# 启动pyspark
cd /usr/local/spark
./bin/pyspark

2、Spark SQL读写MySQL

(1)通过jdbc连接MySQL数据库

>>>jdbcDF = spark.read.format("jdbc") \
.option("url","jdbc:mysql://localhost:3306/spark") \   # 访问数据库地址及数据库(spark数据库)
.option("driver","com.mysql.jdbc.Driver") \   # 指定驱动程序
.option("dbtable", "student") \   # 访问student表
.option("user", "root") \
.option("password", "mysql密码").load()

# .option()增加连接参数

(2)向MySQL数据库中写入数据

use spark
select * from student

往 spark.student 中插入两条记录:

from pyspark.sql.types import Row
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# 生成SparkSession对象(Spark SQL指挥官)
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()

# 下面要设置模式信息
>>> schema = StructType([StructField("id", IntegerType(), True), \   # True说明可以为空
StructField("name", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("age",IntegerType(), True)])

# 设置两条数据,表示两个学生信息(封装Row对象)
studentRDD = spark.sparkContext.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]).map(lambda x:x.split(" "))

# 创建Row对象,每个Row对象都是rowRDD的一行
rowRDD = studentRDD.map(lambda p:Row(int(p[1].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))

# 建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
studentDF = spark.createDataFrame(rowRDD, schema)    

# 把DataFrame写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = '填写mysql密码'  
prop['driver'] = "com.mysql.jdbc.Driver"  # 驱动程序名称
# 库名, 表名, 追加, prop为属性集合
studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark",'student','append', prop)

结果如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/965694.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

说说HTTP 和 HTTPS 有什么区别?

分析&回答 http协议 超文本传输协议&#xff0c;是互联网上应用最多的协议&#xff0c;基于TCP/IP通讯协议来传递信息&#xff0c;用于从WWW服务器传输超文本到本地浏览器的传输协议。 https协议 我们可以将其看作是以安全为目标的http协议。在http协议的基础上增加了S…

不同写法的性能差异

“ 达到相同目的,可以有多种写法,每种写法有性能、可读性方面的区别,本文旨在探讨不同写法之间的性能差异 len(str) vs str "" 本部分参考自: [问个 Go 问题&#xff0c;字符串 len 0 和 字符串 "" &#xff0c;有啥区别&#xff1f;](https://segmentf…

React笔记(八)Redux

一、安装和配置 React 官方并没有提供对应的状态机插件&#xff0c;因此&#xff0c;我们需要下载第三方的状态机插件 —— Redux。 1、下载Redux 在终端中定位到项目根目录&#xff0c;然后执行以下命令下载 Redux npm i redux 2、创建配置文件 在 React 中&#xff0c;…

[管理与领导-64]:IT基层管理者 - 8项核心技能 - 8 - 打造高效团队

目录 前言&#xff1a; 一、团队建设对于不同管理层的不同 第1节&#xff1a;认识自己的团队 1.1 团队的生命周期 1.2 常见的团队问题 1.3 团队角色的配置 1.4 团队水平测试 第2节&#xff1a;什么是高绩效团队 2.1 什么是团队 2.2 团队五个基本要素&#xff1a; 2.…

超图嵌入论文阅读2:超图神经网络

超图嵌入论文阅读2&#xff1a;超图神经网络 原文&#xff1a;Hypergraph Neural Networks ——AAAI2019&#xff08;CCF-A&#xff09; 源码&#xff1a;https://github.com/iMoonLab/HGNN 500star 概述 贡献&#xff1a;用于数据表示学习的超图神经网络 (HGNN) 框架&#xf…

2023开学礼《乡村振兴战略下传统村落文化旅游设计》许少辉新财经理工 ​​​

2023开学礼《乡村振兴战略下传统村落文化旅游设计》许少辉新财经理工 ​​​

什么是盒子模型

什么是盒子模型 盒子模型&#xff0c;也可以称为框模型。 所有 HTML 元素可以看作盒子。在 CSS 中&#xff0c;“box model” 这一术语是用来设计和布局时使用。 CSS 盒模型本质上是一个盒子&#xff0c;封装周围的 HTML 元素&#xff0c;它包括&#xff1a;边距&#xff0c…

CSS学习笔记05

CSS笔记05 定位 position CSS 属性position - 用于指定一个元素在文档中的定位方式。top&#xff0c;right&#xff0c;bottom 和 left 属性则决定了该元素的最终位置。position 有以下常用的属性值&#xff1a; position: static; - 默认值。指定元素使用正常的布局行为&am…

神经网络--感知机

感知机 单层感知机原理 单层感知机:解决二分类问题&#xff0c;激活函数一般使用sign函数,基于误分类点到超平面的距离总和来构造损失函数,由损失函数推导出模型中损失函数对参数 w w w和 b b b的梯度&#xff0c;利用梯度下降法从而进行参数更新。让1代表A类&#xff0c;0代…

es5的实例__proto__(原型链) prototype(原型对象) {constructor:构造函数}

现在看这张图开始变得云里雾里&#xff0c;所以简单回顾一下 prototype 的基本内容&#xff0c;能够基本读懂这张图的脉络。 先介绍一个基本概念&#xff1a; function Person() {}Person.prototype.name KK;let person1 new Person();在上面的例子中&#xff0c; Person …

Nor Flash

核心信息&#xff1a; 工作频率数据吞吐量 bps bit/s&#xff08;传输数据速率&#xff09; Hz&#xff08;时钟频率&#xff09; T/s 56MB/s&#xff08;max&#xff09;448Mb/s&#xff08;数据吞吐量、4路&#xff09;448MHz 112MHz&#xff08;max读、时钟频率&#…

Spring @Configuration 注解解析原理

前言 ​ Configuration 注解是 Spring 3.0 版本引入的新特性&#xff08;目前版本 6.0.11&#xff09;&#xff0c;它用于将一个类标记为配置类&#xff0c;通过配置类可以定义和组装 Spring Bean。 一般来说注解都会有相应的解析器&#xff0c;Configuration 注解靠 C…

Pycharm中出现ImportError:DLL load failed:找不到指定模块的解决方法

不论搭建什么工程&#xff0c;运行什么文件&#xff0c;只要在Pycharm中出现ImportError: DLL load failed: 找不到指定的模块这样的问题&#xff0c;以下方法都适用&#xff01;&#xff01;&#xff01; 一、问题描述 我在使用pycharm连接webots&#xff0c;用python控制机…

Redis功能实战篇之附近商户

在互联网的app当中&#xff0c;特别是像美团&#xff0c;饿了么等app。经常会看到附件美食或者商家&#xff0c; 当我们点击美食之后&#xff0c;会出现一系列的商家&#xff0c;商家中可以按照多种排序方式&#xff0c;我们此时关注的是距离&#xff0c;这个地方就需要使用到我…

JavaScript -【第一周】

文章来源于网上收集和自己原创&#xff0c;若侵害到您的权利&#xff0c;请您及时联系并删除~~~ JavaScript 介绍 变量、常量、数据类型、运算符等基础概念 能够实现数据类型的转换&#xff0c;结合四则运算体会如何编程。 体会现实世界中的事物与计算机的关系理解什么是数据并…

mybatis源码学习-1-调试环境

写在前面,这里会有很多借鉴的内容,有以下三个原因 本博客只是作为本人学习记录并用以分享,并不是专业的技术型博客笔者是位刚刚开始尝试阅读源码的人,对源码的阅读流程乃至整体架构并不熟悉,观看他人博客可以帮助我快速入门如果只是笔者自己观看,难免会有很多弄不懂乃至理解错误…

【科研论文配图绘制】task7密度图绘制

【科研论文配图绘制】task7密度图绘制 task7 了解密度图的定义&#xff0c;清楚密度图是常用使用常见&#xff0c;掌握密度图绘制。 1.什么是密度图 密度图&#xff08;Density Plot&#xff09;是一种用于可视化数据分布的图表类型。它通过在数据中创建平滑的概率密度曲线…

java反编译工具jd-gui使用

文章目录 一、JD-GUI介绍二、下载三、安装四、使用教程五、免责声明摘抄 一、JD-GUI介绍 JD-GUI是一个独立的图形实用程序&#xff0c;显示“.class”文件的Java源代码。 使用JD-GUI浏览重构的源代码&#xff0c;以便即时访问方法和字段。 二、下载 MAC安装包&#xff1a;ht…

Gorm简单了解

GORM 指南 | GORM - The fantastic ORM library for Golang, aims to be developer friendly. 04_GORM查询操作_哔哩哔哩_bilibili 前置&#xff1a; db调用操作语句中间加debug&#xff08;&#xff09;可以显示对应的sql语句 1.Gorm模型定义&#xff08;理解重点&#xff…

在R中安装TensorFlow、TensorFlow_Probability、numpy(R与Python系列第二篇)

目录 前言&#xff1a; 1-安装tensorflow库 Step1: 下载R包tensorflow Step2&#xff1a;安装TensorFlow库 Step3&#xff1a;导入R中 2-安装tensorflow_probability库 Step1&#xff1a;下载R包&#xff1a;tfprobability Step2&#xff1a;安装TensorFlow Probability …