Spark 广播/累加

news2025/1/11 23:48:45

Spark 广播/累加

  • 广播变量
    • 普通变量广播
    • 分布式数据集广播
    • 克制 Shuffle
    • 强制广播
      • 配置项
      • Join Hints
      • broadcast
  • 累加器

Spark 提供了两类共享变量:广播变量(Broadcast variables)/累加器(Accumulators)

广播变量

创建广播变量的方式:

  • 从普通变量创建广播变量 : 由 Driver 分发各 Executors
  • 从分布式数据集创建广播变量 : Driver 拉取各 Executors 分区数并合并, 再分发各Executors

普通变量广播

普通变量分发 :

  • 普通变量在 Driver 端创建 (非分布式数据集),要把普通变量分发给每个 Task
  • 以 Task 粒度分发,当有 n 个 Task,变量就要分发 n 次
  • 在同个 Executor 内部,多个不同的 Task 多次重复缓存同样的内容 , 对内存资源浪费

在这里插入图片描述

广播变量分发:

  • 以 Executors 粒度分发,同个 Executor 的 各 Tasks 互相拷贝。即:变量分发数 = Executors 数

普通变量广播:

val list: List[String] = List("Apache", "Spark")

val bc = sc.broadcast(list)

在这里插入图片描述

分布式数据集广播

创建分布式数据集广播:

val userFile: String = "hdfs://ip:port/rootDir/userData"
val df: DataFrame = spark.read.parquet(userFile)

val bc_df: Broadcast[DataFrame] = spark.sparkContext.broadcast(df)

分布式数据集广播过程 :

  • Driver 从所有的 Executors 拉取这些数据分区,再在本地构建全量数据
  • Driver 把合并的全量数据分发给各个 Executors
  • Executors 收到数据后,缓存到存储系统的 BlockManager

在这里插入图片描述

克制 Shuffle

无优化时,默认用 Shuffle Join

val transactionsDF: DataFrame = _
val userDF: DataFrame = _

transactionsDF.join(userDF, Seq("userID"), "inner")

Shuffle Join 的过程 :

  1. 对关联俩表分别进行 Shuffle
  2. Shuffle 的分区规则:先对 Join keys 计算哈希值,再对哈希值进行分区数取模
  3. Shuffle 后,同 key 的数据会在同个 Executors
  4. Reduce Task 对 同 key 的数据进行关联

在这里插入图片描述

优化代码:

import org.apache.spark.sql.functions.broadcast

val transactionsDF: DataFrame = _
val userDF: DataFrame = _

val bcUserDF = broadcast(userDF)
transactionsDF.join(bcUserDF, Seq("userID"), "inner")

广播过程:

  1. Driver 从所有 Executors 收集 userDF 的所有数据分片,再在本地汇总数据
  2. 给每个 Executors 都发送一份全量数据,各自在本地关联
  3. 利用广播变量 ,就能避免 Shuffle

在这里插入图片描述

强制广播

广播注意点:

  • 创建广播变量越大,网络开销和 Driver 内存也就越大。当广播变量大小 > 8GB,就会直接报错
  • Broadcast Joins 不支持全连接(Full Outer Joins)
  • 左连接(Left Outer Join)时,只能广播右表
  • 右连接(Right Outer Join)时,只能广播左表

配置项

两张 Join 表,只要其中一张表的尺寸 < 10MB,就会采用 Broadcast Joins 做数据关联

# 采用 Broadcast Join 实现的最低阈值
spark.sql.autoBroadcastJoinThreshold 10m

数据在存储/内存大小差异的原因:

  • 为了存储/访问效率,数据采用 Parquet/ORC 格式进行落盘
  • JVM 一般需要比数据原始更大的内存空间来存储对象

准确预估表在内存的大小:

  1. 把表缓存到内存,如: DataFrame/Dataset.cache
  2. 读取执行计划的统计数据
val df: DataFrame = _
df.cache.count

val plan = df.queryExecution.logical
val estimated: BigInt = spark
  .sessionState
  .executePlan(plan)
  .optimizedPlan
  .stats
  .sizeInBytes

Join Hints

Join Hints :在开发中用特殊的语法,告知 Spark SQL 运行时采用这种 Join

val table1: DataFrame = spark.read.parquet(path1)
val table2: DataFrame = spark.read.parquet(path2)

table1.createOrReplaceTempView("t1")
table2.createOrReplaceTempView("t2")

val query: String = "select /*+ broadcast(t2) */ * from t1 
	inner join t2 on t1.id = t2.id"

val queryResutls: DataFrame = spark.sql(query)

DataFrame 的 DSL 语法中使用 Join Hints :

