【Spark分布式内存计算框架——Spark SQL】11. External DataSource(中)parquet 数据

news2025/1/16 18:08:41

6.3 parquet 数据

SparkSQL模块中默认读取数据文件格式就是parquet列式存储数据,通过参数【spark.sql.sources.default】设置,默认值为【parquet】。

范例演示代码:直接load加载parquet数据和指定parquet格式加载数据

import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* SparkSQL读取Parquet列式存储数据
*/
object SparkSQLParquet {
def main(args: Array[String]): Unit = {
// 构建SparkSession实例对象,通过建造者模式创建
val spark: SparkSession = SparkSession
.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[3]")
.getOrCreate()
import spark.implicits._
// TODO: 从LocalFS上读取parquet格式数据
val usersDF: DataFrame = spark.read.parquet("datas/resources/users.parquet")
usersDF.printSchema()
usersDF.show(10, truncate = false)
println("==================================================")
// SparkSQL默认读取文件格式为parquet
val df = spark.read.load("datas/resources/users.parquet")
df.printSchema()
df.show(10, truncate = false)
// 应用结束,关闭资源
spark.stop()
}
}

运行程序结果:
在这里插入图片描述

6.4 text 数据

SparkSession加载文本文件数据,提供两种方法,返回值分别为DataFrame和Dataset,前面【入门案例:词频统计WordCount】中已经使用,下面看一下方法声明:
在这里插入图片描述
可以看出textFile方法底层还是调用text方法,先加载数据封装到DataFrame中,再使用as[String]方法将DataFrame转换为Dataset,实际项目中推荐使用textFile方法,从Spark 2.0开始提供。

无论是text方法还是textFile方法读取文本数据时,一行一行的加载数据,每行数据使用UTF-8编码的字符串,列名称为【value】。

范例演示:分别使用text和textFile方法加载数据。

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* SparkSQL加载文本文件数据,方法text和textFile
*/
object SparkSQLText {
def main(args: Array[String]): Unit = {
// 构建SparkSession实例对象,通过建造者模式创建
val spark: SparkSession = SparkSession
.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[3]")
.getOrCreate() // 底层实现:单例模式,创建SparkContext对象
import spark.implicits._
// TODO: text方法加载数据,封装至DataFrame中
val dataframe: DataFrame = spark.read.text("datas/resources/people.txt")
dataframe.printSchema()
dataframe.show(10, truncate = false)
println("=================================================")
val dataset: Dataset[String] = spark.read.textFile("datas/resources/people.txt")
dataset.printSchema()
dataset.show(10, truncate = false)
spark.stop()// 应用结束,关闭资源
}
}

6.5 json 数据

实际项目中,有时处理数据以JSON格式存储的,尤其后续结构化流式模块:StructuredStreaming,从Kafka Topic消费数据很多时间是JSON个数据,封装到DataFrame中,需要解析提取字段的值。以读取github操作日志JSON数据为例,数据结构如下:
在这里插入图片描述
1)、操作日志数据使用GZ压缩:2015-03-01-11.json.gz,先使用json方法读取。

// 构建SparkSession实例对象,通过建造者模式创建
val spark: SparkSession = SparkSession
.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[3]")
// 底层实现:单例模式,创建SparkContext对象
.getOrCreate()
import spark.implicits._
// TODO: 从LocalFS上读取json格式数据(压缩)
val jsonDF: DataFrame = spark.read.json("datas/json/2015-03-01-11.json.gz")
jsonDF.printSchema()
jsonDF.show(10, truncate = true)

2)、使用textFile加载数据,对每条JSON格式字符串数据,使用SparkSQL函数库functions中自带get_json_obejct函数提取字段:id、type、public和created_at的值。

  • 函数:get_json_obejct使用说明
    在这里插入图片描述

  • 核心代码

val githubDS: Dataset[String] = spark.read.textFile("datas/json/2015-03-01-11.json.gz")
githubDS.printSchema() // value 字段名称,类型就是String
githubDS.show(1)
// TODO:使用SparkSQL自带函数,针对JSON格式数据解析的函数
import org.apache.spark.sql.functions._
// 获取如下四个字段的值:id、type、public和created_at
val gitDF: DataFrame = githubDS.select(
get_json_object($"value", "$.id").as("id"),
get_json_object($"value", "$.type").as("type"),
get_json_object($"value", "$.public").as("public"),
get_json_object($"value", "$.created_at").as("created_at")
)
gitDF.printSchema()
gitDF.show(10, truncate = false)

