Spark 之 解析json的复杂和嵌套数据结构

news2024/11/27 4:37:56

本文主要使用以下几种方法:

1,get_json_object():从一个json 字符串中根据指定的json 路径抽取一个json 对象

2,from_json():从一个json 字符串中按照指定的schema格式抽取出来作为DataFrame的列

3,to_json():将获取的数据转化为json格式

4,explode():炸裂成多行

5,selectExpr():将列转化为一个JSON对象的另一种方式


文件名是 mystudent.txt   具体内容如下,只有一条数据

1|{"dept":{"describe":"主要负责教学","name":"学术部"},"email":"zhangsan@edu.cn","id":79,"name":"zhangsan","stus":[{"grade":"三年级","id":12,"name":"xuesheng1","school":{"address":"南京","leader":"王总","name":"南京大学"}},{"grade":"三年级","id":3,"name":"xuesheng2","school":{"address":"南京","leader":"王总","name":"南京大学"}},{"grade":"三年级","id":1214,"name":"xuesheng3","school":{"address":"南京","leader":"王总","name":"南京大学"}}],"tel":"1585050XXXX"}

 大概是这样的结构:

  

 第一步:导入文件并分割成二元组转换成两列

val optionRDD: RDD[String] = sc.textFile("in/mystudent.txt")
optionRDD.foreach(println)

//分割,注意  |  用的是单引号
val option1: RDD[(String, String)] = optionRDD.map(x => {
      val arr = x.split('|');
      (arr(0), arr(1))
    })
option1.foreach(println)

//转化成两列
val jsonStrDF: DataFrame = option1.toDF("aid", "value")
        jsonStrDF.printSchema()
        jsonStrDF.show(false)

 第二步:按照几个大类先拆分

 val jsonObj: DataFrame = jsonStrDF.select(
      $"aid"
      , get_json_object($"value", "$.dept").as("dept")
      , get_json_object($"value", "$.email").as("email")
      , get_json_object($"value", "$.id").as("tid")
      , get_json_object($"value", "$.name").as("tname")
      , get_json_object($"value", "$.stus").as("stus")
      , get_json_object($"value", "$.tel").as("tel")
    )
    println("--------------------------1--------------------------")
    jsonObj.printSchema()
    jsonObj.show(false)

 第三步:把dept这个部分再分

val jsonObj2: DataFrame = jsonObj.select($"aid", $"email"
      , $"tid", $"tname"
      , get_json_object($"dept", "$.describe").as("describe")
      , get_json_object($"dept", "$.name").as("dname")
      , $"stus", $"tel"
    )
    println("--------------------------2--------------------------")
        jsonObj2.printSchema()
        jsonObj2.show(false)

 第四步:把stus这部分合并成数组

val fileds: List[StructField] =
      StructField("grade", StringType) ::
        StructField("id", StringType) ::
        StructField("name", StringType) ::
        StructField("school", StringType) :: Nil
val jsonObj3: DataFrame = jsonObj2.select(
      $"aid", $"describe", $"dname", $"email", $"tid", $"tname"
      , from_json($"stus", ArrayType(
        StructType(
          fileds
        )
      )
      ).as("events")
    )
    println("--------------------------3--------------------------")
    jsonObj3.printSchema()
    jsonObj3.show(false)

 第五步:explode炸裂stus 部分,分成三部分;并新增列,删除原数组数据

//炸裂
val jsonObj4: DataFrame = jsonObj3.withColumn("events", explode($"events"))
    println("--------------------------4--------------------------")
    jsonObj4.printSchema()
    jsonObj4.show(false)

//新增列,删除原数据
val jsonObj5: DataFrame = jsonObj4.withColumn("grade", $"events.grade")
      .withColumn("id", $"events.id")
      .withColumn("name", $"events.name")
      .withColumn("school", $"events.school")
      .drop("events")
    println("--------------------------5--------------------------")
    jsonObj5.printSchema()
    jsonObj5.show(false)

 第六步:分开school部分,并合并全表

