Spark 共享变量:广播变量与累加器解析

news2024/12/23 4:23:15

Spark 的介绍与搭建:从理论到实践_spark环境搭建-CSDN博客

Spark 的Standalone集群环境安装与测试-CSDN博客

PySpark 本地开发环境搭建与实践-CSDN博客

Spark 程序开发与提交:本地与集群模式全解析-CSDN博客

Spark on YARN:Spark集群模式之Yarn模式的原理、搭建与实践-CSDN博客

Spark 中 RDD 的诞生:原理、操作与分区规则-CSDN博客

Spark 中的 RDD 分区的设定规则与高阶函数、Lambda 表达式详解-CSDN博客

RDD 算子全面解析:从基础到进阶与面试要点-CSDN博客

PySpark 数据处理实战:从基础操作到案例分析-CSDN博客

Spark 的容错机制:保障数据处理的稳定性与高效性-CSDN博客

目录

一、需求背景

二、广播变量(Broadcast Variables)

(一)功能

(二)语法 / 用法

(三)示例代码修改

(四)本质与优势

三、累加器(Accumulators)

(一)需求示例

(二)原理与功能

(三)使用方法与示例代码修改

四、总结


        在 Spark 大数据处理框架中,共享变量是一个非常重要的概念。当我们处理一些涉及到不同计算节点(Executor)需要访问相同数据的场景时,共享变量就发挥了关键作用。本文将深入探讨 Spark 中的广播变量和累加器,包括它们的使用场景、原理以及如何在实际代码中应用。

一、需求背景

        假设我们有一份用户数据(user.tsv),其中包含用户的一些基本信息如用户 id、用户名、年龄和城市 id,同时我们还有一个城市字典(city_dict),它存储了城市 id 与城市名称的对应关系。我们的目标是将这两份数据进行处理,得到包含用户完整信息(用户 id、用户名、年龄、城市 id、城市名称)的结果集。

user.tsv数据如下

user001 陆家嘴 18 2
user002 羊毛 20 5
user003 爱丽丝 22 6
user004 蒸饭 24 8
user005 淘米 26 1
user006 小笼包 28 7
user007 凉粉 30 4
user008 泡腾片 25 10
user009 炒米 27 3
user010 颖火虫 29 9

 city中的字典如下

city_dict = {
		1: "北京",
		2: "上海",
		3: "广州",
		4: "深圳",
		5: "苏州",
		6: "无锡",
		7: "重庆",
		8: "厦门",
		9: "大理",
		10: "成都"
	}

示例代码如下

import os
# 导入pyspark模块
from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
	# 配置环境
	os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
	# 配置Hadoop的路径,就是前面解压的那个路径
	os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
	# 配置base环境Python解析器的路径
	os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
	os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

	# 获取 conf 对象
	# setMaster  按照什么模式运行,local  bigdata01:7077  yarn
	#  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核
	#  appName 任务的名字
	conf = SparkConf().setMaster("local[*]").setAppName("第一个Spark程序")
	# 假如我想设置压缩
	# conf.set("spark.eventLog.compression.codec","snappy")
	# 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
	sc = SparkContext(conf=conf)

	fileRdd = sc.textFile("../datas/user.tsv")
	city_dict = {
		1: "北京",
		2: "上海",
		3: "广州",
		4: "深圳",
		5: "苏州",
		6: "无锡",
		7: "重庆",
		8: "厦门",
		9: "大理",
		10: "成都"
	}
	def getLine(line):
		list01 = line.split(" ")
		cityName = city_dict.get(int(list01[3]))
		# print(cityName)
		return line + " " + cityName
	mapRdd = fileRdd.map(getLine)
	mapRdd.foreach(print)

	# 使用完后,记得关闭
	sc.stop()

结果如下

user007 凉粉 30 4 深圳
user008 泡腾片 25 10 成都
.....

        在 Spark 中,user_rdd 的计算处理在 Executor 中进行,而 city_dict 的数据存储在 Driver 的内存中。这就引发了一个问题:计算过程中每个 Task 是如何获取 city_dict 的数据呢?如果 city_dict 的数据量很大(例如 1G),每个 Task 都要从 Driver 中下载一份(假设存在多个 Task 导致总下载量达到 6G),那么网络传输的开销将非常大,性能会变得很差。

二、广播变量(Broadcast Variables)

(一)功能

        广播变量的主要功能就是将一个变量元素广播到每台 Worker 节点的 Executor 中。这样一来,每个 Task 就可以直接从本地读取数据,从而大大减少网络传输的 I/O。

(二)语法 / 用法

在 Spark 中使用广播变量,首先需要创建一个广播变量对象。例如:

