SparkSql读取数据的方式

news2024/11/12 23:50:16

一、读取普通文件 

方式一:给定读取数据源的类型和地址

spark.read.format("json").load(path)
spark.read.format("csv").load(path)
spark.read.format("parquet").load(path)

方式二:直接调用对应数据源类型的方法

spark.read.json(path)
spark.read.csv(path)
spark.read.parquet(path)

1、代码演示最普通的文件读取方式: 

from pyspark.sql import SparkSession
import os


if __name__ == '__main__':
	# 构建环境变量
	# 配置环境
	os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
	# 配置Hadoop的路径,就是前面解压的那个路径
	os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
	# 配置base环境Python解析器的路径
	os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
	os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

	# 获取sparkSession对象
	spark = SparkSession.builder.master("local[2]").appName("第一次构建SparkSession").config(
		"spark.sql.shuffle.partitions", 2).getOrCreate()

	df01 = spark.read.json("../../datas/resources/people.json")
	df01.printSchema()
	df02 = spark.read.format("json").load("../../datas/resources/people.json")
	df02.printSchema()
	df03 = spark.read.parquet("../../datas/resources/users.parquet")
	df03.printSchema()
	#spark.read.orc("")
	df04 = spark.read.format("orc").load("../../datas/resources/users.orc")
	df04.printSchema()
	df05 = spark.read.format("csv").option("sep",";").load("../../datas/resources/people.csv")
	df05.printSchema()

	df06 = spark.read.load(
		path="../../datas/resources/people.csv",
		format="csv",
		sep=";"
	)
	df06.printSchema()

	spark.stop()

二、 通过jdbc读取数据库数据

先在本地数据库或者linux数据库中插入一张表:

CREATE TABLE `emp`  (
  `empno` int(11) NULL DEFAULT NULL,
  `ename` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `job` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `mgr` int(11) NULL DEFAULT NULL,
  `hiredate` date NULL DEFAULT NULL,
  `sal` decimal(7, 2) NULL DEFAULT NULL,
  `comm` decimal(7, 2) NULL DEFAULT NULL,
  `deptno` int(11) NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of emp
-- ----------------------------
INSERT INTO `emp` VALUES (7369, 'SMITH', 'CLERK', 7902, '1980-12-17', 800.00, NULL, 20);
INSERT INTO `emp` VALUES (7499, 'ALLEN', 'SALESMAN', 7698, '1981-02-20', 1600.00, 300.00, 30);
INSERT INTO `emp` VALUES (7521, 'WARD', 'SALESMAN', 7698, '1981-02-22', 1250.00, 500.00, 30);
INSERT INTO `emp` VALUES (7566, 'JONES', 'MANAGER', 7839, '1981-04-02', 2975.00, NULL, 20);
INSERT INTO `emp` VALUES (7654, 'MARTIN', 'SALESMAN', 7698, '1981-09-28', 1250.00, 1400.00, 30);
INSERT INTO `emp` VALUES (7698, 'BLAKE', 'MANAGER', 7839, '1981-05-01', 2850.00, NULL, 30);
INSERT INTO `emp` VALUES (7782, 'CLARK', 'MANAGER', 7839, '1981-06-09', 2450.00, NULL, 10);
INSERT INTO `emp` VALUES (7788, 'SCOTT', 'ANALYST', 7566, '1987-04-19', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7839, 'KING', 'PRESIDENT', NULL, '1981-11-17', 5000.00, NULL, 10);
INSERT INTO `emp` VALUES (7844, 'TURNER', 'SALESMAN', 7698, '1981-09-08', 1500.00, 0.00, 30);
INSERT INTO `emp` VALUES (7876, 'ADAMS', 'CLERK', 7788, '1987-05-23', 1100.00, NULL, 20);
INSERT INTO `emp` VALUES (7900, 'JAMES', 'CLERK', 7698, '1981-12-03', 950.00, NULL, 30);
INSERT INTO `emp` VALUES (7902, 'FORD', 'ANALYST', 7566, '1981-12-03', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7934, 'MILLER', 'CLERK', 7782, '1982-01-23', 1300.00, NULL, 10);

dept的数据:

CREATE TABLE `dept`  (
  `deptno` int(11) NULL DEFAULT NULL,
  `dname` varchar(14) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `loc` varchar(13) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of dept
-- ----------------------------
INSERT INTO `dept` VALUES (10, 'ACCOUNTING', 'NEW YORK');
INSERT INTO `dept` VALUES (20, 'RESEARCH', 'DALLAS');
INSERT INTO `dept` VALUES (30, 'SALES', 'CHICAGO');
INSERT INTO `dept` VALUES (40, 'OPERATIONS', 'BOSTON');

查询时会报如下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o67.load.
: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:102)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:102)

