【Python-Spark(大规模数据)】

news2024/11/17 13:38:00

Python-Spark(大规模数据)

  • ■ Spark
  • ■ PySparl编程模型
  • ■ 基础准备
  • ■ 数据输入
  • ■ RDD的map成员方法的使用
  • ■ RDD的flatMap成员方法的使用
  • ■ RDD的reduceByKey成员方法的使用
  • ■ 单词计数统计
  • ■ RDD的filter成员方法的使用
  • ■ RDD的distinct成员方法的使用
  • ■ RDD的sortBy成员方法的使用
  • ■ 案例:JSON商品统计
  • ■ 将RDD输出为Python对象
  • ■ 将RDD输出到文件中
  • ■ PySpark综合案例
  • ■ PySpark综合案例

■ Spark

Apache Spark 是用于大规模数据处理的统一分析引擎。
PySpark是由Spark官方开发的Python语言第三方库。

■ PySparl编程模型

  • 通过SparkContext对象,完成数据输入
  • 输入数据后得到RDD对象,对RDD对象进行迭代计算
  • 最终通过RDD对象的成员方法,完成数据输出工作
    在这里插入图片描述

■ 基础准备

# 导包
from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

■ 数据输入

"""
演示通过PySpark代码加载数据,即数据输入
"""
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# # 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
# rdd1 = sc.parallelize([1, 2, 3, 4, 5])
# rdd2 = sc.parallelize((1, 2, 3, 4, 5))
# rdd3 = sc.parallelize("abcdefg")
# rdd4 = sc.parallelize({1, 2, 3, 4, 5})
# rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})
#
# # 如果要查看RDD里面有什么内容,需要用collect()方法
# print(rdd1.collect())
# print(rdd2.collect())
# print(rdd3.collect())
# print(rdd4.collect())
# print(rdd5.collect())

# 用过textFile方法,读取文件数据加载到Spark内,成为RDD对象
rdd = sc.textFile("D:/hello.txt")
print(rdd.collect())
rdd.map()
sc.stop()

■ RDD的map成员方法的使用

"""
演示RDD的map成员方法的使用
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 通过map方法将全部数据都乘以10
# def func(data):
#     return data * 10

rdd2 = rdd.map(lambda x: x * 10).map(lambda x: x + 5)

print(rdd2.collect())
# (T) -> U
# (T) -> T

# 链式调用

■ RDD的flatMap成员方法的使用

"""
演示RDD的flatMap成员方法的使用
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize(["itheima itcast 666", "itheima itheima itcast", "python itheima"])

# 需求,将RDD数据里面的一个个单词提取出来
rdd2 = rdd.flatMap(lambda x: x.split(" "))
print(rdd2.collect())

■ RDD的reduceByKey成员方法的使用

"""
演示RDD的reduceByKey成员方法的使用
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([('男', 99), ('男', 88), ('女', 99), ('女', 66)])
# 求男生和女生两个组的成绩之和
rdd2 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd2.collect())

■ 单词计数统计

"""
完成练习案例:单词计数统计
"""

# 1. 构建执行环境入口对象
from pyspark import SparkContext, SparkConf
import os
os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 2. 读取数据文件
rdd = sc.textFile("D:/hello.txt")
# 3. 取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
# 4. 将所有单词都转换成二元元组,单词为Key,value设置为1
word_with_one_rdd = word_rdd.map(lambda word: (word, 1))
# 5. 分组并求和
result_rdd = word_with_one_rdd.reduceByKey(lambda a, b: a + b)
# 6. 打印输出结果
print(result_rdd.collect())

■ RDD的filter成员方法的使用

"""
演示RDD的filter成员方法的使用
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 对RDD的数据进行过滤
rdd2 = rdd.filter(lambda num: num % 2 == 0)

print(rdd2.collect())

■ RDD的distinct成员方法的使用

