第三阶段第一章——PySpark实战

news2025/1/20 19:58:03

学习了这么多python的知识,是时候来搞点真玩意儿了~~

春风得意马蹄疾,一日看尽长安花

o(* ̄︶ ̄*)o


 1.前言介绍

(1)什么是spark

        Apache Spark是一个开源的分布式计算框架,用于处理大规模数据集的计算任务。它提供了一种高性能、通用、易用的计算引擎,支持数据并行处理、内存计算、迭代计算等多种计算模式,并提供了丰富的API,比如Spark SQL、Spark Streaming、Mlib和Graphx等。Spark的基本单元是弹性分布式数据集(RDD),它是一种可分区、可并行计算的数据结构,可以在多个节点上进行操作。Spark可以运行在多种集群管理器上,包括Hadoop YARN、Apache Mesos和Standalone等。Spark的特点是速度快、易用、灵活、可扩展,已经成为了数据处理和数据科学领域的一个重要工具。

(2)什么是pyspark

        PySpark是指Spark的Python API,它是Spark的一部分,可以通过Python语言进行Spark编程。PySpark提供了Python与Spark之间的交互,使Python开发人员能够使用Python语言对Spark进行编程和操作。PySpark可以使用Python进行数据预处理和分析,同时扩展了Python语言的功能,能够同时使用Spark的分布式计算能力,加快了数据处理和分析的速度。与其他编程语言相比,Python在数据处理和科学计算方面有广泛的应用和支持,因此PySpark特别适合处理Python与大规模数据集交互的数据科学项目。

 


2.基础准备

(1)先下载  pyspark软件包

pip install -i https://pypi.tuna.tsinghua.edu.cn/simple/ pyspark

(2)SparkContext入口对象 

初体验代码:

"""
    演示
"""
# 导包
from pyspark import SparkConf, SparkContext

# 创建sparkConf对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

(3)PySpark编程模型

详细过程 ==>>


3.数据输入

(1)RDD对象

数据输入完成之后都会得到一个RDD对象

(2)数据容器转RDD对象

"""
    演示数据输入
"""
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize((1, 2, 3, 4, 5))
rdd3 = sc.parallelize("abcdefg")
rdd4 = sc.parallelize({1, 2, 3, 4, 5})
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})
# 查看rdd里面的内容需要使用collect()方法
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())

sc.stop()

(3)读取文件转化为RDD对象

代码示例


# 用过textFile方法,读取文件数据加载到Spark内,成为RDD对象
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

rdd_file = sc.textFile("D:\\IOText\\b.txt")
print(rdd_file.collect())

sc.stop()


4.数据计算

pyspark是通过RDD对象内丰富的:成员方法(算子)

(1)map方法

功能:将RDD的数据一条条处理(处理的逻辑 基于map算子中接收的处理函数),返回新的RDD

语法:

代码示例

"""
    演示map方法
"""
from pyspark import SparkConf, SparkContext
import os
#配置环境变量
os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("map_test")
sc = SparkContext(conf=conf)

# 准备RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])


# 通过map方法将全部数据都乘以10
# (T) -> U
def func(data):
    return data * 10


rdd2 = rdd.map(func)
print(rdd2.collect())

# 链式调用,快速解决
rdd3 = rdd.map(lambda e: e * 100).map(lambda e: e + 6)
print(rdd3.collect())

(2)flatMap方法

和map差不多,多了一个消除嵌套的效果

功能:对rdd执行map操作,然后进行解除嵌套

代码示例

"""
    演示 flatmap 方法
"""
from pyspark import SparkConf, SparkContext
import os

# 配置环境变量
os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("flatmap_test")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize(["a b c", "e f g", "uuu ggg"])
rdd2 = rdd.flatMap(lambda x: x.split(" "))
print(rdd2.collect())

(3)reduceByKey方法

功能:自动分组,组内聚合

代码示例

"""
    演示 reduceByKey 方法
"""
from pyspark import SparkConf, SparkContext
import os

