spark的使用

news2024/12/26 9:22:05

国内源下载

https://mirrors.cloud.tencent.com/apache/spark/

环境配置(三台机器都要配置)

修改/etc/profile

export JAVA_HOME=/export/server/jdk
export HADOOP_HOME=/export/server/hadoop

export SPARK_HOME=/export/server/spark
export PYSPARK_PYTHON=/pythonenv/pyspark/bin/python
export HADOOP_CONF_DIR=$HADDOP_HOME/etc/hadoop

修改~/.bashrc

export JAVA_HOME=/export/server/jdk
export PYSPARK_PYTHON=/pythonenv/pyspark/bin/python

修改spark-env.sh

JAVA_HOME=/export/server/jdk

## HADOOP软件配置文件目录,读取HDFS上文件和运行YARN集群
HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
YARN_CONF_DIR=/export/server/hadoop/etc/hadoop
#配置worker的python环境,否则他会用系统自带的
export PYSPARK_PYTHON=/pythonenv/pyspark/bin/python
## 指定spark老大Master的IP和提交任务的通信端口
# 告知Spark的master运行在哪个机器上
export SPARK_MASTER_HOST=node1
# 告知sparkmaster的通讯端口
export SPARK_MASTER_PORT=7077
# 告知spark master的 webui端口
SPARK_MASTER_WEBUI_PORT=8080

# worker cpu可用核数
SPARK_WORKER_CORES=1
# worker可用内存
SPARK_WORKER_MEMORY=1g
# worker的工作通讯地址
SPARK_WORKER_PORT=7078
# worker的 webui地址
SPARK_WORKER_WEBUI_PORT=8081

## 设置历史服务器
# 配置的意思是  将spark程序运行的历史日志 存到hdfs的/sparklog文件夹中
SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node1:8020/sparklog/ -Dspark.history.fs.cleaner.enabled=true"

启动sparkonyarn

/export/server/spark/bin/pyspark --master yarn --deploy -mode client|culster

在这里插入图片描述

使用spark-submit提交py文件到yarn

#提交到yarn
/export/server/spark/bin/spark-submit --master yarn /sparkproject/00_example/helloword.py
#提交到本地运行
/export/server/spark/bin/spark-submit --master local[*] /sparkproject/00_example/helloword.py

在这里插入图片描述

RDD的五大特性

  • 分区性,rdd是可以增加缩减分区的
  • 通用性,每个rdd方法都会作用于每个分区
  • 血缘性,rdd1,rdd2…每个rdd是链式依赖
  • key,value数据的分区性
  • driver就近构建,driver的构建会尽量贴近数据,从而提高性能.

RDD的创建

from pyspark import SparkConf,SparkContext

if __name__ == '__main__':
    # 1.通过sparkcof创建conf对象
    conf = SparkConf().setAppName('wordcount')
    #2.生成sc对象
    sc=SparkContext(conf=conf)
    #读取一个文件
    word_file = sc.textFile('hdfs://node1:9001/input/words.txt')
    word_add = word_file.flatMap(lambda line:line.split(' '))
    word_with_one_rdd = word_add.map(lambda x:(x,1))
    result_rdd = word_with_one_rdd.reduceByKey(lambda a,b:a+b)
    print(result_rdd.collect())

wholeTextFiles 处理一个文件夹内包含多个小文件

from pyspark import SparkConf,SparkContext

if __name__ == '__main__':
    # 1.通过sparkcof创建conf对象
    conf = SparkConf().setAppName('wordcount')
    #2.生成sc对象
    sc=SparkContext(conf=conf)
    #读取一个文件夹,直接collect()会返回文件位置:文件内容的元祖形式,可通过map获取.
    rdd = sc.wholeTextFile('hdfs://node1:9001/input')
    print(rdd.map(lambda x:x[1]).collect())

rdd的算子

转换算子:只要返回结果是rdd的就是转换算子,是懒加载,只有执行执行算子的时候才会处理
执行算子: 返回的不是rdd就是执行算子

