大数据——Spark SQL

news2025/1/8 5:48:52

1、Spark SQL是什么

Spark SQL是Spark中用于处理结构化数据的一个模块,前身是Shark,但本身继承了前身Hive兼容和内存列存储的一些优点。Spark SQL具有以下四个特点:

  1. 综合性(Integrated):Spark中可以加入SQL查询,也可以使用DataFrame API,其中API提供了多种语言选择,Python、R、Java和Scala都支持。
  2. 连接统一性(Uniform Data Access):使用相同的方式连接不同的数据源(Hive、Json和JDBC等等)。
  3. Hive兼容性:能够在已有数据仓库中执行SQL或者Hive查询
  4. 标准化连接(Standard Connectivity):提供了JDBC或者ODBC的数据接口,可以给其他BI工具使用。

Spark SQL的优点

  1. 代码量少:可以直接写SQL语句或者DataFrame 。
  2. 性能更高:在使用DataFrame API时,DataFrame转成RDD时,会进行代码优化,执行效率更高;Spark SQL代码的RDD还行效率比Python、Java等编写的RDD效率高。

2、DataFrame简介

Spark中DataFrame是⼀个分布式的⾏集合,可以想象为⼀个关系型数据库的表,或者⼀个带有列名的Excel表格。不过它跟RDD有以下共同之处:

  • 不可变(Immuatable):跟RDD一样,一旦创建就不能更改你,只能通过transformation生成新的DataFrame;
  • 懒加载(Lazy Evaluations):只有action才会让transformation执行;
  • 分布式(Distributed):也是分布式的。

DataFrame跟RDD的比较

DataFrameRDD
逻辑框架提供详细结构信息,例如列的名称和类型不知道类的内部结构
数据操作API更丰富、效率更高代码少时,速度更快

DataFrame API常用代码

DataFrame的API也分为transformation和action两类

  • transformation 延迟操作
  • action 立即操作

在这里插入图片描述

  1. 创建SparkSession对象
SparkSession.builder.master("local") \
... appName("Word Count") \
... getOrCreate()
# Builder 是 SparkSession 的构造器。 通过 Builder, 可以添加各种配置
# master (master)设置要链接到的spark master节点地址, 传⼊ “local” 代表本地模式, “local[4]”代表本地模式4内核运⾏
# appName (name)为Spark应⽤设置名字
# getOrCreate ()获取⼀个已经存在的 SparkSession 或者如果没有已经存在的, 创建⼀个新的SparkSession
  1. 通过SparkSession创建DataFrame
 sparkSession.createDataFrame
  1. 读取文件生成DataFrame
# json格式
spark.read.json("xxx.json")
spark.read.format('json').load('xxx.json')
# parquet格式
spark.read.parquet("xxx.parquet")
# jdbc格式
spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/db_name")\
.option("dbtable","table_name").option("user","xxx").option("password","xxx").load()
  1. 基于RDD创建DataFrame
