Spark累加器(Accumulator)

news2025/1/11 22:38:46

1.累加器类型:

  • 数值累加器:用于计算总和、计数等。
  • 布尔累加器:用于计算满足特定条件的次数。
  • 自定义累加器:允许定义复杂的聚合逻辑和数据结构。
  • 集合累加器:用于计算唯一元素的数量,处理去重操作。

    在 Spark 中,累加器(Accumulators)是一种可以用来在任务执行过程中进行累积的变量。它们主要用于计算全局的汇总值,如计数或求和。累加器是只加的变量(即只进行累加操作),并且是分布式的,适合于在多节点环境中进行汇总。

2.示例:

2.1(数值累加器):假设我们有一个包含整数的 RDD,我们希望计算这些整数的总和,并使用累加器来进行累积。
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   File Name:     1.测试累加器
   date:          2024/7/30
-------------------------------------------------
PRODUCT:PyCharm
-------------------------------------------------
"""
from pyspark import SparkContext

# 初始化 SparkContext
sc = SparkContext("local[*]", "测试累加器")
# 创建累加器
accumulator = sc.accumulator(0)


# 定义一个函数来增加累加器的值
def add_to_accumulator(x):
    global accumulator
    accumulator.add(x)


# 创建一个 RDD
rdd = sc.parallelize([1, 2, 3, 4])

# 使用 map 来应用函数,并累加值
rdd.foreach(lambda x: add_to_accumulator(x))

# 由于累加器的值在行动操作之后才会被更新,所以需要使用行动操作触发计算
rdd.count()  # 触发计算

# 打印累加器的值
print("Accumulated value:", accumulator.value)

2.2(自定义累加器):自定义累加器允许你定义自己的累加逻辑和数据结构。这些累加器可以包含复杂的聚合操作和自定义数据结构。
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   File Name:     4.自定义累加器测试
   date:          2024/7/30
-------------------------------------------------
PRODUCT:PyCharm
-------------------------------------------------
"""
from pyspark import SparkContext
from pyspark.accumulators import AccumulatorParam

sc = SparkContext("local[*]", "自定义累加器测试")


# 自定义累加器类
class ListAccumulatorParam(AccumulatorParam):
    def zero(self, value):
        return []

    def addInPlace(self, acc1, acc2):
        return acc1 + acc2


list_accumulator = sc.accumulator([], ListAccumulatorParam())


def add_to_list_accumulator(x):
    global list_accumulator
    list_accumulator.add([x])
    return x


rdd = sc.parallelize([1, 2, 3, 4])
rdd.foreach(lambda x: add_to_list_accumulator(x))

rdd.count()  # 触发计算

print("Accumulated list:", list_accumulator.value)

解释
  • 自定义累加器类ListAccumulatorParam 定义了一个自定义累加器,zero 方法返回一个空列表,addInPlace 方法合并两个列表。
  • 创建自定义累加器list_accumulator = sc.accumulator([], ListAccumulatorParam()) 创建了一个自定义的累加器实例。
  • 更新累加器add_to_list_accumulator(x) 函数将每个元素作为列表加到累加器中。
  • 应用函数rdd.foreach(lambda x: add_to_list_accumulator(x))add_to_list_accumulator 函数应用到 RDD 的每个元素。
  • 触发计算rdd.count() 触发了 RDD 的计算,更新累加器的值。
  • 查看结果list_accumulator.value 获取累加器的最终值,即累加的列表。
  • RDD 中的每个元素 [1, 2, 3, 4] 被转换为单元素列表 [1], [2], [3], [4],并分别添加到累加器中。
  • 累加器的 addInPlace 方法将这些列表合并成一个完整的列表。
2.3(集合累加器):集合累加器用于跟踪独特的元素集合,例如计算唯一元素的数量。它可以用于去重操作。
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   File Name:     3.集合累加器测试
   date:          2024/7/30
