(五)Spark大数据开发实战:灵活运用PySpark常用DataFrame API

news2024/11/27 14:52:43

目录

一、PySpark

二、数据介绍

三、PySpark大数据开发实战

1、数据文件上传HDFS

2、导入模块及数据

3、数据统计与分析

①、计算演员参演电影数

②、依次罗列电影番位前十的演员

③、按照番位计算演员参演电影数

④、求每位演员所有参演电影中的最早、最晚上映时间及其相隔天数、年数

⑤、求每位演员所有电影中的评分最高值、最低值、电影数量、评分均值、标准差、方差、最高最低评分之差值

⑥、求参演大于等于10部电影的每位演员的平均评分,计算规则:去掉一个最高分和一个最低分,然后再计算电影平均分

⑦、求投票数在所有电影中排前80%、评分在所有电影中排前20%的电影信息

⑧、求美国、中国(含港澳台)、英国、法国、俄罗斯5个国家各个电影类型的上映电影数目

⑨、求各个地区评分最高的电影,若并列第一则以“|”拼接

⑩、统计从数据中最早年份到最晚年份的每月上映电影数量,若某个月份无电影上映则数量为0

4、下载统计结果

四、总结


一、PySpark

Apache Spark是一个用于大数据处理的开源分布式计算框架,而PySpark则是Spark的Python 实现。PySpark允许使用Python编程语言来利用Spark的强大功能,使得开发人员能够利用Python的易用性和灵活性进行大规模数据处理和分析。

PySpark与Spark-Scala的对比:

1、语言选择:
PySpark: 使用简洁而易学的Python作为编程语言,这使得PySpark学习难度大大降低。
Spark-Scala: 使用Scala作为主要编程语言。Scala是一门运行在Java虚拟机上的多范式编程语言,更接近Java,并具有强大的面向对象和函数式编程特性,但是其学习曲线较陡。

2、性能:
PySpark:由于Python是解释型语言,相比Scala的原生Spark可能会有性能上的一些损失。但通过PySpark的DataFrame和优化技术也可以显著提高性能。
Spark-Scala:使用Scala编写的Spark应用程序可能在性能上略优于PySpark,因为Scala是一门静态类型语言,而且运行在Java虚拟机上。

4、生态系统支持:
PySpark:可与Python的生态系统(如NumPy、Pandas)以及其他大数据工具和库进行集成。
Spark-Scala:由于运行在JVM上,可以利用Java生态系统,但Scala本身的生态系统相对较小。

5、机器学习支持:
PySpark: 提供了MLlib库,支持在分布式环境中进行大规模的机器学习任务。
Spark-Scala: 同样支持MLlib,但在API的使用上可能相对繁琐一些。

总体而言,PySpark强于数据分析,Spark-Scala强于性能。如果应用场景有非常多的可视化和机器学习算法需求,推荐使用pyspark,可以更好地和python中的相关库配合使用。

本文软件环境如下:

操作系统:CentOS Linux 7

Hadoop版本:3.1.3,安装教程可见我另一篇博客内容:Linux CentOS安装Hadoop3.1.3(单机版)详细教程

Spark版本:3.5.2,安装教程可见我另一篇博客内容:Linux CentOS安装PySpark3.5(单机版)详细教程及机器学习实战

Python版本:Python(Anaconda)3.11.4

PySpark基础学习可看 PySpark系列文章:

(一)PySpark3:安装教程及RDD编程

(二)PySpark3:SparkSQL编程

(三)PySpark3:SparkSQL40题

(四)PySpark3:Mlib机器学习实战-信用卡交易数据异常检测


二、数据介绍

