横扫Spark之 - 9个常见的行动算子

news2025/1/19 2:36:14

水善利万物而不争,处众人之所恶,故几于道💦

文章目录

      • 1. collect()
      • 2. count()
      • 3. first()
      • 4. take()
      • 5. takeOrdered()
      • 6. countByKey()
      • 7. saveAS...()
      • 8. foreach()
      • 9. foreachPartition() ***

1. collect()

  收集RDD每个分区的数据以数组封装之后发给Driver
  如果RDD数据量比较大,Driver内存默认只有1G,可能出现内存溢出,工作中一般需要将Driver内存设置为5-10G。可以通过bin/spark-submit --driver-memory 10G 这样设置

  @Test
  def collect(): Unit ={

    val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6))

    val arr = rdd1.collect()

    println(arr.toList)

  }

结果:
在这里插入图片描述

2. count()

  返回RDD中元素的个数

@Test
def count(): Unit ={

  val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6))

  println(rdd1.count())
}

结果:
在这里插入图片描述

3. first()

  返回RDD中的第一个元素
  他会从多个分区取数据,如果0号分区取到了数据的话就只有一个job;如果0号分区没有取到数据,或者取到的数据不够,那就会再启动一个job去其他分区取

  @Test
  def first(): Unit ={

  val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6),7)
	// 0号分区没有数据所以就会再启动一个job从后面的分区取,所以web页面看到有两个job
  val i = rdd1.first()

  println(i)

  Thread.sleep(10000000)
}

结果:
在这里插入图片描述

4. take()

  返回RDD中前n个元素组成的数组
  take和first一样如果取到就一个job如果取不到或者没取够就再来一个job去取

@Test
def take(): Unit ={

  val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6),3)

  println(rdd1.take(3).toList)

  Thread.sleep(10000000)
}

结果:
在这里插入图片描述

5. takeOrdered()

  这个是取排序之后的前几个元素
  takeOrdered没有shuffle,因为只需要每个分区取前三然后拉到一起再取一次前三就完事了

@Test
def takeOrdered(): Unit ={

val rdd1 = sc.parallelize(List(1, 7,98,3,7,86,23,54, 9, 42, 6),3)

val ints = rdd1.takeOrdered(3)

println(ints.toList)

Thread.sleep(1000000)
}

结果:
在这里插入图片描述

6. countByKey()

  统计每个key出现的次数,返回的结果是(key,次数)

@Test
def countByKey(): Unit ={
  val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))

  val rdd2 = rdd1.countByKey()

  println(rdd2.toList)
}

结果:
在这里插入图片描述

7. saveAS…()

  saveAsTextFile(path)将数据保存成text文件,有几个task就保存几个文件
  saveAsSequenceFile(path)将数据保存成Sequencefile文件【只有kv类型RDD有该操作,单值的没有】
  saveAsObjectFile(path)将数据序列化成对象保存到文件

@Test
def save(): Unit ={

  val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))

  rdd1.saveAsTextFile("output/text")  // 为啥保存出来8个文件因为有8个task
  rdd1.saveAsObjectFile("output/ObjectFile")
  rdd1.saveAsSequenceFile("output/SequenceFile")
}

结果:
在这里插入图片描述

8. foreach()

  遍历RDD中的每个元素

@Test
def foreach(): Unit = {
  val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))

  rdd1.foreach(println)
}

结果:
在这里插入图片描述

9. foreachPartition() ***

  对每个分区遍历,参数列表传入的函数是针对每个分区的操作,有多少个分区函数就执行多少次
  foreachPartition的使用场景是:一般用于将数据写入mysql/redis/hbase等位置,可以减少连接的创建、销毁次数,提高效率

@Test
def foreachPartition(): Unit ={

  val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))

  rdd1.foreachPartition(it=>{

    var connection:Connection = null
    var statement:PreparedStatement = null

    try{
      connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456")
      statement = connection.prepareStatement("insert into wc values(?,?)")

      //  计数器
      var count = 0

      it.foreach(x=>{
        statement.setString(1,x._1)
        statement.setInt(2,x._2)

        // 添加到批中,一批一批的执行
        statement.addBatch()

        // 满1000条执行一批
        if(count % 1000 == 0){
          statement.executeBatch()
          // todo 执行完批后要记得clearBatch !!!!!
          statement.clearBatch()
        }
        count = count+1
      })
      // 最后不满1000条的也执行一次
      statement.executeBatch()

    }catch {
      case e:Exception => e.printStackTrace()
    }finally {
      if (connection != null) {
        connection.close()
      }
      if (statement != null) {
        statement.close()
      }
    }
  })
}

结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1441772.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【FFmpeg】ffplay 命令行参数 ⑤ ( 设置音频滤镜 -af 参数 | 设置统计信息 -stats 参数 | 设置同步时钟类型 -sync 参数 )

文章目录 一、ffplay 命令行参数 - 音频滤镜1、设置音频滤镜 -af 参数2、常用的 音频滤镜 参数3、音频滤镜链 示例 二、ffplay 命令行参数 - 统计信息1、设置统计信息 -stats 参数2、关闭统计信息 -nostats 参数 三、ffplay 命令行参数 - 同步时钟类型1、设置同步时钟类型 -syn…

Rebuild企业管理系统 SSRF漏洞(CVE-2024-1021)

免责声明:文章来源互联网收集整理,请勿利用文章内的相关技术从事非法测试,由于传播、利用此文所提供的信息或者工具而造成的任何直接或者间接的后果及损失,均由使用者本人负责,所产生的一切不良后果与文章作者无关。该…

【FPGA Verilog】各种加法器Verilog

