掌握RDD算子2

news2025/1/21 2:57:23

文章目录

      • 扁平映射算子案例
        • 任务1、统计不规则二维列表元素个数
          • 方法一、利用Scala来实现
          • 方法二、利用Spark RDD来实现
      • 按键归约算子案例
        • 任务1、在Spark Shell里计算学生总分
        • 任务2、在IDEA里计算学生总分
          • 第一种方式:读取二元组成绩列表
          • 第二种方式:读取四元组成绩列表
          • 第三种情况:读取HDFS上的成绩文件

扁平映射算子案例

任务1、统计不规则二维列表元素个数

方法一、利用Scala来实现
  • 在net.xxr.rdd.day01包里创建Example02单例对象
package net.xxr.rdd.day01

import org.apache.spark.{SparkConf, SparkContext}

/**
 * 功能:利用Scala统计不规则二维列表元素个数
 */
object Example02 {
  def main(args: Array[String]): Unit = {
    // 创建不规则二维列表
    val mat = List(
      List(7, 8, 1, 5),
      List(10, 4, 9),
      List(7, 2, 8, 1, 4),
      List(21, 4, 7, -4)
    )
    // 输出二维列表
    println(mat)
    // 将二维列表扁平化为一维列表
    val arr = mat.flatten
    // 输出一维列表
    println(arr)
    // 输出元素个数
    println("元素个数:" + arr.size)
  }
}


在这里插入图片描述

方法二、利用Spark RDD来实现
  • 在net.xxr.rdd.day01包里创建Example03单例对象
package net.xxr.rdd.day01


import org.apache.spark.{SparkConf, SparkContext}

/**
 * 功能:利用RDD统计不规则二维列表元素个数
 */
object Example03 {
  def main(args: Array[String]): Unit = {
    // 创建Spark配置对象
    val conf = new SparkConf()
      .setAppName("PrintDiamond") // 设置应用名称
      .setMaster("local[*]") // 设置主节点位置(本地调试)
    // 基于Spark配置对象创建Spark容器
    val sc = new SparkContext(conf)
    // 创建不规则二维列表
    val mat = List(
      List(7, 8, 1, 5),
      List(10, 4, 9),
      List(7, 2, 8, 1, 4),
      List(21, 4, 7, -4)
    )
    // 基于二维列表创建rdd1
    val rdd1 = sc.makeRDD(mat)
    // 输出rdd1
    rdd1.collect.foreach(x => print(x + " "))
    println()
    // 进行扁平化映射
    val rdd2 = rdd1.flatMap(x => x.toString.substring(5, x.toString.length - 1).split(", "))
    // 输出rdd2
    rdd2.collect.foreach(x => print(x + " "))
    println()
    // 输出元素个数
    println("元素个数:" + rdd2.count)
  }
}


在这里插入图片描述

  • 扁平化映射可以简化
    在这里插入图片描述
    在这里插入图片描述

按键归约算子案例

任务1、在Spark Shell里计算学生总分

  • 创建成绩列表scores,基于成绩列表创建rdd1,对rdd1按键归约得到rdd2,然后查看rdd2内容

val scores = List((“张钦林”, 78), (“张钦林”, 90), (“张钦林”, 76),
(“陈燕文”, 95), (“陈燕文”, 88), (“陈燕文”, 98),
(“卢志刚”, 78), (“卢志刚”, 80), (“卢志刚”, 60))
val rdd1 = sc.makeRDD(scores)
val rdd2 = rdd1.reduceByKey((agg, cur) => agg + cur)
rdd2.collect.foreach(println)

在这里插入图片描述

  • 可以采用神奇的占位符
    在这里插入图片描述

任务2、在IDEA里计算学生总分

第一种方式:读取二元组成绩列表
  • 在net.xxr.rdd.day02包里创建CalculateScoreSum01单例对象
package net.xxr.rdd.day02

import org.apache.spark.{SparkConf, SparkContext}

/**
 * 功能:计算总分
 */
