大数据 - Spark系列《三》- 加载各种数据源创建RDD

news2025/1/11 4:01:14

Spark系列文章:

大数据 - Spark系列《一》- 从Hadoop到Spark:大数据计算引擎的演进-CSDN博客

大数据 - Spark系列《二》- 关于Spark在Idea中的一些常用配置-CSDN博客

目录

3.1🧀加载文件(本地)

1. 加载本地文件路径

🌮使用textFile加载本地txt文件

🌮使用textFile加载本地json文件

🌮使用sequenceFile加载本地二进制文件

🌮HDFS也可以 (hdfs://doe01:8020/data/wds/)

3.2 🧀本地集合(测试)

3.3 🧀加载mysql

1. 🌮环境准备

2. 🌮创建Spark应用程序


3.1🧀加载文件(本地)

1. 加载本地文件路径

  • 🌮使用textFile加载本地txt文件
  • 🌮使用textFile加载本地json文件
  • 🌮使用sequenceFile加载本地二进制文件

二进制文件加载后的RDD中每个元素都是一个键值对,其中键和值的类型由用户指定。

/**
 * 加载文本文件 创建RDD
 * 参数1  文件路径
 * 参数2   最小分区数 默认2
 * RDD =  迭代器+分区信息  一行一行的迭代数据
 */
// 从本地文件系统加载(只适用于开发测试)
val rdd: RDD[String] = sc.textFile("local/path/to/text/file", 2)
val rdd: RDD[String] = sc.textFile("local/path/to/json/file", 2)


//-------------------------------------------------
//   User.class   asInstanceOf
val res = sc.sequenceFile("local/path/to/binary/file", classOf[String], classOf[Int])
// 其中第一个参数是文件路径,第二个参数是键的类型,第三个参数是值的类型。
  • 🌮HDFS也可以 (hdfs://doe01:8020/data/wds/)
// 从HDFS文件系统加载(对应绝大多数生产应用场景)
val data: RDD[String] = sc.textFile("hdfs://hadoop01:8020/data/words/", 2)
data.foreach(println)

 🥙练习1:使用textFile加载本地txt文件 - 统计每个城市下订单总额

//数据:orders.txt
oid01,100,bj
oid02,100,bj
oid03,100,bj
oid04,100,nj
oid05,100,nj
package com.doit.day0130

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @日期: 2024/1/31
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description: Spark应用程序入口,用于计算订单数据中各个城市的订单总金额
 */

object StartGetting {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    // 加载订单数据
    val rdd1 = sc.textFile("data/orders.txt")

    // 将订单数据转换为键值对(city, amount),其中city为键,amount为值
    val rdd2 = rdd1.map(line => {
      val arr = line.split(",")
      (arr(2), arr(1))
    })

    // 根据城市对订单数据进行分组
    val rdd3 = rdd2.groupBy(_._1)

    // 计算每个城市的订单总金额
    val rdd4 = rdd3.map(tp => {
      val city = tp._1
      val sum = tp._2.map(_._2.toInt).sum
      (city, sum)
    })

    // 将结果保存到输出文件中
    rdd4.saveAsTextFile("data/citysum_output")
    
   // 将结果保存并保存为sequenceFile文件
    rdd4.saveAsTextFile("data/citysum_output_seq")

    // 关闭SparkContext对象,释放资源
    sc.stop()
  }
}

结果:

🥙练习2:使用textFile加载本地json文件 - 去获取每部电影的平均分

Spark-关于Json数据格式的数据的处理与练习

🥙练习3:使用sequenceFile加载本地二进制文件(练习1出来的结果data/citysum_output_seq) - 将seq文件的数据转换为Object对象,并打印出所有的城市

// 城市对象类
case class CityObj(
                    // 城市名称
                    city: String,
                    // 数量
                    num: Int
                  )
package com.doit.day0201

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.{Level, Logger}
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable

/**
 * @日期: 2024/2/1
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description:
 */


object Test01 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

    //sequenceFile 就是序列化文件 K-V K-V K1-V1 【序列化】
    // 加载 Sequence 文件并创建 RDD
    val rdd1 = sc.sequenceFile("data/citysum_output_seq/", classOf[Text], classOf[IntWritable])
    val newrdd = sc.sequenceFile[String, Int]("data/citysum_output_seq/", 2)

    newrdd.foreach(println)
    // 转换为对象并提取城市数据
    val cities = rdd1.map { case (textKey, intValue) =>
      // 将 Hadoop 的 Text 对象和 IntWritable 对象转换为 Scala 字符串和整数
      val city = textKey.toString
      val count = intValue.get()

      // 创建 CityObj 对象
      CityObj(city, count)
    }


    // 提取并打印所有城市
    val uniqueCities = cities.map(_.city).foreach(println)

    sc.stop()
  }
}

