3.8 Spark RDD典型案例

news2024/11/20 20:19:43

一、利用RDD计算总分与平均分

(一)准备工作

1、启动HDFS服务
在这里插入图片描述
2、启动Spark服务
在这里插入图片描述
3、在本地创建成绩文件
在这里插入图片描述
4、将成绩文件上传到HDFS
在这里插入图片描述
(二)完成任务
1、在Spark Shell里完成任务
(1)读取成绩文件,生成RDD
在这里插入图片描述
(2)定义二元组成绩列表
在这里插入图片描述
(3)利用RDD填充二元组成绩列表
在这里插入图片描述
(4)基于二元组成绩列表创建RDD
在这里插入图片描述
(5)对rdd按键归约得到rdd1,计算总分
在这里插入图片描述
(6)将rdd1映射成rdd2,计算总分与平均分
在这里插入图片描述
2、在IntelliJ IDEA里完成任务
(1)打开RDD项目 创建计算总分平均分对象
在这里插入图片描述

package net.cxf.rdd.day07

import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer

/**
 * 功能:统计总分与平均分
 * 作者:cxf
 * 日期:2023年05月11日
 */
object CalculateSumAvg {
  def main(args: Array[String]): Unit = {
    // 创建Spark配置对象
    val conf = new SparkConf()
      .setAppName("CalculateSumAvg ") // 设置应用名称
      .setMaster("local[*]") // 设置主节点位置(本地调试)
    // 基于Spark配置对象创建Spark容器
    val sc = new SparkContext(conf)
    // 读取成绩文件,生成RDD
    val lines = sc.textFile("hdfs://master:9000/scoresumavg/input/scores.txt")
    // 定义二元组成绩列表
    val scores = new ListBuffer[(String, Int)]()
    // 利用RDD填充二元组成绩列表
    lines.collect.foreach(line => {
      val fields = line.split(" ")
      scores.append((fields(0), fields(1).toInt))
      scores.append((fields(0), fields(2).toInt))
      scores.append((fields(0), fields(3).toInt))
    })
    // 基于二元组成绩列表创建RDD
    val rdd = sc.makeRDD(scores);
    // 对rdd按键归约得到rdd1,计算总分
    val rdd1 = rdd.reduceByKey(_ + _)
    // 将rdd1映射成rdd2,计算总分与平均分
    val rdd2 = rdd1.map(score => (score._1, score._2, (score._2 / 3.0).formatted("%.2f")))
    // 在控制台输出rdd2的内容
    rdd2.collect.foreach(println)
    // 将rdd2内容保存到HDFS指定位置
    rdd2.saveAsTextFile("hdfs://master:9000/scoresumavg/output")
    // 关闭Spark容器
    sc.stop()
  }
}

运行程序,查看结果
在这里插入图片描述
查看HDFS的结果文件
在这里插入图片描述

二、利用RDD统计每日新增用户

(一)准备工作
1、在本地创建用户文件
在这里插入图片描述
2、将用户文件上传到HDFS指定位置
在这里插入图片描述

(二)完成任务
1、在Spark Shell里完成任务
(1)读取文件,得到RDD
在这里插入图片描述
(2)倒排,互换RDD中元组的元素顺序
在这里插入图片描述
(3)倒排后的RDD按键分组
在这里插入图片描述
(4)取分组后的日期集合最小值,计数为1
在这里插入图片描述
(5)按键计数,得到每日新增用户数
在这里插入图片描述
在这里插入图片描述
(6)让输出结果按日期升序
在这里插入图片描述
2、在IntelliJ IDEA里完成任务
(1)打开RDD项目 创建统计新增用户对象
在这里插入图片描述

package net.cxf.rdd.day07
import org.apache.spark.{SparkConf, SparkContext}
/**
 * 功能:统计新增用户
 * 作者:cxf
 * 日期:2023年05月24日
 */
object CountNewUsers {
  def main(args: Array[String]): Unit = {
    // 创建Spark配置对象
    val conf = new SparkConf()
      .setAppName("CountNewUsers") // 设置应用名称
      .setMaster("local[*]") // 设置主节点位置(本地调试)
    // 基于Spark配置对象创建Spark容器
    val sc = new SparkContext(conf)
    // 读取文件,得到RDD
    val rdd1 = sc.textFile("hdfs://master:9000/newusers/input/users.txt")
    // 倒排,互换RDD中元组的元素顺序
    val rdd2 = rdd1.map(
      line => {
        val fields = line.split(",")
        (fields(1), fields(0))
      }
    )
    // 倒排后的RDD按键分组
    val rdd3 = rdd2.groupByKey()
    // 取分组后的日期集合最小值,计数为1
    val rdd4 = rdd3.map(line => (line._2.min, 1))
    // 按键计数,得到每日新增用户数
    val result = rdd4.countByKey()
    // 让统计结果按日期升序
    val keys = result.keys.toList.sorted
    keys.foreach(key => println(key + "新增用户:" + result(key)))
    // 停止Spark容器
    sc.stop()
  }
}