val jsonObj6: DataFrame = jsonObj5.select($"aid", $"describe"
      , $"dname", $"email",$"tid",$"tname",$"grade",$"id",$"name",
      get_json_object($"school","$.address").as("address")
      ,get_json_object($"school","$.leader").as("leader")
    ,get_json_object($"school","$.name").as("schoolname"))
    println("--------------------------6--------------------------")
    jsonObj6.printSchema()
    jsonObj6.show(false)

 总结,全文代码如下:

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SparkSession}

object JsonMyStu {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("jsonstu3opdemo").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val sc: SparkContext = spark.sparkContext

    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._

    val optionRDD: RDD[String] = sc.textFile("in/mystudent.txt")
    optionRDD.foreach(println)


//按照 | 分割成两列
    val option1: RDD[(String, String)] = optionRDD.map(x => {
      val arr = x.split('|');
      (arr(0), arr(1))
    })
    option1.foreach(println)

    val jsonStrDF: DataFrame = option1.toDF("aid", "value")
        jsonStrDF.printSchema()
        jsonStrDF.show(false)

    val jsonObj: DataFrame = jsonStrDF.select(
      $"aid"
      , get_json_object($"value", "$.dept").as("dept")
      , get_json_object($"value", "$.email").as("email")
      , get_json_object($"value", "$.id").as("tid")
      , get_json_object($"value", "$.name").as("tname")
      , get_json_object($"value", "$.stus").as("stus")
      , get_json_object($"value", "$.tel").as("tel")
    )
    println("--------------------------1--------------------------")
    jsonObj.printSchema()
    jsonObj.show(false)

    val jsonObj2: DataFrame = jsonObj.select($"aid", $"email"
      , $"tid", $"tname"
      , get_json_object($"dept", "$.describe").as("describe")
      , get_json_object($"dept", "$.name").as("dname")
      , $"stus", $"tel"
    )
    println("--------------------------2--------------------------")
        jsonObj2.printSchema()
        jsonObj2.show(false)

    val fileds: List[StructField] =
      StructField("grade", StringType) ::
        StructField("id", StringType) ::
        StructField("name", StringType) ::
        StructField("school", StringType) :: Nil
    val jsonObj3: DataFrame = jsonObj2.select(
      $"aid", $"describe", $"dname", $"email", $"tid", $"tname"
      , from_json($"stus", ArrayType(
        StructType(
          fileds
        )
      )
      ).as("events")
    )
    println("--------------------------3--------------------------")
    jsonObj3.printSchema()
    jsonObj3.show(false)

    val jsonObj4: DataFrame = jsonObj3.withColumn("events", explode($"events"))
    println("--------------------------4--------------------------")
    jsonObj4.printSchema()
    jsonObj4.show(false)

    val jsonObj5: DataFrame = jsonObj4.withColumn("grade", $"events.grade")
      .withColumn("id", $"events.id")
      .withColumn("name", $"events.name")
      .withColumn("school", $"events.school")
      .drop("events")
    println("--------------------------5--------------------------")
    jsonObj5.printSchema()
    jsonObj5.show(false)

    val jsonObj6: DataFrame = jsonObj5.select($"aid", $"describe"
      , $"dname", $"email",$"tid",$"tname",$"grade",$"id",$"name",
      get_json_object($"school","$.address").as("address")
      ,get_json_object($"school","$.leader").as("leader")
    ,get_json_object($"school","$.name").as("schoolname"))
    println("--------------------------6--------------------------")
    jsonObj6.printSchema()
    jsonObj6.show(false)

  }
}

拓展:

//如果分割符是  ,  则用以下方法,indexOf返回第一个此元素的下标值
    /*val optinRDD: RDD[String] = sc.textFile("in/mystudent.txt")
    optinRDD.foreach(println)
    val frame: RDD[(String, String)] = optinRDD.map(
      x => {
        //返回第一个,所在的位置
        val i: Int = x.indexOf(",")//1
        
        //开始截取
        //(0,i)--->(0,1)
        //(i+1) 2 从下标元素开始到末尾
        val tuple: (String, String) = (x.substring(0, i), x.substring(i + 1))
        tuple
      }
    )*/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/429043.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【洋桃一号板】STM32F103CBT6标准库函数驱动TM1640点亮数码管

