RDD优化:缓存和checkpoint机制、数据共享(广播变量、累加器)、RDD的依赖关系、shuffle过程、并行度说明

news2024/11/28 12:54:42

文章目录

  • 1. 缓存和checkpoint机制
    • 1.1 缓存使用
    • 1.2 checkpoint
    • 1.3 缓存和checkpoint的区别
  • 2. 数据共享
    • 2.1 广播变量
    • 2.2 累加器
  • 3. RDD依赖关系
  • 4.shuffle过程
    • 4.1 shuffle介绍
    • 4.2 spark计算要尽量避免shuffle
  • 5. 并行度

1. 缓存和checkpoint机制

缓存和checkpoint也叫作rdd的持久化将rdd的数据存储在指定位置
作用:

  • 计算容错
  • 提升计算速度

1.1 缓存使用

在这里插入图片描述
缓存是将数据存储在内存或者磁盘上,缓存的特点计算结束时,缓存自动清空

  • 缓存级别
    • 指定缓存的数据位置
    • 默认是缓存到内存上
  • 使用
    • persist使用该方法
    • cache内部调用persist
    • 手动释放 unpersist
from pyspark import SparkContext
from pyspark.storagelevel import StorageLevel

sc = SparkContext()
rdd = sc.parallelize(['a', 'b','a','c'])


# rdd数据进行转化
rdd_kv  = rdd.map(lambda x: (x,1))
#rdd_kv数据进行缓存
rdd_kv.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)
#使用action算子触发
rdd_kv.collect()


# 分组处理
rdd_group = rdd_kv.groupByKey()

#求和计算
rdd_reduce = rdd_group.reduceByKey(lambda x,y:x+y)
# 查看计算结果
res = rdd_reduce.collect()
print(res)

1.2 checkpoint

在这里插入图片描述
checkpoint也是将中间rdd数据存储起来,但是存储的位置实时分布式存储系统,可以永久保存,程序结束不会释放
如果需要删除就在HDFS上删除对应的目录文件。

#checkpoint使用
from pyspark import SparkContext
sc = SparkContext()

#使用sc对象指定checkpoint存储位置
sc.setCheckpointDir('hdfs://node1:8020/checkpoint')

rdd = sc.parallelize(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k'])

#rdd数据进行转化
rdd_kv = rdd.map(lambda x: (x, 1))

#rdd 数据进行checkpoint
rdd_kv.checkpoint()
#需要使用action算子触发checkpoint
print(rdd_kv.glom().collect())

#分组处理
rdd_group = rdd_kv.groupByKey()

#求和计算
rdd_reduce = rdd_group.reduceByKey(lambda x, y: x + y)

#查看计算结果
res = rdd_kv.collect()
print(res)

res1 = rdd_group.collect()
print(res1)

res2 = rdd_reduce.collect()
print(res2)

1.3 缓存和checkpoint的区别

  • 生命周期
    • 缓存数据,程序计算结束后自动删除
    • checkpoint 程序结束,数据依然保留在HDFS
  • 存储位置
    • 缓存 优先存储在内存上,也可以选存储在本地磁盘,是在计算任务所在的内存和磁盘上。
    • checkpoint存储在HDFS上
  • 依赖关系
    • 缓存数据后,会保留rdd之间依赖关系,缓存临时存储,数据可能会丢失,需要保留依赖,当缓存丢失后可以按照依赖重新计算。
    • checkpoint,数据存储后会断开依赖,数据保存在HDFS,HDFS三副本机制可以保证数据不丢失,所以没有比较保留依赖关系。
      注意:缓存和checkpoint可以作为rdd优化的方案,提升计算速度,一般对经常要使用的rdd进行缓存和checkpoint,对计算比较复杂的rdd进行缓存或checkpoint。

2. 数据共享

2.1 广播变量

在这里插入图片描述
在这里插入图片描述
如果要在分布式计算里面分发大的变量数据,这个都会由driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task都会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的资源,如果将这个变量声明为广播变量,那么每个executor都会拥有一份,这个executor启动的task会共享这个变量,节省了通信的成本和服务器的资源。

减少task线程对应变量的定义,节省内存空间

#广播变量
from pyspark import SparkContext

sc= SparkContext()

a_obj = sc.broadcast(10)
#生成rdd
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])

#转化计算
rdd2 = rdd.map(lambda x:x+a_obj.value)

#查看数据
res = rdd2.collect()
print(res)

2.2 累加器

避免资源抢占造成错误

