Spark【Spark SQL(二)RDD转换DataFrame、Spark SQL读写数据库 】

news2025/1/22 21:06:54

从 RDD 转换得到 DataFrame

Saprk 提供了两种方法来实现从 RDD 转换得到 DataFrame:

  1. 利用反射机制推断 RDD 模式
  2. 使用编程方式定义 RDD 模式

下面使用到的数据 people.txt :

Tom, 21
Mike, 25
Andy, 18

1、利用反射机制推断 RDD 模式

        在利用反射机制推断 RDD 模式的过程时,需要先定义一个 case 类,因为只有 case 类才能被 Spark 隐式地转换为DataFrame对象。

object Tese{
  
    // 反射机制推断必须使用 case 类,case class 必须放到main方法之外
    case class Person(name: String,age: Long)  //定义一个case类

def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("rdd to df 1")
      .getOrCreate()


    import spark.implicits._ //这里的spark不是org.apache.spark这个包 而是我们创建的SparkSession对象 它支持把一个RDD隐式地转换为一个 DataFrame对象


    val rdd: RDD[Person] = spark.sparkContext
      .textFile("data/sql/people.txt")
      .map(line => line.split(","))
      .map(t => Person(t(0), t(1).trim.toInt))

    // 将RDD对象转为DataFrame对象
    val df: DataFrame = rdd.toDF()

    df.createOrReplaceTempView("people")

    spark.sql("SELECT * FROM people WHERE age > 20").show()

    spark.stop()
  }
}

注意事项1:

case 类必须放到伴生对象下,main方法之外,因为在隐式转换的时候它会自动通过 伴生对象名.case类名 来调用case类,如果放到main下面就找不到了。

注意事项2:

import spark.implicits._
这里的spark不是org.apache.spark这个包 而是我们上面创建的SparkSession对象 它支持把一个RDD隐式地转换为一个 DataFrame对象

2、使用编程方式定义 RDD 模式

        反射机制推断时需要定义 case class,但当无法定义 case 类时,就需要采用编程式来定义 RDD 模式了。这种方法看起来比较繁琐,但是很好用,不容易报错。

        我们现在同样加载 people.txt 中的数据,生成 RDD 对象,再把RDD对象转为DataFrame对象,进行SparkSQL 查询。主要包括三个步骤:

  1. 制作表头 schema: StructType
  2. 制作表中记录 rowRDD: RDD[Row]
  3. 合并表头和记录 df:DataFramw
def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("rdd to df 2")
      .getOrCreate()

    //1.制作表头-也就是定义表的模式
    val schema: StructType = StructType(Array(StructField("name", StringType, true),
      StructField("age", IntegerType, true)))
    //2.加载表中的记录-也就是读取文件生成RDD
    val rowRdd: RDD[Row] = spark.sparkContext
      .textFile("data/sql/people.txt")
      .map(_.split(","))
      .map(attr => Row(attr(0), attr(1).trim.toInt))
    //3.把表头和记录拼接在一起
    val peopleDF: DataFrame = spark.createDataFrame(rowRdd, schema)

    peopleDF.createOrReplaceTempView("people")

    spark.sql("SELECT * FROM people WHERE age > 20").show()

    spark.stop()
  }

运行结果:

+----+---+
|name|age|
+----+---+
| Tom| 21|
|Mike| 25|
+----+---+

Spark SQL读取数据库

导入依赖

根据自己本地的MySQL版本导入对应的驱动。

注意:mysql8.0版本在JDBC中的url是:" com.mysql.cj.jdbc.Driver "

<dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.31</version>
        </dependency>

读取 MySQL 中的数据

def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("jdbc spark sql")
      .getOrCreate()

    val mysql: DataFrame = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306/spark")
      .option("driver", "com.mysql.cj.jdbc.Driver")
      .option("dbtable", "student")
      .option("user", "root")
      .option("password", "Yan1029.")
      .load()

    mysql.show()

    spark.stop()
  }

运行结果:

默认显示整张表

+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
|  1| Tom| 21| 男|
|  2|Andy| 20| 女|
+---+----+---+---+

向 MySQL 写入数据