table1.join(table2.hint("b"roadcast"), Seq("key"), "inner")

broadcast

广播数据表 :

import org.apache.spark.sql.functions.broadcast

table1.join(broadcast(table2), Seq(“key”), “inner”)

广播设置要点:以广播阈值配置为主,以强制广播为辅

累加器

累加器的作用:全局计数(Global counter)
SparkContext 提供了 3 种累加器 :

  • longAccumulator:Long 类型的累加器
  • doubleAccumulator :对 Double 类型的数值做全局计数
  • collectionAccumulator :定义集合类型的累加器

累加器在 Driver 端定义,在 RDD 算子中调用 add 进行累加。最后在 Driver 端调用 value ,就能获取全局计数结果

// 定义 Long 类型的累加器
val ac = sc.longAccumulator("Empty string")

def f(x: String): Boolean = {
  if(x.equals("")) {
    // 当遇到空字符串时,累加器加 1
    ac.add(1)
    return false
  } else {
    return true
  }
} 

//用 f 对 RDD 进行过滤
val cleanWordRDD: RDD[String] = wordRDD.filter(f)

// 作业执行完毕,通过调用 value 获取累加器结果
ac.value

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/383309.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

快速上手配置firewalld

firewalld使用firewall-cmd命令配置策略。 查看当前firewalld当前服务运行状态 firewall-cmd --state firewalld防火墙状态还用使用如下命令查看状态 systemctl status firewalld 查看所有打开运行的端口 firewall-cmd --zonepublic --list-ports 查看区域信息情况 firewall…

qml学习之qwidget与qml结合使用并调用信号槽交互

学习qml系列之一说明&#xff1a; 学习qml系列之qwiget和qml信号槽的交互使用&#xff0c;并在qwidget中显示qml界面 在qml中发送信号到qwidget里 在qwidget里发送信号给qml 在qwidget里面调用qml界面方式 方式一&#xff1a;使用QQuickView 这个是Qt5.0中提供的一个类&…

小白量化《穿云箭集群量化》(5)抄底雷达策略

小白量化《穿云箭集群量化》&#xff08;5&#xff09;抄底雷达策略 雷达能够提前发现远处敌我动向。雷达是现代战争不可或缺的装备。 证券市场中分三类人&#xff0c;先知先觉者&#xff0c;后知后觉者&#xff0c;不知不觉者。先知先觉者往往是市场主力&#xff0c;他们拥有信…

Feign踩坑源码分析 -- 请求参数分号变逗号

一.案例 1.1.Post请求&#xff1a; http://localhost:8250/xx/task/test json格式参数&#xff1a; {"string": "a;b;c;d" } 1.2.controller代码&#xff1a; AutowiredDataSourceClientService dataSourceClientService;RequestMapping("/test"…

《计算机原理》——HelloWorld.cpp如何运行的

学校《计算机原理》开课啦&#xff01;特此开辟专栏&#xff0c;将一些知识作为笔记&#xff0c;记录下来。 前言 本篇博客知识点来源于educoder的相关题目 1. 相关知识 1.1 计算机语言 计算机语言是人与计算机之间通讯的语言&#xff0c;计算机语言包括编写计算机程序的字符…

[MatLab]图像绘制

一、绘制二维图像 1.一张图上绘制一条线 绘制代码如下面所示&#xff1a; x 0:0.01:2*pi; y sin(x); figure %建立幕布 plot(x,y) %绘制图像 %设置图像属性 title(ysin(x)) xlabel(x) ylabel(y)xlim([0 2*pi]) %限制x轴的值域 自定义图线的颜色…

GB28181协议--SIP协议介绍

1、SIP协议简介 SIP&#xff08;Session Initiation Protocol&#xff0c;会话初始协议&#xff09;是一个用于建立、更改和终止多媒体会话的应用层控制协议&#xff0c;其中的会话可以是IP电话、多媒体会话或多媒体会议&#xff08;GB28181安防使用的是SIP协议&#xff09;。S…

lab备考第二步:HCIE-Cloud-Compute-第一题:FusionCompute

第一题 FusionCompute 一、题目介绍 1.1. 扩容CAN节点与对接共享存储&#xff08;必选&#xff09; 题目及【考生提醒关键点】 扩容一台CNA节点&#xff0c;配置管理地址设置为&#xff1a;192.168.100.212。密码设置为&#xff1a;Cloud12#$。【输入之前确认自己的大小写是否…

任务类风险漏洞挖掘思路

任务类风险定义&#xff1a; 大部分游戏都离不开任务&#xff0c;游戏往往也会借助任务&#xff0c;来引导玩家上手&#xff0c;了解游戏背景&#xff0c;增加游戏玩法&#xff0c;提升游戏趣味性。任务就像线索&#xff0c;将游戏的各个章节&#xff0c;各种玩法&#xff0c;…

docker上安装nacos

文章目录一、docker安装nacos简单版1.拉取镜像2、挂载目录&#xff0c;用于映射到容器&#xff0c;目录按自己的情况创建3、mysql新建nacos-config的数据库&#xff0c;并执行脚本 sql脚本地址如下&#xff1a;4、修改配置文件custom.properties5、启动容器6、访问二、docker安…

错误:PermissionError: [WinError 32] 另一个程序正在使用此文件,进程无法访问。“+文件路径“的解决方案

最近在使用python进行筛选图片的时候&#xff0c;想到用python里面的os库进行图片的删除。 具体筛选方法就是&#xff0c;删除掉图片长度或宽度小于100像素的图片&#xff0c;示例代码如下所示&#xff1a; for file in os.listdir(img_path):if file .split( . )[ - 1 ] j…

深度强化学习DLR

1 强化学习基础知识 强化学习过程&#xff1a;⾸先环境(Env)会给智能体(Agent)⼀个状态(State)&#xff0c;智能体接收到环境给的观测值之后会做出⼀个动作(Action)&#xff0c;环境接收到智能体给的动作之后会做出⼀系列的反应&#xff0c;例如对这个动作给予⼀个奖励(Reward…

射频功率放大器基于纵向导波的杆状构件腐蚀诊断方法的研究

实验名称&#xff1a;基于纵向导波的杆状构件腐蚀诊断方法研究方向&#xff1a;无损探伤测试设备&#xff1a;信号号发生器、安泰ATA-8202功率放大器、数据采集卡、直流电源、超声探头、钢杆、前置放大器。实验过程&#xff1a;图&#xff1a;试验装置试验装置如图3.2所示。监测…

Android Handler机制(四) Message源码分析

一. 简介 接上一篇文章:Android Handler机制(三) Looper源码分析 ,我们来继续分析一下Message源码 这一系列文章都是为了深入理解Handler机制. Message 作为消息传递的载体&#xff0c;源码主要分为以下 几个部分: 1. 操作数据相关&#xff0c;类似 getter()和 setter()这种…

【JAVASE】注解

文章目录1.概述2.JDK内置注解2.1override注解2.2 Deprecated注解3.元注解4.注解中定义属性4.1 属性value4.2 属性是一个数组5. 反射注解6.注解在开发中的作用1.概述 注解&#xff0c;也叫注释&#xff0c;是一种引用数据类型。编译后也同样生成class字节码文件。 语法 [修饰…

QT获取dll库文件详细信息

一、需求背景获取软件下依赖的dll库的版本信息&#xff0c;如下图所示版本为1.0.7.1018二、实现方法2.1步骤windows下实现&#xff0c;基于version.lib(version.dll)提供的函数获取这些信息首先使用GetFileVersionInfoSizeA(W)获取VersionInfo的大小&#xff0c;申请缓冲区&…

团队API管理工具-YAPI

团队API管理工具-YAPI 推荐一款接口管理平台&#xff0c;操作简单、界面友好、功能丰富、支持markdown语法、可使用Postman导入、Swagger同步数据展示、LDAP、权限管理等功能。 YApi是高效、易用、功能强大的api管理平台&#xff0c;旨在为开发、产品、测试人员提供更优雅的接…

stm32智能家居+微信小程序接收控制

这里写目录标题项目介绍mqtt服务器相关知识![在这里插入图片描述](https://img-blog.csdnimg.cn/9ad065fb8fac48b1b975fc3a48b99763.png)下位机代码项目需要的一些开发工具项目介绍 本项目芯片使用STM32F103ZET6,微信小程序开发使用微信开发者工具。 stm32作为下位机&#xff…

【实现点击下载按钮功能 Objective-C语言】

一、实现点击下载按钮功能, 1.接下来,我们再实现另外一个功能,是什么,点击下载按钮吧: 点击下载按钮,是不是要有效果啊, 就是给大家实现这个功能, 首先,我们要实现单击这个效果,是不是要给按钮注册单击事件吧, 请问,这个按钮在哪里啊,是在控制器里面吗,不是,…

Spark性能优化一 概念篇

&#xff08;一&#xff09;宽依赖和窄依赖 窄依赖(Narrow Dependency)&#xff1a;指父RDD的每个分区只被子RDD的一个分区所使用&#xff0c;例如map、filter等 这些算子一个RDD&#xff0c;对它的父RDD只有简单的一对一的关系&#xff0c;也就是说&#xff0c;RDD的每个part…