Spark累加器

news2025/1/11 0:02:06

1. 累加器

累加器:分布式共享只写变量

考虑如下计算RDD中数据的和:

val rdd = sc.makeRDD(List(1, 2, 3, 4))

var sum = 0
rdd.foreach(
    num => {
        sum += num
    }
)

println("sum = " + sum)

预期结果10,但其实不是

foreach里面的函数是在Executor节点分布式执行的,所以多个分布式节点来同时修改sum,但是这个sum是Driver传给Executor的,各个Executor执行完并不会将sum返回给Driver,所以Driver端执行打印sum,sum依然为0。如果需要将最终的计算结果返回给Driver,就要使用到累加器变量。

累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的累加器变量,在Executor端会得到这个变量的一个新的副本,执行任务更新完这个副本之后,传回Driver端进行聚合。

使用累加器:

val rdd = sc.makeRDD(List(1, 2, 3, 4))

val sum = sc.longAccumulator("sum")

rdd.foreach(num => {
    sum.add(num)
})

println("sum = " + sum.value)

如果转换算子中使用了累加器,最终没有调用行动算子,那么实际也不会执行计算,例如下面代码sum依然为0 

val rdd = sc.makeRDD(List(1, 2, 3, 4))

val sum = sc.longAccumulator("sum")

rdd.map(num => {
    sum.add(num)
    num
})

println("sum = " + sum.value)

注意,使用累加器最终执行行动算子不止一次,可能最终结果不符合预期

val rdd = sc.makeRDD(List(1, 2, 3, 4))

val sum = sc.longAccumulator("sum")

val mapRDD = rdd.map(num => {
    sum.add(num)
    num
})

mapRDD.collect()
mapRDD.collect()
println("sum = " + sum.value)

所以一般累加器需要放在行动算子里进行操作。

使用累加器实现WordCount,以避免Shuffle操作。首先定义一个自定义累加器,以实现map的累加(因为Spark中只有List集合的累加)

// AccumulatorV2泛型:IN:输入类型; OUT:输出类型
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
    private var wcMap = mutable.Map[String, Long]

    // 判断是否是初始状态
    override def isZero : Boolean = {
        wcMap.isEmpty
    }

    override def copy() : AccumulatorV2[String, mutable.Map[String, Long]] = {
        new MyAccumulator()
    }

    override def reset() : Unit = {
        wcMap.clear()
    }

    // 累加函数,每新来一个输入,如何累加
    override def add(word : String) : Unit = {
        val newCnt = wcMap.getOrElse(word, 0L) + 1
        wcMap.update(word, newCnt);
    }

    // 合并函数,由Driver执行,这里本质是两个map的合并
    override def merge(other : AccumulatorV2[String, mutable.Map[String, Long]]) : Unit = {
    val map1 = wcMap
    val map2 = other

    map2.foreach{
        case(word, count) => {
            val newCnt = map1.getOrElse(word, 0L) + count
            map1.update(word, newCnt)
        }
    }
}

    override def value : mutable.Map[String, Long] = {
        wcMap
    }
}
val rdd = sc.makeRDD(List("hello", "spark", "hello"))

val myAcc = sc.newMyAccumulator()
sc.register(myAcc, "wordCountAcc")

rdd.foreach(word => {
    myAcc.add(word)
})

println(myAcc.value)

2. 广播变量

累加器:分布式共享只读变量

如果想实现两个rdd的相同key的value连接起来,一般会想到join算子,但是join是笛卡尔乘积,且存在shuffle,严重影响性能。可采用如下代码实现类似功能:

val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))

val map = mutable.Map(("a", 4), ("b", 5), ("c", 6))

rdd1.map {
    case(w, c) => {
        val l : Int = map.getOrElse(w, 0)
        (w, (c, l))
    }
}.collect.foreach(println)

上述代码存在一个问题,map算子内的函数是在Executor内执行的(具体来说是Executor里的task执行的),里面用到了map这份数据,如果map很大,那么每个Executor的每个task都会包含这份大数据,非常占用内存,影响性能。于是引入了广播变量的概念,将这种数据存放在每一个Executor的内存中,该Executor中的各个task共享这个只读变量:

val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))