-------------------------------------------------
PRODUCT:PyCharm
-------------------------------------------------
"""
from pyspark import SparkContext
from pyspark.accumulators import AccumulatorParam

sc = SparkContext("local[*]", "集合累加器测试")


# 自定义集合累加器类
class SetAccumulatorParam(AccumulatorParam):
    def zero(self, value):
        return set()

    def addInPlace(self, acc1, acc2):
        return acc1.union(acc2)


set_accumulator = sc.accumulator(set(), SetAccumulatorParam())


def add_to_set_accumulator(x):
    global set_accumulator
    set_accumulator.add(set([x]))
    return x


rdd = sc.parallelize([1, 2, 2, 3, 4, 4])
rdd.foreach(lambda x: add_to_set_accumulator(x))

rdd.count()  # 触发计算

print("Unique elements:", len(set_accumulator.value))
解释
  • 自定义累加器类SetAccumulatorParam 定义了一个自定义累加器,zero 方法返回一个空集合,addInPlace 方法合并两个集合。
  • 创建自定义累加器set_accumulator = sc.accumulator(set(), SetAccumulatorParam()) 创建了一个自定义的累加器实例。
  • 更新累加器add_to_set_accumulator(x) 函数将每个元素作为集合添加到累加器中。
  • 应用函数rdd.foreach(lambda x: add_to_set_accumulator(x))add_to_set_accumulator 函数应用到 RDD 的每个元素。
  • 触发计算rdd.count() 触发了 RDD 的计算,更新累加器的值。
  • 查看结果len(set_accumulator.value) 获取累加器的最终值,即唯一元素的数量。
  • RDD 中的元素 [1, 2, 2, 3, 4, 4] 被转换为集合形式,分别是 {1}, {2}, {2}, {3}, {4}, {4}
  • 每个元素的集合被添加到累加器中。由于累加器的合并逻辑是集合的并集,最终的累加器会包含所有唯一的元素,所以最后的计算结果是4。

3.累加器的特点:

  • 只加操作:累加器只能执行加操作,不能进行减操作或其他类型的操作。
  • 分布式支持:累加器在多节点环境下是分布式的,每个 Executor 都会在其本地更新累加器的值。最后,这些本地值会在 Driver 节点上进行合并。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1959679.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Qt Designer,仿作一个ui界面的练习(四):编写代码

一、新建项目,目录结构如图: PYS下存放脚本,SRC下存放资源文件,UIS下存放组态画面文件。 在每个子目录下都有__init__.py文件,系统会自动将其识别为软件包。 其中一个UIS.__init__.py文件的内容: # impo…

手撕数据结构02--二分搜索(附源码)

一、理论基础 二分搜索,也称折半搜索、对数搜索,是一种在有序数组中查找某一特定元素的搜索算法。 二分搜索是一种高效的查找算法,适用于在已排序的数组中查找特定元素。它的基本思想是通过不断将搜索区间对半分割,从而快速缩小…

ROOM数据快速入门

ROOM数据库快速入门 文章目录 ROOM数据库快速入门第一章 准备工作第01节 引入库第02节 布局文件第03节 activity类第04节 效果图 第二章 数据类第01节 实体类(表)第02节 数据访问类(DAO)第03节 数据Service层第04节 RoomDataBase …

达梦数据库DPI 实现两个数据库数据互通

链接字符串是目标访问链接 目标访问用户名 口令实现 31 里访问33库的数据 如果在31上建立视图访问33的某个表 AS SELECT SZZJ.sys_user.id FROM SZZJ.sys_userszzj31_szzj33;

护眼灯哪些牌子好?五款专业护眼灯品牌排行推荐

普通台灯长时间使用下来,眼睛疲劳、酸涩。但当作业或者工作没有做完的时候,还得硬着头皮撑着。大家是不是经常为这种事情发愁?于是,护眼台灯被设计出来了,但市面上出现的护眼台灯种类多,质量也是难以保证&a…

开发进度网站带后台源码

【源码介绍】 后台地址是:admin.php 后台没有账号密码 这个没有数据库 有能力的可以自己改 【搭建教程】 1.源码上传至虚拟机或者服务器 2.绑定域名和目录 3.访问域名安装, 4.安装完成后就行了 注:资源均网络搬运 仅供测试学习使用&#xff…

【数据结构与算法】队列(顺序存储)

队列 一.队列的原理二.队列的结构三.队列初始化四.判断队列是否满或空1.是否为满2.是否为空 五.入队操作六.队列的遍历七.出队操作1.前移2.后指 八.其他小接口1.获取队列首元素2.获取队列长度3.清除队列 酒.总结 一.队列的原理 队列也是一种线性结构,只不过是一种受限制的线性…

微服务面试-分布式 注册中心 远程调用 保护

标红的原理还是不太熟悉 重新看 分布式事务 CAP理论 Consistency(一致性) Availability(可用性) Partition tolerance (分区容错性) BASE 理论 就是做取舍 cap三选二 AT模式脏写 TCC模式 注册中…

25考研数据结构复习·6.4图的应用

最小生成树 Prim算法 从某一顶点开始构建生成树;每次将代价最小的新顶点纳入生成树,知道所有顶点都纳入为止。 时间复杂度O(|V|^2) 适合用于边稠密图 实现思想 从V0开始,总共需要n-1轮处理 每一轮处理:循环遍历所有结点&…

京东商品详情API:多规格商品的返回值处理

处理京东商品详情API中关于多规格商品的返回值,首先需要了解京东API的返回数据结构。通常,对于多规格商品(如不同颜色、尺寸等选项的商品),API会返回一个包含多个规格选项和对应价格、库存等信息的复杂数据结构。 以下…

java中 VO DTO BO PO DAO

VO、DTO、BO、PO、DO、POJO 数据模型的理解和实际使用_vo dto bo-CSDN博客 深入理解Java Web开发中的PO、VO、DTO、DAO和BO概念_java dto dao-CSDN博客

汇凯金业:区块链的介绍和应用场景

区块链, 一个近年来炙手可热的技术名词, 它就像一颗耀眼的明星, 吸引着人们的目光, 引发着人们的思考。 究竟什么是区块链? 它为何能够引发如此巨大的关注? 它又将如何改变我们的未来? 一、 区块链: 去中心化的信任…

中仕公考:什么是事业编?

事业编制内的职员是指那些经过考试选拔,成功进入公共机构服务,同时在人事部门和组织部有正规记录的个体。 入职条件: 要求应聘者参与由事业单位举办的公开招聘考试。 管理方式: 事业编制内职员的人事管理由当地的人事部门或相…

02 RabbitMQ:下载安装

02 RabbitMQ:下载&安装 1. 下载&安装1.1. 官网1.2. Docker方式1.2.1. 下载镜像1.2.2. 启动1.2.3. 登录验证 1. 下载&安装 1.1. 官网 RabbitMQ: One broker to queue them all | RabbitMQ 1.2. Docker方式 1.2.1. 下载镜像 # docker pull 镜像名称[…

Windows API钩子

原文链接:https://www.cnblogs.com/zhaotianff/p/18073138 有前面的文章中,我介绍了一个Windows API的监控工具(API Monitor)的使用。它内部就是使用了Hook机制,能Hook Windows API,能在我们钩选的API函数…

C++必备知识--类和对象

类的定义 格式 class是类的关键字,Stack是类的名字(自己定义的),在{}中定义类的主体。C中类的出现是为了替代C语言中的结构体,所以类的结构与结构体类似,{}之后需要加分号。类体中的内容称为类的成员,类中(声明)的变量…

国内优秀的消防报警设备企业「三江电子」×企企通启动采购数字化项目,共筑消防电子产品数字化新篇章

近日,深圳市高新投三江电子股份有限公司(以下简称“三江电子”)与企企通成功召开SRM采购供应链管理项目启动会。三江电子副总经理黄总、副总经理沈总、企企通高级副总裁徐总,以及双方项目负责人和项目组成员一同出席本次启动会。 …

AlmaLinux9安装中文语言包_zabbix没有中文语言包

更新你的系统包,如果系统最新可以忽略: sudo dnf update安装中文简体语言包 sudo yum install langpacks-zh_CN安装繁体中文包 sudo dnf install kde-l10n-Chinese-traditional安装完成后重启系统,以确保语言设置生效 设置系统为简体中文&…

《昇思25天学习打卡营第25天|第28天》

今天是打卡的第二十八天,实践应用篇中的计算机视觉中Vision Transformer图像分类。 从Vision Transformer(ViT)简介开始了解,模型结构,模型特点,实验的环境准备和数据读取,模型解析&#xff08…

Vue学习---vue 项目结构

这只是一个基础的目录结构,根据项目的复杂性和规模,可能会有所不同。 node_modules 项目依赖文件,其中包括很多基础依赖和自己安装的依赖。 public 存放公共资源和项目的主入口文件inde…