def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("jdbc spark sql")
      .getOrCreate()

    //导入两条student信息
    val rdd: RDD[Array[String]] = spark.sparkContext
      .parallelize(Array("3 Mike 22 男", "4 Cindy 23 女"))
      .map(_.split(" "))

    //设置模式信息-创建表头
    val schema: StructType = StructType(Array(StructField("id", IntegerType, true),
      StructField("name", StringType, true),
      StructField("age", IntegerType, true),
      StructField("sex", StringType, true)))

    //创建Row对象 每个 Row对象都是表中的一行-创建记录
    val rowRDD = rdd.map(stu => Row(stu(0).toInt, stu(1), stu(2).toInt, stu(3)))

    //创建DataFrame对象 拼接表头和记录
    val df = spark.createDataFrame(rowRDD, schema)

    //创建一个 prop 变量 用来保存 JDBC 连接参数
    val prop = new Properties()
    prop.put("user","root")
    prop.put("password","Yan1029.")
    prop.put("driver","com.mysql.cj.jdbc.Driver")

    //写入数据 采用 append 模式追加
    df.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)

    spark.stop()
  }

运行结果:


总结

        今天上午就学到这里,本想着今天专门看看StructType、StructField和Row这三个类的,没想到就在这节课。这一篇主要学了RDD对象向DataFrame对象的转换以及Spark SQL如何读取数据库、写入数据库。

        下午学完这一章最后的DataSet。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/995697.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

知乎热议!大学开学了,女儿去了浙江上大学,两千生活费够不够?

大家好&#xff0c;我是菜哥&#xff01; 又到了周末闲聊时间&#xff0c;这几天微博上一个热搜引起了广大网友的关注&#xff0c;那就是“大学开学了&#xff0c;女儿在浙江上大学&#xff0c;给两千生活费够不够&#xff1f;”。 这个话题引起了广泛讨论&#xff0c;很多人都…

C++中的红黑树

红黑树 搜索二叉树搜索二叉树的模拟实现平衡搜索二叉树(AVL Tree)平衡搜索二叉树的模拟实现红黑树(Red Black Tree)红黑树的模拟实现 红黑树的应用(Map 和 Set)Map和Set的封装 搜索二叉树 搜索二叉树的概念&#xff1a;二叉搜索树又称二叉排序树&#xff0c;它或者是一棵空树&…

计算机网络与技术——概述

&#x1f60a;计算机网络与技术——概述 &#x1f47b;前言&#x1f94f;信息时代下计算机网络的发展&#x1f30f;互联网概述&#x1f4e1;计算机网络基本概念&#x1f4e1;互联网发展三阶段&#x1f4e1;互联网的标准化 &#x1f30f;互联网的组成&#x1f4e1;互联网的边缘部…

企业ERP和泛微OA集成场景分析

轻易云数据集成平台&#xff08;qeasy.cloud&#xff09;为企业ERP和泛微OA系统提供了强大的互通解决方案&#xff0c;特别在销售、采购和库存领域的单据审批场景中表现出色。这些场景涉及到多个业务单据的创建和审批&#xff0c;以下是一些具体的应用场景描述&#xff1a; 采购…

C++ STL教程

C 标准模板库的核心包括&#xff1a;&#xff08;1&#xff09;容器&#xff08;Containers&#xff09;&#xff1b;&#xff08;2&#xff09;算法&#xff08;Algorithms&#xff09;&#xff1b;&#xff08;3&#xff09;迭代器&#xff08;iterators&#xff09; &#…

第一章 计算机系统概述 一、操作系统的基本概念

一、操作系统的层次结构 二、定义 操作系统是计算机系统中最基本、最重要的软件之一&#xff0c;它是计算机硬件与用户之间的桥梁。它的主要功能是管理计算机系统的资源&#xff0c;包括处理器、内存、外部设备以及数据等。 操作系统的基本概念包括&#xff1a; 资源管理&…

在Postman的脚本中使用pm对象获取接口的请求参数

在Postman的脚本中使用pm对象获取接口的请求参数 1、获取在Query Params中输入的参数全局变量的引用&#xff08;以在header中引用为例&#xff09;2、获取在Body中输入的参数3、pm对象常用用法 1、获取在Query Params中输入的参数 query params页面 在tests中写脚本做后置处…

探索 AI+开源的未来:Open Source Congress@日内瓦

注&#xff1a;本文翻译源自 Linux 基金会发布的 Open Source Congress 会议官网内容&#xff0c;蓝色斜字体的段落则为作者参与会议的记录与心得。 Note: This article was translated from the official website of the Linux Foundations Open Source Congress, and the par…

【MySQL系列】MySQL的事务管理的学习(二)_ 再次理解隔离性

「前言」文章内容大致是MySQL事务管理&#xff0c;续上一篇。 「归属专栏」MySQL 「主页链接」个人主页 「笔者」枫叶先生(fy) 目录 七、再次理解隔离性7.1 数据库并发的场景有7.2 多版本并发控制&#xff08;MVCC&#xff09;7.3 三个隐藏字段列7.4 undo日志7.5 模拟MVCC7.6 R…