接着放驱动程序: 

Python环境放入MySQL连接驱动

  • 找到工程中pyspark库包所在的环境,将驱动包放入环境所在的jars目录中
  • 如果是Linux上:注意集群模式所有节点都要放。

第一种情况:

假如你是windows环境:

我的最终的路径是在这里:

第二种情况:linux环境下,按照如下方式进行

# 进入目录
cd /opt/installs/anaconda3/lib/python3.8/site-packages/pyspark/jars

# 上传jar包:mysql-connector-java-5.1.32.jar

代码演示: 

import os

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType


if __name__ == '__main__':
	# 获取sparkSession对象
	# 设置 任务的环境变量
	os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
	# 配置Hadoop的路径,就是前面解压的那个路径
	os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
	# 配置base环境Python解析器的路径
	os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径
	os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
	# 得到sparkSession对象
	spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()
	

	dict = {"user":"root","password":"root"}
	jdbcDf = spark.read.jdbc(url="jdbc:mysql://localhost:3306/spark",table="emp",properties=dict)
	jdbcDf.show()
	# jdbc的另一种写法
	jdbcDf2 = spark.read.format("jdbc") \
		.option("driver", "com.mysql.cj.jdbc.Driver") \
		.option("url", "jdbc:mysql://localhost:3306/spark") \
		.option("dbtable", "spark.dept") \
		.option("user", "root") \
		.option("password", "root").load()
	jdbcDf2.show()


	# 关闭
	spark.stop()

三、 读取table中的数据【hive】

  • 场景:Hive底层默认是MR引擎,计算性能特别差,一般用Hive作为数据仓库,使用SparkSQL对Hive中的数据进行计算
    • 存储:数据仓库:Hive:将HDFS文件映射成表
    • 计算:计算引擎:SparkSQL、Impala、Presto:对Hive中的数据表进行处理
  • 问题:SparkSQL怎么能访问到Hive中有哪些表,以及如何知道Hive中表对应的

1)集群环境操作hive 

需要启动的服务: 

先退出base环境:conda deactivate
启动服务:
启动hdfs:  start-dfs.sh  因为hive的数据在那里存储着
启动yarn:  start-yarn.sh 因为spark是根据yarn部署的,假如你的spark是standalone模式,不需要启动yarn.
日志服务也需要启动一下:
mapred --daemon start historyserver
# 启动Spark的HistoryServer:18080
/opt/installs/spark/sbin/start-history-server.sh
启动metastore服务: 因为sparkSQL需要知道表结构,和表数据的位置
hive-server-manager.sh start metastore
启动spark服务: 啥服务也没有了,已经启动完了。
查看metastore服务:
hive-server-manager.sh status metastore

修改配置: 

cd /opt/installs/spark/conf
新增:hive-site.xml
vi hive-site.xml

在这个文件中,编写如下配置:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://bigdata01:9083</value>
    </property>
</configuration>

接着将该文件进行分发:
xsync.sh hive-site.xml

操作sparkSQL: 

/opt/installs/spark/bin/pyspark --master local[2] --conf spark.sql.shuffle.partitions=2

此处的pyspark更像是一个客户端,里面可以通过python编写spark代码而已。而我们以前安装的pyspark更像是spark的python运行环境。

进入后,通过内置对象spark:

>>> spark.sql("show databases").show()
+---------+
|namespace|
+---------+
|  default|
|     yhdb|
+---------+