object CalculateScoreSum01 {
  def main(args: Array[String]): Unit = {
    // 创建Spark配置对象
    val conf = new SparkConf()
      .setAppName("PrintDiamond") // 设置应用名称
      .setMaster("local[*]") // 设置主节点位置(本地调试)
    // 基于Spark配置对象创建Spark容器
    val sc = new SparkContext(conf)
    val scores = List(
      ("张钦林", 78), ("张钦林", 90), ("张钦林", 76),
      ("陈燕文", 95), ("陈燕文", 88), ("陈燕文", 98),
      ("卢志刚", 78), ("卢志刚", 80), ("卢志刚", 60))
    // 基于二元组成绩列表创建RDD
    val rdd1 = sc.makeRDD(scores)
    // 对成绩RDD进行按键归约处理
    val rdd2 = rdd1.reduceByKey(_ + _)
    // 输出归约处理结果
    rdd2.collect.foreach(println)
  }
}


在这里插入图片描述

第二种方式:读取四元组成绩列表
  • 在net.xxr.rdd.day02包里创建CalculateScoreSum02单例对象
package net.xxr.rdd.day02

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ListBuffer

/**
 * 功能:计算总分
 */
object CalculateScoreSum02 {
  def main(args: Array[String]): Unit = {
    // 创建Spark配置对象
    val conf = new SparkConf()
      .setAppName("PrintDiamond") // 设置应用名称
      .setMaster("local[*]") // 设置主节点位置(本地调试)
    // 基于Spark配置对象创建Spark容器
    val sc = new SparkContext(conf)
    // 创建四元组成绩列表
    val scores = List(
      ("张钦林", 78, 90, 76),
      ("陈燕文", 95, 88, 98),
      ("卢志刚", 78, 80, 60)
    )
    // 将四元组成绩列表转化成二元组成绩列表
    val newScores = new ListBuffer[(String, Int)]()
    // 通过遍历算子遍历四元组成绩列表
    scores.foreach(score => {
      newScores.append(Tuple2(score._1, score._2))
      newScores.append(Tuple2(score._1, score._3))
      newScores.append(Tuple2(score._1, score._4))}
    )
    // 基于二元组成绩列表创建RDD
    val rdd1 = sc.makeRDD(newScores)
    // 对成绩RDD进行按键归约处理
    val rdd2 = rdd1.reduceByKey(_ + _)
    // 输出归约处理结果
    rdd2.collect.foreach(println)
  }
}


在这里插入图片描述

第三种情况:读取HDFS上的成绩文件
  • 将成绩文件上传到HDFS的/input目录

hdfs dfs -mkdir /input
hdfs dfs -put scores.txt /input
hdfs dfs -cat /input/scores.txt

在这里插入图片描述

  • 在net.xxr.rdd.day02包里创建CalculateScoreSum03单例对象
package net.xxr.rdd.day02

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ListBuffer

/**
 * 功能:计算总分
 */
object CalculateScoreSum03 {
  def main(args: Array[String]): Unit = {
    // 创建Spark配置对象
    val conf = new SparkConf()
      .setAppName("CalculateScoreSum")
      .setMaster("local[*]")
    // 基于配置创建Spark上下文
    val sc = new SparkContext(conf)

    // 读取成绩文件,生成RDD
    val lines = sc.textFile("hdfs://master:9000/input/scores.txt")
    // 定义二元组成绩列表
    val scores = new ListBuffer[(String, Int)]()
    // 遍历lines,填充二元组成绩列表
    lines.collect.foreach(line => {
      val fields = line.split(" ")
      scores += Tuple2(fields(0), fields(1).toInt)
      scores += Tuple2(fields(0), fields(2).toInt)
      scores += Tuple2(fields(0), fields(3).toInt)
    })
    // 基于二元组成绩列表创建RDD
    val rdd1 = sc.makeRDD(scores)
    // 对成绩RDD进行按键归约处理
    val rdd2 = rdd1.reduceByKey((x, y) => x + y)
    // 输出归约处理结果
    rdd2.collect.foreach(println)
  }
}


在这里插入图片描述

  • 在Spark Shell里完成同样的任务