#累加器的使用
from pyspark import SparkContext

sc = SparkContext()
#将变量值添加到累加器中
acc_obj = sc.accumulator(0)

#rdd操作
rdd = sc.parallelize([1,2,3])

#使用累加器进行数据累加
rdd2 = rdd.map(lambda x:acc_obj.add(x))

#查看结果
res = rdd.collect()
print(res)

res1 = rdd2.collect()
print(res1)

#查看累加器的结果
print(acc_obj.value)

查看结果:
在这里插入图片描述

3. RDD依赖关系

  • 窄依赖
    • 每个父RDD的一个partition最多被子RDD的一个partition所使用。
      • map
      • flatMap
      • filter
  • 宽依赖
    • 一个父RDD的partition会被多个子RDD的partition所使用
      • groupByKey
      • reduceByKey
      • sortByKey
    • 在宽依赖中rdd之间会发生数据交换,这个交换的过程称为rdd的shuffle
      • 只要是宽依赖必然发生shuffle
      • 在宽依赖进行数据交换时,只有等待所有分区交换完成后,才能进行后续的计算,非常影响计算速度。

那么如何判断是宽依赖还是窄依赖呢?

#判断宽窄依赖
from pyspark import SparkContext
sc = SparkContext()

rdd = sc.parallelize([1,2,3,4])
rdd_kv = sc.parallelize([('a',1), ('b',2), ('c',3)])

#算子演示
rdd2 = rdd.map(lambda x:x+1)
rdd3 = rdd_kv.groupByKey()

#查看结果
# res = rdd2.collect()
# print(res)

res = rdd3.collect()
print(res)

DAG 管理维护rdd之间依赖关系,保证代码的执行顺序,


DAG会根据依赖关系划分stage,每个stage都是一个独立的计算步骤,当发生宽依赖时,会单独拆分一个计算步骤(stage),进行相关数据计算,可以保证每个单独的stage可以并行执行

在发生宽依赖进行shuffle时,会独立的方法执行shuffle计算


拆分计算步骤的本质是为了保证数据计算的并行执行

查看spark的计算过程,通过DAG判断算子是宽依赖还是窄依赖

拆分了计算stage是宽依赖,没有拆分是窄依赖

启动spark的历史日志

start-history-server.sh

在这里插入图片描述
在这里插入图片描述

4.shuffle过程

mapreduce的shuffle作用: 将map计算后的数据传递给redue使用。
mapreduce的shuffle过程: 分区(将相同key的数据放在一个分区,采用hash),排序,合并(规约)。
将map计算的数据传递给reduce


spark中也有shuffle
当执行宽依赖的算子就会进行shuffle
将rdd的数据传递给下一个rdd,进行数据交换


无论是spark还是mr,shuffle的本质是传递交换数据。

在这里插入图片描述

4.1 shuffle介绍

  • spark的shuffle的两个部分
    • shuffle write 写
    • shuffle read 读
    • 会进行文件的读写,影响spark的计算速度
  • spark的shuffle方法类
    • 是spark封装好的处理shuffle的方法
    • hashshuffle类
      • 进行的是hash计算
      • spark1.2版本之前主要使用,之后引入了sortshuffle
      • spark2.0之后,删除了hashshuffle,从2.0版本开始使用sortshuffle类
      • 优化的hashshuffle和未优化的ashshuffle
    • sortshuffle类
      • 排序方式将相同key值数据放在一起
      • sortshuffle类使用时,有两个方法实现shuffle
        • bypass模式版本和普通模式版本
        • bypass模式版本不会排序,会进行hash操作
        • 普通模式版本会排序进行shuffle
      • 可以通过配置指定按照那种模式执行 根据task数量决定 默认 task数量小于等于200 采用bypass,task数量超过200个则使用普通模式的方法进行shuffle
      • 一个分区对应一个task,所以task数量由分区数决定

普通模式和bypass模式的主要区别在于如何将相同key值的数据放在一起

  • 排序 普通模式采用的策略
  • 哈希取余 bypass模式采用的策略

4.2 spark计算要尽量避免shuffle

# 优化计算,减少shuffle
from pyspark import SparkContext
sc = SparkContext()

rdd = sc.parallelize([('男',20),('男',23),('女',20),('女',22)])
#求不同性别的年龄和
#reduceByKey  是宽依赖算子
rdd2 = rdd.reduceByKey(lambda x,y:x+y)