# 配置环境变量
os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("reduceByKey_test")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([('男', 99), ('男', 88), ('女', 99), ('女', 66)])
# 求男生和呃女生两个组的成绩之和
rdd1 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd1.collect())


(4)练习案例1

目标,统计文件内出现单词的数量.

代码示例:

"""
    读取文件,统计文件内,单词出现的数量
"""

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("test_1")
sc = SparkContext(conf=conf)

# 读取文件
file_rdd = sc.textFile("D:\\IOText\\b.txt")

# 取出所有的单词
words = file_rdd.flatMap(lambda line: line.split(' '))
print(words.collect())
# 将其转化为map类型,value设置为1,方便在之后分组统计
word_one = words.map(lambda w: (w, 1))
# 分组求和
res = word_one.reduceByKey(lambda a, b: a + b)

print(res.collect())

我的文件数据是

word1
word2 word2
word3 word3 word3
word4 word4 word4 word4

(5)filter方法

功能:过滤想要的数据进行保留

filter返回值为TRUE则保留,不然会被过滤

语法:

代码示例:

"""
    filter方法演示
"""
from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# filter返回值为TRUE则保留,不然会被过滤
rdd2 = rdd.filter(lambda e: e % 2 == 0)
print(rdd2.collect())

(6)distinct方法

功能:对RDD数据进行去重,返回新的RDD

语法:

rdd.distinct( )   # 这里无需传参

代码示例:

"""
    distinct方法演示
"""
from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1, 1, 2, 2, 2, 3, 4, 5])
# filter返回值为TRUE则保留,不然会被过滤
rdd2 = rdd.distinct()
print(rdd2.collect())

(7)sortBy方法

功能:对RDD数据进行排序,基于你指定的排序依据

语法:

"""
    sortBy方法演示
"""
from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)

# 读取文件
file_rdd = sc.textFile("D:\\IOText\\b.txt")

# 取出所有的单词
words = file_rdd.flatMap(lambda line: line.split(' '))
# print(words.collect())
# 将其转化为map类型,value设置为1,方便在之后分组统计
word_one = words.map(lambda w: (w, 1))
# 分组求和
res = word_one.reduceByKey(lambda a, b: a + b)
# print(res.collect())
# 按照出现次数从小到大排序
final_rdd = res.sortBy(lambda x: x[1], ascending=True, numPartitions=1)
print(final_rdd.collect())


(8)练习案例2

需要的数据已经上传到博客

三个需求:

# TODO 需求一:城市销售额排名
# TODO 需求2:全部城市有哪些商品类别在售卖
# TODO 需求3:北京市有哪些商品类别在售卖
"""
    sortBy方法演示
"""
from pyspark import SparkConf, SparkContext
import json
import os

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)

# TODO 需求一:城市销售额排名
# 1.1 读取文件
file_rdd = sc.textFile("D:\\IOText\\orders.txt")
# 1.2 取出一个个JSON字符串
json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
# 1.3 取出城市和销售额数据
dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
# print(dict_rdd.collect())
# 1.4 取出城市和销售额数据
city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money'])))
# 1.5 按城市分组按销售额聚合
city_result_rdd = city_with_money_rdd.reduceByKey(lambda a, b: a + b)
# 1.6 按销售额聚合结果进行排序
result_rdd_1 = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print("需求1的结果:", result_rdd_1.collect())

# TODO 需求2:全部城市有哪些商品类别在售卖
# 同时记得去重
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print("需求2的结果:", category_rdd.collect())

# TODO 需求3:北京市有哪些商品类别在售卖
# 3.1 只要北京市的数据
rdd_beijing = dict_rdd.filter(lambda x: x["areaName"] == "北京")
# 3.2 取出全部商品类别并且去重
result_rdd_3 = rdd_beijing.map(lambda x: x['category']).distinct()
print("需求3的结果:", result_rdd_3.collect())


5.数据输出

(1)输出为Python对象:collect算子

功能:将RDD各个分区内的数据,统一手机到Driver中,形成一个List对象

用法:

rdd.collect( )   # 返回值是一个list

(2)输出为Python对象:reduce算子

功能:对RDD数据集按照你传入的逻辑进行聚合

