Jupyter + Pyspark + Yarn 交互式大数据分析

news2025/1/12 16:18:14

背景:

​ 小批量数据可以使用pandas 进行分析,方便灵活。但大批量(千万级别)数据,使用pandas分析处理,速度很慢,且需一次性读取全部数据,内存可能溢出。

​ 此时使用spark分布式分析处理速度很快,且数据分区,再配上jupyter 在线分析工具界面,可以很方便进行交互式大数据集分析。

注意:

​ python第三方库pyspark,和spark自带组件pyspark,都提供了与Spark 集群交互的 Python 接口,让Python 开发人员能够利用 Apache Spark 的强大功能来处理大规模数据。

​ 区别:spark集群自带组件pyspark,与spark一体,无需进行配置,可直接使用;python pip安装的pyspark模块,需要配置上spark集群相关信息,才能利用spark集群处理数据。

环境:

​ Jupyter-lab(python3.7) + Spark集群(sparkV2.4.0 - cdh6.3.4)

文章目录

    • 1、Jupyter Pyspark 在线交互式环境配置
      • 1.1 第一种方式
      • 1.2 第二种方式[未验证]
    • 2、在线交互式大数据分析测试
    • 3、spark-submit
    • 4、client 和 cluster 运行模式注意点

1、Jupyter Pyspark 在线交互式环境配置

1.1 第一种方式

# 安装pyspark类库
> pip install pyspark==2.4.0  # 与spark集群版本保持一致
# 启动jupyter-lab
> jupyter-lab
# jupyter环境
import os
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,FloatType,IntegerType
import pyspark.sql.functions as F
from pyspark.ml.feature import QuantileDiscretizer
import pandas as pd
import numpy as np

import logging
import sys
import warnings

# 日志配置
logging.getLogger("py4j").setLevel(logging.WARN) # 屏蔽spark运行debug日志
logging.getLogger("pyspark").setLevel(logging.WARN)

# 配置集群spark、hadoop家目录
os.environ['SPARK_HOME'] = '/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6751098/lib/spark'
os.environ['HADOOP_CONF_DIR'] = '/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6751098/lib/hadoop' # spark on hive
os.environ['PYSPARK_PYTHON'] = './py3/python3.7/bin/python' # 使用 spark.yarn.dist.archives 分发到各节点的python环境;也可以是所在节点绝对路径(要求每台机器所安装python环境一致)
os.environ['PYSPARK_DRIVER_PYTHON'] = '/data/d2/anaconda3/anaconda3/envs/python3.7/bin/python' # spark client 模式运行时,driver 本地运行,无法使用spark.yarn.dist.archives 分发到各节点的python环境,需单独指定



# 创建sparksql会话
parameters = [
              ('spark.app.name','测试pyspark连接'),# 设置spark应用名称  
              # spark.yarn.dist.archives 依赖归档文件(zip/tar.gz等),用于指定哪些文件应该分发到每个执行节点上。
              # 这些压缩文件将在应用程序执行前在executor节点上工作目录内解压可用。
              # 这里将整个python3.7解释器,压缩为py3,随spark执行程序一起分发到各个executor工作目录内。
              ('spark.yarn.dist.archives','hdfs://namenode2:8020/python3/python3.7.tar.gz#py3'), 
              ('spark.master','yarn'),  #集群运行模式,yarn(连接到正在运行的 YARN 集群,Spark应用程序会作为 YARN 上的一个应用来提交和执行) 
              ('spark.submit.deploymode','client'), # driver运行模式 -> 客户端模式运行
             #('spark.submit.pyFiles','./test.txt') # job开始前上传所依赖py文件,client模式下可能提示找不到(文件会随submit自动上传)
              ('spark.debug.maxToStringFields',100)
             ]


conf = SparkConf.setALL(parameters)
# sc = SparkContext.getOrCreate(conf=conf)
# sc.addPyFile('./funcs.py') #(local_file_path/hdfs/url) 添加单个python文件到executor上,甚至在开始job后也可以添加py文件 

