Spark 广播变量累加器

news2025/1/10 21:53:03

广播变量

场景描述:一份数据存在Driver中,但是每个Executor都需要一份。
常规模式下,Driver会给每个分区都发送一份数据。如果在Executor中存在多个分区的情况,那么一个Executor会获得多份数据。
Executor是进程,task是线程。分区位于线程中,那么在同一个Executor进程中,里面的线程是共享数据的。
所以理想情况下,我只给一个Executor发送数据即可,这样可以节约内存和网络IO。
在这里插入图片描述

解决方法: 广播变量
在设置广播变量之后,分区1获取数据,Driver会留下记录,Executor1获取了哪个广播变量。
当分区2找Driver获取数据时,Driver会先检查这个广播变量是否被这个Executor获取过,如果获取过就不会给分区2,并告知分区2这个广播变量已经被获取过,那么分区2就会回到Executor中找其他分区索要数据。

在这里插入图片描述

使用方式:

# 1. 将本地list标记称广播变量即可
broadcast = sc.broadcast(stu_info_list)

# 2. 使用广播变量,从broadcast对象取出本地list对象即可
value = broad.value

# 也就是 先放进去broadcast内部,然后从broadcast内部再取出来,中间传输的是broadcast这个对象了
# 只要中间传输的是broadcast对象,spark就会留意,只会给每个Executor发一份,而不是给每个分区发一份

代码示例:

# coding:utf8
import time

from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    stu_info_list = [(1, '张大仙', 11),
                     (2, '王晓晓', 13),
                     (3, '张甜甜', 11),
                     (4, '王大力', 11)]
    # 1. 将本地Python List对象标记为广播变量
    broadcast = sc.broadcast(stu_info_list)

    score_info_rdd = sc.parallelize([
        (1, '语文', 99),
        (2, '数学', 99),
        (3, '英语', 99),
        (4, '编程', 99),
        (1, '语文', 99),
        (2, '编程', 99),
        (3, '语文', 99),
        (4, '英语', 99),
        (1, '语文', 99),
        (3, '英语', 99),
        (2, '编程', 99)
    ])

    def map_func(data):
        id = data[0]
        name = ""
        # 匹配本地list和分布式rdd中的学生ID  匹配成功后 即可获得当前学生的姓名
        # 2. 在使用到本地集合对象的地方, 从广播变量中取出来用即可
        for stu_info in broadcast.value:
            stu_id = stu_info[0]
            if id == stu_id:
                name = stu_info[1]

        return (name, data[1], data[2])


    print(score_info_rdd.map(map_func).collect())

"""
场景: 本地集合对象 和 分布式集合对象(RDD) 进行关联的时候
需要将本地集合对象 封装为广播变量
可以节省:
1. 网络IO的次数
2. Executor的内存占用
"""

如果两个都是分布式集合对象,那么就不会造成内存浪费。但是两个RDD要结合使用,需要使用JOIN算子。
JOIN必然产生shuffle,可能造成性能降低。
分布式产生很多shuffle,降低性能:使用JOIN时,只会讲分量数据(需要的部分数据)传输。
而广播的本地list,是全量数据传输。
所以再本机数据不大的情况下,几千或者几万条数据,可能大小只有几MB,这样性能还比两个RDD做JOIN操作性能更好。
如果本地数据太大,例如几个GB那种,还是将本地数据存放到分布式集合对象中,使用JOIN算子关联
在这里插入图片描述

累加器

需求:想要对map算子计算中的数据,进行计数累加
得到全部数据计算完后的累加结果

分布式计算中的累加问题:
在这里插入图片描述
打印结果:
在这里插入图片描述

首先要明确非RDD代码由Driver执行
只是把count的值发送到分区,并不是指针,而且就算是指针也访问不到,不属于同一个服务器。连内存都不是同一块,所以指针没有任何用处。
在这里插入图片描述

解决方法:累加器
累加器再各个分区生效。都是访问的同一份数据。

代码演示:

# coding:utf8
import time

from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)

    # Spark提供的累加器变量, 参数是初始值
    acmlt = sc.accumulator(0)

    def map_func(data):
        global acmlt
        acmlt += 1
        # print(acmlt)

    rdd2 = rdd.map(map_func)
    rdd2.cache()  # acmlt = 10,添加缓存
    rdd2.collect() # action算子,执行之后rdd2就会销毁

    rdd3 = rdd2.map(lambda x:x) 
     # 如果不使用缓存,再次使用 rdd2,就会溯源,就又会走一次map算子,输出结果为20
    rdd3.collect()
    print(acmlt) # 添加缓存,输出为10 。 如果不加缓存,输出为20


综合案例

需求:
在这里插入图片描述

# coding:utf8
import time

