Spark-Scala语言实战(13)

news2025/1/21 13:04:05

在之前的文章中,我们学习了如何在spark中使用键值对中的keys和values,reduceByKey,groupByKey三种方法。想了解的朋友可以查看这篇文章。同时,希望我的文章能帮助到你,如果觉得我的文章写的不错,请留下你宝贵的点赞,谢谢。

Spark-Scala语言实战(12)-CSDN博客文章浏览阅读722次,点赞19次,收藏15次。今天开始的文章,我会带给大家如何在spark的中使用我们的键值对方法,今天学习键值对方法中的keys和values,reduceByKey,groupByKey三种方法。希望我的文章能帮助到大家,也欢迎大家来我的文章下交流讨论,共同进步。https://blog.csdn.net/qq_49513817/article/details/137385224今天的文章开始,我会继续带着大家如何在spark的中使用我们的键值对里的方法。今天学习键值对方法中的fullOuterJoin,zip,combineByKey三种方法。

目录

一、知识回顾

二、键值对方法

1.fullOuterJoin

2.zip

3.combineByKey

拓展-方法参数设置


一、知识回顾

 上一篇文章中我们学习了键值对的三种方法,分别是keys和values,reduceByKey,groupByKey。

keys和values分别对应了我们的键与值。

我们可以用它们来创建我们的RDD

 reduceByKey可以进行统计,将有相同键的值进行相加,统一输出。

而 groupByKey方法就是对我们的键值对RDD进行分组了

它可以将我们的相同的键,不同的值组合成一个组。

那么,开始今天的学习吧~ 

二、键值对方法

1.fullOuterJoin

  •  fullOuterJoin()方法用于对两个RDD进行全外连接,保留两个RDD中所有键的连接结果。
import org.apache.spark.{SparkConf, SparkContext}
object p1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("p2")
    val sc = new SparkContext(conf)
    // 创建两个RDD(弹性分布式数据集)
    val p1 = sc.parallelize(Seq(("a1", "1"), ("a2", "2"), ("a3", "3")))
    val p2 = sc.parallelize(Seq(("a2", "A"), ("a3", "B"), ("a4", "C")))
    // 将RDD转换为键值对
    val pp1 = p1.map { case (key, value) => (key, value) }
    val pp2 = p2.map { case (key, value) => (key, value) }
    // 执行fullOuterJoin操作
    val ppp = pp1.fullOuterJoin(pp2)
    // 收集结果并打印
    ppp.collect().foreach(println)
    }
}

我们的代码创建了两个键值对RDD,那么使用 fullOuterJoin方法全外连接那么两个键值对都会连接。

可以看到两个键值对里的键与值都连接上了,互相没有的值即显示None值。 

2.zip

  • zip()方法用于将两个RDD组合成键值对RDD,要求两个RDD的分区数量以及元素数量相同,否则会抛出异常。
  • 将两个RDD组合成Key/Value形式的RDD,这里要求两个RDDpartition数量以及元素数量都相同,否则会抛出异常
import org.apache.spark.{SparkConf, SparkContext}
object p1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("p2")
    val sc = new SparkContext(conf)
    // 创建两个RDD
    val p1 = sc.parallelize(Seq(1, 2, 3))
    val p2 = sc.parallelize(Seq("a", "b", "c"))
    // 使用zip方法将两个RDD组合在一起
    val pp1 = p1.zip(p2)
    val pp2 = p2.zip(p1)
    // 收集结果并打印
    pp1.collect().foreach(println)
    pp2.collect().foreach(println)
    }
}

 代码创建了两个不同的RDD键值对,分别使用p1zip方法p2与p2zip方法p1,那么它们输出的结果会是一样的吗?

可以看到是不一样的,谁在前面谁就是键,反之是值。 

3.combineByKey

  • combineByKey()方法是Spark中一个比较核心的高级方法,键值对的其他一些高级方法底层均是使用combineByKey()方法实现的,如groupByKey()方法、reduceByKey()方法等。
  • combineByKey()方法用于将键相同的数据聚合,并且允许返回类型与输入数据的类型不同的返回值。
  • combineByKey()方法的使用方式如下。
    • combineByKey(createCombiner,mergeValue,mergeCombiners,numPartitions=None)
import org.apache.spark.{SparkConf, SparkContext}
object p1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("p2")
    val sc = new SparkContext(conf)
    val p1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5)))
    val p2 = p1.combineByKey(
      // createCombiner: 将第一个值转换为累加器
      (v: Int) => v,
      // mergeValue: 将新的值加到累加器上
      (c: Int, v: Int) => c + v,
      // mergeCombiners: 合并两个累加器
      (c1: Int, c2: Int) => c1 + c2
    )
    p2.collect().foreach { case (key, value) =>
      println(s"Key: $key, Value: $value")
    }
  }
}