[Latex]公式编辑,编号、对齐【持】

导言区 \documentclass{article} \usepackage{amsmath,amssymb,amsfonts,}%math-数学公式&#xff1b;symb-数学符号&#xff1b;fonts-字号&#xff1b;环境是否进入数学模式是否接受可选参数是否占满整行是否产生编号备注align是否是是align* 不产生编号&#xff0c;其他与 …

阿里云oss上传视频测试,出现了413错误

阿里云oss上传视频测试&#xff0c;出现了413错误 &#xff08;1&#xff09;nginx抛出问题&#xff0c;请求体过大 &#xff08;2&#xff09;修改nginx配置&#xff0c;重新加载生效 client_max_body_size 1024m;在cmd下运行命令&#xff1a;nginx.exe -s reload

基于canvas实现图片文字水印生成器

目录 介绍 1.静态页面结构 2.给生成水印按钮绑定点击事件 3.生成水印的函数 总结 介绍 在前端开发中时常会遇到需要给图片加上水印的功能&#xff0c;就像在创作csdn的文章时上传的图片都会打上传作者的水印&#xff0c;我们来探讨一下这个水印是如何生成的。 首先生成的文…

【大数据】CDC 技术:变化数据捕获

CDC 技术&#xff1a;变化数据捕获 1.什么是 CDC &#xff1f;2.批处理 vs CDC3.四种 CDC 的实现方法3.1 表元信息 Table metadata3.2 表求差 Table differences3.3 数据库触发器 Trigger-based CDC3.4 数据库事务日志 Log-based CDC 4.Oracle CDC 详解4.1 Oracle CDC 机制4.1.…

leetcode 52. N 皇后 II

2023.9.10 本题是皇后问题的变式&#xff0c;让求出不同解决方案的数量&#xff0c;和之前做过的 N皇后 基本一样&#xff0c;最终返回ans里棋盘的数量即可。 当复习一下皇后问题了&#xff0c;代码如下&#xff1a; class Solution { private:vector<vector<string&g…

无涯教程-JavaScript - AMORDEGRC函数

描述 AMORDEGRC函数返回每个会计期间的折旧。此功能是为法国会计系统提供的。如果在会计期间的中间购买资产,则会考虑按比Example折旧。 该功能类似于AMORLINC,不同之处在于,根据资产的寿命在计算中使用了折旧系数。 语法 AMORDEGRC (cost, date_purchased, first_period, …

SAP MM学习笔记29 - 供给元(供货源)的Block(拉黑)

前面学习了 供给元 的知识。 可以参考如下的URL SAP MM学习笔记28- 供给元&#xff08;供货源&#xff09;决定_东京老树根的博客-CSDN博客 有时候还有什么业务需求呢&#xff1f;就是比如突发要拉黑某个供应商 或 拉黑某个供应商的某个产品&#xff0c; 那又该如何做呢&…

202331读书笔记|《我笨拙地爱着这个世界(“外卖诗人”王计兵自选集)》——脚在泥泞,心有繁花

202331读书笔记|《我笨拙地爱着这个世界&#xff08;“外卖诗人”王计兵自选集&#xff09;》——脚在泥泞&#xff0c;心有繁花 《我笨拙地爱着这个世界&#xff08;“外卖诗人”王计兵自选集&#xff09;》作者王计兵。这是读的他的第二本书&#xff0c;比较有烟火气&#xf…

Spring Messaging远程命令执行漏洞复现(CVE-2018-1270)

一、漏洞说明 Spring Messaging为Spring框架提供消息支持&#xff0c;用户使用受影响版本的Spring Framework时&#xff0c;允许应用程序通过Spring Messaging模块内存中STOMP代理创建WebSocket。由于selector用SpEL表达式编写&#xff0c;并使用StandardEvaluationContext解析…

springBoot对接Apache POI 实现excel下载和上传

搭建springboot项目 此处可以参考 搭建最简单的SpringBoot项目_Steven-Russell的博客-CSDN博客 配置Apache POI 依赖 <dependency><groupId>org.apache.poi</groupId><artifactId>poi</artifactId><version>5.2.2</version> </…

Flink JobManager的高可用配置

背景 在flink执行中&#xff0c;jobManager是一个负责执行流式应用执行和检查点生成的组件&#xff0c;一旦发生故障&#xff0c;那么其负责的所有应用都会被取消&#xff0c;所以我们需要对JobManager配置高可用的模式 JobManager高可用配置 配置JobManager的高可用需要使用…