运行程序,查看结果
在这里插入图片描述

三、利用RDD实现分组排行榜

(一)准备工作
1、在本地创建成绩文件
在这里插入图片描述
2、将成绩文件上传到HDFS上指定目录
在这里插入图片描述
(二)完成任务
(1)读取成绩文件得到RDD
在这里插入图片描述
(2)利用映射算子生成二元组构成的RDD
在这里插入图片描述
(3)按键分组得到新的二元组构成的RDD
在这里插入图片描述
(4)按值排序,取前三
在这里插入图片描述
(5)按指定格式输出结果
在这里插入图片描述
2、在IntelliJ IDEA里完成任务
(1)打开RDD项目 创建分组排行榜单例对象
在这里插入图片描述

package net.cxf.rdd.day07

import org.apache.spark.{SparkConf, SparkContext}
/**
 * 功能:成绩分组排行榜
 * 作者:cxf
 * 日期:2023年05月24日
 */
object GradeTopN {
  def main(args: Array[String]): Unit = {
    // 创建Spark配置对象
    val conf = new SparkConf()
      .setAppName("GradeTopN") // 设置应用名称
      .setMaster("local[*]") // 设置主节点位置(本地调试)
    // 基于Spark配置对象创建Spark容器
    val sc = new SparkContext(conf)
    // 实现分组排行榜
    val top3 = sc.textFile("hdfs://master:9000/topn/input/grades.txt")
      .map(line => {
        val fields = line.split(" ")
        (fields(0), fields(1))
      }) // 将每行成绩映射成二元组(name, grade)
      .groupByKey() // 按键分组
      .map(item => {
        val name = item._1
        val top3 = item._2.toList.sortWith(_ > _).take(3)
        (name, top3)
      }) // 值排序,取前三
    // 输出分组排行榜结果
    top3.collect.foreach(line => {
      val name = line._1
      val scores = line._2.mkString(" ")
      println(name + ": " + scores)
    })
    // 停止Spark容器,结束任务
    sc.stop()
  }
}

运行程序,查看结果
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/562277.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【搭建轻量级图床】本地搭建LightPicture开源图床管理系统,并公网远程访问

文章目录 1.前言2. Lightpicture网站搭建2.1. Lightpicture下载和安装2.2. Lightpicture网页测试2.3.cpolar的安装和注册 3.本地网页发布3.1.Cpolar云端设置3.2.Cpolar本地设置 4.公网访问测试5.结语 1.前言 现在的手机越来越先进,功能也越来越多,而手机…

二十三种设计模式第九篇--代理模式

在代理模式(Proxy Pattern)中,一个类代表另一个类的功能。这种类型的设计模式属于结构型模式。在代理模式中,我们创建具有现有对象的对象,以便向外界提供功能接口。 意图:为其他对象提供一种代理以控制对这…

示范性微电子院校“抢人”,芯片赛道黄不了!

经常看到有同学问,“国内高校微电子专业最好的是哪所高校?”“想搞数字ic设计去哪所大学好呢?” 其实国内28所示范性微电子学院都是非常不错的选择。 2015年,九所示范性微电子院校名单公布,包括了清华大学、北京大学、复旦大学…

8、Linux C/C++ 实现MySQL的图片插入以及图片的读取

本文结合了Linux C/C 实现MySQL的图片插入以及图片的读取,特别是数据库读写的具体流程 一、文件读取相关函数 fseek() 可以将文件指针移动到文件中的任意位置。其基本形式如下: int fseek(FILE *stream, long offset, int whence);其中,str…

kafka 设置用户密码和通过SpringBoot测试

叙述 当前Kafka认证方式采用动态增加用户协议。 自0.9.0.0版本开始Kafka社区添加了许多功能用于提高Kafka群集的安全性,Kafka提供SSL或者SASL两种安全策略。SSL方式主要是通过CA令牌实现,此文主要介绍SASL方式。 1)SASL验证: 验证方式Kaf…

【JavaSE】Java基础语法(六):方法详解

文章目录 1. 方法概述1.1 方法的概念 2. 方法的定义和调用2.1 方法的定义2.2 方法的调用过程 3. 带参数方法的定义和调用3.1 带参数方法定义和调用3.2 形参和实参 4. 带返回值方法的定义和调用4.1 带返回值方法定义和调用4.2 带返回值方法的练习-求两个数的最大值(应用) 5. 方法…

【链接】深入理解PLT表和GOT表