"""
演示RDD的distinct成员方法的使用
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1, 1, 3, 3, 5, 5, 7, 8, 8, 9, 10])
# 对RDD的数据进行去重
rdd2 = rdd.distinct()

print(rdd2.collect())

■ RDD的sortBy成员方法的使用

"""
演示RDD的sortBy成员方法的使用
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 1. 读取数据文件
rdd = sc.textFile("D:/hello.txt")
# 2. 取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
# 3. 将所有单词都转换成二元元组,单词为Key,value设置为1
word_with_one_rdd = word_rdd.map(lambda word: (word, 1))
# 4. 分组并求和
result_rdd = word_with_one_rdd.reduceByKey(lambda a, b: a + b)
# 5. 对结果进行排序
final_rdd = result_rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=1)
print(final_rdd.collect())

■ 案例:JSON商品统计

"""
完成练习案例:JSON商品统计
需求:
1. 各个城市销售额排名,从大到小
2. 全部城市,有哪些商品类别在售卖
3. 北京市有哪些商品类别在售卖
"""
from pyspark import SparkConf, SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# TODO 需求1: 城市销售额排名
# 1.1 读取文件得到RDD
file_rdd = sc.textFile("D:/orders.txt")
# 1.2 取出一个个JSON字符串
json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
# 1.3 将一个个JSON字符串转换为字典
dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
# 1.4 取出城市和销售额数据
# (城市,销售额)
city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money'])))
# 1.5 按城市分组按销售额聚合
city_result_rdd = city_with_money_rdd.reduceByKey(lambda a, b: a + b)
# 1.6 按销售额聚合结果进行排序
result1_rdd = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print("需求1的结果:", result1_rdd.collect())
# TODO 需求2: 全部城市有哪些商品类别在售卖
# 2.1 取出全部的商品类别
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print("需求2的结果:", category_rdd.collect())
# 2.2 对全部商品类别进行去重
# TODO 需求3: 北京市有哪些商品类别在售卖
# 3.1 过滤北京市的数据
beijing_data_rdd = dict_rdd.filter(lambda x: x['areaName'] == '北京')
# 3.2 取出全部商品类别
result3_rdd = beijing_data_rdd.map(lambda x: x['category']).distinct()
print("需求3的结果:", result3_rdd.collect())
# 3.3 进行商品类别去重

■ 将RDD输出为Python对象

"""
演示将RDD输出为Python对象
"""

from pyspark import SparkConf, SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])

# collect算子,输出RDD为list对象
rdd_list: list = rdd.collect()
print(rdd_list)
print(type(rdd_list))
# reduce算子,对RDD进行两两聚合
num = rdd.reduce(lambda a, b: a + b)
print(num)
# take算子,取出RDD前N个元素,组成list返回
take_list = rdd.take(3)
print(take_list)
# count,统计rdd内有多少条数据,返回值为数字
num_count = rdd.count()
print(f"rdd内有{num_count}个元素")

sc.stop()

■ 将RDD输出到文件中

"""
演示将RDD输出到文件中
"""

from pyspark import SparkConf, SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")

sc = SparkContext(conf=conf)

# 准备RDD1
rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices=1)

# 准备RDD2
rdd2 = sc.parallelize([("Hello", 3), ("Spark", 5), ("Hi", 7)], 1)

# 准备RDD3
rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]], 1)

# 输出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")

■ PySpark综合案例

"""
演示PySpark综合案例
"""

from pyspark import SparkConf, SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)

# 读取文件转换成RDD
file_rdd = sc.textFile("D:/search_log.txt")
# TODO 需求1: 热门搜索时间段Top3(小时精度)
# 1.1 取出全部的时间并转换为小时
# 1.2 转换为(小时, 1) 的二元元组
# 1.3 Key分组聚合Value
# 1.4 排序(降序)
# 1.5 取前3
result1 = file_rdd.map(lambda x: (x.split("\t")[0][:2], 1)).\
    reduceByKey(lambda a, b: a + b).\
    sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
    take(3)