结果:

注意点:

  1. 类型匹配sequenceFile 方法需要指定键和值的类型参数,这些类型应该与文件中实际的数据类型匹配。通常情况下,键和值的类型会使用 Hadoop 库中的数据类型,如 TextIntWritable 等。

  2. 类型转换:在处理文件数据时,需要将 Hadoop 的 Text 类型转换为 Scala 的 String 类型,将 IntWritable 类型转换为 Scala 的 Int 类型。

🥙练习4:使用textFile加载hdfs txt文件 - 每个字母代表一个人 , 统计任意一个人和其他人的共同好友 

//数据:f.txt
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
package com.doit.day0201

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.{Level, Logger}
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable

/**
 * @日期: 2024/2/2
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description: 实现统计每个人与其他人的共同好友
 */

object Test02 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    // 从HDFS读取数据创建RDD
    val rdd1 = sc.textFile("hdfs://hadoop01:8020/spark/data/f.txt", 2)

    // 对每行数据进行处理,生成以每个人为key,其好友为value的RDD
    val rdd2: RDD[(String, String)] = rdd1.flatMap(line => {
      val arr1 = line.split(":")
      val name = arr1(0)
      val arr2 = arr1(1).split(",")
      arr2.map(tp => (name, tp))
    })

    // 将数据按照每个人分组,形成键值对的RDD,键为人名,值为其好友列表
    val rdd3 = rdd2.groupBy(_._1)

    // 转换RDD结构,将Iterable转换为List
    val rdd4 = rdd3.map(tp => {
      val name = tp._1
      val fr: Iterable[String] = tp._2.map(_._2)
      (name, fr)
    })

    // 将RDD转换为List
    val list: List[(String, Iterable[String])] = rdd4.collect().toList

    // 遍历List中的每个元素,计算交集
    for (i <- 0 to list.size; j <- i + 1 to list.size) {
      val tuple: (String, Iterable[String]) = list(i)
      val tuple1 = list(j)

      // 计算两人好友列表的交集
      val v3 = tuple._2.toList.intersect(tuple1._2.toList)
      println(s"${tuple._1}与${tuple1._1}的交集为" + v3)
    }

    // 关闭SparkContext
    sc.stop()
  }
}

结果:

3.2 🧀本地集合(测试)

在Spark中,makeRDD方法用于将本地集合或序列转换为RDD。它接受一个Seq类型的集合作为参数,并可选地接受一个表示分区数量的整数参数。

  • 默认分区 环境的所有可用核数

  • 创建的时候可以通过参数设置分区

package com.doit.day0201

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.{Level, Logger}
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable

import scala.collection.mutable

/**
 * @日期: 2024/2/4
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description: 示例RDD的创建和并行度设置
 */

// 定义一个城市对象,包含城市名和人口数量
case class CityObj(name: String, population: Int)

object Test04 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    // 创建一个List集合,包含城市对象
    val city = List(
      CityObj("shanghai", 5000000),
      CityObj("beijing", 9800000),
      CityObj("nanjing", 5500000)
    )

    // 将List集合直接转换为RDD,默认并行度为所有可用核数
    val rdd1 = sc.makeRDD(city)
    // 将List集合转换为RDD,并指定并行度为2
    val rdd2 = sc.makeRDD(city, 2)

    // 打印RDD的分区数
    println(rdd1.getNumPartitions) // 16
    println(rdd2.getNumPartitions) // 2

    // 创建一个可变的HashMap,包含姓名和年龄
    val map = mutable.HashMap[String, Int](("zss", 23), "lss" -> 33)

    // HashMap不可以直接传入makeRDD,需要先转换为List再传入
    val rdd3 = sc.makeRDD(map.toList)
    // 打印RDD的分区数
    println(rdd3.getNumPartitions) // 16

    // 关闭SparkContext
    sc.stop()
  }
}