我的代码中: 

createCombiner: 这个函数定义了如何将每个键的第一个值转换为初始的累加器值。 

代表着每个键,第一个出现的值将作为累加器的初始值。

mergeValue: 这个函数定义了如何将新值与当前的累加器值合并。在我的代码中,我将新值与累加器相加。

代表着每个键的后续值,它们都会被加到当前的累加器值上。

mergeCombiners: 这个函数定义了当两个累加器(对应于同一个键但可能来自不同的分区)需要合并时应该执行的操作。在我的代码中,也是将两个累加器值相加

这确保了无论数据如何在分区之间分布,最终每个键都会得到正确的累加结果。

看看输出效果

可以看到我们的键值对成功累加。

快去试试吧~ 

拓展-方法参数设置

方法参数描述例子
fullOuterJoinotherRDD另一个要与之进行全外连接的RDDrdd1.fullOuterJoin(rdd2)
fullOuterJoinnumPartitions结果RDD的分区数(可选)rdd1.fullOuterJoin(rdd2, numPartitions=10)
zipotherRDD要与之进行zip操作的另一个RDDrdd1.zip(rdd2)
combineByKeycreateCombiner处理第一个出现的每个键的值的函数lambda v: (v, 1)
combineByKeymergeValue合并具有相同键的值的函数lambda acc, v: (acc[0] + v, acc[1] + 1)
combineByKeymergeCombiners合并两个累积器的函数lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
combineByKeynumPartitions结果RDD的分区数(可选)rdd.combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=5)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1571534.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

EdenAI

文章目录 关于 EdenAI功能说明提供的模型Eden AI PlatformIntegrations 和以下平台集成 Python 调用 API异步功能 关于 EdenAI Eden AI 用户界面(UI)专为处理人工智能项目而设计。 通过 Eden AI Portal,您可以使用市场上最好的引擎 执行无代…

深度学习方法;乳腺癌分类

乳腺癌的类型很多,但大多数常见的是浸润性导管癌、导管原位癌和浸润性小叶癌。浸润性导管癌(IDC)是最常见的乳腺癌类型。这些都是恶性肿瘤的亚型。大约80%的乳腺癌是浸润性导管癌(IDC),它起源于乳腺的乳管。 浸润性是指癌症已经“侵袭”或扩散到周围的乳…

#SOP#-如何使用AI辅助论文创作

#SOP#-如何使用AI辅助论文创作 ——2024.4.6 “在使用工具的时候,要做工具的主人” 最终交付物: 一份可执行的AI辅助创作论文的指导手册 交付物质量要求: 不为任何AI大模型付费!不为任何降重网站付费!通过知网检查论…

视频分块上传Vue3+SpringBoot3+Minio

文章目录 一、简化演示分块上传、合并分块断点续传秒传 二、更详细的逻辑和细节问题可能存在的隐患 三、代码示例前端代码后端代码 一、简化演示 分块上传、合并分块 前端将完整的视频文件分割成多份文件块,依次上传到后端,后端将其保存到文件系统。前…

UNIAPP(小程序)每十个文章中间一个广告

三十秒刷新一次广告 ad-intervals"30" <template><view style"margin: 30rpx;"><view class"" v-for"(item,index) in 100"><!-- 广告 --><view style"margin-bottom: 20rpx;" v-if"(inde…

Kafka参数介绍

官网参数介绍:Apache KafkaApache Kafka: A Distributed Streaming Platform.https://kafka.apache.org/documentation/#configuration

深入浅出 -- 系统架构之分布式常见理论概念

随着计算机科学和互联网的发展&#xff0c;分布式场景变得越来越常见&#xff0c;能否处理好分布式场景下的问题&#xff0c;成为衡量一个工程师是否合格的标准。本文我们介绍下分布式系统相关的理论知识&#xff0c;这些理论是我们理解和处理分布式问题的基础。 CAP理论 CAP…

小林coding图解计算机网络|TCP篇06|如何理解TCP面向字节流协议、为什么UDP是面向报文的协议、如何解决TCP的粘包问题?

小林coding网站通道&#xff1a;入口 本篇文章摘抄应付面试的重点内容&#xff0c;详细内容还请移步&#xff1a;小林coding网站通道 文章目录 如何理解UDP 是面向报文的协议如何理解字节流如何解决粘包固定长度的消息 特殊字符作为边界自定义消息结构 如何理解UDP 是面向报文的…

代码随想录算法训练营第三十一天| 理论基础、LeetCode 455.分发饼干、376. 摆动序列、53. 最大子序和