broadcastValue = sc.broadcast(city_dict)

        这里的 sc 是 SparkContext 对象,city_dict 是我们想要广播的数据(在这个例子中是城市字典)。创建广播变量后,在需要使用该数据的地方,可以通过 broadcastValue.value 来获取广播的数据。

此链接是官方给的API文档:RDD Programming Guide - Spark 3.5.3 Documentation

(三)示例代码修改

        在我们的用户数据处理示例中,原始代码在处理每个用户数据行时,需要获取对应的城市名称。修改后的代码如下:

import os
# 导入pyspark模块
from pyspark import SparkContext, SparkConf


if __name__ == '__main__':
	# 配置环境
	os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
	# 配置Hadoop的路径,就是前面解压的那个路径
	os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
	# 配置base环境Python解析器的路径
	os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
	os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

	# 获取 conf 对象
	# setMaster  按照什么模式运行,local  bigdata01:7077  yarn
	#  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核
	#  appName 任务的名字
	conf = SparkConf().setMaster("local[*]").setAppName("第一个Spark程序")
	# 假如我想设置压缩
	# conf.set("spark.eventLog.compression.codec","snappy")
	# 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
	sc = SparkContext(conf=conf)

	fileRdd = sc.textFile("../datas/user.tsv",2)
	city_dict = {
		1: "北京",
		2: "上海",
		3: "广州",
		4: "深圳",
		5: "苏州",
		6: "无锡",
		7: "重庆",
		8: "厦门",
		9: "大理",
		10: "成都"
	}
	# 将一个变量广播出去,广播到executor中,不是task中
	city_dict_broad = sc.broadcast(city_dict)
	def getLine(line):
		list01 = line.split(" ")
		#cityName = city_dict.get(int(list01[3]))
		# 使用广播变量的变量获取数据
		cityName = city_dict_broad.value.get(int(list01[3]))
		# print(cityName)
		return line + " " + cityName
	mapRdd = fileRdd.map(getLine)
	mapRdd.foreach(print)

	# 释放广播变量
	city_dict_broad.unpersist()
	# 使用完后,记得关闭
	sc.stop()

(四)本质与优势

广播变量本质上是一种优化手段。它的优势主要体现在两个方面:

  1. 减少数据传输量:通过广播一个 Driver 中较大的数据,可以减少每次从 Driver 复制的数据量,降低网络 I/O 损耗,从而提高整体性能。
  2. 优化表连接:在两张表进行 Join 操作时,如果一张表较小,可以将小表进行广播,然后与大表的每个部分进行 Join,这样就可以避免 Shuffle Join(Reduce Join),进一步提升性能。

需要注意的是,广播变量是只读变量,不能被修改

三、累加器(Accumulators)

(一)需求示例

        假设我们有搜狗日志的数据,现在需要统计 10 点搜索的数据一共有多少条。如果按照常规的方式编写代码,可能会出现问题。例如:

import os
import re

import jieba
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevel

if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 获取 conf 对象
    # setMaster  按照什么模式运行,local  bigdata01:7077  yarn
    #  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核
    #  appName 任务的名字
    conf = SparkConf().setMaster("local[*]").setAppName("搜索热词案例")
    # 假如我想设置压缩
    # conf.set("spark.eventLog.compression.codec","snappy")
    # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
    sc = SparkContext(conf=conf)

    mapRdd = sc.textFile("../../datas/zuoye/sogou.tsv",minPartitions=8) \
     .filter(lambda line:len(re.split("\s+",line)) == 6) \
     .map(lambda line:(re.split("\s+",line)[0],re.split("\s+",line)[1],re.split("\s+",line)[2][1:-1])).persist(StorageLevel.MEMORY_AND_DISK_2)

    # 统计一天每小时点击量并按照点击量降序排序
    _sum = 0
    def sumTotalLine(tuple1):
        global _sum # 把_sum 设置为全局变量
        timeStr = tuple1[0] # 10:19:18
        if timeStr[0:2] == '10':
            _sum += 1

    mapRdd.foreach(lambda tuple1:sumTotalLine(tuple1))
    print(_sum) # 结果是0


    # 使用完后,记得关闭

        在 Spark 中,上述代码最终结果会是 0。因为 sum = 0 是在 Driver 端的内存中的,Executor 中程序对其进行累加操作并不能改变 Driver 端的结果。

(二)原理与功能

        累加器的功能是实现分布式的计算。它在每个 Task 内部构建一个副本进行累加,并且在最后返回每个 Task 的结果并进行合并。

官方API截图

(三)使用方法与示例代码修改

在 Spark 中使用累加器,首先需要创建一个累加器对象:

accumulator = sc.accumulator(0)

然后在需要进行计数累加的地方使用 accumulator.add(1)。例如:

def getLines(line, accumulator):
    accumulator.add(1)