val map = mutable.Map(("a", 4), ("b", 5), ("c", 6))

val bc : Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)

rdd1.map {
    case(w, c) => {
        val l : Int = bc.value.getOrElse(w, 0)
        (w, (c, l))
    }
}.collect.foreach(println)

3. 案例需求

1)已经从原始日志文件中读出了商品的点击数量rdd、下单数量rdd、支付数量rdd,都是(商品id, cnt)的形式,需要将这三种rdd组合成(商品id, (点击数量, 下单数量, 支付数量))的rdd,并且依次按照点击数量、下单数量、 支付数量排序取前十。

很自然地想到组合rdd的算子join,但是join只能组合相同的key,如果一个商品只有点击没有下单,那么使用join是不会出现在最终结果的,同理leftOuterJoin和rightOuterJoin也是类似的,不能实现相应的功能。因此只有cogroup算子满足要求,cogroup = connect + group。(join和cogroup算子的功能示例参见:RDD算子介绍(三))

val cogroupRDD : RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))]= clickRDD.cogroup(orderRDD, payRDD)

val resultRDD = cogroupRDD.mapValues{
    case(clickIter, orderIter, payIter) => {
        var clickCnt = 0
        val iter1 = clickIter.iterator
        if (iter1.hasNext) {
            clickCnt = iter1.next()
        }

        var orderCnt = 0
        val iter2 = orderIter.iterator
        if (iter2.hasNext) {
            orderCnt = iter2.next()
        }

        var payCnt = 0
        val iter3 = payIter.iterator
        if (iter3.hasNext) {
            payCnt = iter1.next()
        }

        (clickCnt, orderCnt, payCnt)
    }
}

resultRDD.sortBy(_._2, false).take(10).foreach(println)

 上述实现方式使用了cogroup算子,该算子可能存在shuffle,且该算子不常用,可以采用另外一种方式实现。首先将商品的点击数量rdd、下单数量rdd、支付数量rdd转换为(商品id, (clickCnt, orderCnt, payCnt))的形式,然后进行union,最后进行聚合。