import scala.collection.mutable.ListBuffer
val lines = sc.textFile("hdfs://master:9000/input/scores.txt")
val scores = new ListBuffer[(String, Int)]()
lines.collect.foreach(line => {
val fields = line.split(" ")
scores.append(Tuple2(fields(0), fields(1).toInt))
scores.append(Tuple2(fields(0), fields(2).toInt))
scores.append(Tuple2(fields(0), fields(3).toInt))
})
val rdd1 = sc.makeRDD(scores)
val rdd2 = rdd1.reduceByKey(_ + _)
rdd2.collect.foreach(println)

在这里插入图片描述

  • 修改程序,将计算结果写入HDFS文件
    在这里插入图片描述
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/554041.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

控制系统典型应用车型 —— 潜入顶升式AMR

车型介绍: “潜入顶升AMR”是由驱动装置车身装置升降装置等结构组成的高性能移动机器人。通过复杂的智能技术来合理的路径规划,以适应环境并在其中导航,结合近距离激光雷达、碰撞传感器等技术,可以在高速运转的同时,潜伏至货物固…

【Python】布尔类型 ( 布尔类型变量 | 比较运算符 )

文章目录 一、布尔类型变量二、比较运算符三、代码示例 一、布尔类型变量 Python 中的 布尔类型 ( bool ) 用于 逻辑判断 , 布尔类型 是 数字类型 ( Number ) 的一种 , 其有两种 字面量 取值 : 真 : True , 其本质是数字 1 ;假 : False , 其本质是数字 0 ; 代码示例 : # 布尔…

电脑清灰记录(惠普暗影精灵5)

如果选择自己清灰,请确保自己处于精神平静,耐心可控,无粗暴动作倾向,无孩童打扰的状态。 新手全程大概需要2个半小时,老手比较顺利的话半个小时搞定。以下电脑,首次尝试。参考链接暗影精灵四笔记本电脑拆机…

AI绘画新秀-免费使用-Leonardo(Midjourney对手)注册教程

本教程收集于:AIGC从入门到精通教程 AI绘画新秀-免费使用-Leonardo(Midjourney对手) 保姆级注册教程 目录 一、写在前面的话。 二、纯文字教程 2.1 Leonardo注册教程:

一个Python的轻量级搜索工具--Whose

本文将简单介绍 Python 中的一个轻量级搜索工具Whoosh,并给出相应的使用示例代码。 # Whoosh 简介 Whoosh 由 Matt Chaput 创建,它一开始是一个为 Houdini 3D 动画软件包的在线文档提供简单、快速的搜索服务工具,之后便慢慢成为一个成熟的搜…

数字人的新革命,BAT的“冲高”战场

配图来自Canva可画 ChatGPT横空出世,让人们看到了数字人的另一种可能,将ChatGPT与虚拟数字人融合,研发出更加智能化、拟人化的虚拟数字人成为数字人厂商的新命题、新方向。 2月份,岭南股份、风语筑、开普云等10多家公司&#xf…

java设备台账管理系统myeclipse定制开发mysql数据库网页模式java编程jdbc

一、源码特点 java设备台账管理系统 是一套完善的web设计系统,对理解JSP java编程开发语言有帮助 mysql数据库,系统具有完整的源代码和数据库,系统主要采用B/S模式开发。 java设备台账管理系统myeclipse定制开发mysql 二、功能介绍 此次系统…

3D控件Aspose.3D入门教程(11):通过简单的步骤生成条形码

Aspose.3D 是一个功能丰富的游戏软件和计算机辅助设计(CAD)的API,可以在不依赖任何3D建模和渲染软件的情况下操作文档。API支持Discreet3DS, WavefrontOBJ, FBX (ASCII, Binary), STL (ASCII, Binary), Universal3D, Collada, glTF, GLB, PLY…

JetsonNano学习(七)Nginx 搭建 HLS 直播服务器

文章目录 一、使用 Nginx-rtmp-module 编译 Nginx下载 Nginx-rtmp-module安装 Nginx 依赖下载 Nginx编译 Nginx 二、Nginx 配置文件三、启动 Nginx 服务方式一安装nginx初始化脚本、获取nginx初始化脚本 方式二直接调用 四、使用 RTMP 将视频推送到 Nginx安装FFmpeg捕获网络摄像…

行业分析——半导体行业

