Spark经典案例分享

news2025/1/11 6:11:20

Spark经典案例

  1. 链接操作案例
  2. 二次排序案例

链接操作案例

案例需求

数据介绍

代码如下:

package base.charpter7

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

/**
 * @projectName sparkGNU2023  
 * @package base.charpter7  
 * @className base.charpter7.Join  
 * @description ${description}  
 * @author pblh123
 * @date 2023/11/28 17:25
 * @version 1.0
 *
 */
    
object Join {
  def main(args: Array[String]): Unit = {

    //    1. 创建一个sc对象
    if (args.length != 4) {
      println("usage is WordCount <rating> <movie> <output>")
      System.exit(5)
    }
    val murl = args(0)
    val ratingfile = args(1)
    val movingfile = args(2)
    val outputfile = args(3)

    val spark: SparkSession = new SparkSession.Builder()
      .appName(s"${this.getClass.getSimpleName}").master(murl).getOrCreate()
    val sc: SparkContext = spark.sparkContext

    //    2. 代码主体
    //    判断输出路径是否存在,存在则删除
    val conf: Configuration = new Configuration()
    val fs: FileSystem = FileSystem.get(conf)
    if (fs.exists(new Path(outputfile))) {
      println(s"存在目标文件夹$outputfile")
      fs.delete(new Path(outputfile))
      println(s"目标文件夹$outputfile 已删除")
    }
    else println(s"目标文件夹$outputfile 不存在")


    //rating etl
    val ratingrdd: RDD[String] = sc.textFile(ratingfile, 1)
    val rating: RDD[(Int, Double)] = ratingrdd.map(line => {
      val fileds: Array[String] = line.split("::")
      (fileds(1).toInt, fileds(2).toDouble)
    })
    val movieScores: RDD[(Int, Double)] = rating.groupByKey().map(x => {
      val avg = x._2.sum / x._2.size
      (x._1, avg)
    })
    //    move etl
    val movierdd: RDD[String] = sc.textFile(movingfile)
    // movieid,(movieid,title)
    val movieskey: RDD[(Int, (Int, String))] = movierdd.map(line => {
      val fileds: Array[String] = line.split("::")
      (fileds(0).toInt, fileds(1))
    }).keyBy(tup => tup._1)

    // movieid,(movieid,avg_rating)
    val sskey: RDD[(Int, (Int, Double))] = movieScores.keyBy(tup => tup._1)
    // movieid, (movieid,avg_rating),(movieid,title)
    val joinres: RDD[(Int, ((Int, Double), (Int, String)))] = sskey.join(movieskey)
    // movieid,avg_rating,title
    val res: RDD[(Int, Double, String)] = joinres.filter(f => f._2._1._2 > 4.0)
      .map(f => (f._1, f._2._1._2, f._2._2._2))
//    val res: RDD[(Int, Double, String)] = sskey.join(movieskey)
//      .filter(f => f._2._1._2 > 4.0)
//      .map(f => (f._1, f._2._1._2, f._2._2._2))

    res.take(5).foreach(println)
    res.saveAsTextFile(outputfile)


    //  3. 关闭sc,spark对象
    sc.stop()
    spark.stop()
  }
}

运行结果

二次排序案例

需求及数据说明:

代码实现

SecondarySortKey.class 方法

package base.charpter7

/**
 * @projectName sparkGNU2023  
 * @package base.charpter7  
 * @className base.charpter7.SecondarySortKey  
 * @description ${description}  
 * @author pblh123
  
* @date 2023/11/29 17:01
  
* @version 1.0
  
*/
    
class SecondarySortKey(val first:Int, val second:Int) extends Ordered[SecondarySortKey] with Serializable{

  override def compare(that: SecondarySortKey): Int = {
    if (this.first - that.first != 0){
      this.first - that.first
    } else {
      this.second - that.second
    }
    }
}
SecondarySortApp.scala方法
package base.charpter7

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

/**
 * @projectName sparkGNU2023  
 * @package base.charpter7  
 * @className base.charpter7.SecondarySortApp  
 * @description ${description}  
 * @author pblh123
 * @date 2023/11/29 17:04
 * @version 1.0
 *
 */
    