HashMap不可直接使用makeRDD方法

对于HashMap类型的集合,由于其不是Seq的子类,因此无法直接使用makeRDD方法进行转换。通常情况下,可以先将HashMap转换为List,再使用makeRDD方法,示例如下:

val map = mutable.HashMap[String, Int](("zss", 23), "lss" -> 33) 
// HashMap不可以直接传入makeRDD,需要先转换为List再传入 
val rdd3 = sc.makeRDD(map.toList)

 

3.3 🧀加载mysql

1. 🌮环境准备

在开始之前,需要确保以下环境已经准备好:

  • Spark环境:确保已经安装和配置了Spark,并且可以正常运行Spark应用程序。

  • MySQL数据库:确保MySQL数据库已经安装并且可以访问。需要提供数据库连接地址、用户名和密码。

//创建表和插入数据
CREATE TABLE `salary` (
  `empid` int NOT NULL,
  `basesalary` double DEFAULT NULL,
  `titlesalary` double DEFAULT NULL,
  `deduction` double DEFAULT NULL,
  PRIMARY KEY (`empid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

INSERT INTO `salary` (`empid`, `basesalary`, `titlesalary`, `deduction`) VALUES
(1001, 2200, 1100, 200),
(1002, 1200, 200, NULL),
(1003, 2900, 700, 200),
(1004, 1950, 700, 150);
  • 在pom.xml里面添加mysql依赖

<!-- https://mvnrepository.com/artifact/com.mysql/mysql-connector-j -->
<dependency>
    <groupId>com.mysql</groupId>
    <artifactId>mysql-connector-j</artifactId>
    <version>8.2.0</version>
</dependency>

2. 🌮创建Spark应用程序

使用JdbcRDD从MySQL数据库读取数据,需要注意以下几个关键参数:

1)SparkContext 对象 (sc): 这是 Spark 应用程序的主要入口点,需要传递给 JdbcRDD 构造函数。

2)数据库连接函数 (conn): 这是一个无参数的函数,用于获取数据库连接。在函数体内,应该使用 DriverManager.getConnection 方法来获取数据库连接,并指定数据库的连接地址、用户名和密码。

3)查询 SQL 语句 (sql): 这是用于执行数据库查询的 SQL 语句。你可以在 SQL 语句中使用占位符(?)来表示查询参数,后续会通过 JdbcRDD 的其他参数来提供具体的查询范围。

4)查询参数范围: 通过指定起始和结束的查询参数值来定义查询范围。这些参数值会传递给 SQL 语句中的占位符,以便在查询时动态指定查询条件。

5)并行度 (numPartitions): 这指定了创建的 RDD 的分区数,也就是并行度。它决定了查询在 Spark 集群中并行执行的程度。通常情况下,可以根据数据量和集群资源情况来设置并行度,以提高查询性能。

6)结果集处理函数 (resultSetHandler): 这是一个函数,用于处理从数据库返回的查询结果。你需要实现这个函数来定义对查询结果的处理逻辑,例如提取需要的字段、转换数据类型等。

package com.doit.day0201

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{DriverManager, ResultSet}

/**
 * @日期: 2024/2/4
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description: 使用JdbcRDD从MySQL数据库读取数据的示例
 */

object Tes05 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    /**
     * 参数一 sc
     * 参数二 函数  获取连接对象
     * 参数三 查询sql  要求  必须指定查询范围
     * 参数4 5 数据范围
     * 参数6 并行个数
     * 参数7 处理返回结果的函数
     */

    // 定义一个函数来获取数据库连接
    val conn = () => {
      DriverManager.getConnection("jdbc:mysql://localhost:3306/day02_test02_company", "root", "123456")
    };

    // 定义查询SQL语句
    val sql = "select empid,basesalary,titlesalary from salary where empid >= ? and empid <= ?"

    // 定义结果集处理函数
    val f2 = (rs: ResultSet) => {
      // 每条结果数据的处理逻辑
      val id = rs.getInt(1)
      val basesalary = rs.getDouble(2)
      val titlesalary = rs.getDouble(3)
      (id, basesalary, titlesalary)
    }

    // 创建JdbcRDD并执行查询
    val rdd1 = new JdbcRDD(sc, conn, sql, 1002, 1003, 1, f2)
    rdd1.foreach(println)


    // 停止SparkContext
    sc.stop()
  }
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1433669.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

倍增法+LCA(C/C++)

目录 1 介绍 2 基本模板 1 介绍 倍增法(binary lifting)&#xff0c;是一种每次将情况翻倍从而将线性处理转化为对数级处理&#xff0c;进而极大优化时间复杂度的方法。 2 基本模板 //预处理复杂度同为O(nlogn),查询时间上&#xff0c;ST表为O(1),线段树为O(logn) #inc…

[C++] 如何使用Visual Studio 2022 + QT6创建桌面应用

安装Visual Studio 2022和C环境 [Visual Studio] 基础教程 - Window10下如何安装VS 2022社区版_visual studio 2022 社区版-CSDN博客 安装QT6开源版 下载开源版本QT Try Qt | 开发应用程序和嵌入式系统 | Qt Open Source Development | Open Source License | Qt 下载完成&…

华为交换机配置Qos

QoS在企业网中的应用 在企业网络中&#xff0c;QoS的一系列技术不要求在同一台设备上应用&#xff0c;而应根据业务需要在不同位置应用。 图5 QoS技术在企业网络中的应用 理论上来说&#xff0c;各层次设备的功能如下&#xff1a; l 接入层业务识别 接入交换机LSW1作为边界…

常用工具类-Arrays

常用工具类-Arrays 数组打印创建数组比较数组数组排序和检索数组转ListsetAll 和 parallelSetAll 数组打印 Arrays提供了toString()方法&#xff0c;可以直接将数组的内容打印出来&#xff0c;极为便捷。 String[] strArr new String[] {"1","2","…

16.docker删除redis缓存数据、redis常用基本命令

1.进入redis容器内部 &#xff08;1&#xff09;筛选过滤出redis容器 docker ps | grep "redis"&#xff08;2&#xff09;进入redis容器 #说明&#xff1a;d24为redis容器iddocker exec -it d24 /bin/bash2.登陆redis (1) 进入redis命令行界面 redis-cli说明&a…

go消息队列RabbitMQ - 订阅模式-direct

1.发布订阅 在Fanout模式中&#xff0c;一条消息&#xff0c;会被所有订阅的队列都消费。但是&#xff0c;在某些场景下&#xff0c;我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。 在Direct模型下&#xff1a; 队列与交换机的绑定&#xff0c;不能…

javaEE - 21( 15000字 Tomcat 和 HTTP 协议入门 -2)

一&#xff1a; HTTP 响应 1.1 认识 “状态码” (status code) 状态码表示访问一个页面的结果. (是访问成功, 还是失败, 还是其他的一些情况…)&#xff0c;以下为常见的状态码. 1.1.1 200 OK 这是一个最常见的状态码, 表示访问成功. 抓包抓到的大部分结果都是 200 HTTP/…

004集—二调数据库标注分子分母模式及统计净面积——arcgis

二调数据库中分子分母标注方法为&#xff1a; 表达式如下&#xff1a; "<und>"& [TBBH] &"</und>" &vbnewline& [DLBM] "<und>"&[DLBM]&"</und>" &vbnewline& [DLMC] &quo…

Anaconda的安装及其配置

一、简介 Anaconda是一个开源的包、环境管理器&#xff0c;主要具有以下功能和特点&#xff1a; 提供conda包管理工具&#xff1a;可以方便地创建、管理和分享Python环境&#xff0c;用户可以根据自己的需要创建不同的环境&#xff0c;每个环境都可以拥有自己的Python版本、库…

【零基础入门TypeScript】Union

目录 语法&#xff1a;Union文字 示例&#xff1a;Union类型变量 示例&#xff1a;Union 类型和函数参数 Union类型和数组 示例&#xff1a;Union类型和数组 TypeScript 1.4 使程序能够组合一种或两种类型。Union类型是表达可以是多种类型之一的值的强大方法。使用管道符号…

【动态规划】【状态压缩】【2次选择】【广度搜索】1494. 并行课程 II

作者推荐 视频算法专题 本文涉及知识点 动态规划汇总 状态压缩 广度优先搜索 LeetCode1494. 并行课程 II 给你一个整数 n 表示某所大学里课程的数目&#xff0c;编号为 1 到 n &#xff0c;数组 relations 中&#xff0c; relations[i] [xi, yi] 表示一个先修课的关系&am…

关于在大模型中遇到的6(colab)

来源&#xff1a;免费的GPU——colab使用教程 - 知乎 作者&#xff1a;焦龙 首先你得注册一个google账号才能用google的这些东西&#xff0c;然后你需要科学上网才能去注册google账号&#xff0c;这些我就不说了。 colab网址&#xff1a;运行代码的地方 https://colab.rese…

Hack The Box-Challenges-Misc-M0rsarchive

解压压缩包&#xff0c;里面是一张图片和一个新的zip文件 图片放大后的图案是----. 考虑到为莫斯密码&#xff0c;将其解密 密码为9&#xff0c;继续解压缩包 又是一张莫斯密码图加压缩包&#xff0c;写一段脚本去解密图片中的莫斯密码&#xff0c;并自动解压缩包 import re i…

cesium-测量高度垂直距离

cesium做垂直测量 完整代码 <template><div id"cesiumContainer" style"height: 100vh;"></div><div id"toolbar" style"position: fixed;top:20px;left:220px;"><el-breadcrumb><el-breadcrumb-i…

情人节送什么礼物合适?情人节礼物最佳排行榜

​马上就要到情人节了&#xff0c;大家是否已经选好了送给爱人的礼物呢&#xff1f;如果没有&#xff0c;或许可以考虑一些优质的数码产品。随着科技的发展&#xff0c;数码产品已经成为我们生活中不可或缺的一部分。接下来&#xff0c;我将为大家推荐几款非常适合作为情人节礼…

LeetCode:2.两数相加

目录 题目&#xff1a;​编辑2. 两数相加 - 力扣&#xff08;LeetCode&#xff09; 分析问题&#xff1a; 官方的优秀代码博主的注释&#xff1a; 博主的辣眼代码&#xff0c;无注释&#xff0c;拉出来拷打自己&#xff1a; 每日表情包&#xff1a; 2. 两数相加 - 力扣&am…

什么是信创业态支持?支持信创的数据库防水坝哪家好?

随着国产化信创化的崛起&#xff0c;出现了很多新名词&#xff0c;例如信创业态支持、国产信创化等等。今天我们就来聊聊什么是信创业态支持&#xff1f;支持信创的数据库防水坝哪家好&#xff1f; 什么是信创业态支持&#xff1f; 大范围而言&#xff0c;信创业态支持可以理解…

【404App】一篇文章搞定SSL证书更换,赶紧收藏吧!

场景:阿里云服务器,nginx 第一步:下载免费证书,登录购买域名的阿里云账户,选择免费证书申请、下载(https://yundun.console.aliyun.com/?spm=5176.12818093_-1363046575.ProductAndResource–ali–widget-product-recent.2.3be916d0qj5Z8O&p=cas#/certExtend/free/…

uniapp中配置开发环境和生产环境

uniapp在开发的时候&#xff0c;可以配置多种环境&#xff0c;用于自动切换IP地址&#xff0c;用HBuilder X直接运行的就是开发环境&#xff0c;用HBuilder X发布出来的&#xff0c;就是生产环境。 1.使用HBuilder X创建原生的uniapp程序 选择vue3 2.什么都不改&#xff0c;就…

ChatGPT辅助编程,一次有益的尝试

如果大家想学习PCIe&#xff0c;搜索网上的信息&#xff0c;大概率会看到chinaaet上Felix的PCIe扫盲系列的博文 Felix-PCIe扫盲 每次看这个系列博文的时候&#xff0c;我都在想有没有什么方法可以把这个系列的博文都保存到一个pdf文件中&#xff0c;这样方便阅读。于是有了下…