map算子

from pyspark import SparkConf,SparkContext

if __name__ == '__main__':
    # 1.通过sparkcof创建conf对象
    conf = SparkConf().setAppName('wordcount').setMaster('local[*]')
    #2.生成sc对象
    sc=SparkContext(conf=conf)

    rdd = sc.parallelize([1,2,3,4,5,6])
    def math_10(data):
        return data*10
    print(rdd.map(math_10).collect())
    print(rdd.map(lambda x:x*10).collect())

flatmap用法与map相同,限制性map算子,然后在接触数据嵌套
[(1,2,3),(4,5,6),(7,8,9)] ===>> [1,2,3,4,5,6,7,8,9]

reduceByKey

    rdd = sc.parallelize([('a',1),('b',1),('b',2),('a',2),('a',1),('a',1)])
    print(rdd.reduceByKey(lambda a,b:a+b).collect())
#[('a', 5), ('b', 3)]

mapValues

    rdd = sc.parallelize([('a',1),('b',1),('b',2),('a',2),('a',1),('a',1)])
    print(rdd.mapValues(lambda values:values*10).collect())
#[('a', 10), ('b', 10), ('b', 20), ('a', 20), ('a', 10), ('a', 10)]

groupBy

    rdd = sc.parallelize([('a',1),('b',1),('b',2),('a',2),('a',1),('a',1)])
    print(rdd.groupBy(lambda t:t[0]).collect())
#[('a', <pyspark.resultiterable.ResultIterable object at 0x7faa2b811370>), ('b', <pyspark.resultiterable.ResultIterable object at 0x7faa2b8113a0>)]

   result = rdd.groupBy(lambda x:x[0])
    print(result.map(lambda t:(t[0],list(t[1]))).collect())
    # [('a', [('a', 1), ('a', 2), ('a', 1), ('a', 1)]), ('b', [('b', 1), ('b', 2)])]

filter ====>rdd.filter(func) 传入参数返回值时bool类型,为true的留下,为false的过滤

    rdd = sc.parallelize([1,2,3,4,5,6,7,8])
    # print(rdd.groupBy(lambda t:t[0]).collect())
    print(rdd.filter(lambda x:x%2==1).collect())
# [1, 3, 5, 7]

groupByKey和reduceByKey的区别?
1.groupByKey只进行了分组后可以自定义聚合函数,reduceByKey内置聚合分组聚合.
2.是reduceByKey会在分组前在每个分区先进行聚合,被shuffle的数据可以极大地减少,然后在执行分组操作,然后在执行聚合.相较于groupByKey来讲:大量节省了磁盘的io操作,在数据量较大的情况下,优先使用reduceByKey.

mappartitions和foreachpartitions的区别?
1.相同点:他们两个都是对一整个分区的数据进行处理的
2.不同点,mappartitions是转换算子返回的是rdd,foreachpartitions是执行算子,由executor执行,返回值为none.

coalesce修改分区数量
两个参数第一个参数是要修改的数量值,第二个参数shuffle=true. 建议只减少分区,不增加分区,增加分区会产生shuffle.

rdd数据是过程数据:即每生成一个新的rdd,老的rdd就会被清理
如果数据再生成rdd3时还想使用rdd1,这时候就可以使用rdd的缓存机制,缓存机制是分散存储.
1.rdd.cache()
2.rdd.persist()
3.rdd.unpersist() 清除缓存
在这里插入图片描述

rdd缓存和CheckPoint的区别

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/623155.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

苹果MR Vision Pro将会带动哪些零部件出货?

苹果如何重新定义AR? 在如今以智能手机为主的消费电子市场下行阶段&#xff0c;市场急需开辟一个新的领域带来新的增长点&#xff0c;以往被寄予厚望的VR/AR等头显设备在经历了数年发展后&#xff0c;依旧难堪大任&#xff0c;业界都把希望寄托在苹果身上。 简单来说&#xf…

学习Java一年的程序员的Python学习记录(转行了,校招Java根本找不到工作)