print("需求1的结果:", result1)

# TODO 需求2: 热门搜索词Top3
# 2.1 取出全部的搜索词
# 2.2 (词, 1) 二元元组
# 2.3 分组聚合
# 2.4 排序
# 2.5 Top3
result2 = file_rdd.map(lambda x: (x.split("\t")[2], 1)).\
    reduceByKey(lambda a, b: a + b).\
    sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
    take(3)
print("需求2的结果:", result2)

# TODO 需求3: 统计黑马程序员关键字在什么时段被搜索的最多
# 3.1 过滤内容,只保留黑马程序员关键词
# 3.2 转换为(小时, 1) 的二元元组
# 3.3 Key分组聚合Value
# 3.4 排序(降序)
# 3.5 取前1
result3 = file_rdd.map(lambda x: x.split("\t")).\
    filter(lambda x: x[2] == '黑马程序员').\
    map(lambda x: (x[0][:2], 1)).\
    reduceByKey(lambda a, b: a + b).\
    sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
    take(1)
print("需求3的结果:", result3)

# TODO 需求4: 将数据转换为JSON格式,写出到文件中
# 4.1 转换为JSON格式的RDD
# 4.2 写出为文件
file_rdd.map(lambda x: x.split("\t")).\
    map(lambda x: {"time": x[0], "user_id": x[1], "key_word": x[2], "rank1": x[3], "rank2": x[4], "url": x[5]}).\
    saveAsTextFile("D:/output_json")

■ PySpark综合案例

"""
演示PySpark综合案例
"""

from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python'
os.environ['HADOOP_HOME'] = "/export/server/hadoop-3.3.1"
conf = SparkConf().setAppName("spark_cluster")
conf.set("spark.default.parallelism", "24")
sc = SparkContext(conf=conf)

# 读取文件转换成RDD
file_rdd = sc.textFile("hdfs://m1:8020/data/search_log.txt")
# TODO 需求1: 热门搜索时间段Top3(小时精度)
# 1.1 取出全部的时间并转换为小时
# 1.2 转换为(小时, 1) 的二元元组
# 1.3 Key分组聚合Value
# 1.4 排序(降序)
# 1.5 取前3
result1 = file_rdd.map(lambda x: (x.split("\t")[0][:2], 1)).\
    reduceByKey(lambda a, b: a + b).\
    sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
    take(3)
print("需求1的结果:", result1)

# TODO 需求2: 热门搜索词Top3
# 2.1 取出全部的搜索词
# 2.2 (词, 1) 二元元组
# 2.3 分组聚合
# 2.4 排序
# 2.5 Top3
result2 = file_rdd.map(lambda x: (x.split("\t")[2], 1)).\
    reduceByKey(lambda a, b: a + b).\
    sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
    take(3)
print("需求2的结果:", result2)

# TODO 需求3: 统计黑马程序员关键字在什么时段被搜索的最多
# 3.1 过滤内容,只保留黑马程序员关键词
# 3.2 转换为(小时, 1) 的二元元组
# 3.3 Key分组聚合Value
# 3.4 排序(降序)
# 3.5 取前1
result3 = file_rdd.map(lambda x: x.split("\t")).\
    filter(lambda x: x[2] == '黑马程序员').\
    map(lambda x: (x[0][:2], 1)).\
    reduceByKey(lambda a, b: a + b).\
    sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
    take(1)
print("需求3的结果:", result3)

# TODO 需求4: 将数据转换为JSON格式,写出到文件中
# 4.1 转换为JSON格式的RDD
# 4.2 写出为文件
file_rdd.map(lambda x: x.split("\t")).\
    map(lambda x: {"time": x[0], "user_id": x[1], "key_word": x[2], "rank1": x[3], "rank2": x[4], "url": x[5]}).\
    saveAsTextFile("hdfs://m1:8020/output/output_json")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1625292.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【分布式通信】NPKit,NCCL的Profiling工具