# 避免shuffle,需要将宽依赖算子计算的过程换成窄依赖
boy = sc.accumulator(0)
girl = sc.accumulator(0)
def func(x):
    if x[0]=='男':
        boy.add(x[1])
    else :
        girl.add(x[1])

    return None
rdd3 = rdd.map(func)

rdd3.collect()
print(boy.value)
print(girl.value)

5. 并行度

  • 资源并行度

    • task在指定任务能够使用到的cpu核心数量

    • 多任务 多个进程或多个线程执行任务

      • 两种方式
        • 并行 多个任务同时执行
        • 并发 任务交替执行
        • 和cpu的核心数有关
          • 例如
          • cpu核心是4核 有两个线程任务 两个线程任务可以 并行执行
          • cpu核心是4核 有八个线程任务 并发执行
    • spark中cpu核心数据设置

      • –num-executors=2 设置executors数量 和服务器数量保持一致
      • –executor-cores=2 设置每个executors中的cpu核心数 每个服务器中cpu核心数一致
      spark-submit  --master yarn  --num-executors=3   --executor-cores=2
      

      最大支持的task并行数量是 num-executors* executor-cores =6

      需要按照服务器实际的cpu核心数指定 lscpu

  • 数据并行度

    • 就是task数量,task由分区数决定
    • 为了保证task能充分利用cpu资源,实现并行计算,需要设置分区数应该和资源并行度一致
    • 在实际公司中就要根据公司资源并行度进行设置分区数
    • 有的场景下公司会要求数据并行度大于资源并行度

资源并行度,

按照yarn安装的服务器数量指定excutor数量 3

核心数量按照yarn中的nodemanager中的核心数指定 2
数据并行度指定

官方建议 数据并行度的task数量和资源并行度数量一致

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2206557.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

源代码加密有哪些技巧呢?除了用加密软件还有哪些方法?

导语:源代码加密是保护软件核心资产的主要方式了,可以通过多种技术措施确保源代码不被未授权访问、复制或篡改,防止泄密问题。这篇文章是一些有效的源代码加密技巧,欢迎您的阅读! 源代码加密常用技巧1、访问控制&…

俏生元将传统膳食智慧融入现代生活,自然成分绽放健康光彩

近年来,当代女性健康食品市场正经历快速发展和显著变化。随着女性健康意识的提升,市场对专门针对女性健康的产品需求快速上升。女性消费者对健康的关注不再局限于表面,而是越来越注重内在健康和生活质量的提升。此外,中式养生文化…

Python 基于 Bert 的中文情歌分析,多分类中文情感分析

前言 在自然语言处理(NLP)领域中,情感分析是一项非常常见的任务。它的目标是判断文本的情感倾向,例如在社交媒体上的评论、产品评价、电影评论等数据中,识别文本是正面的、负面的,还是中性的。与传统的二分…

中航资本:科技股有哪几种?科技股都包含哪些板块?

科技股主要有两种,一种是软科技,一种是硬科技。 1、硬科技:需要强健的数学、物理、计算机科学和工程技能才能够完毕,例如:光电芯片、‌人工智能、‌航空航天、‌生物技能、‌信息技能、计算机视觉、集成电路规划、软件…

2024双十一值得入手的好物品牌有哪些?精选五款双十一必入好物推荐

在双十一这个全民狂欢的购物节,各大品牌纷纷推出诱人的优惠活动,让人心动不已。今年,有哪些品牌的好物值得我们关注呢?下面,我将为大家精选五款双十一必入好物推荐,让你的购物车不再空虚。 双十一必入好物…

怎么把人声和背景音乐分离?实用方法:将人声从音乐中剥离

怎么把人声和背景音乐分离?在数字音频处理中,将人声与背景音乐进行分离是一个复杂但富有挑战性的任务。这种分离技术在音乐制作、影视编辑、版权管理以及个人娱乐等多个领域具有广泛的应用价值。 虽然完美地将人声和背景音乐分离开来可能仍然是一个技术难题&#…

⽂件操作详解

本章讲述的是有关文件的相关内容,本章我们会认识到什么是文件,二进制文件与文本文件,文件的打开和关闭,⽂件的顺序读写和随机读写以及⽂件读取结束的判定和⽂件缓冲区。 1.什么是⽂件 像这样在磁盘(硬盘)上…

骑行眼镜的选择攻略:评论与实用建议!

骑行眼镜评论分析 目录 骑行眼镜评论分析 1. 评论的基本统计分析(数据来源:淘宝评论信息接口) 评论长度的集中趋势: 评论长度的离散程度: 2.用户评价分析 词云高频词分析 1. 眼镜 (2***8 次出现): 2. 不错 (1***9 次出现)…