本文数据来自采集豆瓣网分类排行榜 (“https://movie.douban.com/chart”)中各分类类别所有电影的相关信息并存储为csv文件。

爬虫代码在我另一篇博客:豆瓣电影信息爬取与可视化分析

数据放在了百度云上:https://pan.baidu.com/s/1YWB2iEOsMmXHkEUFpY2_TA?pwd=ej3z

数据如下图所示,包含电影名、上映日期、上映地区、类型、豆瓣链接、参演演员、演员数、评分、打分人数,共有3357部电影:

三、PySpark大数据开发实战

1、数据文件上传HDFS

首先通过xftp上传linux服务器,然后通过以下命令上传至HDFS:

hdfs dfs -mkdir /data
hdfs dfs -mkdir /output
hdfs dfs -put film_info.csv /data

2、导入模块及数据

使用SparkSession.builder.config(conf=SparkConf()).getOrCreate()创建Spark会话。使用spark.read.csv()读取CSV文件,并设置header=True以识别首行为列名,inferSchema=True自动推断数据类型。

from pyspark import SparkConf, SparkContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import datetime
import pyspark.sql.functions as F
from pyspark.sql.window import Window

# 主程序:
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()

df = spark.read.csv("/data/film_info.csv", header=True, inferSchema=True)

3、数据统计与分析

①、计算演员参演电影数

以下代码中使用了spark sql进行统计,也可以通过DataFrame API进行统计:

df.groupBy("actor").agg(count("title").alias("act_film_num"))
# 按分隔符切分列表
df_split = df.withColumn("actors", F.split(df["actors"], "\|")) \
    .withColumn("types", F.split(df["types"], "\|")) \
    .withColumn("regions", F.split(df["regions"], "\|"))

# 演员:拆分多行
df_exploded = df_split.withColumn("actor", F.explode(F.col("actors")))
df_exploded.drop(*["actors", "regions", "types"]).createOrReplaceTempView("actor_exploded")

df1 = spark.sql('''
    select actor,
           count(*) as act_film_num
    from actor_exploded 
    group by actor
    ''')
df1.sort(df1["act_film_num"].desc()).repartition(1).write.mode("overwrite").option("header", "true").csv(
    "/output/result1.csv")

结果如下:

+-------------+------------+
|        actor|act_film_num|
+-------------+------------+
|       童自荣|          43|
|     户田惠子|          37|
|         林雪|          33|
|       张国荣|          32|
|       刘德华|          31|
|       周星驰|          31|
|         成龙|          31|
|       任达华|          31|
|         刘洵|          30|
|塞缪尔·杰克逊|          29|
|  汤姆·汉克斯|          29|
|       梁家辉|          28|
|       吴孟达|          28|
|       梁朝伟|          27|
|      斯坦·李|          27|
|       吴君如|          27|
|    威廉·达福|          27|
|       黄秋生|          27|
|       胡立成|          27|
|  布拉德·皮特|          26|
+-------------+------------+
only showing top 20 rows
②、依次罗列电影番位前十的演员

这一题考察了窗口函数、行转列等等。

# 定义窗口函数,按电影标题和演员顺序排序
windowSpec = Window.partitionBy("title").orderBy("actors")
# 添加序号列
df2 = df_exploded.withColumn("rank", F.row_number().over(windowSpec))
# 过滤出前10个演员
rank_num = 10
rank_num_list = [str(i + 1) for i in range(rank_num)]
# 将演员重新组合成单行
df2_tmp1 = df2.groupBy("title").pivot("rank", rank_num_list).agg(F.collect_list("actor"))
df2_tmp2 = df2_tmp1.select("title", *[F.col(f"{i + 1}")[0].alias(f"actor{i + 1}") for i in range(rank_num)])
df2_tmp2.repartition(1).write.mode("overwrite").option("header", "true").csv("/output/result2.csv")

结果如下:

+------------------------+-------------------+---------------------+------------------+---------------+-----------------+----------------------+-------------------+---------------------+-----------------+-----------------+
|                   title|             actor1|               actor2|            actor3|         actor4|           actor5|                actor6|             actor7|               actor8|           actor9|          actor10|
+------------------------+-------------------+---------------------+------------------+---------------+-----------------+----------------------+-------------------+---------------------+-----------------+-----------------+
|                 101忠狗|          罗德·泰勒|            凯特·鲍尔|           本·怀特|    丽莎·戴维斯| 贝蒂·洛乌·格尔森|         J·帕特·奥马利|      玛莎·温特沃思|      大卫·弗兰克海姆|弗莱德里克·沃洛克|        汤姆·康威|
|                   11:14|        亨利·托马斯|          布莱克·赫伦|       芭芭拉·赫希|  克拉克·格雷格|    希拉里·斯万克|           肖恩·海托西|      斯塔克·桑德斯|          科林·汉克斯|        本·福斯特|  帕特里克·斯威兹|
|                    2012|        约翰·库萨克|          阿曼达·皮特|     切瓦特·埃加福|      坦迪·牛顿|    奥利弗·普莱特|           汤姆·麦卡锡|        伍迪·哈里森|          丹尼·格洛弗|      连姆·詹姆斯|        摩根·莉莉|
|                    2046|             梁朝伟|               章子怡|              王菲|       木村拓哉|             巩俐|                刘嘉玲|               张震|               张曼玉|             董洁|      通猜·麦金泰|
|                    21克|            西恩·潘|          娜奥米·沃茨|本尼西奥·德尔·托罗|  夏洛特·甘斯布|      梅丽莎·里奥|         迈克尔·芬内尔|     

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2232571.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

达梦数据库宕机问题分析及处理

官方宕机原因排查 官方故障诊断排除 相关概念 达梦数据库宕机往往会产生core文件,解读core文件是分析宕机原因的主要手段,类似oracle的diag.trc或system dump转储文件,记录数据库线程状态、sql语句等。 首选的排查方向可以从内存溢出、磁盘…

spring ai 入门 之 结构化输出 - 把大模型llm返回的内容转换成java bean

目录 ​编辑 将AI非结构化文本转换为特定格式数据的应用场景说明 Spring AI 介绍 :为Java开发者打造的AI应用开发框架 Qwen 介绍 : 一个国内领先的开源大模型 Spring AI Alibaba框架介绍 : 一个国内最好的spring ai实现 使用spring ai …

HbuildderX运行到手机或模拟器的Android App基座识别不到设备 mac

寻找模拟器 背景: 运行的是h5,模拟器是网易MuMu。 首先检查一下是否配置dab环境,adb version 配置一下hbuilderX的adb: 将命令输出的路径配置到hbuilderx里面去,然后重启下HbuilderX。 开始安装基座…一直安装不…

使用Docker Compose构建多容器应用

💓 博客主页:瑕疵的CSDN主页 📝 Gitee主页:瑕疵的gitee主页 ⏩ 文章专栏:《热点资讯》 使用Docker Compose构建多容器应用 引言 Docker Compose 简介 安装 Docker Compose 创建基本配置 运行多容器应用 查看服务状态 …

react-router与react-router-dom的区别

写法上的区别: 写法1: import {Swtich, Route, Router, HashHistory, Link} from react-router-dom;写法2: import {Switch, Route, Router} from react-router; import {HashHistory, Link} from react-router-dom;react-router实现了路由的核心功能 react-router-…

Python 字符串类型中 ``split(“\n“)`` 与 ``splitlines()`` 方法的一些区别

最近在以 self.__print("#" * 20 "\n") 调用自己写的 __print 接口时发现打印的时候 "\n" 没有打出来,进而发现了 split("\n") 与 splitlines() 方法的一些区别。 一个是参数上,split 需要传递一个字符串作为…

Java Iterator 实现杨辉三角

一、问题描述 杨辉三角定义如下: 1/ \1 1/ \ / \1 2 1/ \ / \ / \1 3 3 1/ \ / \ / \ / \1 4 6 4 1/ \ / \ / \ / \ / \ 1 5 10 10 5 1 把每一行看做一个list,试写一个 Iterator,不断输出下一行的 list&#xf…

Spark 的介绍与搭建:从理论到实践

目录 一、分布式的思想 (一)存储 (二)计算 二、Spark 简介 (一)发展历程 (二)Spark 能做什么? (三)spark 的组成部分 (四&…

Linux操作系统 ------(3.文本编译器Vim)

目录 1.前言 2.本章学习目标 3.vim的三种工作模式 3.1一般模式‌ 3.2编辑模式‌ 3.3命令行模式‌ 4.运行vim 5.vim 不同工作模式下的常见命令 6.一般模式下的功能键 6.1移动光标类 6.2删除、复制和粘贴类 6.3查找替换类 7.从一般模式进入编辑模式 8.命令行模式下的…

RocketMQ的消息类型

RocketMQ的消息类型 文章目录 RocketMQ的消息类型一、顺序消息二、广播消息应用场景:示例代码:实现思路:注意点: 三、延时消息应用场景:核心方法: 四、批量消息应用场景:示例代码:注…

Selective Generation for Language Models 语言模型的选择性生成

生成式语言模型(Generative Language Models, GLMs)在文本生成任务中取得了显著进展。然而,生成内容的“幻觉”现象,即生成内容与事实或真实语义不符的问题,仍是GLMs在实际应用中的一个重大挑战。为了解决这一问题&…

git clone,用https还是ssh

前言 在使用Git去克隆项目时,会遇到https和ssh等形式,这两种又有何种区别呢,本文将重点讨论在具体使用中的问题。 注:第一次使用Git 时,需要先设置全局用户名和邮箱,否则后续使用命令时会报错,也是提醒先添…

最新整理:Selenium自动化测试面试题

1.selenium中如何判断元素是否存在? find_elements查找到的元素个数为0,find_element报错意味着元素不存在 2.如何判断元素是否出现? 判断元素是否出现,存在两种情况,一种是该元素压根就没有,自然不会出现;另外一种是有这样的…

业绩代码查询实战——php

一、一级代码显示职员 foreach($data_职员信息 as $key > $value){//$where_查询分类$where_查询通用;//$dat分类one $业绩提成->where($where_查询分类)->order("CreateDate desc")->select();if($value[haschildname]0 && $value[key] !"…

如何彻底删除gitbash中所有的命令记录、以及彻底删除Windows powerShell或者cmd中的所有命令记录

文章目录 1. 文章引言2. 彻底删除gitbash中所有的命令记录3. 彻底删除Windows powerShell或者cmd中的所有命令记录1. 文章引言 有时,我们使用外部电脑从gitbash中下载代码,假设使用history -c命令: 可以清除当前弹框的历史记录,但也无法彻底删除命令记录。打开gitbash后,通…

工作管理实战指南:利用Jira、Confluence等Atlassian工具打破信息孤岛,增强团队协作【含免费指南】

如果工作场所存在超级反派,其中之一可能会被命名为“信息孤岛”,因为它们能够对公司的生产力和协作造成严重破坏。当公司决定使用太多互不关联的工具来完成工作时,“信息孤岛”就会出现,导致团队需要耗费大量时间才能就某件事情达…

OceanBase V4.3.3,首个面向实时分析场景的GA版本发布

在10月23日举办的 OceanBase年度发布会 上,我们怀着激动之情,正式向大家宣布了 OceanBase 4.3.3 GA 版的正式发布,这也是OceanBase 为实时分析(AP)场景打造的首个GA版本。 2024 年初,我们推出了 4.3.0 版本…

最新最全面的JAVA面试题免费下载

面对求职市场的激烈竞争,掌握全面且深入的Java知识已成为每一位Java开发者必不可少的技能。《2023最新版Java面试八股文》是一份精心整理的面试准备资料,旨在帮助广大开发者系统复习,从容应对Java及相关技术栈的面试挑战。这份文档不仅汇聚了…

Spring Security 框架篇-深入了解 Spring Security 的授权核心功能(RBAC 权限模型、自定义异常处理器、校验权限方法)

🔥博客主页: 【小扳_-CSDN博客】 ❤感谢大家点赞👍收藏⭐评论✍ 文章目录 1.0 权限系统 1.1 引入 1.2 RBAC 权限模型 1.3 数据库设计 2.0 Spring Security 核心功能-授权 2.1 思路分析 2.2 编写 SQL 语句 2.3 将用户权限进行封装 2.4 获取用户…

博捷芯MIP专机:精密划片技术的革新者

BJX8160 精密划片机作为MINI行业的专用机,凭借其全自动上下料、高精度高速度um级无膜切割以及兼容多种上下料方式等特点,成为了工厂无人值守自动化的理想选择。同时,MIP专机作为博捷芯的独创产品,展现了博捷芯在精密划片机领域的领…