语法:

(3)输出为Python对象:take算子

功能:取RDD的前N个元素,组合成list返回给你

语法:

(4)输出为Python对象:count算子

功能:计算RDD有多少条数据,返回值是一个数字

语法:

(5)上述1-4算子代码示例

"""
    collect算子
"""

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)

# 准备RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])

# collect算子,输出RDD为list对象
rdd_list: list = rdd.collect()
print(type(rdd_list))
print(rdd_list)

# reduce算子,对RDD进行两两聚合
num = rdd.reduce(lambda a, b: a + b)
print(num)

# take算子,取出RDD前N个元素,组成list返回
take_list_3 = rdd.take(3)
print(take_list_3)

# count算子,统计rdd内有多少条数据,返回值为数字
num_count = rdd.count()
print(f"有{num_count}个元素")

 

(6)输出到文件中:saveAsTextFile算子

功能:将RDD的数据写入文本文件中

支持  本地写出,hdfs等文件系统

语法:

注意事项:

 代码示例:

"""
    saveAsTextFile
"""

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"
os.environ['HADOOP_HOME'] = "D:\\CodeSoft\\Hadoop\\hadoop-3.0.0"

conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)

# 准备RDD1
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
# 准备RDD2
rdd2 = sc.parallelize([("hello", 3), ("spark", 5), ("java", 7)])
# 准备RDD3
rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]])

# 输出到文件之中
rdd1.saveAsTextFile("D:\\IOText\\saveAsTextFile测试1")
rdd2.saveAsTextFile("D:\\IOText\\saveAsTextFile测试2")
rdd3.saveAsTextFile("D:\\IOText\\saveAsTextFile测试3")

上述方法会使文件内有对应你CPU核心数量的分区

修改RDD分区为1个

方式一:SparkConf对象设置属性为全局并行度为1

 

方式二:创建RDD的时候设置(parallelize方法传入numSlices的参数为1)


6.综合案例

搜索引擎日志分析

实现代码:
 

"""
    综合案例
"""

from pyspark import SparkConf, SparkContext
import os
import json

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"
os.environ['HADOOP_HOME'] = "D:\\CodeSoft\\Hadoop\\hadoop-3.0.0"

conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
conf.set("spark.default.parallelism", "1")
conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)

# 读取文件转换成RDD
file_rdd = sc.textFile("D:\\IOText\\search_log.txt")

# TODO 需求一:热门搜索时间段Top3(小时精度)
# 1.1 取出全部的时间并且转换为小时
# 1.2 转换为(小时,1)的二元组
# 1.3 key分组聚合value
# 1.4 排序(降序)
# 1.5 取前三
res1 = (file_rdd
        .map(lambda x: (x.split("\t")[0][:2], 1))
        .reduceByKey(lambda a, b: a + b)
        .sortBy(lambda x: x[1], ascending=False, numPartitions=1)
        .take(3)
        )
print(f"需求一的结果:{res1}")

# TODO 需求二:热门搜索词Top3
# 2.1 取出全部的搜索词
# 2.2 (词,1)二元组
# 2.3 分组聚合
# 2.4 排序
# 2.5 top3
res2 = (file_rdd
        .map(lambda x: (x.split("\t")[2], 1))
        .reduceByKey(lambda a, b: a + b)
        .sortBy(lambda x: x[1], ascending=False, numPartitions=1)
        .take(3)
        )
print(f"需求二的结果:{res2}")

# TODO 需求三:统计黑马程序员关键字在什么时间段被搜索的最多
# 3.1 过滤内容,只保留黑马程序员关键词
# 3.2 转换为(小时,1)的二元组
# 3.3 分组聚合
# 3.4 排序
# 3.5 取前一
res3 = (file_rdd
        .map(lambda x: x.split("\t"))
        .filter(lambda x: x[2] == "黑马程序员")
        .map(lambda x: (x[0][:2], 1))
        .reduceByKey(lambda a, b: a + b)
        .sortBy(lambda x: x[1], ascending=False, numPartitions=1)
        .take(1)
        )