NPKit介绍 NPKit (Networking Profiling Kit) is a profiling framework designed for popular collective communication libraries (CCLs), including Microsoft MSCCL, NVIDIA NCCL and AMD RCCL. It enables users to insert customized profiling events into different C…

【JavaScriptThreejs】判断路径在二维平面上投影的方向顺逆时针

原理分析 可以将路径每个连续的两点向量叉乘相加,根据正负性判断路径顺逆时针性 当我们计算两个向量的叉积时,结果是一个新的向量,其方向垂直于这两个向量所在的平面,并且其大小与这两个向量构成的平行四边形的面积成正比。这个新…

Android 组件提供的状态保存(saveInstanceState)与恢复(restoreInstanceState)

在Android的组件Activity中,有这样一对方法: onSaveInstanceeState 和 onRestoreInstanceState 这两对方法,可以让我在Activiy被异常销毁时,保存状态;以及在Activity重建时,恢复状态。 比如:当我们在输入…

MATLAB 2024a软件下载安装教程

1-首先下载Matlab,以下迅雷云链接,里面有全版本的matlab,根据自己的需要下载即可,建议下载最新版的,功能会更多,当然内存也会更大。 迅雷云盘迅雷云盘https://pan.xunlei.com/s/VNgH_6VFav8Kas-tRfxAb3XOA…

Linux I2C(二) - I2C软硬件架构