# rdd中读取数据
    spark = SparkSession.builder.master('local').appName('Test').getOrCreate()
    sc = spark.sparkContext
    list1 =  [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
    rdd = sc.parallelize(list1)
    # 添加数据列名
    people = rdd.map(lambda x:Row(name=x[0], age=int(x[1])))
    # 创建DataFrame
    df_pp = spark.createDataFrame(people)
    print(df_pp.show(2))
  1. 从CSV文件中读取数据
# # rdd中读取数据
spark = SparkSession.builder.master('local').appName('Test').getOrCreate()
df = spark.read.format('csv').option('header','true').load('iris.csv')
df.printSchema()
df.show(5)
print(df.count())
print(df.columns)
  1. 增加列、删除列和提取部分列
# 增加列
df.withColumn('newWidth', df.SepalWidth*2).show()

# 删除列
df.drop('cls').show()

# 提取部分列
df.select('SepalLength','SepalWidth').show()
  1. 统计信息、基本统计功能和分组统计
#统计信息 describe
df.describe().show()
#计算某⼀列的描述信息
df.describe('cls').show() 
# 基本统计信息
df.select('cls').distinct().count()
# 分组统计 
df.groupby('cls').agg({'SepalWidth':'mean','SepalLength':'max'}).show()
# avg(), count(), countDistinct(), first(), kurtosis(),
# max(), mean(), min(), skewness(), stddev(), stddev_pop(),
# stddev_samp(), sum(), sumDistinct(), var_pop(), var_samp() and variance()
  1. 采集数据、拆分数据集和查看两个数据集的差异
# ================采样数据 sample===========
#withReplacement:是否有放回的采样
#fraction:采样⽐例
#seed:随机种⼦
sdf = df.sample(False,0.2,100)

#设置数据⽐例将数据划分为两部分
trainDF, testDF = df.randomSplit([0.6, 0.4])

#查看两个数据集在类别上的差异 subtract,确保训练数据集覆盖了所有分类
diff_in_train_test = testDF.select('cls').subtract(trainDF.select('cls'))
diff_in_train_test.distinct().count()
  1. 自定义函数和交叉表
# 交叉表 crosstab
df.crosstab('cls','SepalLength').show()

# 自定义函数UDF
# 找到数据,做后续处理
traindf, testdf = df.randomSplit([0.7,0.3])
diff_in_train_test = testdf.select('cls').subtract(traindf.select('cls')).distinct().show()
# 找到类,整理到一个列表中
not_exist_cls = traindf.select('cls').subtract(testdf.select('cls')).distinct().rdd.map(lambda x:x[0]).collect()

# 定义一个方法
def shou_remove(x):
    if x in not_exist_cls:
        return -1
    else:
        return x
# 在RDD中可以直接定义函数,交给rdd的transformatioins⽅法进⾏执⾏
# 在DataFrame中需要通过udf将⾃定义函数封装成udf函数再交给DataFrame进⾏调⽤执⾏
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

check = udf(shou_remove, StringType())
resultdf = traindf.withColumn('new_cls',check(traindf['cls'])).filter('new_cls<>-1')

resultdf.show()
  1. 加载json的API

1)以通过反射⾃动推断DataFrame的Schema

# 1) json→RDD→DataFrame
spark = SparkSession.builder.appName('json_demo').getOrCreate()
sc = spark.sparkContext