val rdd1 = clickRDD.map {
    case (id, cnt) => {
        (id, (cnt, 0, 0)
    }
}

val rdd2 = orderRDD.map {
    case (id, cnt) => {
        (id, (0, cnt, 0)
    }
}

val rdd3 = payRDD.map {
    case (id, cnt) => {
        (id, (0, 0, cnt)
    }
}

val rdd : RDD[(String, (Int, Int, Int))] = rdd1.union(rdd2).union(rdd3)

val resultRDD = rdd.reduceByKey((t1, t2) => {
    (t1._1 + t2._1, t1._2 + t2._2, t3._3 + t3._3)
})

resultRDD.sortBy(_._2, false).take(10).foreach(println)

上述代码使用了reduceByKey,还是会有shuffle操作,要完全避免shuffle操作,可以使用foreach算子,如果要使用foreach算子,就必须使用累加器了。这个累加器的作用就是遍历原始数据,按照商品id进行分组,对商品的用户行为类型(点击、下单、支付)对进行数量统计。可以将商品的用户行为(点击、下单、支付)数量封装成一个样例类HotCatagory,然后这个累加器的输入就是商品id+行为类型(点击、下单、支付),输出就是这个样例类的集合。具体实现过程参考https://www.bilibili.com/video/BV11A411L7CK?p=116&spm_id_from=pageDriver&vd_source=23ddeeb1fb342c5293413f7b87367160​​​​​​​

2)统计页面的跳转率。所谓某个页面A到某个页面B的跳转率,就是页面A到页面B的次数/页面A的点击次数。首先统计各个页面的点击次数,数据结构为map,作为分母。对于分子,需要按照sessionId进行分组,然后按照时间排序,这样才能得到各个用户浏览页面的顺序,然后转换数据结构,统计各个页面到其他页面的跳转次数。

actionDataRDD.cache()

// 分母
val pageIdToCountMap : Map[Long, Long] = actionDataRDD.map(action => {
    (action.page_id, 1L)
}).reduceByKey(_+_).collect.toMap

val sessionRDD : RDD[(String, Iterable[UserVisitAction])] = actionDataRDD.groupBy(_.session_id)

val mvRDD : RDD[(String, List[((Long, Long), Int)])] = sessionRDD.mapValues(
    iter => {
        val sortList : List[UserVisitAction] = iter.toList.sortBy(_.action_time)
        val flowIds : List[Long] = sortList.map(_.page_id)

        val pageFlowIds : List[(Long, Long)] = flowIds.zip(flowIds.tail)
        pageFlowIds.map(t=> (t, 1))
    }
)

// 分子
val dataRDD = mvRDD.map(_._2).flatMap(list=>list).reduceByKey(_+_)

dataRDD.foreach {
    case((page1, page2), sum) => {
        val cnt : Long = pageIdToCountMap.getOrElse(page1, 0L)
        println(s"页面${page1}跳转页面${page2}的转换率为" + (sum.toDouble / cnt))
    }
}


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1686803.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

rtemis 包:多种机器学习算法集成!兼顾数据处理与可视化美图

rtemis 是一个集机器学习与可视化于一体的 R 包,用于各种高级机器学习研究和应用。整体而言,该软件有三个目标: 「应用数据科学」:使高级数据分析高效且易于使用 「机器学习研究」:提供一个平台以开发和测试新颖的机器…

添加、修改和删除列表元素

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 添加、修改和删除列表元素也称为更新列表。在实际开发时,经常需要对列表进行更新。下面我们介绍如何实现列表元素的添加、修改和删除。 …

深度学习之使用Matlab乳腺癌分类检测系统

欢迎大家点赞、收藏、关注、评论啦 ,由于篇幅有限,只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 一、项目背景与意义 乳腺癌作为女性最常见的恶性肿瘤之一,对女性的健康构成了严重威胁。乳腺癌的早期发…

基于51单片机的音乐喷泉

基于51单片机的音乐喷泉 (程序+原理图+PCB+设计报告) 功能介绍 具体功能: 1.检测音乐信号的声音强度,使喷头的水柱能够根据音乐的节奏和音量起伏; 2.系统将声音强度转化为模拟信…

神经网络的工程基础(三)——更优化的最优化算法

相关说明 这篇文章的大部分内容参考自我的新书《解构大语言模型:从线性回归到通用人工智能》,欢迎有兴趣的读者多多支持。 本文将讨论更优化的最优化问题算法。 关于大语言模型的内容,推荐参考这个专栏。 内容大纲 相关说明一、概述二、算…

【你眼中的IT行业现状与未来趋势展望】

随着技术的不断进步,IT行业已成为推动全球经济和社会发展的关键力量。从云计算、大数据、人工智能到物联网、5G通信和区块链,这些技术正在重塑我们的生活和工作方式。你眼中IT行业的现状及未来发展趋势是怎么样的?无论您是行业领袖、技术专家…

K8S中Prometheus+Grafana监控

1.介绍 phometheus:当前一套非常流行的开源监控和报警系统。 运行原理:通过HTTP协议周期性抓取被监控组件的状态。输出被监控组件信息的HTTP接口称为exporter。 常用组件大部分都有exporter可以直接使用,比如haproxy,nginx,Mysql,Linux系统信…

论文精读:UFO: A UI-Focused Agent for Windows OS Interaction

UFO : A UI-Focused Agent for Windows OS Interaction Status: Reading Author: Bo Qiao, Chaoyun Zhang, Dongmei Zhang, Liqun Li, Minghua Ma, Qinglong Zhang, Qingwei Lin, Saravan Rajmohan, Shilin He, Si Qin, Xiangyu Zhang, Yu Kang Institution: 微软(…

Xline 0.7重构性能分析总述

1、重构概述 在Xline 0.7.0中,我们完成了对Xline代码库中进行了一次较大的重构。这次重构在某些性能测试中甚至使得Xline获得了近20倍的性能提升。在本文中我会讲解Xline中重构后命令执行流程的新设计,以及我们是如何优化Xline的性能的。 2、etcd的性能…

Map遍历、反射、GC

map的遍历 用foreach遍历 HashMap<Character,Integer> map new HashMap<>();map.put(A,2);map.put(B,3);map.put(C,3);for (Map.Entry<Character,Integer> entry: map.entrySet()) {char key entry.getKey();int value entry.getValue();System.out.prin…

Nacos 进阶篇---服务发现:服务之间请求调用链路分析(六)

一、引言 前面几个章节把Nacos服务注册从客户端到服务端&#xff0c;整个流程源码都分析了一遍。 本章节我们来分析&#xff0c;order-service、stock-service 完成Nacos注册后&#xff0c;可以通过Feign的方式&#xff0c;来完成服务之间的调用。那它的底层是如何实现的&am…

linux下的docker使用

docker是什么&#xff0c;docker翻译过来的意思就是码头工人&#xff0c;顾名思义&#xff0c;docker本质上就是一个搬运工&#xff0c;只不过从搬运货物改成了搬运程序&#xff0c;使搬运的不同的程序能够独立的运行在码头上的不同容器内&#xff0c;互不干扰&#xff0c;而他…

不使用ScrollRect 和 HorizontalLayoutGroup做的横向循环列表

一、 版本一 1.前情提要 因为需要展示300多个相同的物体&#xff0c;但是如果全部放在场景内&#xff0c;运行起来会很卡&#xff0c;所以想到了用无限循环&#xff0c;然后动态填充不同的数据。 做的这个没有用HorizontalLayoutGroup 和 ScrollRect 。 1.没有使用Horizontal…

Git原理及常用命令小结——实用版(ing......)、Git设置用户名邮箱

Git基本认识 Git把数据看作是对小型文件系统的一组快照&#xff0c;每次提交更新&#xff0c;或在Git中保存项目状态时&#xff0c;Git主要对当时的全部文件制作一个快照并保存这个快照的索引。同时&#xff0c;为了提高效率&#xff0c;如果文件没有被修改&#xff0c;Git不再…

JSON的序列化与反序列化以及VSCode执行Run Code 报错

JSON JSON: JavaScript Object Notation JS对象简谱 , 是一种轻量级的数据交换格式。 JSON格式 { "name":"金苹果", "info":"种苹果" } 一个对象&#xff1a;由一个大括号表示.括号中通过键值对来描述对象的属性 (可以理解为, 大…

操作系统总结(2)

目录 2.1 进程的概念、组成、特征 &#xff08;1&#xff09;知识总览 &#xff08;2&#xff09;进程的概念 &#xff08;3&#xff09;进程的组成—PCB &#xff08;4&#xff09;进程的组成---程序段和数据段 &#xff08;5&#xff09;程序是如何运行的呢&#xff1f…

Android和flutter交互,maven库的形式导入aar包

记录遇到的问题&#xff0c;在网上找了很多资料&#xff0c;都是太泛泛了&#xff0c;使用后&#xff0c;还不能生效&#xff0c;缺少详细的说明&#xff0c;或者关键代码缺失&#xff0c;我遇到的问题用红色的标注了 导入aar包有两种模式 1.比较繁琐的&#xff0c;手动将aar…

Java8-HashMap实现原理

目录 HashMap原理 hashmap的put流程&#xff1a; HashMap扩容机制&#xff1a; HashMap的寻址算法&#xff1a; HashMap原理 HashMap的底层数据结构是由&#xff0c;数组&#xff0c;链表和红黑树组成的。 当我们往HashMap中put元素的时候&#xff0c;利用key的hashCode重…

HC32F103BCB使用SPI获取AS5040编码器数据

1.AS5040介绍 2.硬件电路 硬件上使用SSI通信方式连接。 3.配置硬件SPI 查看手册&#xff0c;AS5040时序 可以看到在空闲阶段不发生数据传输的时候时钟(CLK)和数据(DO)都保持高电位(tCLKFE阶段)&#xff0c;在第一个脉冲的下降沿触发编码器载入发送数据&#xff0c;然后每一个…

【Unity Shader入门精要 第9章】更复杂的光照(四)

1. 透明度测试物体的阴影 对于物体有片元丢弃的情况&#xff0c;比如透明度测试或者后边会讲到的消融效果&#xff0c;使用默认的 ShadowCaster Pass 会产生问题&#xff0c;这是因为该Pass在生成阴影映射纹理时&#xff0c;没有考虑被丢弃的片元&#xff0c;而是使用完整的模…