一、理论基础 文章讲解&#xff1a;https://programmercarl.com/%E8%B4%AA%E5%BF%83%E7%AE%97%E6%B3%95%E7%90%86%E8%AE%BA%E5%9F%BA%E7%A1%80.html 1.贪心的定义 贪心的本质是选择每一阶段的局部最优解&#xff0c;从而达到全局最优解。例如&#xff0c;有一堆钞票&#xff0c…

使用 ChatGPT 创建在线课程:一步一步指南与提示模板

原文&#xff1a;Creating Online Courses with ChatGPT 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 谢谢 作为对你支持的感谢&#xff0c;随意定制本书中列出的任何提示&#xff0c;并将其作为你自己的重新销售。是的&#xff0c;对你免费。 它们都结构良好且用…

移动开发技术历史演化简介h5,跨平台,原生的各种技术实现方案的简单介绍

移动端的开发技术是指针对移动设备如智能手机和平板电脑等便携终端进行应用程序和服务创建的过程。本文将主要介绍一下移动端的开发技术的历史进化历程。讲述h5&#xff0c;跨平台&#xff0c;原生的各种技术实现方案和他们各自的优势与不足。 移动开发&#xff0c;不仅是编程技…

自动化测试框架Robot Framework入门

什么是RF RF是一个基于 Python 的、可扩展的关键字驱动的自动化 验收测试框架、验收测试驱动开发 &#xff08;ATDD&#xff09;、 行为驱动开发 &#xff08;BDD&#xff09; 和机器人流程自动化 &#xff08;RPA&#xff09;。它 可用于分布式、异构环境&#xff0c;其中自动…

Day82:服务攻防-开发组件安全Solr搜索Shiro身份Log4j日志本地CVE环境复现

目录 J2EE-组件Solr-本地demo&CVE 命令执行&#xff08;CVE-2019-17558&#xff09; 远程命令执行漏洞(CVE-2019-0193) Apache Solr 文件读取&SSRF (CVE-2021-27905) J2EE-组件Shiro-本地demo&CVE CVE_2016_4437 Shiro-550Shiro-721(RCE) CVE-2020-11989(身…

macU盘在电脑上读不出来 u盘mac读不出来怎么办 macu盘不能写入

对于Mac用户来说&#xff0c;使用U盘是很常见的操作&#xff0c;但有时候可能会遇到Mac电脑无法读取U盘的情况&#xff0c;这时候就需要使用一些特定的工具软件来帮助我们解决问题。本文就来告诉大家macU盘在电脑上读不出来是怎么回事&#xff0c;u盘mac读不出来怎么办。 一、m…

Java 中 Spring Boot 框架下的 Email 开发

Email 开发 1. 核心依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId> </dependency><dependency><groupId>org.springframework.boot</groupId><…

ubuntu安装sublime3并设置中文

安装Sublime Text 3 在Ubuntu上安装Sublime Text 3可以通过以下步骤进行&#xff1a; 打开终端。 导入Sublime Text 3的GPG密钥&#xff1a; wget -qO- https://download.sublimetext.com/sublimehq-pub.gpg | sudo apt-key add - 添加Sublime Text 3的存储库&#xff1a; …

Spring Boot 相关知识和工具类

写在前面 此文是对后端开发框架Spring Boot快速入门一文的知识点补充与完善&#xff0c;如果是新手小白建议两篇文章一起食用,上面那篇文章为主&#xff0c;本文为辅&#xff0c;以达到最佳效果&#xff0c;大佬随意。 http 五种与后端的交互方法 Get:主要用于请求数据。当客…

vue2+elementUi的两个el-date-picker日期组件进行联动

vue2elementUi的两个el-date-picker日期组件进行联动 <template><el-form><el-form-item label"起始日期"><el-date-picker v-model"form.startTime" change"startTimeChange" :picker-options"startTimePickerOption…

Map源码解析

基本介绍 其实HashMap底层是个什么东西我们之前也讲过, 就是一个哈希桶(差不多可以看成一个数组), 然后每一个节点又连接着链表/红黑树之类的, 下面让我们看一看具体在源码上是怎样实现的: 常量及其它 -> static final int DEFAULT_INITIAL_CAPACITY 1 << 4; //这个…

解决JavaWeb中IDEA2023新版本无法创建Servlet的问题

出现问题&#xff1a;IDEA右键创建Servlet时&#xff0c;找不到选项 原因分析&#xff1a;IDEA的2023版的已经不支持Servlet了&#xff0c;如果还要使用的话&#xff0c;需要自己创建模板使用 创建模板 右击设置&#xff0c;选择&#xff08;File and Code Templates&#x…