一、今天介绍如何使用STM32F103CBT6驱动TM1640点亮数码管,硬件用的洋桃开发板,点亮后效果如下,六个数码管依次显示0.1.2.3.4.5.6.7 硬件原理图如下,只用到了单片机的两个IO口即可实现上图的效果,该开发板上用的是PA11…

chapter-3 -数据库数据模型

以下内容来源于MOOC学习—原课程请见:数据库原理与应用 考研复习 概述 关系及关系模式 笛卡尔积 定义在一组域上的有序对的集合, 域是一组具有相同类型的集合,比如自然数,长度小于n的字符串结合等【比如int age】 从n个域的每…

Linux工具make与makefile

Linux项目自动化构建工具-make/Makefile 目录Linux项目自动化构建工具-make/Makefile引言1、make && makefile2、make执行步骤2.1 依赖关系2.2 依赖方法3、项目清理4、伪目标 .PHONY5、文件的三个时间6、make的工作原理7、Linux下的第一个小程序认识缓冲区进度条①函数…

T5模型简单介绍

目录 一、概要 二、深入扩展 2.1 两个要素 2.2 预训练方法 一、概要 谷歌公司的研究人员提出的 T5(Text-to-Text Transfer Transformer,有5个T开头的单词,所以叫做T5)模型采用了一种与前述模型截然不同的策略:将不…

RUAS论文阅读笔记

这是CVPR2021的一篇暗光增强的论文 Retinex增强和去噪部分 第一部分的核心公式是一种retinex公式(用于暗图增强的retinex公式有几种类型,虽然本质一样但是对于各个分量的定义不一样):yx⊗tyx\otimes tyx⊗t,其中x是正…

Trie|并查集|堆|

目录 初始化 插入 查询 合并集合 连通块中点的数量 堆排序 模拟堆 Trie树是用来快速存储和查找字符串集合的数据结构 #include<iostream> using namespace std; const int N 100010; int son[N][26];//本题为小写因为字母&#xff0c;每个节点最多有26个子节点…

JUC编程之——synchronized的底层实现与分析

1 synchronized关键字 synchronized 是 Java 中的关键字&#xff0c;是一种同步锁(也是一种悲观锁)。它修饰的对象有以下几种&#xff1a; 作用于实例方法&#xff0c;当前实例加锁&#xff0c;进入同步代码前要获得当前实例的锁——对象锁&#xff1b;作用于代码块&#xff…

水文水利数据对接详解

数据对接 水雨情监测及视频监控系统需要与什么平台进行对接&#xff1f; 答&#xff1a;水雨情监测及视频监控系统由省统一接收的方式&#xff0c;数据接收中心设在***水利云。 2.水雨情数据接收中心有哪些组成部分&#xff1f; 答&#xff1a;水雨情数据接收中心主要由硬件…

pdf如何压缩变小,pdf压缩教程四招快速学

PDF是我们日常工作中经常使用的文件格式之一。这种文件格式方便易用&#xff0c;能够确保文件在传输和接收过程中不会出现错版等问题。为了方便发送&#xff0c;我们通常会将编辑好的内容转换为PDF格式。但是有时候文件过大&#xff0c;无法通过传输渠道发送怎么办&#xff1f;…

字节5年测试工程师对“测试开发”的理解

写在前面&#xff1a; 写这篇文章的目的是为了能够更好的帮助刚入职的新人了解这个岗位和自己的工作&#xff0c;也想谈谈自己工作一年来对这个领域的了解程度&#xff0c;做一个小小总结吧&#xff5e; 一、我理解的测试开发 测试开发与开发、测试的关系 以前在没有接触测试…

樱花树盛开的季节,我用简单的C代码绘制了一棵樱花树向她表白~『C/C++图形库EasyX』