print(f"需求三的结果:{res3}")

# TODO 需求四:将数据转化为JSON格式,写出到文件之中
# 4.1 转换为JSON格式的RDD
# 4.2 写出为文件
(file_rdd
 .map(lambda x: x.split("\t"))
 .map(lambda x: {
    "time": x[0],
    "user_id": x[1],
    "key_word": x[2],
    "rank1": x[3],
    "rank2": x[4],
    "url": x[5]}
      )
 .saveAsTextFile("D:\\IOText\\综合案例")
 )

结语

有啥好说的。呃呃呃呃呃呃呃没啥好说的

下班

┏(^0^)┛

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1186828.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

万界星空科技MES系统软件体系架构及应用

MES系统是数字化车间的核心。MES通过数字化生产过程控制,借助自动化和智能化技术手段,实现车间制造控制智能化、生产过程透明化、制造装备数控化和生产信息集成化。生产管理MES系统主要包括车间管理系统、质量管理系统、资源管理系统及数据采集和分析系统…

Power Apps-库组件样式调整

数据表控件参考文档:Power Apps 中的 数据表 控件 - Power Apps | Microsoft Learn 修改每个item的布局 选中组件,点击左上角的🖊,可以进行调整 重新选择该组件的样式 点击布局中后面的选项可以重新选择 整合计数代表一行有几个…

高防CDN与高防服务器:为什么高防服务器不能完全代替高防CDN

在当今的数字化时代,网络安全已经成为企业不容忽视的关键问题。面对不断增长的网络威胁和攻击,许多企业采取了高防措施以保护其网络和在线资产。然而,高防服务器和高防CDN是两种不同的安全解决方案,各自有其优势和局限性。在本文中…

图扑智慧农业:农林牧数据可视化监控平台

数字农业是一种现代农业方式,它将信息作为农业生产的重要元素,并利用现代信息技术进行农业生产过程的实时可视化、数字化设计和信息化管理。能将信息技术与农业生产的各个环节有机融合,对于改造传统农业和改变农业生产方式具有重要意义。 图…

Read-Easy Excel源码解析(一)

Read&Write-Easy Excel 当我们需要导入大Excel时候,用POI会内存溢出,这时候我们用EasyExcel来解决,它底层采用的是SAX(Simple Api for Xml)事件驱动,解析xml的方式来解析excel文件。 首先我们看他的re…

【唠唠嵌入式】__嵌入式开发需要从0开始造轮子吗?