from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
import re

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    # 1. 读取数据文件
    file_rdd = sc.textFile("../data/input/accumulator_broadcast_data.txt")

    # 特殊字符的list定义
    abnormal_char = [",", ".", "!", "#", "$", "%"]

    # 2. 将特殊字符list 包装成广播变量
    broadcast = sc.broadcast(abnormal_char)

    # 3. 对特殊字符出现次数做累加, 累加使用累加器最好
    acmlt = sc.accumulator(0)

    # 4. 数据处理, 先处理数据的空行, 在Python中有内容并且去除头尾空格,也就是返回True,  
    # 没有内容返回None,也就是返回False
    lines_rdd = file_rdd.filter(lambda line: line.strip())

    # 5. 去除前后的空格
    data_rdd = lines_rdd.map(lambda line: line.strip())

    # 6. 对数据进行切分, 按照正则表达式切分, 因为空格分隔符某些单词之间是两个或多个空格
    # 正则表达式 \s+ 表示 不确定多少个空格, 最少一个空格
    words_rdd = data_rdd.flatMap(lambda line: re.split("\s+", line))

    # 7. 当前words_rdd中有正常单词 也有特殊符号.
    # 现在需要过滤数据, 保留正常单词用于做单词计数, 在过滤 的过程中 对特殊符号做计数
    def filter_func(data):
        """过滤数据, 保留正常单词用于做单词计数, 在过滤 的过程中 对特殊符号做计数"""
        global acmlt
        # 取出广播变量中存储的特殊符号list
        abnormal_chars = broadcast.value
        if data in abnormal_chars:
            # 表示这个是 特殊字符
            acmlt += 1
            return False
        else:
            return True

    normal_words_rdd = words_rdd.filter(filter_func)
    # 8. 正常单词的单词计数逻辑
    result_rdd = normal_words_rdd.map(lambda x: (x, 1)).\
        reduceByKey(lambda a, b: a + b)

    print("正常单词计数结果: ", result_rdd.collect())
    print("特殊字符数量: ", acmlt)

总结

1、广播变量解决了什么问题
分布式集合RDD和本地集合进行关联使用的时候,降低内存占用以及减少网络IO传输,提高性能

2、累加器解决了什么问题
分布式代码执行中,进行全局累加

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/359510.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微信小程序阻止页面返回(包滑动、自动返回键)

这个场景还是挺有意思的,比如某多多,只要你点左上角的返回 好家伙,满满又 花不了 的优惠券就来了,让你拥有一种消费最划算的感觉。 如果你的场景比较简单,只是对左上角的返回进行监听,只需要关闭自带的导航…

16_FreeRTOS队列集

目录 队列集 队列集相关API函数介绍 队列集使用流程 实验源码 队列集 一个队列只允许任务间传递的消息为同一种数据类型,如果需要在任务间传递不同数据类型的消息时,那么就可以使用队列集! 作用:用于对多个队列或信号量进行“监听”其中不管哪一个消息到来,都…

JVM学习笔记四:运行时数据区之虚拟机栈

目录 概述 StackOverflowError测试案例 栈运行原理 栈帧的内部结构 改变栈帧大小的StackOverflowError测试案例 局部变量表 局部变量槽 操作数栈 动态链接 静态链接 动态链接 早期绑定 晚期绑定 方法返回地址 概述 与程序计数器一样,Java虚拟机栈也是…

4665: 求前n项和

描述给定序列&#xff1a;求前n项之和。输入输入数据有多组&#xff0c;第一行为数据的组数t&#xff08;1<t<15&#xff09;。每组数据有一行&#xff0c;每行为一个正整数n&#xff08;n<1000000&#xff09;。输出每组输出前n项的和&#xff0c;保留4位小数。样例输…

【编程入门】应用市场(安卓版)

背景 前面已输出多个系列&#xff1a; 《十余种编程语言做个计算器》 《十余种编程语言写2048小游戏》 《17种编程语言10种排序算法》 《十余种编程语言写博客系统》 《十余种编程语言写云笔记》 《N种编程语言做个记事本》 目标 为编程初学者打造入门学习项目&#xff0c;使…

Jmeter常用断言之BeanShell断言详解

BeanShell断言可以使用beanshell脚本来执行断言检查&#xff0c;可以用于更复杂的个性化需求&#xff0c;使用更灵活&#xff0c;功能更强大&#xff0c;但是要能够熟练使用beanshell脚本 在这里除了可以使用beanshell的内置变量外&#xff0c;主要通过 Failure 和 FailureMess…

es 7.8.0 linux 集群

1. 下载es linux版本的数据包 地址: https://www.elastic.co/cn/downloads/past-releases#elasticsearch 解压: 解压 tar -xzvf xxx 2. 我是在一个服务器上测试的,实际上是不同的服务器 所以复制了三份,模拟多节点 进去之后主要是修改elasticsearch.yml 内容如下 节点一…

关于在VM上的windows server 2022系统安装

目录 1、windows serer 2022安装的准备工作 1&#xff09;下载系统 2&#xff09;寻找对应系统密钥 3&#xff09;配置server系统开机配置项&#xff08;可能会出现sconfig配置界面&#xff09; 2、开始安装server系统 1、windows serer 2022安装的准备工作 1&#xff09;…

Dropout