# 对用户数据 RDD 进行处理并统计数据量
fileRdd.foreach(lambda line: getLines(line, accumulator))

最后可以通过 accumulator.value 获取累加的结果。完整代码如下:

import os
import re

import jieba
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevel


if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 获取 conf 对象
    # setMaster  按照什么模式运行,local  bigdata01:7077  yarn
    #  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核
    #  appName 任务的名字
    conf = SparkConf().setMaster("local[*]").setAppName("搜索热词案例")
    # 假如我想设置压缩
    # conf.set("spark.eventLog.compression.codec","snappy")
    # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
    sc = SparkContext(conf=conf)
    accCounter = sc.accumulator(0)

    mapRdd = sc.textFile("../../datas/zuoye/sogou.tsv",minPartitions=8) \
     .filter(lambda line:len(re.split("\s+",line)) == 6) \
     .map(lambda line:(re.split("\s+",line)[0],re.split("\s+",line)[1],re.split("\s+",line)[2][1:-1])).persist(StorageLevel.MEMORY_AND_DISK_2)

    # 统计一天每小时点击量并按照点击量降序排序
    #_sum = 0
    def sumTotalLine(tuple1):
        #global _sum # 把_sum 设置为全局变量
        timeStr = tuple1[0] # 10:19:18
        if timeStr[0:2] == '10':
            accCounter.add(1)

    mapRdd.foreach(lambda tuple1:sumTotalLine(tuple1))
    print(accCounter.value) # 104694

    # 假如我不知道累加器这个操作,这个题目怎么做?
    print(mapRdd.filter(lambda tuple1: tuple1[0][0:2] == '10').count())


    # 使用完后,记得关闭
    sc.stop()

四、总结

        Spark 中的广播变量和累加器是处理分布式计算中共享数据问题的有效工具。广播变量主要用于在多个 Task 之间共享只读数据,减少网络传输开销;累加器则用于实现分布式环境下的计数或累加操作,确保在不同 Task 中的计算结果能够正确地合并到 Driver 端。在实际的 Spark 大数据处理项目中,合理地运用广播变量和累加器能够显著提高程序的性能和计算的准确性。

        希望通过本文的介绍,读者能够对 Spark 中的共享变量有更深入的理解,并能够在自己的项目中熟练运用广播变量和累加器来优化数据处理流程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2241873.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

前海华海金融创新中心的工地餐点探寻

​前海的工地餐大部分都是13元一份的哈。我在前海华海金融创新中心的工地餐点吃过一份猪杂饭,现做13元一份。我一般打包后回公司吃或直接桂湾公园找个环境优美的地方吃饭。 ​我点的这份猪杂汤粉主要是瘦肉、猪肝、肉饼片、豆芽和生菜,老板依旧贴心问需要…

借助Excel实现Word表格快速排序