文章目录 一 基础语法二 集合三 函数四 IO五 项目结构六 面向对象 一 基础语法 Python如果是部署在Linux上&#xff0c;是需要通过源码去编译安装的&#xff0c;在编译的过程中&#xff0c;会以来一些第三方的软件。所以这些软件需要提前安装一下。 yum install wget zlib-deve…

VS报错集锦 --- 出现:error LNK2005: _DllMain@12 已经在 *****.obj 中定义 错误

出现的问题&#xff1a; 1>mfcs140d.lib(dllmodul.obj) : error LNK2005: DllMain 已经在 DllMain.obj 中定义 解决方法&#xff1a; 项目 -- 属性 -- c/c -- 预处理器 -- 将预处理定义中的_USRDLL 删除即可

“大四在读生”都四面成功拿到字节跳动Offer了,你还有什么理由去摸鱼?

博主大四在读&#xff0c;投的是字节 Data 的软件测试岗位实习生&#xff0c;base 杭州。 时间线&#xff1a; 4.12 投递4.13 安排简历筛选4.14 安排面试4.19 16:00 一面4.22 16:00 二面 4.23 8:00 三面4.23 16:00 HR 面4.23 16:30 Offer 一面 你对字节跳动的了解和认知有哪…

Allegro16.6详细教程(四)

(2) PIN的定義 如果用第一種方式產生Netlist的話,就要對於一些Power pin加以定義。 1.滑鼠點選想定義的零件。 2.點選選單中Edit>Part。 3.用滑鼠點選想定義的Pin腳。 4.點選功能表中Edit>Properties,透過這些步驟就看到了下面的這個對話方塊了。 在這裏主要是把T…

Apache Flink 1.17

Apache Flink 1.17 1. Flink 1.17 Overview2. Flink 1.17 Overall Story3. Flink 1.17 Key Features4. Summary5. Q&A 1. Flink 1.17 Overview Flink 1.17 版本完成了 7 个 FLIP&#xff0c;累计贡献者 170&#xff0c;解决 600Issue 以及 1100Commits&#xff0c;整体来看…

ppt如何录屏?电脑怎么录制ppt文稿?

案例&#xff1a;在电脑上怎么录制PPT文稿&#xff0c;具体怎么操作&#xff1f; 【我工作的时候经常需要用到ppt文稿&#xff0c;有时还需要对PPT文稿进行录制&#xff0c;但我不知道如何操作。有小伙伴知道在电脑上如何录制ppt文稿吗&#xff1f;需要用到什么工具&#xff1…

热水器语音芯片,带有温度检测算法、数码管显示语音ic,WTV380

在现代科技不断进步的时代&#xff0c;智能家居产品成为越来越多消费者的选择&#xff0c;在热水器行业为了提供更智能、便捷的用户交互体验&#xff0c;一款带有数码管显示语音提示二合一&#xff0c;多功能语音芯片方案 —— WTV380 WTV380能够实现语音播报热水器的各种信息…

学PCB设计要精通模电吗?

PCB设计是电子工程师在电路设计领域中的重要一环&#xff0c;而模拟电路设计&#xff08;简称&#xff1a;模电&#xff09;是其中的核心内容之一&#xff0c;很多小白在初学PCB设计都会困惑&#xff0c;学PCB设计是否要精通模电&#xff1f;这篇文或许能解惑&#xff0c;本文将…

【LeetCode热题100】打卡第16天:组合总和

文章目录 组合总和⛅前言&#x1f512;题目&#x1f511;题解 组合总和 ⛅前言 大家好&#xff0c;我是知识汲取者&#xff0c;欢迎来到我的LeetCode热题100刷题专栏&#xff01; 精选 100 道力扣&#xff08;LeetCode&#xff09;上最热门的题目&#xff0c;适合初识算法与数…

Vue.js 中的 $router 和 $route

