用一个例子告诉你 怎样在spark中创建RDD

news2025/1/12 1:52:50

目录

1. 前言

2. 分发驱动中scala集合中的数据

2.1 parallelize

2.2 makeRDD

2.3 range

3. 分发外部存储系统中的数据

3.1 textFile

3.2 wholeTextFiles


1. 前言

众所周知,spark是一种计算引擎(用来计算数据),但是数据从何而来呢?
       spark获取数据主要有两种方式:
             方式1:
                     分发驱动程序中scala集合中的数据
             方式2:
                     分发外部存储系统中的数据(HDFS、HBase、其他共享文件系统)

spark将读来的数据,分发到了哪里去?
       spark是一个分布式计算引擎,spark会将读取来的数据
                 按照指定的并行度,分发到不同的计算节点上去


2. 分发驱动中scala集合中的数据

spark提供了两个方法,用来将本地集合的数据(客户端JVM)切分成若干份

                                      然后再分发到不同的计算节点中去
    主要有两个参数:
             seq: Seq[T]       本地集合
             numSlices: Int    切片数(可选参数,不指定时使用默认切片数)

2.1 parallelize

  test("parallelize") {
    // 初始化 spark配置实例
    val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
    // 初始化 spark环境对象
    val sc: SparkContext = new SparkContext(sparkconf)

    val list = List("java", "scala", "c++", "c#")

    // 指定切片数
    val rdd1: RDD[String] = sc.parallelize(list, 2)
    // 使用默认切片
    val rdd2: RDD[String] = sc.parallelize(list)

    sc.stop()
  }

2.2 makeRDD

  test("makeRDD") {
    /*
    * TODO : 源码中 makeRDD 调用的还是 parallelize方法
    *
    * */

    // 初始化 spark配置实例
    val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
    // 初始化 spark环境对象
    val sc: SparkContext = new SparkContext(sparkconf)

    val list = List("java", "scala", "c++", "c#")

    // 指定切片数
    val rdd1: RDD[String] = sc.makeRDD(list, 2)
    // 使用默认切片
    val rdd2: RDD[String] = sc.makeRDD(list)

    sc.stop()
  }

2.3 range

def range(start: Long,  end: Long,step: Long = 1,numSlices: Int = defaultParallelism): RDD[Long]

功能:
           创建一个Long类型的RDD,元素内容为 start到end,公差为step 的等差数列

参数:
           start: Long, 起始位置
           end: Long, 结束位置,不包括该位置
           step: Long = 1, 数列公差,默认为1
           numSlices: Int = defaultParallelism 切片数,不指定时使用默认切片数

使用场景:
          常用来造数据使用

  test("range") {
    // 初始化 spark配置实例
    val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
    // 初始化 spark环境对象
    val sc: SparkContext = new SparkContext(sparkconf)

    val rdd: RDD[Long] = sc.range(0, 10)

    sc.stop()
  }

3. 分发外部存储系统中的数据

spark提供了两个方法,用于将外部文件数据切片后,再分发到不同的计算节点上去
    主要有两个参数:
           path: String  指定文件系统URL
           minPartitions: Int  指定切片数(不指定时,使用默认切片数)


使用限制:
对文件系统的要求:
         读取的文件系统必须是 HDFS、本地文件系统、任何hadoop支持的文件系统
对读取文件的要求:
         文件格式必须是 text格式且UTF-8