>>> spark.sql("select * from yhdb.student").show()
+---+------+                                                                    
|sid| sname|
+---+------+
|  1|laoyan|
|  1|廉德枫|
|  2|  刘浩|
|  3|  王鑫|
|  4|  司翔|
+---+------+

2)开发环境如何编写代码,操作hive:

代码实战:

from pyspark.sql import SparkSession
import os



if __name__ == '__main__':
	# 构建环境变量
	# 配置环境
	os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
	# 配置Hadoop的路径,就是前面解压的那个路径
	os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
	# 配置base环境Python解析器的路径
	os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
	os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
	# 防止在本地操作hdfs的时候,出现权限问题
	os.environ['HADOOP_USER_NAME'] = 'root'

	# 获取sparkSession对象
	spark = SparkSession \
		.builder \
		.appName("HiveAPP") \
		.master("local[2]") \
		.config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \
		.config('hive.metastore.uris', 'thrift://bigdata01:9083') \
		.config("spark.sql.shuffle.partitions", 2) \
		.enableHiveSupport() \
		.getOrCreate()

	spark.sql("select * from yhdb.student").show()

    spark.read.table("yhdb.student").show()

	spark.stop()

不要在一个python 文件中,创建两个不同的sparkSession对象,否则对于sparksql获取hive的元数据,有影响。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2237180.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

模型训练中GPU利用率低?

买了块魔改华硕猛禽2080ti&#xff0c;找了下没找到什么测试显存的软件&#xff0c;于是用训练模型来测试魔改后的显存稳定性&#xff0c;因为模型训练器没有资源监测&#xff0c;于是用了Windows任务管理器来查看显卡使用情况&#xff0c;却发现GPU的利用率怎么这么低&#xf…

在gitlab,把新分支替换成master分支

1、备份master分支&#xff0c;可以打tag 2、删除master分支 正常情况下&#xff0c;master分支不允许删除&#xff0c;需要做两个操作才能删除 a、变更项目默认分支为非master分支&#xff0c;可以先随便选择 b、取消master为非保护分支 操作了上述两步&#xff0c;就可以删…

Transformer究竟是什么?预训练又指什么?BERT

目录 Transformer究竟是什么? 预训练又指什么? BERT的影响力 Transformer究竟是什么? Transformer是一种基于自注意力机制(Self-Attention Mechanism)的神经网络架构,它最初是为解决机器翻译等序列到序列(Seq2Seq)任务而设计的。与传统的循环神经网络(RNN)或卷…

UE5.4 PCG 自定义PCG蓝图节点

ExecuteWithContext&#xff1a; PointLoopBody&#xff1a; 效果&#xff1a;点密度值与缩放成正比

SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”

SLF4J常见问题 1、SLF4J简介2、SLF4J实现原理3、SLF4J常见问题 1、SLF4J简介 SLF4J&#xff08;Simple Logging Facade for Java&#xff09;是一个为Java程序提供日志输出的统一接口&#xff0c;并不具备具体的日志实现方案&#xff0c;类似JDBC&#xff0c;SLF4J只做两件事&a…

MySQL记录锁、间隙锁、临键锁(Next-Key Locks)详解

行级锁&#xff0c;每次操作锁住对应的行数据。锁定粒度最小&#xff0c;发生锁冲突的概率最低&#xff0c;并发度最高。 应用在InnoDB存储引擎中。InnoDB的数据是基于索引组织的&#xff0c;行锁是通过对索引上的索引项加锁来实现的&#xff0c;而不是对记录加的锁。 对于行…

前端-懒加载

目录 1.懒加载的概念 2.懒加载的特点 3.懒加载的实现原理 4.懒加载与预加载的区别 5.懒加载实现 6.预加载实现 1.懒加载的概念 懒加载也加延迟加载、按需加载&#xff0c;指在长网页中延迟加载图片数据&#xff0c;是一种较好的网页性能优化的方式。 2.懒加载的特点 &…

uniapp 实现瀑布流

效果演示 组件下载 瀑布流布局-waterfall - DCloud 插件市场

若依后端项目打包镜像部署

添加打包依赖 <build><finalName>${project.artifactId}</finalName><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><executions><executi…

使用Python实现音频降噪