Vue.js 中的 $router 和 $route 在 Vue.js 中&#xff0c;$router 和 $route 是两个常用的对象&#xff0c;用于处理路由相关的操作。在本文中&#xff0c;我们将介绍 $router 和 $route 的区别&#xff0c;并且演示如何使用它们。 $router 和 $route 的区别 在 Vue.js 中&am…

Windows 下挂载使用 CephFS

一、Ceph集群搭建和CephFS创建 参考上期文章 Centos stream 8 使用 cephadm 安装 Ceph (17.2.6 quincy)集群_阿波罗.2012的博客-CSDN博客 二、将CephFS挂载到Windows Server 2019下 1、准备Dokany 下载地址&#xff1a;Release 1.5.1.1000 dokan-dev/dokany GitHub 下载…

【数据分享】1929-2022年全球站点的逐年平均降水量(Shp\Excel\12000个站点)

气象数据是在各项研究中都经常使用的数据&#xff0c;气象指标包括气温、风速、降水、湿度等指标&#xff0c;说到常用的降水数据&#xff0c;最详细的降水数据是具体到气象监测站点的降水数据&#xff01; 之前我们分享过1929-2022年全球气象站点的逐年平均气温数据、逐年最高…

实用工具篇(三):一款 IntelliJ IDEA 神级插件Bito

目录 1、什么是Bito 2、为什么要使用Bito 3、如何安装Bito插件 4、如何使用Bito插件 1、什么是Bito Bito是一款在IntelliJ IDEA编辑器中的插件&#xff0c;Bito插件是由ChatGPT团队开发的&#xff0c;它是ChatGPT团队为了提高开发效率而开发的一款工具。 Bito插件的强大…

Clion开发STM32之ESP8266系列(一)

系列开篇说明 结合个人的开发经验以及实际情况&#xff0c;分享stm32结合esp8266的一个开发项目。从0开始构建项目程序。为了方便记录项目开发记录&#xff0c;此系列的每个篇章都在git上做一个节点。此系列也是作为一个个人的开发思路和经验本次开发选用的芯片为stm32f103vet…

GIT安装教程(入门)

目录 前言 Git作者 官网 GIT优点 GIT缺点 为什么要使用 Git 下载以及安装步骤 一、官网下载 二、GIT安装步骤 GIT习题 习题一 总结 前言 Git 是一个分布式版本控制及源代码管理工具;Git 可以为你的项目保存若干快照&#xff0c;以此来对整个项目进行版本管理 Git作…

lan区段

LAN区段相当于说模拟出一个交换机或者集线器出来&#xff0c;把不同虚拟机连接起来&#xff0c;与物理机不进行数据交流&#xff0c;与外网也不进行数据交流&#xff0c;构建一个独立的网络。没有DHCP功能&#xff0c;需要手工配置IP或者单独配置DHCP服务器。

AG-Grid JavaScript 29.3.5 企业版 注册版

世界上最好的 JavaScript 网格 开发人员构建企业应用程序的专业选择 开始吧 Javascript 数据网格 Javascript 反应数据网格 反应 角度数据网格 有角的 Vue 数据网格 视图 实体数据网格 坚硬的 有很多基于组件的表库&#xff0c;但我相信 AG Grid 是黄金标准&#xff0c;也是我最…

抖音账号矩阵系统开发源码

技术自研框架开发背景&#xff1a; 抖音账号矩阵系统是一种基于数据分析和管理的全新平台&#xff0c;能够帮助用户更好地管理、扩展和营销抖音账号。 部分源码分享&#xff1a; //计算分页$active_list_all $Video_model->getCount($where);$page_libs new Libs_Pagin…

Mysql5.7.x镜像开启log-bin失效及解决

文章目录 [toc] 1.问题2.mysql5.7.16的部署及开启log-bin2.1 准备挂载目录2.2 启动容器命令2.3 开启log-bin配置文件内容 3.原因及解决办法4.了解binlog和redolog有什么区别&#xff1f;5.总结 1.问题 由于在本地搭建了一个数据同步的环境用到了mysql&#xff0c;所以用Docker的…