对url的要求:
         支持单个文件  /my/directory/1.txt
         支持多个文件  /my/directory/*.txt
         支持目录         /my/directory (目录下的必须都是文件,不能有目录存在)
                 java.io.IOException: Path: /dir/dir2 is a directory, which is not supported by the record 
                 只能读取目录下的文件,不会对子目录进行遍历读取
         支持gz格式的压缩文件 /my/directory/*.txt


3.1 textFile

返回 RDD[String] 格式的rdd,每个元素内容为 读取到text文件的每行rdd的长度为所有文件的行数

  test("textFile") {
    // 初始化 spark配置实例
    val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
    // 初始化 spark环境对象
    val sc: SparkContext = new SparkContext(sparkconf)

    // 读取目录下的所有文件
    val rdd: RDD[String] = sc.textFile("src/main/resources/data/dir")
    // 读取gz格式的压缩文件
    //val rdd: RDD[String] = sc.textFile("src/main/resources/data/dir/1.txt.gz")
    rdd.foreach(println(_))

    sc.stop()
  }

3.2 wholeTextFiles

返回 RDD[(String, String)] 格式的rdd,每个元素内容为 (文件名称,文件内容),rdd的长度为读取到的文件个数

  test("wholeTextFiles") {
    // 初始化 spark配置实例
    val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
    // 初始化 spark环境对象
    val sc: SparkContext = new SparkContext(sparkconf)

    /*
    * TODO data/dir目录下 虽然存在目录不会报错,但是读取时会过滤掉目录,并不会递归读取子目录
    * */
    val rdd: RDD[(String, String)] = sc.wholeTextFiles("src/main/resources/data/dir")
    rdd.foreach(e => println(s"fileName:${e._1}   data:${e._2}"))

    sc.stop()
  }

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/372906.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Windows注册表的读写操作

目录1 注册表(Registry)介绍1.1 注册表简介1.2 注册表位置1.3 开启/禁用 注册表编辑器1.4 注册表的结构1.5 修改注册表实例2 程序中对注册表的读写操作2.1 打开和关闭注册表2.2 创建和删除指定的注册表键2.3 读取和设置指定注册表中某个键值2.4 增加和删除注册表键中某个键值2.…

华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典

文章目录2023 年用 Python 语言解华为 OD 机试题,一篇博客找全。华为 OD 机试题清单(机试题库还在逐日更新)2023 年用 Python 语言解华为 OD 机试题,一篇博客找全。 在 2023 年,Python 已成为广泛使用的编程语言之一&…

rabbitmq添加用户,虚拟机步,设置rabbitmq配置文件

第一步,登录后台控制页面 http://ip:15672第二步,添加用户和权限 重点:选择Admin和Users 第三步,添加虚拟机 点击侧边的Virtual Hosts 第四步将虚拟机和用户搭配 注意新建好后,在虚拟机列表中,点击虚拟机…

ubuntu安装spinningup

ubuntu安装spinningup 经过这篇博客安装好mujoco和mujoco-py后,下面安装强化学习代码库spinningup 按照spinningup官网的安装步骤走,下面我总结一下安装过程中出现的问题 在安装spinningup的时候,最好重建一个新的虚拟环境,因为…

基于springboot+vue的校园招聘系统

博主主页:猫头鹰源码 博主简介:Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容:毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 项目介绍…

Java基础语法的心得总结

一、数据存储的理解可以参考第四大部分https://blog.csdn.net/xiaoxixicc/article/details/124222375个人理解:栈内存中保存的实际上是对象在堆内存中的引用地址。通过这个引用地址可以快速查找到保存中堆内存中的对象。二、static静态变量(共享的作用&a…

[数据结构]:05-循环队列(链表)(C语言实现)

目录 前言 已完成内容 循环队列实现 01-开发环境 02-文件布局 03-代码 01-主函数 02-头文件 03-QueueCommon.cpp 04-QueueFunction.cpp 结语 前言 此专栏包含408考研数据结构全部内容,除其中使用到C引用外,全为C语言代码。使用C引用主要是为了…

(三十七)大白话SQL标准中对事务的4个隔离级别,都是如何规定的呢?

之前我们给大家讲了数据库中多个事务并发时可能产生的几种问题,包括了脏写、脏读、不可重复读、幻读,几种问题 那么针对这些多事务并发的问题,实际上SQL标准中就规定了事务的几种隔离级别,用来解决这些问题。 注意一下&#xff…

SSM项目 替换为 SpringBoot

一、运行SSM项目 保证项目改为SpringBoot后运行正常,先保证SSM下运行正常。 项目目录结构 创建数据库,导入sql文件 查看项目中连接数据jar版本,修改对应版本,修改数据库配置信息 配置启动tomcat 运行项目,测试正常…