系列综述: 💞目的:本系列是个人整理为了秋招面试的,整理期间苛求每个知识点,平衡理解简易度与深入程度。 🥰来源:材料主要源于多处理器编程的艺术进行的,每个知识点的修正和深入主要…

nest日志包pino、winston配置-懒人的折腾

nest日志 三种node服务端日志选型 winstonpinolog4js 2023年5月23日 看star数:winston > pino > log4js 使用体验: pino 格式简洁,速度快,支持输入日志到任意数据库,日志暂无自动清理(可能是我…

AI是怎么帮我写代码,写SQL的?(本文不卖课)

近期,ChatGPT风起云涌,“再不入局,就要被时代淘汰”的言论甚嚣尘上,借着这一波创业的朋友都不止3-4个,如果没记错,前几次抛出该言论的风口似乎是区块链,元宇宙,WEB3.0。 画外音&…

动态规划问题实验:数塔问题

目录 前言实验内容实验流程实验过程实验分析伪代码代码实现分析算法复杂度用例测试 总结 前言 动态规划是一种解决复杂问题的方法,它将一个问题分解为若干个子问题,然后从最简单的子问题开始求解,逐步推导出更复杂的子问题的解,最…

绝世内功秘籍《调试技巧》

本文作者:大家好,我是paper jie,感谢你阅读本文,欢迎一建三连哦。 内容专栏:这里是《C知识系统分享》专栏,笔者用重金(时间和精力)打造,基础知识一网打尽,希望可以帮到读者们哦。 内…

CloudQuery v2.0.0 发布 新增数据保护、数据变更、连接管理等功能

哈喽社区的小伙伴们,经过一个月的努力,CloudQuery 社区版发布了全新 v2.0.0系列! 对比 v1.5.0,v2.0.0 在整体 UI 界面上就做了很大调整,功能排布我们做了重新梳理,可以说,社区版 v2.0.0 带领 C…

Linux——makefile自动化构建工具

一. 前言 一个工程中的源文件不计数,其按类型、功能、模块分别放在若干个目录中,makefile定义了一系列的 规则来指定,哪些文件需要先编译,哪些文件需要后编译,哪些文件需要重新编译,甚至于进行更复杂 的功能…

数据结构的定义

主要的定义 数据 描述客观事物的数和字符的集合,比如文字,数字和特殊符号 基本单元:数据元素 一个数据单元由若干个数据项构成 数据项:具有独立含义的数据最小单元,也称字段或域 数据元素&…

Spring Boot 中的 Starter 是什么?如何创建自定义 Starter?

Spring Boot 中的 Starter 是什么?如何创建自定义 Starter? Spring Boot 是一个快速构建应用程序的框架,它提供了一种简单的方式来快速启动和配置 Spring 应用程序。Spring Boot Starter 是 Spring Boot 的一个重要概念,它可以帮…

计算机网络详细笔记(四)网际控制报文协议ICMP

文章目录 4.网际控制报文协议ICMP4.1.ICMP报文的种类4.2.ICMP应用举例 4.网际控制报文协议ICMP 网际控制报文协议概述:: 作用:更有效地转发IP数据报和提高交付成功的机会。原理:允许主机或路由器报告差错情况和提供有关异常情况…

maven_SSM项目如何实现验证码功能

验证码的作用 防止恶意注册,自动化程序批量注册。防止暴力破解。 1、这里我们使用goole的验证码生成器 由于直接在maven中引入依赖,没有找到。所以只能直接去下载jar包了。 链接:https://pan.baidu.com/s/1KANhJKI4sQCfkiroTVr0WA?pwd29iv …

Oracle数据库环境变量配置修改数据库密码

1.设置环境变量: 必须设置环境变量才可以用CMD命令访问Oracle数据库 1.1.首先找到你Oracle安装位置路径 C:\app\Administrator\product\11.2.0\dbhome_1 1.2.设置环境变量 1.2.1 设置Adimistrator变量 变量名: ORACLE_HOME 变量值:C:\app…

嵌入式学习之Linux驱动(第九期_设备模型_教程更新了)_基于RK3568

驱动视频全新升级,并持续更新~更全,思路更科学,入门更简单。 迅为基于iTOP-RK3568开发板进行讲解,本次更新内容为第九期,主要讲解设备模型,共计29讲。视频选集 0.课程规划 06:35 1.抛砖引玉-设备模型…

K8s in Action 阅读笔记——【3】Pods: running containers in Kubernetes

K8s in Action 阅读笔记——【3】Pods: running containers in Kubernetes 3.1 Introducing pods 在Kubernetes中,Pod是基本构建块之一,由容器集合组成。与独立部署容器不同,你总是要部署和操作一个Pod。Pod并不总是包含多个容器&#xff0…