(29)数字信号处理中正弦波的表示

文章目录 前言一、奈奎斯特采样定理二、正弦波的时域表示三、数字信号处理中正弦波的表示方法 前言 本文首先介绍了奈奎斯特采样定理,然后以奈奎斯特采样定理为基础,给出MATLAB代码,说明在数字信号处理中如何表示一个正弦波。 一、奈奎斯特采…

Flutter技术学习

以下内容更适用于 不拘泥于教程学习,而是从简单项目入手的初学者。 在开始第一个项目之前,我们先要了解 两个概念。 Widget 和 属性 Widget 是用户界面的基本构建块,可以是任何 UI 元素。属性 是 widget 类中定义的变量,用于配…

Python脚本分类和代码举例

Python是一种强大且灵活的编程语言,被广泛应用于数据分析、Web开发、自动化、人工智能等领域。在不同的应用场景下,Python脚本可以被分类为多种类型。本文将深入分析Python脚本的分类,同时提供相关代码示例,帮助读者理解和应用这些…

Spring AI 介绍与入门使用 -- 一个Java版Langchain

Langchain 是什么? Langchain 是一个Python 的AI开发框架,它集成了模型输入输出、检索、链式调用、内存记忆(Memory)、Agents以及回调函数等功能模块。通过这些模块的协同工作,它能够支持复杂的对话场景和任务执行流程…

【大数据】数据采集工具sqoop介绍

文章目录 什么是sqoop?一、Sqoop的起源与发展二、Sqoop的主要功能三、Sqoop的工作原理四、Sqoop的使用场景五、Sqoop的优势六、Sqoop的安装与配置 sqoop命令行一、Sqoop简介与架构二、Sqoop特点三、Sqoop常用命令及参数四、使用示例五、注意事项 什么是sqoop? Sqoop是一款开…

BlackMarket_ 1靶机渗透

项目地址 plain https://download.vulnhub.com/blackmarket/BlackMarket.zip 实验过程 开启靶机虚拟机 ![](https://img-blog.csdnimg.cn/img_convert/169d964d61ea9660c1104e723f71449e.png) 使用nmap进行主机发现,获取靶机IP地址 plain nmap 192.168.47.1-254…

D34【python 接口自动化学习】- python基础之输入输出与文件操作

day34 文件关闭 学习日期:20241011 学习目标:输入输出与文件操作﹣-46 常见常新:文件的关闭 学习笔记: 文件关闭的内部工作过程 close()函数 with语句 常用的打开关闭文件 # 文件关闭 # 方式…

第十八篇——有什么比无穷大更大,比无穷小更小?

目录 一、背景介绍二、思路&方案三、过程1.思维导图2.文章中经典的句子理解3.学习之后对于投资市场的理解4.通过这篇文章结合我知道的东西我能想到什么? 四、总结五、升华 一、背景介绍 看到这篇文章之后,我才发现,我还可以多么有知&…

IP-Guard与Ping32两大加密软件对比:安全性、功能与性能全面评测

随着网络安全威胁的不断升级,越来越多的企业开始重视文件加密,尤其是那些涉及敏感数据、技术文档和业务机密的文件。市场上涌现出众多加密软件,而IPGuard与Ping32作为其中的佼佼者,分别在不同的企业中获得了广泛应用。本文将对这两…

胤娲科技:破茧成蝶——具身智能工业机器人引领工业新纪元

想象一下,走进一家未来的工厂,不再是冰冷的机械臂在单调地重复着同样的动作,而是灵活多变的智能机器人, 它们能够“看一遍、学一遍、做一遍”,然后高效地投入到生产中。这样的场景,是否已经让你对未来充满了…

MySQL基本语法、高级语法知识总结以及常用语法案例

MySQL基本语法总结 MySQL是一种广泛使用的关系型数据库管理系统,其基本语法涵盖了数据库和数据表的创建、查询、修改和删除等操作。 一、数据库操作 创建数据库(CREATE DATABASE) 语法:CREATE DATABASE [IF NOT EXISTS] databa…

工业4G路由R10提升物流仓储效率

在当今全球化的商业环境中,物流仓储行业面临着越来越大的压力,需要不断提高效率、降低成本并确保货物的安全与准时交付。面对这些挑战,技术革新成为了推动行业发展的关键力量。工业4G路由R10作为一款集成了边缘计算、数据采集、协议转换、远程…