1bit半加器adder设计实例 module adder(cout,sum,a,b); output cout; output sum; input a,b; wire cout,sum; assign {cout,sum}ab; endmodule 解释说明 (1)assign {cout,sum}ab 是连续性赋值 对于线网wire进行赋值,必须以assign或者dea…

报错ValueError: Unknown CUDA arch (8.6) or GPU not supported

文章目录 问题描述解决方案参考文献 问题描述 报错 ValueError: Unknown CUDA arch (8.6) or GPU not supported 本人显卡为 RTX 3060,CUDA 为 10.2,PyTorch 为 1.5 解决方案 修改 C:\Users\Administrator\Envs\test\Lib\site-packages\torch\utils\c…

【前端高频面试题--Vue基础篇】

🚀 作者 :“码上有前” 🚀 文章简介 :前端高频面试题 🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬前端高频面试题--Vue基础篇 Vue基本原理双向绑定与MVVM模型Vue的优点计算属性与监听属性计算属性监…

多线程基础详解(看到就是赚到)

🎥 个人主页:Dikz12📕格言:那些在暗处执拗生长的花,终有一日会馥郁传香欢迎大家👍点赞✍评论⭐收藏 目录 创建线程 1.创建类继承Thread,重写run() 2.实现Runnable,重写run() 3.继承Thread,使用匿名内部类 …

核心篇-OSPF技术之序(上)

文章目录 一. 实验专题1.1. 实验1:配置单区域OSPF1.1.1. 实验目的1.1.2. 实验拓扑1.1.3. 实验步骤(1)配置地址(2)运行OSPF 1.1.4. 实验调试(1)查看接口信息(2)查看邻居状…

飞天使-linux操作的一些技巧与知识点9-zabbix6.0 容器之纸飞机告警设置

文章目录 zabbix 告警纸飞机方式webhook 方式 zabbix 告警纸飞机方式 第一种方式参考 https://blog.csdn.net/yetugeng/article/details/99682432bash-4.4$ cat telegram.sh #!/bin/bashMSG$1TOKEN"61231432278:AAsdfsdfsdfsdHUxBwPSINc2kfOGhVik" CHAT_ID-41dsdde…

Redis——集群环境部署

一般情况下的Redis,我们都是在一台服务器上进行操作的,也就是说读、写以及备份操作都是在一台Redis服务器上进行的。随着项目访问量的增加,对Redis服务器的操作也更加频繁,虽然Redis读写速度都很快,但是一定程度上也会…

基于BatchNorm的模型剪枝【详解+代码】

文章目录 1、BatchNorm(BN)2、L1与L2正则化2.1 L1与L2的导数及其应用2.2 论文核心点 3、模型剪枝的流程 ICCV经典论文,通俗易懂!论文题目:Learning Efficient Convolutional Networks through Network Slimming卷积后能…

GC调优工具

1、jstat 2、VisualVM GC tool插件 插件下载地址:https://blog.csdn.net/jushisi/article/details/109655175 3、Prometheus和Grafana监控

Xshell

更改背景颜色 多个会话同时执行命令 查看 -> 撰写 -> 撰写窗格

备战蓝桥杯---动态规划(基础3)

本专题主要介绍在求序列的经典问题上dp的应用。 我们上次用前缀和来解决,这次让我们用dp解决把 我们参考不下降子序列的思路,可以令f[i]为以i结尾的最大字段和,易得: f[i]max(a[i],a[i]f[i-1]); 下面是AC代码: #in…

Go 语言 for 的用法

For statements 本文简单翻译了 Go 语言中 for 的三种用法,可快速学习 Go 语言 for 的使用方法,希望本文能为你解开一些关于 for 的疑惑。详细内容可见文档 For statements。 For statements with single condition 在最简单的形式中,只要…

Redis -- 渐进式遍历

家,是心的方向。不论走多远,总有一盏灯为你留着。桌上的碗筷多了几双,笑声也多了几分温暖。家人团聚,是最美的风景线。时间:2024年 2月 8日 12:51:20 目录 前言 语法 示例 前言 试想一个场景,那就是在key非常多的…

【数据结构与算法-初学者指南】【附带力扣原题】队列

🎉🎉欢迎光临🎉🎉 🏅我是苏泽,一位对技术充满热情的探索者和分享者。🚀🚀 🌟特别推荐给大家我的最新专栏《数据结构与算法:初学者入门指南》📘&am…

【项目问题解决】java. net.SocketException: Connection reset

目录 【项目问题解决】java. net.SocketException: Connection reset 1.问题描述2.问题原因3.解决思路4.解决方案5.总结6.参考 文章所属专区 项目问题解决 1.问题描述 通过JMeter 压测接口,无并发,无间歇时间跑接口10000次报错,后续改成建个…

云卷云舒:论超级数据库、算网数据库、智算数据库

笔者大胆提出一种“超级数据库”的概念设想。 一、超级能力 就像当初提出“超级计算机”一样,我们是否同样可以提出“超级数据库”的概念呢?当然不是不可以。 二、超级计算机 我们回忆一下“超级计算机”的发展之路,大致经过了如下几个环…

Swift Combine 管道 从入门到精通三

Combine 系列 Swift Combine 从入门到精通一Swift Combine 发布者订阅者操作者 从入门到精通二 1. 用弹珠图描述管道 函数响应式编程的管道可能难以理解。 发布者生成和发送数据,操作符对该数据做出响应并有可能更改它,订阅者请求并接收这些数据。 这…

FPGA实现ISP用于无人车、无人机配送的方案调研

查到一个always 奥唯思公司做的用FPGA实现ISP的方案,采用易灵思钛金16nm的FPGA Ti60F225,通过MIPI CSI RX采集图像传感器的数据,在FPGA内部经过一系列复杂的ISP运算后,再通过MIPI CSI TX将图像数据发送给后端。 一套完整的ISP&a…