在音频处理领域&#xff0c;背景噪声是一个常见的问题。为了提高音频的质量&#xff0c;我们需要对音频进行降噪处理。本文将介绍如何使用 Python 实现音频降噪。 依赖库安装 在开始之前&#xff0c;我们需要安装以下依赖库&#xff1a; pydub&#xff1a;用于音频文件的读取…

18、论文阅读:AOD-Net:一体化除雾网络

AOD-Net: All-in-One Dehazing Network 前言介绍相关工作物理模型传统方法深度学习方法 建模与扩展变换后的公式网络设计与高级特征任务相结合 除雾评价数据集和实现 前言 该论文提出了一种基于卷积神经网络&#xff08;CNN&#xff09;的图像去雾模型&#xff0c;称为 All-in…

软件工程。

图 UML 数据流图&#xff08;DFD&#xff09; 1&#xff0c;数据流图概念 描绘信息流和数据从输入移动到输出的过程中所经受的变换。 也就是 数据流图。 数据流图以图形的方式描绘数据在系统中流动和处理的过程。 数据流图&#xff08;DFD&#xff0c;Data Flow Diagram&a…

机器学习—为什么我们需要激活函数

如果我们使用神经网络中每个神经元的线性激活函数&#xff0c;回想一下这个需求预测示例&#xff0c;如果对所有节点使用线性激活函数&#xff0c;在这个神经网络中&#xff0c;事实证明&#xff0c;这个大神经网络将变得与线性回归没有什么不同&#xff0c;所以这将挫败使用神…

Java基础使用②Java数据变量和类型+小知识点

目录 1. Java小知识点 1.1 Java注释 1.2 Java标识符命名 1.3 Java关键字 2. 字面常量和数据变量 2.1 字面常量 2.2 数据类型 3.变量 3.1 变量概念 3.2 语法格式 3.3 整型变量 3.4 浮点型变量 3.5 字符型变量 3.6 布尔型变量 3.7 类型转换 3.8 类型提升 4. 字符…

Blender 几何、线框猴头的构建 笔记

一、学习blender视频教程链接 案例7&#xff1a;猴头构建_建模动画_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1Bt4y1E7qn?spm_id_from333.788.videopod.episodes&vd_sourced0ea58f1127eed138a4ba5421c577eb1&p23 二、几何节点基础教程 1.首先添加几何节…

Kafka面试题解答(二)

1.怎么尽可能保证 Kafka 的可靠性 kafka是可能会出现数据丢失问题的&#xff0c;Leader维护了一个动态的in-sync replica set&#xff08;ISR&#xff09;&#xff0c;意为和 Leader保持同步的FollowerLeader集合(leader&#xff1a;0&#xff0c;isr:0,1,2)。 如果Follower长…

用科技力量,重塑数字化园区新生态!

数字化园区的成功打造绝非易事&#xff0c;它需要在多个关键层面付出持之以恒的努力&#xff0c;而成都树莓集团在这一进程中无疑发挥着重要作用。 在数据的管理与应用方面&#xff0c;成都树莓集团更是展现出卓越的实力。集团运用先进的数据挖掘、分析技术&#xff0c;助力园区…

Mac上无法访问usr/local的文件

sudo chmod 755 /usr/loca 最后用百度提供的方法解决了

【Lucene】原理学习路线

基于《Lucene原理与代码分析完整版》&#xff0c;借助chatgpt等大模型&#xff0c;制定了一个系统学习Lucene原理的计划&#xff0c;并将每个阶段的学习内容组织成专栏文章&#xff0c;zero2hero 手搓 Lucene的核心概念和实现细节。 深入的学习和专栏计划&#xff0c;覆盖Lucen…

友思特应用 | 动态捕捉:高光谱相机用于移动产线上的食品检测

导读 高光谱成像技术能够为食品安全助力。以友思特BlackIndustry SWIR 1.7 Max 为代表的高光谱相机&#xff0c;完美解决了移动产线检测的应用难点。 高光谱技术&#xff1a;为食品安全保驾护航 食品安全一直是大众关心的热点话题&#xff0c;提供安全、高质量的食品需要对食…