# 创建回话
spark = SparkSession\
        .builder\
        .config(conf=conf)\
        .enableHiveSupport()\ #.enableHiveSupport():这个方法会自动包括一些必要的 Hive 类库和配置,以便与 Hive 服务通信
        .getOrCreate() 
spark

# 关闭回话
# spark.stop()

在这里插入图片描述

1.2 第二种方式[未验证]

​ spark官网说明
在这里插入图片描述

## 通过配置driver端python环境为jupyter,然后再启动{SPARK_HOME}/bin/pyspark实现

> export PYSPARK_DRIVER_PYTHON=/xx/anaconda3/bin/jupyter #jupyter启动服务命令所在目录
> export PYSPARK_DRIVER_PYTHON_OPTS=notebook # jupyter启动参数,jupyter notebook方式
> export PYSPARK_DRIVER_PYTHON_OPTS='lab --allow-root' # jupyter-lab 方式 二选其一
# 上面两个配置可以直接加到{SPARK_HOME}/bin/pyspark 启动文件里

# 再次启动pyspark
> ./bin/pyspark

2、在线交互式大数据分析测试

#1、读取hive表数据
data = spark.sql('select * from test.id4')
data.show(4)

在这里插入图片描述

#2、使用udf进行数据处理
@F.udf(IntegerType())
def add(x):
    return x+1

data.withColumn('flag+1',add(data.flag))
data.show(4)

在这里插入图片描述

3、spark-submit

spark-submit 是Spark提交各类任务(python、R、Java、Scala)的工具,可以使用shell脚本运行指定的py脚本。

其实,{SPARK_HOME}/bin/pyspark交互式环境,运行时底层也是使用的spark-submit提交资源管理器进行计算。

spark-submit(V3.1.3)具体参数:*

【从spark-submit --help 翻译而来(不同版本间可能有差异)】

spark-submit 提交用法及参数(偏python)
提交脚本格式
Usage: spark-submit [options] "app jar | python file | R file" [app arguments]
Options:注释
--master MASTER_URL spark://host:port, mesos://host:port, yarn,k8s://https://host:port,local (Default: local[*]).
--deploy-mode DEPLOY_MODE 运行模式 'client/cluster '(Default: client)
--class CLASS_NAME 您的应用程序的主类(用于 Java / Scala 应用程序,python应该不需要)。
--name NAME 应用程序的名称。
--packages 要包含在驱动程序和执行程序类路径中的 jar 的 maven 坐标的逗号分隔列表。
--py-files PY_FILES逗号分隔的 .zip、.egg 或 .py 文件列表,提交的python文件和入口文件在同一目录下,这里面包括Python应用主程序,这些文件将被交付给每一个执行器来使用。
--files FILES逗号分隔的文件列表,放置在每个执行器的工作目录中。这些文件在执行器中的文件路径可以通过 SparkFiles.get(fileName) 访问。
--archives ARCHIVES 要提取到每个执行程序的工作目录中的以逗号分隔的档案列表。
--conf, -c PROP=VALUE 任意 Spark 配置属性
--properties-file FILE加载额外属性的文件路径。如果未指定,默认查找 conf/spark-defaults.conf
--driver-memory MEMdriver程序内存(例如 1000M,2G)(默认值:1024M)。
--executor-memory MEM每个executor的内存(例如 1000M,2G)(默认值:1G)
仅集群部署模式:
--driver-cores NUMdrive使用的核心数(默认值:1)。
--executor-cores每个executor可以使用的cpu核心(Yarn和K8S默认1,standalone默认worker可利用的全部核心)
Spark 仅适用于 YARN 和 Kubernetes:
--num-executors NUM要启动的执行程序数量(默认值:2)。 如果启用了动态分配,则执行器的初始数量将至少为 NUM。
--principal PRINCIPAL用于登录 KDC 的主体。
--keytab KEYTAB包含以上指定主体的密钥表的文件的完整路径。
Spark 仅适用于 YARN:
--queue QUEUE_NAME 要提交到的 YARN 队列(默认值:“default”)。