jsonString =  [
"""{ "id" : "01001", "city" : "AGAWAM", "pop" : 15338, "state" : "MA" }""",
"""{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""
]
jsonrdd = sc.parallelize(jsonString) # json 2 rdd
jsondf = spark.read.json(jsonrdd) #rdd 2 dataframe
jsondf.printSchema()
jsondf.show()

# 2) 直接从文件中加载
spark = SparkSession.builder.appName('json_demo').getOrCreate()
sc = spark.sparkContext

jsondf = spark.read.json('zips.json')
jsondf.printSchema()
jsondf.filter(jsondf.pop>40000).show(10)
jsondf.createOrReplaceTempView('temp_table')
resulfdf = spark.sql('select * from temp_table where pop>40000')
resulfdf.show(10)

2)通过StructType对象指定Schema

spark = SparkSession.builder.appName('json_demo').getOrCreate()
    sc = spark.sparkContext
    jsonSchema = StructType([
     StructField("id", StringType(), True),
     StructField("city", StringType(), True),
     StructField("loc" , ArrayType(DoubleType())),
     StructField("pop", LongType(), True),
     StructField("state", StringType(), True)
    ])
    reader = spark.read.schema(jsonSchema)
    jsondf = reader.json('zips.json')
    jsondf.printSchema()
    jsondf.show(2)
    jsondf.filter(jsondf.pop>40000).show(10)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1032978.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

啥子是DOM???总听,不晓得啥

远古时代&#xff0c;浏览器加载完HTML页面就没了&#xff0c;也就是加载一个文本。 但是之后JS出现&#xff0c;要修改HTML文本&#xff0c;这么大的文本&#xff0c;你咋晓得要修改那个地方&#xff1f;就算修改完了浏览器还要在重新解析加载一遍&#xff0c;耗时耗性能啊&a…

oracle11g-图形安装(centos7)

目录 一.环境准备1.关闭防火墙2.关闭SELINUX3.配置本地yum源4.安装ORACLE先决条件的软件包5.修改LINUX的内核文件6.添加下列参数到/etc/security/limits.conf7.添加下列条目到/etc/pam.d/login8.环境变量中添加下列语句9.创建文件目录和相应的用户10.配置oracle用户的环境变量1…

Java笔记:垃圾回收

1 判断算法垃圾 主要是2种&#xff1a;引用计数法和根搜索算法 1.1 引用计数法&#xff08; Reference Counting&#xff09; 1. 概念 给对象中添加一个引用计数器&#xff0c;每当有一个地方引用它时&#xff0c;计数器值就加1&#xff1a;当引用失效时&#xff0c;计数器值…

【Linux】系统编程生产者消费者模型(C++)

目录 【1】生产消费模型 【1.1】为何要使用生产者消费者模型 【1.2】生产者消费者模型优点 【2】基于阻塞队列的生产消费者模型 【2.1】生产消费模型打印模型 【2.2】生产消费模型计算公式模型 【2.3】生产消费模型计算公式加保存任务模型 【2.3】生产消费模型多生产多…

行行AI人工智能大会 | LTD荣获“AI强应用创新TOP50代表企业”

LTDAI重新定义下一代网站。9月20日&#xff0c;由行行AI和见实科技、梅花创投联合主办&#xff0c;以“强应用多模型——人工智能落地大潮”为主题的“2023年度见实大会”在京顺利召开。 9月20日&#xff0c;由行行AI和见实科技、梅花创投联合主办&#xff0c;以“强应用多模型…

python使用apscheduler每隔一段时间自动化运行程序

apscheduler使用比较简单&#xff0c;每隔一段时间自动化运行的步骤是&#xff1a; 创建调度器scheduler BlockingScheduler()添加任务scheduler.add_job(函数名, interval, minutes30) # 每隔30分钟运行一次直接执行&#xff1a;scheduler.start()示例代码 from datetime i…

一花落,万物生,AIGC为国货复兴注入新活力

在最近的商业新闻中&#xff0c;国货们发“花难财”的热度持续在涨&#xff0c;“花西子”曾经是中国国货的代表之一&#xff0c;以其独特的美妆产品而闻名于世。然而&#xff0c;近期因为李佳琦“79块钱哪里贵了”事件的原因&#xff0c;让不少网友扒出这支79的眉笔算下来一克…

BeanUtils.copyProperties的使用场景

1. 常见场景 我们如果有两个具有很多相同属性名的JavaBean对象a和b&#xff0c;想把a中的属性赋值到b&#xff0c;例如 接口中将接收到的前端请求参数XxxReqVo,我们想把这个入参转化为XxxQuery对象作为数据库的查询条件对象 传统做法是手动set&#xff0c;即 XxxBean xxxBea…

算法笔记——循环链表

带环链表 算法题中&#xff0c;会有一种题目让我们去判断链表里的是否有循环。 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 这里就需要我们要用快慢指针来进行搜索&#xff0c;直接提供代码 class Solution { public:bool hasCycle(ListNode *…

zabbix自定义监控、钉钉、邮箱报警

目录 一、实验准备 二、安装 三、添加监控对象 四、添加自定义监控项 五、监控mariadb 1、添加模版查看要求 2、安装mariadb、创建用户 3、创建用户文件 4、修改监控模版 5、在上述文件中配置路径 6、重启zabbix-agent验证 六、监控NGINX 1、安装NGINX&#xff0c…

抖音seo源码关键词霸屏搜索

抖音seo源码mvg框架依据关键词霸屏搜索引擎机制技术代开发&#xff0c;抖音seo优化系统&#xff0c;抖音seo优化系统最终也是类比百度seo关键词霸屏搜索引擎来搭建&#xff0c;从短视频ai创意制作&#xff0c;发布&#xff0c;多账号平台管理。 1&#xff1a;抖音SEO霸屏系统的…

在对bbox 进行坐标系转换时heading的处理--使用向量的旋转

实际做题中我们可能会遇到很多有关及计算几何的问题&#xff0c;其中有一类问题就是向量的旋转问题&#xff0c;下面我们来具体探讨一下有关旋转的问题。 首先我们先把问题简化一下&#xff0c;我们先研究一个点绕另一个点旋转一定角度的问题。已知A点坐标(x1,y1)&#xff0c;…

打开常用软件出现msvcp140.dll丢失的解决方法,msvcp140.dll是什么东西?

在我们使用计算机的过程中&#xff0c;有时候会遇到一些错误提示&#xff0c;其中“找不到 msvcp140.dll”就是比较常见的一种。那么&#xff0c;msvcp140.dll 到底是什么呢&#xff1f;为什么会出现找不到的情况&#xff1f;丢失 msvcp140.dll 又会对计算机产生什么影响&#…

Flask配合Echarts写一个动态可视化大屏

ch 技术 后端&#xff1a;flask 可视化&#xff1a;echarts 前端&#xff1a;HTMLJavaScriptcss 大屏布局 大屏拆分 案例项目中大屏可按版块进行拆解&#xff0c;会发现这里大屏主要由标题、折线图、柱状图、地图、滚动图和词云等组成&#xff0c;整体可切分为8个版块&…

RK358支持全链路ECC的DDR和普通内存有何区别?

ECC内存&#xff08;ErrorCorrection Code Memory&#xff09;和普通内存是计算机存储技术中常见的两种类型的内存。它们在设计和功能上有一些重要区别。接下来我们将详细解释ECC内存和普通内存&#xff0c;并列举它们之间的区别以及ECC内存的纠错原理。 普通内存通常被称为非E…

MQ - 16 集群篇_分布式集群的数据一致性方案

文章目录 导图Pre分区、副本和数据倾斜副本间数据同步方式同步复制异步复制CAP 和一致性模型集群数据一致性和可靠性实现ZooKeeper 数据一致性和可靠性Kafka 数据一致性和可靠性Pulsar 数据一致性和可靠性总结导图 Pre MQ - 14 集群篇_如何构建分布式的消息队列集群&#

FOXBORO FBM233 P0926GX控制脉冲模块

FOXBORO FBM233 P0926GX 是一种控制脉冲模块&#xff0c;通常用于工业自动化和控制系统中。这个模块的主要功能是生成和控制脉冲信号&#xff0c;以用于执行特定的操作或控制过程。以下是可能适用于 FOXBORO FBM233 P0926GX 控制脉冲模块的一些常见特点&#xff1a; 脉冲生成&a…

【力扣-每日一题】213. 打家劫舍 II

class Solution { public:int getMax(int n,vector<int> &nums){int a0,bnums[n],c0;for(int in1;i<nums.size()n-1;i){ //sizen-1,为0时&#xff0c;第一个可以偷&#xff0c;最后一个不能偷size-1&#xff1b;n为1时&#xff0c;最后一个可偷&#xff0c;计算…

【00】FISCO BCOS区块链简介

官方文档&#xff1a;https://fisco-bcos-documentation.readthedocs.io/zh_CN/latest/docs/introduction.html FISCO BCOS是由国内企业主导研发、对外开源、安全可控的企业级金融联盟链底层平台&#xff0c;由金链盟开源工作组协作打造&#xff0c;并于2017年正式对外开源。 F…

Java基于SpringBoot的会员制医疗预约服务系统,可作为毕业设计

博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝30W、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 文章目录 1 简介2 技术栈 3系统分析3.1需求分析 4系统总体设计4.1系统结构4.2数据库设计4.2.1 数据库概念…