1,I2C的总线拓扑 2,I2C S/W topology linux kernel I2C framework使用如下的软件拓扑抽象I2C硬件(我们可以一起领会一下其中的“设备模型”思想): 1)platform bus(/sys/bus/platform&#xff0…

Oracle导出导入dmp等文件类型的多表数据的常用方法、遇见的常见问题和解决办法(exp无效sql???)

使用PLSQL执行导出表数据的时候有两种方法 1、使用Oracle命令【imp--exp】【impdp--expdp】 但是如果你的本机没有安装有Oracle数据库,使用的instant client远程连接服务器上的Oracle数据库时候,你没有Oracle数据库带有的exp.exe、imp.exe等扩展文件&a…

如何高效跟进项目进度?试试禅道几个功能

禅道提供了一系列功能和工具,可实现项目进度的有效管理和跟进,极大提升项目管理效率。禅道中的项目进度来源于迭代进度,迭代的进度又来源于任务的消耗和剩余工时,可通过以下功能有效跟进项目进展。 一、燃尽图 在禅道里&#xf…

机器学习day1

一、人工智能三大概念 人工智能三大概念 人工智能(AI)、机器学习(ML)和深度学习(DL) 人工智能:人工智能是研究计算代理的合成和分析的领域。人工智能是使用计算机来模拟,而不是人类…

【办公类-22-14】周计划系列(5-5)“周计划-05 周计划表格内教案部分“节日”清空改成“节日“” (2024年调整版本)Win32

背景需求: 本学期19周,用了近10周的时间,终于把周计划教案部分的内容补全了(把所有教案、反思的文字都撑满一个单元格), 一、原始教案 二、新模板内的教案 三、手动添加文字后的样式(修改教案…

庐山研习班上介绍的25个LINUX工具

从2013年的第一届算起,庐山研习班走过十余个年头,办了十几次了。但每一次,都有很多不一样。即使是相同的主题,也有很大差异。 今年春季的庐山研习班是在上个周末。周四晚上我和大部分同学都到了五老峰脚下的训练基地。 除了周六下…

【C++ STL序列容器】list 双向链表

文章目录 【 1. 基本原理 】【 2. list 的创建 】2.1 创建1个空的 list2.2 创建一个包含 n 个元素的 list(默认值)2.3 创建一个包含 n 个元素的 list(赋初值)2.4 通过1个 list 初始化另一个 list2.5 拷贝其他类型容器的指定元素创…

HNCTF 2022 week1 题解

自由才是生活主旋律。 [HNCTF 2022 Week1] Interesting_include <?php //WEB手要懂得搜索 //flag in ./flag.phpif(isset($_GET[filter])){$file $_GET[filter];if(!preg_match("/flag/i", $file)){die("error");}include($file); }else{highlight_…

CentOS7安装并配置Yearning并实现无公网IP远程SQL审核与数据查询

目录 ​编辑 前言 1. Linux 部署Yearning 2. 本地访问Yearning 3. Linux 安装cpolar 4. 配置Yearning公网访问地址 5. 公网远程访问Yearning管理界面 6. 固定Yearning公网地址 结语 前言 作者简介&#xff1a; 懒大王敲代码&#xff0c;计算机专业应届生 今天给大家聊聊…

Docker 的数据管理 端口映射 容器互联 镜像的创建

目录 概念 概念 管理 Docker 容器中数据主要有两种方式&#xff1a;数据卷&#xff08;Data Volumes&#xff09;和数据卷容器&#xff08;DataVolumes Containers&#xff09;。总结&#xff1a;因为容器数据是临时保存的为了安全&#xff0c;就要让数据保持持久化。 1&#…

qt QTreeWidget 学习

树形控件的节点可以有多层、多个子节点&#xff0c; 如果将子节点全部展开&#xff0c;那么每一行都是一个数据条目。QTreeWidgetItem 比较特殊&#xff0c;一个条目内部可以有多列数据信息&#xff0c;相当于表格控件一整行的表格单元集成为一个条目。 默认情况下&#xff0c;…

Methoxy-PEG-PLGA,mPEG-PLGA是一种可生物降解的两亲性嵌段共聚物

【试剂详情】 英文名称 mPEG-PLGA&#xff0c;Methoxy-PEG-Poly(lactide-co-glycolide)&#xff0c;Methoxy-PEG-PLGA&#xff0c; mPEG-Poly(lactide-co-glycolide) 中文名称 聚乙二醇单甲醚聚乳酸&#xff0c;乙醇酸两嵌段共聚物 外观性状 由分子量决定&#xff0c;液体…

调试记录 Flash 芯片 GD25LQ128ESIG 的程序烧录问题

1. 烧录工具 工具型号&#xff1a; VS4000P 2. 烧录问题 1. 烧录器选择烧录型号过程中没有看见 Flash 芯片 GD25LQ128ESIG 的型号。其中有GD25Q128E &#xff0c;但是三个选项的封装不对。 3. 解决过程 1. 尝试别的类型的芯片型号烧录。 A.GD25LQ80E(SOP8_200) B.GD25LQ64E(SOP…

IDEA 2024.1 配置 AspectJ环境

最近Java课设在学习AspectJ&#xff0c;做PPT顺便写一个博客 下载包 首先去AspectJ官网下载一个JAR包并安装 安装完最后可以按照他的建议配置一下 然后找到AspectJ的安装位置的lib目录&#xff0c;把三个包拷到自己项目中的lib目录下 由于最新版的IDEA已经不支持AspectJ了 所…

(八)Servlet教程——创建Web项目以及Servlet的实现

1. 打开Idea编辑器 2. 点击界面上的“新建项目”按钮 3. 设置好项目名称和位置 应用服务器选择之前设置好的Tomcat服务器 构建系统默认选择Maven 4. 点击“下一步”按钮 5. 点击“完成”按钮&#xff0c;Idea就创建好了项目&#xff0c;创建完成后的目录结构如下图所示 6. 此…

脉冲电源的直流斩波板设计总结(RC缓冲电路,输出电容选值)

IC的RC缓冲 总结一下过去电加工所的直流斩波板问题 1&#xff1a;电流突变问题 在独立式电火花脉冲电源里面&#xff0c;用电阻去限制电流&#xff0c;从而抑制当极间突变时的电流突变。 在非独立式的脉冲电源里面&#xff0c;电流平时是稳定在循环电感里面&#xff0c;当击…