4、client 和 cluster 运行模式注意点

起初,在配置spark运行环境时,指定spark python环境配置如下:

 vim spark-defaults.conf
spark.yarn.dist.archives=hdfs://***/***/***/env/python_env.zip#python_env
spark.pyspark.driver.python=./python_env/bin/python # pyspark程序内部自定义函数或类执行环境
spark.pyspark.python=./python_env/bin/python 

Spark-submit在进行client模式提交时,提示 "Cannot run program “./python_env/bin/python “: error=2, No such file or dictor "错误,而进行cluster提交时正常运行。

原因:

 --archives  # code运行依赖文档。
 --py-files  # code依赖python。

 如上面依赖,也可能出现 “file not found” 问题,原理应该一样。

driver服务启动后,会自动上传依赖文件到executor中,解压到当前文件夹下(通过查看yarn运行日志,可以看到上传解压过程)。

  当使用client模式时,driver运行在本地spark-submit进程中,未进行archives的上传解压,所以报错找不到python文件。

  当使用cluster模式提交时,会优先在yarn的机器中,开启一个特殊的executor运行driver,在开启executor过程中,伴随着进行archives的上传解压。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1921269.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

记VMware网络适配器里的自定义特定虚拟网络一直加载问题解决办法

1、问题描述 VMware网络适配器里的自定义特定虚拟网络一直加载问题: 在自定义:特定虚拟网络选择的时候 没有上图所示的三个选择,而是正在加载虚拟网络.... 如下图所示: 2、解决办法 2.1、原因分析: 是安装时候出现…

【Java】Idea运行JDK1.8,Build时中文内容GBK UTF-8编码报错一堆方块码

问题描述 在Windows系统本地运行一个JDK1.8的项目时,包管理用的Gradle,一就编码报错(所有的中文内容,包括中文注释、中文的String字面量),但程序还是正常运行。具体如下: 解决 1. Idea更改编…

springboot+vue系统开发

链接: https://pan.baidu.com/s/1P1YpHAx9QOBPxjFZ9SAbig 提取码: u6f1

精选力扣,牛客链表面试题

💎 欢迎各位大佬互三:我的主页 1. 反转链表 206.反转链表 思考:如果不开辟额外的空间,只在原来的链表上进行修改的话,该用什么方法呢 只需要从第二个元素开始,依次进行头插就可以了 接着修改一下引用就可…

ROS2 + 科大讯飞 初步实现机器人语音控制

环境配置: 电脑端: ubuntu22.04实体机作为上位机 ROS版本:ros2-humble 实体机器人: STM32 思岚A1激光雷达 科大讯飞语音SDK 讯飞开放平台-以语音交互为核心的人工智能开放平台 实现步骤: 1. 下载和处理科大讯飞语音模…

Linux /etc/profile 详解

概述 Linux是一个多用户的操作系统。每个用户登录系统后,都会有一个专用的运行环境。通常每个用户默认的环境都是相同的,这个默认环境实际上就是一组环境变量的定义。用户可以对自己的运行环境进行定制,其方法就是修改相应的系统环境变量&…

python如何查看类的函数