object SecondarySortApp {
  def main(args: Array[String]): Unit = {

    //  1. 创建spark,sc对象
    if (args.length != 2) {
      println("您需要输入二个参数")
      System.exit(5)
    }
    val musrl: String = args(0)
    val spark: SparkSession = new SparkSession.Builder()
      .appName(s"${this.getClass.getSimpleName}")
      .master(musrl)
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext

    //  2. 代码主体
    // 读取一个txt文件
    val inputfile: String = args(1)
    val lines: RDD[String] = sc.textFile(inputfile, 1)
    // 进行二次排序
    val pairRDDwithSort: RDD[(SecondarySortKey, String)] = lines.map(line => {
      val strings: Array[String] = line.split(" ")
      (new SecondarySortKey(strings(0).toInt, strings(1).toInt), line)
    })
    val pairRDDwithSort2: RDD[(SecondarySortKey, String)] = pairRDDwithSort.sortByKey(false)
    val sortedRes: RDD[String] = pairRDDwithSort2.map(sortedline => sortedline._2)
    sortedRes.collect().foreach(println)

    //  3. 关闭sc,spark对象
    sc.stop()
    spark.stop()
  }
}

配置参数

运行效果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1267348.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

品牌全渠道营销系统如何与不同经销商ERP打通

品牌商在与各经销商ERP系统打通方面面临的挑战。传统的ERP系统往往使得数据收集和合作变得繁琐且低效&#xff0c;导致市场响应迟缓&#xff0c;影响整体的供应链管理和市场决策。我们的解决方案旨在破解这一难题&#xff0c;提供一个全渠道营销系统&#xff0c;它能自动与各类…

啊哒-MISC-bugku-解题步骤

——CTF解题专栏—— 题目信息&#xff1a; 题目&#xff1a;啊哒 作者&#xff1a;第七届山东省大学生网络安全技能大赛 提示&#xff1a;无 解题附件&#xff1a; 解题思路&#xff1a; 图片的话还是老三样斧winwalk、010Editor、Stegsolve。ok直接开搞&#xff01; 解题…

Typora .MD笔记中本地图片批量上传到csdn (.PNG格式)(无需其他任何图床软件)

Typora .MD笔记中本地图片批量上传到csdn &#xff08;.PNG格式&#xff09;(无需其他任何图床软件) 截图软件推荐 qq 截图 快捷键 ctrlshiftA. 步骤一 设置Typora 的图片 点击文件. 点击偏好设置 ->图像 我们可以选择将图片复制到我们的文件夹中。 建议刚写好文件标题就…

element ui 表格合计项合并

如图所示&#xff1a; 代码&#xff1a; <el-table height"400px" :data"tableData " borderstyle"width: 100%"stripe show-summaryref"table"id"table"> </el-table>监听表格 watch: { //监听table这个对象…

OBC、DCDC自动化测试解决方案!