运行结果:
在这里插入图片描述
范例演示完整代码:

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* SparkSQL读取JSON格式文本数据
*/
object SparkSQLJson {
def main(args: Array[String]): Unit = {
// 构建SparkSession实例对象,通过建造者模式创建
val spark: SparkSession = SparkSession
.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[3]")
// 底层实现:单例模式,创建SparkContext对象
.getOrCreate()
import spark.implicits._
// TODO: 从LocalFS上读取json格式数据(压缩)
val jsonDF: DataFrame = spark.read.json("datas/json/2015-03-01-11.json.gz")
jsonDF.printSchema()
jsonDF.show(10, truncate = true)
println("===================================================")
val githubDS: Dataset[String] = spark.read.textFile("datas/json/2015-03-01-11.json.gz")
githubDS.printSchema() // value 字段名称,类型就是String
githubDS.show(1)
// TODO:使用SparkSQL自带函数,针对JSON格式数据解析的函数
import org.apache.spark.sql.functions._
// 获取如下四个字段的值:id、type、public和created_at
val gitDF: DataFrame = githubDS.select(
get_json_object($"value", "$.id").as("id"),
get_json_object($"value", "$.type").as("type"),
get_json_object($"value", "$.public").as("public"),
get_json_object($"value", "$.created_at").as("created_at")
)
gitDF.printSchema()
gitDF.show(10, truncate = false)
// 应用结束,关闭资源
spark.stop()
}
}

6.6 csv 数据

在机器学习中,常常使用的数据存储在csv/tsv文件格式中,所以SparkSQL中也支持直接读取格式数据,从2.0版本开始内置数据源。关于CSV/TSV格式数据说明:
在这里插入图片描述
SparkSQL中读取CSV格式数据,可以设置一些选项,重点选项:
1)、分隔符:sep

  • 默认值为逗号,必须单个字符
    2)、数据文件首行是否是列名称:header
  • 默认值为false,如果数据文件首行是列名称,设置为true
    3)、是否自动推断每个列的数据类型:inferSchema
  • 默认值为false,可以设置为true

官方提供案例:
在这里插入图片描述
当读取CSV/TSV格式数据文件首行是否是列名称,读取数据方式(参数设置)不一样的 。

第一点:首行是列的名称,如下方式读取数据文件

// TODO: 读取TSV格式数据
val ratingsDF: DataFrame = spark.read
// 设置每行数据各个字段之间的分隔符, 默认值为 逗号
.option("sep", "\t")
// 设置数据文件首行为列名称,默认值为 false
.option("header", "true")
// 自动推荐数据类型,默认值为false
.option("inferSchema", "true")
// 指定文件的路径
.csv("datas/ml-100k/u.dat")
ratingsDF.printSchema()
ratingsDF.show(10, truncate = false)

第二点:首行不是列的名称,如下方式读取数据(设置Schema信息)

// 定义Schema信息
val schema = StructType(
StructField("user_id", IntegerType, nullable = true) ::
StructField("movie_id", IntegerType, nullable = true) ::
StructField("rating", DoubleType, nullable = true) ::
StructField("timestamp", StringType, nullable = true) :: Nil
)
// TODO: 读取TSV格式数据
val mlRatingsDF: DataFrame = spark.read
// 设置每行数据各个字段之间的分隔符, 默认值为 逗号
.option("sep", "\t")
// 指定Schema信息
.schema(schema)
// 指定文件的路径
.csv("datas/ml-100k/u.data")
mlRatingsDF.printSchema()
mlRatingsDF.show(5, truncate = false)

将DataFrame数据保存至CSV格式文件,演示代码如下:

/**
* 将电影评分数据保存为CSV格式数据
*/
mlRatingsDF
// 降低分区数,此处设置为1,将所有数据保存到一个文件中
.coalesce(1)
.write
// 设置保存模式,依据实际业务场景选择,此处为覆写
.mode(SaveMode.Overwrite)
.option("sep", ",")
// TODO: 建议设置首行为列名
.option("header", "true")
.csv("datas/ml-csv-" + System.nanoTime())

范例演示完整代码SparkSQLCsv.scala如下:

import org.apache.spark.SparkContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
/**
* SparkSQL 读取CSV/TSV格式数据:
* i). 指定Schema信息
* ii). 是否有header设置
*/
object SparkSQLCsv {
def main(args: Array[String]): Unit = {
// 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.appName(SparkSQLCsv.getClass.getSimpleName)
.master("local[2]")
.getOrCreate()
import spark.implicits._
// 获取SparkContext实例对象
val sc: SparkContext = spark.sparkContext
/**
* 实际企业数据分析中
* csv\tsv格式数据,每个文件的第一行(head, 首行),字段的名称(列名)
*/
// TODO: 读取TSV格式数据
val ratingsDF: DataFrame = spark.read
// 设置每行数据各个字段之间的分隔符, 默认值为 逗号
.option("sep", "\t")
// 设置数据文件首行为列名称,默认值为 false
.option("header", "true")
// 自动推荐数据类型,默认值为false
.option("inferSchema", "true")
// 指定文件的路径
.csv("datas/ml-100k/u.dat")
ratingsDF.printSchema()
ratingsDF.show(10, truncate = false)
println("=======================================================")
// 定义Schema信息
val schema = StructType(
StructField("user_id", IntegerType, nullable = true) ::
StructField("movie_id", IntegerType, nullable = true) ::
StructField("rating", DoubleType, nullable = true) ::
StructField("timestamp", StringType, nullable = true) :: Nil
)
// TODO: 读取TSV格式数据
val mlRatingsDF: DataFrame = spark.read
// 设置每行数据各个字段之间的分隔符, 默认值为 逗号
.option("sep", "\t")
// 指定Schema信息
.schema(schema)
// 指定文件的路径
.csv("datas/ml-100k/u.data")
mlRatingsDF.printSchema()
mlRatingsDF.show(5, truncate = false)
println("=======================================================")
/**
* 将电影评分数据保存为CSV格式数据
*/
mlRatingsDF
// 降低分区数,此处设置为1,将所有数据保存到一个文件中
.coalesce(1)
.write
// 设置保存模式,依据实际业务场景选择,此处为覆写
.mode(SaveMode.Overwrite)
.option("sep", ",")
// TODO: 建议设置首行为列名
.option("header", "true")
.csv("datas/ml-csv-" + System.nanoTime())
// 关闭资源
spark.stop()
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/356282.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

事物发展的不同阶段会有不同的状态

之前讨论过一个话题,有人问“股票交易稳定盈利很难么?” 我的回答:不难,难在之前。 这几天我又想到经常看到论坛里有人pk观点,最后甩出一句话:“证明你说得对,你先赚一个亿再说。否则&#xf…

写代码犹如写文章: “大师级程序员把系统当故事来讲,而不是当做程序来写” | 如何架构设计复杂业务系统? 如何写复杂业务代码?...

“大师级程序员把系统当故事来讲,而不是当做程序来写”写代码犹如写文章好的代码应该如好文章一样表达思想,被人读懂。中心思想: 突出明确程序是开发者用编程语言写成的一本书,首先应该是记录开发者对业务需求分析、系统分析,最终…

并发编程底层原理

并发编程 文章目录并发编程线程知识点回顾多线程并行和并发什么是并发编程?并发编程的根本原因?Java内存模型(JMM)并发编程的核心问题-可见性、有序性、原子性可见性有序性原子性并发问题总结volatile关键字volatile的底层原理如何…

K8s学习(二)Kubernetest的资源管理及五大资源介绍

文章目录前言1.kubernetes的资源管理系统资源查看2.资源管理方式3.资源管理实战3.1 Namespace3.2 Pod3.3 Label3.4 Deployment3.5 Service3.5.1创建集群内部可访问的Service3.5.2创建集群外部可访问的Service前言 本文是k8s学习系列文章,前后串起来是一个完整的课程…

一招鉴别真假ChatGPT,并简要介绍ChatGPT、GPT、GPT2和GPT3模型之间的区别和联系

以下内容除红色字体部分之外,其他均来源于ChatGPT自动撰写。 ChatGPT是基于GPT模型的对话生成模型,旨在通过对话模拟实现自然语言交互。它是为了改善人机对话体验而设计的,主要应用于聊天机器人、智能客服等场景。 与GPT模型相比,…

全栈之路-前端篇 | 第一讲.基础前置知识【浏览器内核与网络知识】学习笔记

欢迎关注「全栈工程师修炼指南」公众号 点击 👇 下方卡片 即可关注我哟! 设为「星标⭐」每天带你 基础入门 到 进阶实践 再到 放弃学习! 涉及 企业运维、网络安全、应用开发、物联网、人工智能、大数据 学习知识 “ 花开堪折直须折,莫待无花…

内大-oj练习题(1期)

用于存储内大oj练习题 1. 排序题2. 实数输出3. 字符串比较大小4. 1055 找最小放表头,找最大放表尾5. 通过反转实现数据移动6. 破圈报数7. 通话记录8. 用栈实现进制转换9. 判断升序10. 金额的中文大写11. 生日组成的素数12. 判断是否属于一个子网13 统计字符个数14. 求前n项和1…

LeetCode02.07面试题 链表相交 带有输入和输出的链表相交

题目: 给你两个单链表的头节点 headA 和 headB ,请你找出并返回两个单链表相交的起始节点。如果两个链表没有交点,返回 null 。 图示两个链表在节点 c1 开始相交: 题目数据 保证 整个链式结构中不存在环。 注意, 函…

Git的基本操作

文章目录1.git的工作流程2.git的工作环境3.git的基本操作(1)git init(2)git status(3)git add(4)git commit4.版本控制(1)git reflog与git log(2)再增加两个版本(3)git reset --hard 版本号(4)两个指针4.分支管理(1)对分支的理解(2)git branch和git branch -v(3)git checkout 分…

基于matlab的斜视模式下SAR建模

一、前言此示例说明如何使用线性 FM (LFM) 波形对基于聚光灯的合成孔径雷达 (SAR) 系统进行建模。在斜视模式下,SAR平台根据需要从宽侧斜视一定角度向前或向后看。斜视模式有助于对位于当前雷达平台位置前面的区域进行…

mysql EXPLAIN关键字

EXPLAIN 使用EXPLAIN关键字可以模拟优化器执行SQL查询语句,从而知道MySQL是如何处理你的SQL语句的。分析你的查询语句或是表结构的性能瓶颈。通过EXPLAIN,我们可以分析出以下结果: 表的读取顺序数据读取操作的操作类型哪些索引可以使用哪些索…

一文吃透SQL性能优化,阿里47条军规

目录1、先了解MySQL的执行过程2、数据库常见规范3、所有表必须使用Innodb存储引擎4、每个Innodb表必须有个主键5、数据库和表的字符集统一使用UTF86、查询SQL尽量不要使用select *,而是具体字段7、避免在where子句中使用 or 来连接条件8、尽量使用数值替代字符串类型…

【数据挖掘实战】——中医证型的关联规则挖掘

目录 一、背景和挖掘目标 1、问题背景 2、传统方法的缺陷 3、原始数据情况 4、挖掘目标 二、分析方法和过程 1、初步分析 2、总体过程 第1步:数据获取 第2步:数据预处理 第3步:构建模型 三、思考和总结 项目地址:Data…

YOLOv6-目标检测论文解读

文章目录摘要问题算法网络设计BackboneNeckHead标签分配SimOTA(YOLOX提出):TAL(Task alignment learning,TOOD提出)损失函数分类损失框回归损失目标损失行业有用改进自蒸馏图像灰度边界填充量化及部署实验消…

测试1:测试相关概念

1.测试相关概念 1.1.测试概念 1.1.1.需求 符合正式文档规定的条件和权能,包括用户需求和软件需求 它们之间的的转换是:沟通 用户需求和软件需求的区别: 能否指导开发人员开发,测试人员编写测试用例 1.1.2.缺陷Bug 与正确的…

补充前端面试题(三)

图片懒加载<!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-width, in…

「数据仓库」怎么选择现代数据仓库?

构建自己的数据仓库时要考虑的基本因素我们用过很多数据仓库。当我们的客户问我们&#xff0c;对于他们成长中的公司来说&#xff0c;最好的数据仓库是什么时&#xff0c;我们会根据他们的具体需求来考虑答案。通常&#xff0c;他们需要几乎实时的数据&#xff0c;价格低廉&…

[SSD科普之1] PCIE接口详解及应用模式

PCI-Express(peripheral component interconnect express)是一种高速串行计算机扩展总线标准&#xff0c;它原来的名称为“3GIO”&#xff0c;是由英特尔在2001年提出的&#xff0c;旨在替代旧的PCI&#xff0c;PCI-X和AGP总线标准。一、PCI-E x1/x4/x8/x16插槽模式PCI-E有 x1/…

day20_经典接口(Comparable丶Comparator)

由来 我们知道基本数据类型的数据&#xff08;除boolean类型外&#xff09;需要比较大小的话&#xff0c;之间使用比较运算符即可&#xff0c;但是引用数据类型是不能直接使用比较运算符来比较大小的。那么&#xff0c;如何解决引用类型比较大小的问题&#xff1f; java.lang…

基于Spring、Spring MVC、MyBatis的房屋销售购买系统

文章目录项目介绍主要功能截图&#xff1a;登录前台首页后台首页房屋区域管理出租房屋审核账号管理部分代码展示设计总结项目获取方式&#x1f345; 作者主页&#xff1a;Java韩立 &#x1f345; 简介&#xff1a;Java领域优质创作者&#x1f3c6;、 简历模板、学习资料、面试题…