文章目录&#x1f490;专栏导读&#x1f490;文章导读绘制一根线条绘制一个简易的树干优化树干&#xff0c;使其更加细致绘制樱花树增加随机树形与渐变色效果如何设置随机数进阶——通过鼠标点击来控制生成樱花树进阶——生成樱花树并展示生长过程&#x1f490;专栏导读 &#…

通过阿里云函数计算解决ChatGPT API的调用问题

ChatGPT系列文章 与其被ChatGPT取代&#xff0c;不如征服ChatGPT&#xff0c;做它的主人&#xff01; 文章目录ChatGPT系列文章前言命令行部署准备工作两行命令实现部署应用中心部署使用代理访问API总结前言 自2022年11月30日 OpenAI 发布 ChatGPT 以来&#xff0c;虽然时有唱…

最新版本VSCode配置Python、PyQt5、QtDesigner环境并创建一个ui界面测试

参考链接&#xff1a;最新版本VSCode配置Python、PyQt5、QtDesigner环境并创建一个ui界面测试 一、安装Python3 PyQt5所支持的python版本是从3.5开始的&#xff0c;因此安装的Python3版本必须大于3.5。 我安装的位置是C:\Python\Python38。 参见真小白入门Pyhton的安装 二、安…

图-文多模态,大模型,预训练

参考老师的无敌课程 多模态任务是指需要同时处理两种或多种不同类型的数据&#xff08;如图像、文本、音频等&#xff09;的任务。例如&#xff0c;图像描述&#xff08;image captioning&#xff09;就是一种典型的多模态任务&#xff0c;它需要根据给定的图像生成相应的文本描…

XO08R2 1SBP260109R1001接地系统能够为dcs提供屏蔽层,消除电子噪声干扰

​ XO08R2 1SBP260109R1001接地系统能够为dcs提供屏蔽层&#xff0c;消除电子噪声干扰 dcs合理、可靠的系统接地&#xff0c;是dcs系统非常重要的内容。为了保证dcs系统的监测控制精度和安全、可靠运行&#xff0c;必须对系统接地方式、接地要求、信号屏蔽、接地线截面选择、接…

【C++学习】map和set的封装

&#x1f431;作者&#xff1a;一只大喵咪1201 &#x1f431;专栏&#xff1a;《C学习》 &#x1f525;格言&#xff1a;你只管努力&#xff0c;剩下的交给时间&#xff01; map和set的封装&#x1f349;map和set中的红黑树&#x1f34c;set中的键值和map中的键值&#x1f349…

CTF杂项提纲

CTF的杂项是涉及编码&#xff0c;图片隐写&#xff0c;音频隐写&#xff0c;压缩包分析的方向&#xff0c;本文对MISC的知识点做了一个简单列举 常见编码 ASCII 0-9,48-57 A-Z 65-90 a-z 97-122 URL url编码又叫百分号编码&#xff0c;是统一资源定位的编码方式 base16/…

ModStartCMS v6.2.0 VIP权益配置功能,界面UI优化升级

ModStart 是一个基于 Laravel 模块化极速开发框架。模块市场拥有丰富的功能应用&#xff0c;支持后台一键快速安装&#xff0c;让开发者能快的实现业务功能开发。 系统完全开源&#xff0c;基于 Apache 2.0 开源协议&#xff0c;免费且不限制商业使用。 功能特性 丰富的模块市…

【初识数据库】数据库简介及MySQL安装

目录 数据库基本概念&#xff1a; 主流的关系型数据库&#xff1a; 下载并安装数据库 我们选择的是&#xff1a;MySQL Community Server 8.0.26 卸载老版本MySQL 数据库基本概念&#xff1a; 数据库&#xff1a;是数据的仓库&#xff0c;存储数据的地方 数据库管理系统&am…

RK3568平台开发系列讲解(调试篇)debugfs 分析手段

🚀返回专栏总目录 文章目录 一、enable debugfs二、debugfs API三、使用示例沉淀、分享、成长,让自己和他人都能有所收获!😄 📢Linux 上有一些典型的问题分析手段,从这些基本的分析方法入手,你可以一步步判断出问题根因。这些分析手段,可以简单地归纳为下图: 从这…