大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化

news2024/9/24 17:16:38

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)

章节内容

上节完成的内容如下:

  • RDD容错机制
  • RDD分区机制
  • RDD分区器
  • RDD自定义分区器

在这里插入图片描述

广播变量

基本介绍

有时候需要在多个任务之间共享变量,或者在任务(Task)和 Driver Program 之间共享变量。
为了满足这个需求,Spark提供了两种类型的变量。

  • 广播变量(broadcast variable)
  • 累加器(accumulators)
    广播变量、累加器的主要作用是为了优化Spark程序。

广播变量将变量在节点的Executor之间进行共享(由Driver广播),广播变量用来高效分发较大的对象,向所有工作节点(Executor)发送一个较大的只读值,以供一个或多个操作使用。

使用广播变量的过程如下:

  • 对一个类型T的对象调用SparkContext.broadcast创建一个Broadcast[T]对象,任何可序列化的类型都可以这么实现(在Driver端)
  • 通过Value属性访问该对象的值(Executor中)
  • 变量只会被分到各个Executor一次,作为只读值处理

在这里插入图片描述
广播变量的相关参数:

  • spark.broadcast.blockSize(缺省值: 4m)
  • spark.broadcast.checksum(缺省值:true)
  • spark.broadcast.compree(缺省值:true)

变量应用

普通JOIN

在这里插入图片描述

MapSideJoin

在这里插入图片描述

生成数据 test_spark_01.txt

1000;商品1
1001;商品2
1002;商品3
1003;商品4
1004;商品5
1005;商品6
1006;商品7
1007;商品8
1008;商品9

生成数据格式如下:
在这里插入图片描述

生成数据 test_spark_02.txt

10000;订单1;1000
10001;订单2;1001
10002;订单3;1002
10003;订单4;1003
10004;订单5;1004
10005;订单6;1005
10006;订单7;1006
10007;订单8;1007
10008;订单9;1008

生成的数据格式如下:
在这里插入图片描述

编写代码1

我们编写代码进行测试

package icu.wzk

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object JoinDemo {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("JoinDemo")
      .setMaster("local[*]")

    val sc = new SparkContext(conf)
    sc.hadoopConfiguration.setLong("fs.local.block.size", 128 * 1024 * 1024)

    val productRDD: RDD[(String, String)] = sc
      .textFile("data/test_spark_01.txt")
      .map {
        line => val fields = line.split(";")
          (fields(0), line)
      }

    val orderRDD: RDD[(String, String)] = sc
      .textFile("data/test_spark_02.txt", 8)
      .map {
        line => val fields = line.split(";")
          (fields(2), line)
      }

    val resultRDD = productRDD.join(orderRDD)
    println(resultRDD.count())
    Thread.sleep(100000)
    sc.stop()
  }

}

编译打包1

mvn clean package

并上传到服务器,准备运行
在这里插入图片描述

运行测试1

spark-submit --master local[*] --class icu.wzk.JoinDemo spark-wordcount-1.0-SNAPSHOT.jar

提交任务并执行,注意数据的路径,查看下图:
在这里插入图片描述
运行结果可以查看到,运行了: 2.203100 秒 (取决于你的数据量的多少)
在这里插入图片描述

2024-07-19 10:35:08,808 INFO  [main] scheduler.DAGScheduler (Logging.scala:logInfo(54)) - Job 0 finished: count at JoinDemo.scala:32, took 2.203100 s
200

编写代码2

接下来,我们对比使用 MapSideJoin 的方式

package icu.wzk

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object MapSideJoin {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("MapSideJoin")
      .setMaster("local[*]")

    val sc = new SparkContext(conf)
    sc.hadoopConfiguration.setLong("fs.local.block.size", 128 * 1024 * 1024)

    val productRDD: RDD[(String, String)] = sc
      .textFile("data/test_spark_01.txt")
      .map {
        line => val fields = line.split(";")
          (fields(0), line)
      }

    val productBC = sc.broadcast(productRDD.collectAsMap())

    val orderRDD: RDD[(String, String)] = sc
      .textFile("data/test_spark_02.txt")
      .map {
        line => val fields = line.split(";")
          (fields(2), line)
      }

    val resultRDD = orderRDD
      .map {
        case (pid, orderInfo) =>
          val productInfo = productBC.value
          (pid, (orderInfo, productInfo.getOrElse(pid, null)))
      }
    println(resultRDD.count())

    sc.stop()
  }

}

编译打包2

mvn clean package

编译后上传到服务器准备执行:
在这里插入图片描述

运行测试2