考虑极端天气线路脆弱性的配电网分布式电源和储能优化配置模型

目录 1 主要内容 1.1 线路脆弱性分析 ​编辑 1.2 配电网线路故障分析 1.3 蒙特卡洛随机抽样的线路脆弱性分析模型伪代码 1.4 配电网储能和光伏优化配置 2 程序效果 3 下载链接 1 主要内容 程序主要参考《考虑极端天气线路脆弱性的配电网分布式电源配置优化模型-马宇帆》…

RL笔记:动态规划(2): 策略迭代

目录 0. 前言 (4.3) 策略迭代 Example 4.2: Jack’s Car Rental Exercise 4.4 Exercise 4.5 Exercise 4.6 Exercise 4.7 0. 前言 Sutton-book第4章(动态规划)学习笔记。本文是关于其中4.2节(策略迭代)。 (4.3) 策略迭代 基…

【JavaWeb】复习重点内容

✅✅作者主页:🔗孙不坚1208的博客 🔥🔥精选专栏:🔗JavaWeb从入门到精通(持续更新中) 📋📋 本文摘要:本篇文章主要分享JavaWeb的学习重点内容。 &a…

C++11多线程编程 二:多线程通信,同步,锁

C11多线程编程 一:多线程概述 C11多线程编程 二:多线程通信,同步,锁 C11多线程编程 三:锁资源管理和条件变量 2.1 多线程的状态及其切换流程分析 线程状态说明: 初始化(Init)&am…

javaEE 初阶 — 网络层中 IP 协议 的报文结构

文章目录IP 协议报文4位版本号4位首部长度8位服务类型16位总长度(字节数)8位生存时间(TTL)与 8位协议16位首部校验和32位源 IP 地址与32位目标 IP 地址动态分配的 IP 地址NAT 网络地址转换IPv6IP 协议报文 4位版本号 这里的 IP 协…

图表示学习+对比学习入门必看:DGI

来源:投稿 作者:kon 编辑:学姐 前言 众所周知,火热的对比学习不仅在CV取得了很多成果,也在NLP、推荐等领域大放异彩。自然的,有人将对比学习引入了图表示学习领域,利用图本身的结构与结点自身的…

14.微服务SpringCloud

一、基本概念 Spring Cloud 被称为构建分布式微服务系统的“全家桶”,它并不是某一门技术,而是一系列微服务解决方案或框架的有序集合。它将市面上成熟的、经过验证的微服务框架整合起来,并通过 Spring Boot 的思想进行再封装,屏蔽…

【Servlet篇】Response对象详细解读

文章目录Response 继承体系Response 设置响应数据设置响应行数据设置响应头数据设置响应体数据Response 重定向Response 响应字符数据Response 响应字节数据Response 继承体系 前面说到,我们使用 Request 对象来获取请求数据,使用 Response 对象来设置响…

Pyinstaller 打包EXE(七) 百篇文章学PyQT

本文章是百篇文章学PyQT6的第七篇,本文讲述如何使用Pyinstaller打包UI界面和代码,将程序打包成EXE来更为方便的进行部署,在写博客和学习的过程中会遇到很多问题,例如:PyQT6在网上很多博客都是PyQT5、或者PyQT4大部分都…

Could not extract response: no suitable HttpMessageConverter

版本:spring-cloud-openfeign-core-2.1.1.RELEASE.jar,spring-webmvc-5.1.14.RELEASE.jar,jetty-server-9.4.41.v20210516.jar,tomcat-embed-core-9.0.48.jar 问题背景 生产服务请求下游服务时偶发抛出下面的异常,下…

git入门

目录 1. git简介 1.1 git是什么 1.2 git与svn的区别 2. github 2.1 创建仓库 2.2 删除仓库 2.3 新建文件及文件夹 3. git的基本操作 3.1 配置账户及邮箱 3.2 git文件状态与工作区域 3.3 常用命令 3.4 克隆(clone) 3.5 查看git仓库的状态 3.…