目录 前言 从0开始和套用模板的利弊 1. 从0开始的利弊 2. 套用模板的利弊 从0开始,还是套用模板? 1. 看项目赶不赶 2. 看项目用途 3. 看工程师水平 4. 看领导决策 5. 看公司决策 6. 看项目规划 实际工作 总结 (* ̄︶&#xffe3…

react-native 0.63 适配 Xcode 15 iOS 17.0+

iOS 17.0 Simulator(21A328)下载失败 App Store 更新到 Xcode15 后,无法运行模拟器和真机。需要下载iOS 17对应的模拟器。Xcode中更新非常容易中断失败,可以在官网单独下载iOS 17模拟器文件,例如:iOS_17.0.1_Simulator_Runtime.d…

​软考-高级-信息系统项目管理师教程 第四版【第24章-法律法规与标准规范-思维导图】​

软考-高级-信息系统项目管理师教程 第四版【第24章-法律法规与标准规范-思维导图】 课本里章节里所有蓝色字体的思维导图

线性代数 | 矩阵运算 加减 数乘 矩阵的幂运算

文章目录 1. 矩阵加减和数乘2.矩阵与矩阵的乘法2.1相乘条件:看中间,取两头2.2 相乘计算方法 3. 矩阵的幂3.1 观察归纳法3.2 邻项相消法3.3 化为对角 4.矩阵求逆(除法)4.1 判断是否可逆(证明题或者要求求出逆矩阵&#…

React事件绑定的方式有哪些?区别?

一、是什么 在react应用中,事件名都是用小驼峰格式进行书写,例如onclick要改写成onClick 最简单的事件绑定如下: class ShowAlert extends React.Component { showAlert() { console.log("Hi"); } render() { ret…

conda环境中pytorch1.2.0版本安装包安装一直失败解决办法!!!

conda环境中pytorch1.2.0版本安装包安装一直失败解决办法 cuda10.0以及cudnn7.4现在以及安装完成,就差torch的安装了,现在torch我要装的是1.2.0版本的,安装包以及下载好了,安装包都是在这个网站里下载的(点此进入&…

《研发效能(DevOps)工程师》课程简介(五)丨IDCF

由国家工业和信息化部教育与考试中心颁发的职业技术证书,也是国内首个研发效能(DevOps)职业技术认证,内涵1000页学习教材2000分钟的课程内容讲解460多个技术知识点300多道练习题。 在这里,你不仅可以了解到华为、微软、…

Go-服务注册和发现,负载均衡,配置中心

文章目录 什么是服务注册和发现技术选型 Consul 的安装和配置1. 安装2. 访问3. 访问dns Consul 的api接口go操作consulgrpc下的健康检查grpc的健康检查规范动态获取可用端口号 负载均衡策略1. 什么是负载均衡2. 负载均衡策略1. 集中式load balance2. 进程内load balance3. 独立…

Python+Selenium+Unittest 之selenium12--WebDriver操作方法2-鼠标操作1(ActionChains类简介)

在我们平时的使用过程中,会使用鼠标去进行很多操作,比如鼠标左键点击、双击、鼠标右键点击,鼠标指针悬浮、拖拽等操作。在selenium中,我们也可以去实现常用的这些鼠标操作,这时候就需要用到selenium中的ActionChains类…

采用springboot 2.7.10来操作clickhouse

1、采用springboot与clickhouse结合&#xff0c;其实和操作mysql&#xff0c;oracle区别不大。直接上代码开干 2、所采用的环境 jdk1.8 springboot 2.7.10 clickhouse 22.8.3.13 clickhouse 0.5.0 3、项目的pom.xml文件 <dependency><groupId>com.clickhous…

基于SSM框架的共享单车管理系统小程序系统的设计和实现

基于SSM框架的共享单车管理系统小程序系统的设计和实现 源码传送入口前言主要技术系统设计功能截图Lun文目录订阅经典源码专栏Java项目精品实战案例《500套》 源码获取 源码传送入口 前言 随着科学技术的飞速发展&#xff0c;各行各业都在努力与现代先进技术接轨&#xff0c;…

整理笔记——稳压直流电路知识

一、稳压直流电路的基本构成 稳压直流电路就是把生活中常用的220V交流电转换成稳压直流电。如生活中常见的手机充电器就是一个稳压直流电路。其主要功能是提供持续稳定且满足要求的电压。 直流稳压电路由一下几个模块组成&#xff1a; 下面具体分析下各个模块的功能。…

编译原理——构造预测分析表(判断某字符串是否是文法G(E)的句子)

进入今天的学习前&#xff0c;若不理解LL(1)文法中的首符号集&#xff0c;后跟符号集和选择符号集&#xff0c;可看&#xff1a; http://t.csdnimg.cn/BjSHv 构造预测分析表的步骤&#xff1a; 步骤1&#xff1a;对文法的每个规则U->u,执行步骤2与3 步骤2&#xff1a;对…

Springboot---整合对象储存服务MinIO

OSS 「OSS」的英文全称是Object Storage Service&#xff0c;翻译成中文就是「对象存储服务」&#xff0c;官方一点解释就是对象存储是一种使用HTTP API存储和检索非结构化数据和元数据对象的工具。 白话文解释就是将系统所要用的文件上传到云硬盘上&#xff0c;该云硬盘提供了…

chatGPT使用情况

作为一个语言模型&#xff0c;我&#xff08;ChatGPT&#xff09;被用于各种不同的应用场景。以下是一些常见的情况&#xff1a; 个人助手&#xff1a;人们可以使用我来获取信息、解答问题、获取建议或进行闲聊。我可以提供各种知识和帮助&#xff0c;从学术知识到日常生活的建…