Python非常方便,它不需要用户查询文档,只需掌握如下两个帮助函数,即可查看Python中的所有函数(方法)以及它们的用法和功能: dir():列出指定类或模块包含的全部内容(包括函数、方法、…

浅谈串口UART通信原理

文章目录 引言并行和串行波特率UART帧格式 引言 UART(Universal Asynchronous Receiver/Transmitter,通用异步收发器)是一种用于串行通信的硬件设备。它允许两个设备之间进行异步数据传输。是一种通用的串行、异步通信总线。该总线有两条数据…

SpringBoot整合XXL_JOB示例

XXL-JOB 是一个分布式任务调度平台,主要用于管理和执行定时任务。它适用于各种场景,例如定时任务、批处理任务、分布式任务等。XXL-JOB 提供了丰富的功能,使得任务调度变得简单、高效和可靠。以下是 XXL-JOB 的一些主要功能和特点&#xff1a…

Centos系统内磁盘分区

Centos系统内磁盘分区 建议如果有重要数据提前做好备份 以根目录扩容50G为例: 1、卸载/home目录 umount /home 2、删除逻辑卷 y确认即可 lvremove /dev/mapper/centos-home 3、df -h查询一下,/home目录已经不见了 4、向根目录分区追加50G容量 lv…

数据销毁境外间谍情报机关逼迫、威胁贷款学生为其窃取我国家秘密

近年来,随着国际形势的复杂多变,境外间谍情报机关的活动也日益猖獗。他们利用各种手段,包括通过校园贷逼迫、威胁贷款学生为其窃取我国国家秘密,这种行为不仅危害了国家安全,也严重损害了社会的公平正义。那么&#xf…

微信小程序毕业设计-汽车维修项目管理系统项目开发实战(附源码+论文)

大家好!我是程序猿老A,感谢您阅读本文,欢迎一键三连哦。 💞当前专栏:微信小程序毕业设计 精彩专栏推荐👇🏻👇🏻👇🏻 🎀 Python毕业设计…

(一)高并发压力测试调优篇——MYSQL数据库的调优

前言 在实际项目开发中,很多业务场景下都需要考虑接口的性能要求,追求高并发、高吞吐量。那么对于此类问题如何入手呢?关注作者,不迷路。本节内容主要介绍在数据库db方面的优化,以mysql数据库为例。 关于db的优化&am…

python库(11):Box库简化字典和对象之间的转换

1Box库简介 Box是一个Python库,它提供了一种将数据封装在字典和列表中的方式,同时提供了一些额外的功能,比如数据验证、默认值设置等。这使得Box库非常适合用于配置管理、数据传输对象(DTO)的创建,以及任何…

PDF 中图表的解析探究

PDF 中图表的解析探究 0. 引言1. 开源方案探究 0. 引言 一直以来,对文档中的图片和表格处理都非常有挑战性。这篇文章记录一下最近工作上在这块的探究。图表分为图片和表格,这篇文章主要记录了对表格的探究。还有,我个人主要做日本项目&…

[C++]——同步异步日志系统(4)

同步异步日志系统 一、日志等级模块设计二、日志消息类设计 一、日志等级模块设计 定义出日志系统所包含的所有日志等级分别为:(7个等级) UNKNOW0,未知等级的日志DRBUG ,调试等级的日志INFO ,提示等级的日…

前端调试技巧(npm Link,vscode调试,浏览器调试等)

Npm Link 功能: 在本地开发npm模块的时候,我们可以使用npm link命令,将npm 模块链接到对应的运行项目中去,方便地对模块进行调试和测试 断点调试 vscode调试 Debug Vue2 Project 目标:在VSCode中调试项目代码…

docker拉取镜像-配置阿里云镜像加速

1、配置阿里云镜像&#xff08;用于拉取镜像加速&#xff09; sudo mkdir -p /etc/docker sudo tee /etc/docker/daemon.json <<-EOF {"registry-mirrors": ["https://xxxxxxxx.mirror.aliyuncs.com"] } EOF sudo systemctl daemon-reload sudo syst…

论文阅读【时间序列】TimeMixer (ICLR2024)

【时间序列】TimeMixer (ICLR2024) 原文链接&#xff1a;TIMEMIXER: DECOMPOSABLE MULTISCALE MIXING FOR TIME SERIES FORECASTING 代码仓库&#xff1a;https://github.com/kwuking/TimeMixer 符号定义 符号含义P用于预测的历史序列长度&#xff08;seq_len&#xff09;F预测…

debian 12 Install

debian 前言 Debian是一个基于Linux内核的自由和开放源代码操作系统&#xff0c;由全球志愿者组成的Debian项目维护和开发。该项目始于1993年&#xff0c;由Ian Murdock发起&#xff0c;旨在创建一个完整的、基于Linux的自由软件操作系统。 debian download debian 百度网盘…