OBC(车载充电机&#xff09;和DCDC&#xff08;直流-直流变换器&#xff09;是电动汽车的核心部件&#xff0c;DCDC和OBC的功能质量对于整车的性能和安全性至关重要。在OBC和DCDC&#xff0c;以及整车开发测试过程中&#xff0c;需要对OBC和DCDC进行功能和性能方面进行全面的测…

银河麒麟高级服务器操作系统V10安装达梦数据库管理系统DM8——单实例

一、介绍 之前介绍过供个人学习在VMware虚拟机上安装银河麒麟高级服务器操作系统V10&#xff0c;有兴趣的可以去看看&#xff08;银河麒麟V10安装&#xff09;&#xff0c;本次主要学习在银河麒麟V10上安装达梦数据库-DM8。DM8是达梦公司在总结DM系列产品研发与应用经验的基础…

明天就删,限时领取。zui全拼多多直播问题答疑文档合集。

直播流程是什么&#xff1f;什么时间要做什么事&#xff1f;直播带货播出什么数据才算好?怎么提高直播间流量指标&#xff1f;付费起号还是自然起号好&#xff1f;大小循环话术和场控话术怎么说?今天为大家分享一份“zui全直播500问”&#xff1a; 以上内容为“zui全直播500问…

消息队列进阶-3.消息队列常见问题解决方案

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱吃芝士的土豆倪&#xff0c;24届校招生Java选手&#xff0c;很高兴认识大家&#x1f4d5;系列专栏&#xff1a;Spring源码、JUC源码、Kafka原理&#x1f525;如果感觉博主的文章还不错的话&#xff0c;请&#x1f44…

产品软文撰写思路,媒介盒子分享

产品软文的目的是为了将产品卖出去&#xff0c;然而想把产品卖出去&#xff0c;不是靠几句话就能实现的&#xff0c;还需要进行多方面分析&#xff0c;今天媒介盒子就来和大家分享&#xff1a;产品软文撰写思路。 一、 产品体验分享 自己要成为自己产品的深度用户并不是一句空…

洗牙器亚马逊UL1431测试报告检测标准

洗牙器是一种电动口腔清洁工具&#xff0c;用于移除食物残渣和牙菌斑&#xff0c;提高口腔卫生水平。 亚马逊要求商家上架的产品检测报告必须是ISO17025/ILAC ISO 17025标准认可的实验室出具的合格报告。 UL测试报告是根据产品选用相应的UL标准进行测试合格后&#xff0c;出具…

【MySQL源码】使用CLion 远程调试MySQL源码

目录 0 准备工作 1 IDE 2 下载MySQL源码 ​编辑 一 配置CLion 1 添加远程服务器 2 配置远程服务器环境 3 升级gdb版本 4 升级CMake版本 5 修改远程服务器文件上传的目录的对应关系 5 配置cmake 7 初始化MySQL 8 启动MySQL 作为DBA工作多年&#xff0c;如果还是停…

InnoDB存储引擎中的锁

文章目录 概要一、需要解决的问题二、共享锁和独占锁1.1 锁定读1.2 表级别的共享锁、独占锁 三、行锁3.1 数据准备3.2 几种常见的行级锁3.3 行锁升级为表锁 概要 关于MySQL涉及到的锁&#xff0c;大致可以总结如下&#xff1a; MyISAM存储引擎在开发过程中几乎很少使用了&…

虾知(知虾):助力Shopee卖家实现市场分析和选品策略优化的神器

在如今的电商市场竞争激烈的背景下&#xff0c;卖家需要借助数据分析工具来了解市场需求、热销产品和竞争状况&#xff0c;以制定明智的选品策略。而虾知&#xff08;知虾&#xff09;作为一款专为Shopee卖家设计的数据分析工具&#xff0c;为卖家提供全面的市场分析、商品分析…

InstructDiffusion-多种视觉任务统一框架

论文:《InstructDiffusion: A Generalist Modeling Interface for Vision Tasks》 github&#xff1a;https://github.com/cientgu/InstructDiffusion InstructPix2Pix&#xff1a;参考 文章目录 摘要引言算法视觉任务统一引导训练集重构统一框架 实验训练集关键点检测分割图像…

Could NOT find resource [logback-test.xml]

修改 之后就可以正常启动了

wsj0数据集原始文件.wv1.wv2转换成wav文件

文章目录 准备一、获取WSJO数据集二、安装sph2pipe三、转换代码四、结果展示 ​ 最近做语音分离实验需要用到wsj0-2mix数据集&#xff0c;但是从李宏毅语音分离教程里面获取的wsj0-2mix只有一部分。从网上获取到了完整的WSJO数据集后&#xff0c;由于原始的语音文件后缀是wv1或…

Linux安装mongodb数据库(详细)

一、下载安装包 本文使用 tgz 方式,根据服务器类型在官网下载 MongoDB 安装包。官方地址&#xff1a;https://www.mongodb.com/try/download/community 下载方式如图所示&#xff1a; 选择版本 关于 MongoDB 的版本选择&#xff0c;参见如下版本差异&#xff1a; 1、将从官…

推荐几款免费的智能AI伪原创工具

在当今信息快速传播的时代&#xff0c;创作者们常常为了在激烈的竞争中脱颖而出而苦苦挣扎&#xff0c;而其中的一项挑战就是创作出独具创意和独特性的内容。然而&#xff0c;时间有限的现实让很多人望而却步。在这个背景下&#xff0c;免费在线伪原创工具成为了创作者们的得力…

csapp-linklab之第二阶段“输出学号”实验报告

本阶段主题是链接中的“重定位”。两次重定位&#xff0c;一次是绝对地址重定位&#xff0c;一次是PC相对地址重定位。 本题目标依旧是输出学号&#xff0c;反汇编phase2.o&#xff0c;看到学号“0000000000”已经存放在只读数据区了。现在任务就是改do_pheas的指令和重定位表…

示波器高压探头的操作说明及使用注意事项

操作说明&#xff1a; 连接探头衰减端的地线(鳄鱼夹)到好的接地点或可靠的接地测试端。连接BNC头到示波器的BNC输入端口。选择示波器要求的量程范围。 注意&#xff1a;请务必在连接测试前把高压电源关闭。 注意事项&#xff1a; 请勿将测试设备的接地线从地面接线柱上移开。…