spark-submit --master local[*] --class icu.wzk.MapSideJoin spark-wordcount-1.0-SNAPSHOT.jar

启动我们的程序,并观察结果
在这里插入图片描述
我们可以观察到,这次只用了 0.10078 秒就完成了任务:
在这里插入图片描述

累加器

基本介绍

累加器的作用:可以实现一个变量在不同的Executor端能保持状态的累加。
累加器在Driver端定义、读取,在Executor中完成累加。
累加器也是Lazy的,需要Action触发:Action触发一次,执行一次;触发多次,执行多次。

Spark内置了三种类型的累加器,分别是:

  • LongAccumulator 用来累加整数型
  • DoubleAccumulator 用来累加浮点型
  • CollectionAccumulator 用来累加集合元素

运行测试

我们可以在 SparkShell 中进行一些简单的测试,目前我在 h122 节点上,启动SparkShell

spark-shell --master local[*]

启动的主界面如下:
在这里插入图片描述
写入如下的内容进行测试:

val data = sc.makeRDD("hadoop spark hive hbase java scala hello world spark scala java hive".split("\\s+"))
val acc1 = sc.longAccumulator("totalNum1")
val acc2 = sc.doubleAccumulator("totalNum2")
val acc3 = sc.collectionAccumulator[String]("allwords")

我们进行测试的结果如下图所示:
在这里插入图片描述
继续编写一段进行测试:

val rdd = data.map{word => acc1.add(word.length); acc2.add(word.length); acc3.add(word); word}
rdd.count
rdd.collect

println(acc1.value)
println(acc2.value)
println(acc3.value)

我们进行测试的结果如下:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2071000.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【代码随想录训练营第42期 Day39打卡 - 打家劫舍问题 - LeetCode 198.打家劫舍 213.打家劫舍II 337.打家劫舍III

目录 一、做题心得 二、题目与题解 题目一:198.打家劫舍 题目链接 题解:动态规划 题目二:213.打家劫舍II 题目链接 题解:动态规划 题目三:337.打家劫舍III 题目链接 题解:动态规划 三、小结 一、…

卸载nomachine

网上的方法:提示找不到命令 我的方法: step1. 终端输入 sudo find / -name nxserver 2>/dev/null确认 NoMachine 的实际安装路径。你可以使用 find 命令在系统中查找 nxserver 脚本的位置。 找到路径后,你可以使用该路径来卸载 NoMachine。 如下图,紫色框中是我的路径…

【ACM出版】第三届公共管理、数字经济与互联网技术国际学术会议(ICPDI 2024,9月06-08)

第三届公共管理、数字经济与互联网技术国际学术会议(ICPDI 2024)定于2024年9月06-08日在中国-济南举行。 会议主要围绕公共管理、数字经济,互联网技术等研究领域展开讨论。会议旨在为从事公共管理、经济、大数据、互联网研究的专家学者提供一…

解决LabVIEW配置文件中文乱码问题

LabVIEW配置文件中的中文字符在程序调用时出现乱码,通常是由于字符编码不匹配引起的。LabVIEW默认使用ANSI编码格式,而配置文件可能使用了不同的编码格式(如UTF-8),导致中文字符在读取时无法正确解析。 解决方法 统一编…

导数的基本法则与常用导数公式的推导

目录 n 次幂函数导数公式的推导导数和的运算法则的证明正弦、余弦函数导数公式的推导代数证明两个重要极限(引理)及证明具体推导 几何直观 导数积的运算法则的证明导数商的法则的证明链式法则的证明有理幂函数求导法则的证明反函数求导法则的证明反正切函…

SSH 远程登录报错:kex_exchange_identification: Connection closed.....

一 问题起因 在公司,使用ssh登录远程服务器。有一天,mac终端提示:`kex_exchange_identification: Connection closed by remote host Connection closed by UNKNOWN port 65535`。 不知道为啥会出现这样的情形,最近这段时间登录都是正常的,不知道哪里抽风了,就提示这个。…

C++初学(15)

前面学习了循环的工作原理,接下来来看看循环完成的一项最常见的任务:逐字符地读取来自文本或键盘的文本。 15.1、使用cin进行输入 如果需要程序使用循环来读取来自键盘的文本输入,则必须有办法直到何时停止读取。一种方式是选择某个特殊字符…

发布分班查询,老师都在用哪个小程序?

新学期伊始,校园里又迎来了一批朝气蓬勃的新生。老师们的日程表上,除了日常的教学准备,还多了一项重要的任务——分班。这项工作不仅需要老师们精心策划,以确保每个班级的平衡,还要在分班完成后,及时将结果…

系统架构不是设计出来的