目录一、Dropout出现的原因二、什么是Dropout&#xff1f;三、为什么Dropout解决过拟合?3.1 取平均的作用3.2 减少神经元间复杂的共适应关系四、实现Dropout—— pytorchexample 1example 2example 3设置dropout参数技巧一、Dropout出现的原因 在机器学习的模型中 如果模型的…

处理窗口的常用API函数及窗口处理经验总结(附源码)

目录 1、检测窗口状态 2、将窗口前置显示 2.1、将窗口拉到最前面显示 2.2、将窗口置顶显示 2.3、将窗口设置到指定窗口的上面 3、将不显示的窗口强行显示出来 4、获取窗口的信息 5、通过窗口信息去查找窗口 5.1、调用GetClassName接口去比对窗口的类名 5.2、调用Find…

清理bib文件(删除重复项,仅保留tex中引用的条目)

在写latex文件的过程中&#xff0c;经常会遇到添加了一堆文献的bibtex到bib文件中&#xff0c;有时候文章一长同一篇文献用不同的cite-key引用了多次&#xff0c;同时也会有一些文献最后并没被正文引用&#xff0c;这就需要对bib文件进行清理。 删除重复项 可以用JabRef 在J…

45岁当打之年再创业,剑指中国版ChatGPT,这位美团联合创始人能否圆梦?

文 BFT机器人 “即便只有一个人&#xff0c;我也要出发。” 这是45岁的前美团联合创始人王慧文再次冲上创业沙场的“征战”宣言&#xff0c;这一次他的梦想是“组队拥抱新时代&#xff0c;打造中国OpenAI”。 01 当打之年&#xff0c; AI新梦再起航 “我的人工智能宣言&…

视频投票和图文投票之间的差异投票链接制作平台微擎投票

“我的舞台我的梦”网络评选投票_线上小程序的投票方式_视频投票的功能_在线投票程序用户在使用微信投票的时候&#xff0c;需要功能齐全&#xff0c;又快捷方便的投票小程序。而“活动星投票”这款软件使用非常的方便&#xff0c;用户可以随时使用手机微信小程序获得线上投票服…

Spring中的拦截器

这里写目录标题基本概念HandlerInterceptor拦截器HandlerInterceptor讲解MethodInterceptor拦截器二者的区别基本概念 在web开发中&#xff0c;拦截器是经常用到的功能。它可以帮我们预先设置数据以及统计方法的执行效率等等。 Spring中拦截器主要分两种&#xff0c;一个是Han…

【学习总结】激光雷达与相机外参标定:代码(cam_lidar_calibration)

前段时间尝试了一款激光雷达和相机标定的代码&#xff0c;总结了博客&#xff1a; 【学习总结】激光雷达与相机外参标定&#xff1a;原理与代码 但总觉得那个代码太差劲&#xff0c;而且精度不行&#xff0c;于是又找了些新的代码&#xff0c;体验比之前的好很多&#xff0c;在…

【自然语言处理】主题建模:Top2Vec(理论篇)

主题建模&#xff1a;Top2Vec&#xff08;理论篇&#xff09;Top2Vec 是一种用于 主题建模 和 语义搜索 的算法。它自动检测文本中出现的主题&#xff0c;并生成联合嵌入的主题、文档和词向量。 算法基于的假设&#xff1a;许多语义相似的文档都可以由一个潜在的主题表示。首先…

90后,转行软件测试3年,从月入7000+到月入过万,整理出的这一万字经验分享。

周一发工资了&#xff0c;到手12857.65&#xff0c;美滋滋 今年是我毕业参加工作的第3年&#xff0c;工资终于来到5位数了。上一家公司月薪7000&#xff0c;实际拿到手就6450左右&#xff0c;感觉今年真的是元气满满啊&#xff0c;工资翻倍&#xff0c;良好的人生开端。 想起…

Odoo丨Odoo框架源码研读二:ORM框架与日志

Odoo丨Odoo框架源码研读二&#xff1a;ORM框架与日志 而Odoo在实际开发的大多数场景都是基于它的ORM框架进行的&#xff0c;所以本期我们将带来Odoo框架源码的第二期内容——ORM和日志。 *ORM* Odoo是通过Controller控制器&#xff0c;来控制前后台的交互。上一期我们详细的…

算法专题训练营

动归算法专题 1.拆分词句 是不是,在不在都是可以用动归解决的 状态转义方程不一定都是等式,也有可能是条件 2.三角形 动归算法也不是一定要借助新开空间,也是可以用自己原来的空间 3.背包问题 4.分割回文串-ii 5.不同的子序列 贪心算法专题 只管一步的最优结果, 1.分割平衡…

前缀和差分(C/C++)

目录 1. 前缀和的定义 2. 一维前缀和 2.1 计算公式 2.2 用途 2.3 小试牛刀 3. 二维前缀和 3.1 用途 1. 前缀和的定义 对于一个给定的数列A&#xff0c;他的前缀和数中 S 中 S[ i ] 表示从第一个元素到第 i 个元素的总和。 如下图&#xff1a;绿色区域的和就是前缀和数组…