半导体行业是现代高科技产业和新兴战略产业,是现代信息技术、电子技术、通信技术、信息化等产业的基础之一。我国政府先后制定了《中国集成电路产业发展规划》和《中国人工智能发展规划》,明确提出要支持半导体和人工智能等产业的发展,为半导…

xs _webmsxyw 纯算法还原盗用代码请注明出处搬来搬去真的很下头!

本文以教学为基准、本文提供的可操作性不得用于任何商业用途和违法违规场景。 本人对任何原因在使用本人中提供的代码和策略时可能对用户自己或他人造成的任何形式的损失和伤害不承担责任。 最新版 x-s 没露任何版权请审核员认真对待谢谢。 【2023.05.22】 更新全站接口通用 …

【ARM位段地址分配】STM32 struct 位段内存分配位置问题

因为需要将 7位地址位 和 1位读写标志位 进行组合,想到了用 struct 和 union 的方法。 说明:作为自己测试用,使用硬件STM32F407ZET6 本篇文章仅对位段操作再ARM芯片上存储空间位置分配的探究,供给作为需要确定位段操作分配内存…

我出版了一本关于TikTok电商运营的书

回首2020年初,第一次在手机上下载TikTok的那个下午,我并没有意识到,未来三年多这个词会充满我的工作与生活。 那其实是非常幸福的一段时间,对TikTok的期待没有那么功利,每天刷一刷TikTok中的视频,再随手拍…

【1080. 根到叶路径上的不足节点】

来源:力扣(LeetCode) 描述: 给你二叉树的根节点 root 和一个整数 limit ,请你同时删除树中所有 不足节点 ,并返回最终二叉树的根节点。 假如通过节点 node 的每种可能的 “根-叶” 路径上值的总和全都小…

『手撕 Mybatis 源码』04 - 创建 sqlSession

创建 sqlSession 创建 openSession 对象 当获取 SqlSessionFactory 之后,就可以开始获取 SqlSession 对象 public class MybatisTest {Testpublic void test1() throws IOException {...// 3.问题:openSession()执行逻辑是什么?// 3. (1) …

c++学习——引用

引用 **引用的语法****引用的注意事项****数组的引用****引用的本质****尽量用const来代替define****指针的引用** 1.引用是做什么:和C语言的指针一样的功能,并且使语法更加简洁 2.引用是什么∶引用是给空间取别名 引用的语法 #define _CRT_SECURE_NO_WARNINGS #include <i…

星戈瑞Sulfo-CY7 NHS ester标记带氨基的荧光Cyanine7-NHS

Sulfo-CY7 NHS ester是一种常用的蛋白质标记试剂&#xff0c;是一种高度稳定的荧光染料&#xff0c;能够被用于各种细胞成像实验中。利用这种染料&#xff0c;科学家们可以标记细胞膜、细胞核、细胞器等&#xff0c;从而观察细胞的各种结构和功能。 产品名称&#xff1a;水溶性…

似然(likelihood)、极大似然、对数似然、最大后验等

1 似然 设总体X服从分布P(x&#xff1b;θ)&#xff08;当X是连续型随机变量时为概率密度&#xff0c;当X为离散型随机变量时为概率分布&#xff09;&#xff0c;θ为待估参数&#xff08;或者说系统参数&#xff09;&#xff0c;X1,X2,…Xn是来自于总体X的样本&#xff0c;x1…

互联网医院|互联网医院系统开发|互联网医疗平台搭建

随着互联网的快速普及&#xff0c;互联网医院成为解决医疗资源分布不平衡、基层医疗服务短缺等问题的有效途径。互联网医院系统的优势主要体现在以下几个方面&#xff1a;   1.在线挂号预约   互联网医院系统可以方便快捷地进行在线预约挂号。患者可以通过小程序或网站实现…

BI大数据分析平台,精细化分析的必备工具

在日常的工作中&#xff0c;经常会遇到要做经营决策时&#xff0c;数据分析却掉链子的情况&#xff0c;比如当老板临时提出要进一步分析某类商品的销售情况时&#xff0c;得重新开发报表。BI大数据分析平台能不能随时随地实现精细化数据分析&#xff0c;避免数据分析跟不上运营…