今天给大家分享一个 X/ Twitter 早期系统架构演变的故事,内容来自《数据密集型应用系统设计》这本书,具体数据来自 X/ Twitter 在 2012 年 11 月发布的分享。 推特的两个主要业务是: 发布推文(Tweets)。用户可以向其粉…

零基础入门~汇编语言(第四版王爽)~第3章寄存器(内存访问)

文章目录 前言3.1 内存中字的存储3.2 DS 和[address]3.3 字的传送3.4 mov、add、sub指令3.5 数据段检测点3.13.6 栈3.7 CPU提供的栈机制3.8 栈顶超界的问题3.9 push、pop指令3.10 栈 段检测点3.2实验2 用机器指令和汇编指令编程 前言 第2章中,我们主要从CPU 如何执…

2月公开赛Web-ssrfme

考点&#xff1a; redis未授权访问 源码&#xff1a; <?php highlight_file(__file__); function curl($url){ $ch curl_init();curl_setopt($ch, CURLOPT_URL, $url);curl_setopt($ch, CURLOPT_HEADER, 0);echo curl_exec($ch);curl_close($ch); }if(isset($_GET[url…

回归预测 | Matlab实现WOA-ESN鲸鱼算法优化回声状态网络多输入单输出回归预测

回归预测 | Matlab实现WOA-ESN鲸鱼算法优化回声状态网络多输入单输出回归预测 目录 回归预测 | Matlab实现WOA-ESN鲸鱼算法优化回声状态网络多输入单输出回归预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.Matlab实现WOA-ESN鲸鱼算法优化回声状态网络多输入单输出…

vue3 生命周期钩子

在 Vue 3 中&#xff0c;可以在组件不同阶段的生命周期执行特定逻辑。 生命周期整体分为四个阶段&#xff1a;创建、挂载、更新、卸载。 创建阶段 组合式APIsetup() 这是组合式 API 的入口点&#xff0c;在组件实例创建之前被调用。在此阶段&#xff0c;可以初始化响应式数据…

一键批量查询邮政快递,物流状态尽在掌握

邮政快递批量查询&#xff0c;轻松掌握物流动态 在电商行业蓬勃发展的今天&#xff0c;邮政快递作为连接商家与消费者的桥梁&#xff0c;其物流信息的及时性和准确性对于提升客户体验至关重要。然而&#xff0c;面对海量的快递单号&#xff0c;如何高效地进行批量查询&#xf…

【最长上升子序列】

题目 代码 #include <bits/stdc.h> using namespace std; const int N 1010; int a[N], f[N]; int main() {int n;cin >> n;for(int i 1; i < n; i) cin >> a[i];int res 0;for(int i 1; i < n; i){f[i] 1;for(int j 1; j < i; j){if(a[j] &…

(贪心) LeetCode 135. 分发糖果

原题链接 一. 题目描述 n 个孩子站成一排。给你一个整数数组 ratings 表示每个孩子的评分。 你需要按照以下要求&#xff0c;给这些孩子分发糖果&#xff1a; 每个孩子至少分配到 1 个糖果。 相邻两个孩子评分更高的孩子会获得更多的糖果。 请你给每个孩子分发糖果&#xf…

UE5 蓝图 计算当前时间段

思路&#xff1a; 那当前hour与阈值hour对比 。小于返回&#xff0c;大于就继续循环对比。 临时变量 折叠图表↓

快排之自省排序

introsort是introspective sort采⽤了缩写&#xff0c;他的名字其实表达了他的实现思路&#xff0c;他的思路就是进行⾃我侦测和反省&#xff0c;快排递归深度太深&#xff08;sgi stl中使⽤的是深度为2倍排序元素数量的对数值&#xff09;那就说明在这种数据序列下&#xff0c…

数据结构-栈与队列-数组和链表的推广运用-第六天

hello算法 1.数组和队列作为最基础的两种数据结构&#xff0c;区别主要在于&#xff1a; 1.数组是连续存储&#xff0c;因此可以利用一个开始节点的地址直接确定其他的节点地址。 2.链表未绑定的存储顺序&#xff0c;具有更灵活快捷的增删改查。 3.为了解决存储的问题&#xf…

【Simulink】使用简化机械臂系统动力学的抓取和放置任务及轨迹调度

abbSavedConfigs.mat 文件中的配置 文件中保存了多个关节角度配置&#xff0c;每个配置对应不同的机器人操作步骤。这些配置通常用于控制机器人在执行任务时的各个关键姿态和动作。 各个配置的功能解释&#xff1a; configSequence (18x7 double): 功能: 包含了机器人执行任…