实例需求:Word中的表格如下图所示,为了强化记忆,希望能够将表格内容随机排序,表格第一列仍然按照顺序编号,即编号不跟随表格行内容调整。 乱序之后的效果如下图所示(每次运行代码的结果都不一定相同&#x…

【C语言指南】C语言内存管理 深度解析

💓 博客主页:倔强的石头的CSDN主页 📝Gitee主页:倔强的石头的gitee主页 ⏩ 文章专栏:《C语言指南》 期待您的关注 引言 C语言是一种强大而灵活的编程语言,为程序员提供了对内存的直接控制能力。这种对内存…

【Linux网络编程】简单的UDP网络程序

目录 一,socket编程的相关说明 1-1,sockaddr结构体 1-2,Socket API 二,基于Udp协议的简单通信 一,socket编程的相关说明 Socket编程是一种网络通信编程技术,它允许两个或多个程序在网络上相互通信&…

Kafka入门:Java客户端库的使用

在现代的分布式系统中,消息队列扮演着至关重要的角色,而Apache Kafka以其高吞吐量、可扩展性和容错性而广受欢迎。本文将带你了解如何使用Kafka的Java客户端库来实现生产者(Producer)和消费者(Consumer)的基…

STM32设计学生宿舍监测控制系统

目录 前言 一、本设计主要实现哪些很“开门”功能? 二、电路设计原理图 电路图采用Altium Designer进行设计: 三、实物设计图 四、程序源代码设计 五、获取资料内容 前言 随着科技的飞速发展和智能化时代的到来,学生宿舍的安全、舒适…

HTML5实现俄罗斯方块小游戏

文章目录 1.设计来源1.1 主界面1.2 皮肤风格1.2 游戏中界面1.3 游戏结束界面 2.效果和源码2.1 动态效果2.2 源代码 源码下载 作者:xcLeigh 文章地址:https://blog.csdn.net/weixin_43151418/article/details/143788449 HTML5实现俄罗斯方块小游戏&#x…

自由学习记录(22)

最后再总结一下吧 虽然过程里很多细节也许我没有去管,毕竟现在就已经存在更好的解决方案了 但大致思想是了解了 A星是一种网格上的遍历方式,为了找到一个目标点和起点之间的要经过的最短节点组 里面更像是动态规划 每一次的遍历,都是当前…

UNIX网络编程-TCP套接字编程(实战)

概述 TCP客户端/服务器程序示例是执行如下步骤的一个回射服务器: 客户端从标准输入读入一行文本,并写给服务器。服务器从网络输入读入这行文本,并回射给客户端。客户端从网络输入读入这行回射文本,并显示在标准输出上。 TCP服务器…

『VUE』27. 透传属性与inheritAttrs(详细图文注释)

目录 什么是透传属性(Forwarding Attributes)使用条件唯一根节点禁用透传属性继承总结 欢迎关注 『VUE』 专栏,持续更新中 欢迎关注 『VUE』 专栏,持续更新中 什么是透传属性(Forwarding Attributes) 在 V…

408模拟卷较难题(无分类)

模拟卷特别是大题还是很有难度的,而且有些题有错,还是先把真题吃透,后面没时间的话就不整理了。 一棵树转化为二叉树,那么这棵二叉树一定为右子树为空的树 计算不同种形态,即计算6个结点的二叉树有几种形态&#xff0c…

【JavaScript】LeetCode:96-100

文章目录 96 单词拆分97 最长递增子序列98 乘积最大子数组99 分割等和子集100 最长有效括号 96 单词拆分 动态规划完全背包:背包-字符串s,物品-wordDict中的单词,可使用多次。问题转换:s能否被wordDict中的单词组成。dp[i]&#x…

安全见闻1-5

涵盖了编程语言、软件程序类型、操作系统、网络通讯、硬件设备、web前后端、脚本语言、病毒种类、服务器程序、人工智能等基本知识,有助于全面了解计算机科学和网络技术的各个方面。 安全见闻1 1.编程语言简要概述 C语言:面向过程,适用于系统…

相亲小程序(源码+文档+部署+讲解)

最近我在挖掘一些优秀的开源项目时,无意间发现了一个相当给力的系统——相亲小程序管理系统。这个系统不仅功能实用,而且代码结构清晰,易于二次开发。作为一名技术爱好者,我觉得有必要把这个好东西推荐给我的读者们。接下来&#…

RabbitMQ介绍和快速上手案例

文章目录 1.引入1.1同步和异步1.2消息队列的作用1.3rabbitMQ介绍 2.安装教程2.1更新软件包2.2安装erlang2.3查看这个erlang版本2.4安装rabbitMQ2.5安装管理页面2.6浏览器测试2.7添加管理员用户 3.rabbitMQ工作流程4.核心概念介绍4.1信道和连接4.2virtual host4.3quene队列 5.We…

aws(学习笔记第十二课) 使用AWS的RDS-MySQL

aws(学习笔记第十二课) 使用AWS的RDS 学习内容: AWS的RDS-MySQL 1. 使用AWS的RDS 什么是RDS RDS就是Relation Database Service的缩写,是AWS提供的托管关系型数据库系统。让用户能够在 AWS Cloud 云中更轻松地设置、操作和扩展关系数据库。 数据库和we…

跳房子(弱化版)

题目描述 跳房子,也叫跳飞机,是一种世界性的儿童游戏,也是中国民间传统的体育游戏之一。 跳房子的游戏规则如下: 在地面上确定一个起点,然后在起点右侧画 n 个格子,这些格子都在同一条直线上。每个格子内…

A029-基于Spring Boot的物流管理系统的设计与实现

🙊作者简介:在校研究生,拥有计算机专业的研究生开发团队,分享技术代码帮助学生学习,独立完成自己的网站项目。 代码可以查看文章末尾⬇️联系方式获取,记得注明来意哦~🌹 赠送计算机毕业设计600…

Spring系统框架

Spring Framework系统架构 1.Spring核心概念 代码书写现状 耦合度偏高 解决方案 使用对象时,在程序中不要主动使用new产生对象,转换为外部提供对象 IOC(Inversion of Control)控制反转 对象的创建控制权由程序移到外部,这种思想称为控制…

鸿蒙实战:页面跳转

文章目录 1. 实战概述2. 实现步骤2.1 创建项目2.2 准备图片素材2.3 编写首页代码2.4 创建第二个页面 3. 测试效果4. 实战总结 1. 实战概述 实战概述:本实战通过ArkUI框架,在鸿蒙系统上开发了一个简单的两页面应用。首页显示问候语和“下一页”按钮&…