SparkSQL 读写数据攻略:从基础到实战

news2025/1/18 17:05:38

目录

一、输入Source

1)代码演示最普通的文件读取方式:

2) 通过jdbc读取数据库数据

3) 读取table中的数据【hive】

二、输出Sink

实战一:保存普通格式

实战二:保存到数据库中

实战三:将结果保存在hive表中

三、总结


        在大数据处理领域,SparkSQL 以其强大的数据处理能力和丰富的数据源支持备受青睐。它能够高效地读取和写入多种格式的数据,无论是本地文件、分布式文件系统(如 HDFS)上的数据,还是数据库、Hive 表中的数据,都能轻松驾驭。今天,就让我们深入探究 SparkSQL 读写数据的方式,通过详细的代码示例和原理讲解,助你全面掌握这一关键技能。

 

一、输入Source

 

  • 类型:text / csv【任意固定分隔符】 / json / orc / parquet / jdbc / table【Hive表】
  • 语法:spark.read.format(格式).load(读取的地址)

方式一:给定读取数据源的类型和地址

spark.read.format("json").load(path)
spark.read.format("csv").load(path)
spark.read.format("parquet").load(path)

方式二:直接调用对应数据源类型的方法

spark.read.json(path)
spark.read.csv(path)
spark.read.parquet(path)

特殊参数:option,用于指定读取时的一些配置选项

spark.read.format("csv").option("sep", "\t").load(path)

jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

 

1)代码演示最普通的文件读取方式:

from pyspark.sql import SparkSession
import os

if __name__ == '__main__':
	# 构建环境变量
	# 配置环境
	os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
	# 配置Hadoop的路径,就是前面解压的那个路径
	os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
	# 配置base环境Python解析器的路径
	os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
	os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

	# 获取sparkSession对象
	spark = SparkSession.builder.master("local[2]").appName("第一次构建SparkSession").config(
		"spark.sql.shuffle.partitions", 2).getOrCreate()

	df01 = spark.read.json("../../datas/resources/people.json")
	df01.printSchema()
	df02 = spark.read.format("json").load("../../datas/resources/people.json")
	df02.printSchema()
	df03 = spark.read.parquet("../../datas/resources/users.parquet")
	df03.printSchema()
	#spark.read.orc("")
	df04 = spark.read.format("orc").load("../../datas/resources/users.orc")
	df04.printSchema()
	df05 = spark.read.format("csv").option("sep",";").load("../../datas/resources/people.csv")
	df05.printSchema()

	df06 = spark.read.load(
		path="../../datas/resources/people.csv",
		format="csv",
		sep=";"
	)
	df06.printSchema()

	spark.stop()

 

2) 通过jdbc读取数据库数据

先在本地数据库或者linux数据库中插入一张表:

CREATE TABLE `emp`  (
  `empno` int(11) NULL DEFAULT NULL,
  `ename` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `job` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `mgr` int(11) NULL DEFAULT NULL,
  `hiredate` date NULL DEFAULT NULL,
  `sal` decimal(7, 2) NULL DEFAULT NULL,
  `comm` decimal(7, 2) NULL DEFAULT NULL,
  `deptno` int(11) NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of emp
-- ----------------------------
INSERT INTO `emp` VALUES (7369, 'SMITH', 'CLERK', 7902, '1980-12-17', 800.00, NULL, 20);
INSERT INTO `emp` VALUES (7499, 'ALLEN', 'SALESMAN', 7698, '1981-02-20', 1600.00, 300.00, 30);
INSERT INTO `emp` VALUES (7521, 'WARD', 'SALESMAN', 7698, '1981-02-22', 1250.00, 500.00, 30);
INSERT INTO `emp` VALUES (7566, 'JONES', 'MANAGER', 7839, '1981-04-02', 2975.00, NULL, 20);
INSERT INTO `emp` VALUES (7654, 'MARTIN', 'SALESMAN', 7698, '1981-09-28', 1250.00, 1400.00, 30);
INSERT INTO `emp` VALUES (7698, 'BLAKE', 'MANAGER', 7839, '1981-05-01', 2850.00, NULL, 30);
INSERT INTO `emp` VALUES (7782, 'CLARK', 'MANAGER', 7839, '1981-06-09', 2450.00, NULL, 10);
INSERT INTO `emp` VALUES (7788, 'SCOTT', 'ANALYST', 7566, '1987-04-19', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7839, 'KING', 'PRESIDENT', NULL, '1981-11-17', 5000.00, NULL, 10);
INSERT INTO `emp` VALUES (7844, 'TURNER', 'SALESMAN', 7698, '1981-09-08', 1500.00, 0.00, 30);
INSERT INTO `emp` VALUES (7876, 'ADAMS', 'CLERK', 7788, '1987-05-23', 1100.00, NULL, 20);
INSERT INTO `emp` VALUES (7900, 'JAMES', 'CLERK', 7698, '1981-12-03', 950.00, NULL, 30);
INSERT INTO `emp` VALUES (7902, 'FORD', 'ANALYST', 7566, '1981-12-03', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7934, 'MILLER', 'CLERK', 7782, '1982-01-23', 1300.00, NULL, 10);

dept的数据:

CREATE TABLE `dept`  (
  `deptno` int(11) NULL DEFAULT NULL,
  `dname` varchar(14) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `loc` varchar(13) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of dept
-- ----------------------------
INSERT INTO `dept` VALUES (10, 'ACCOUNTING', 'NEW YORK');
INSERT INTO `dept` VALUES (20, 'RESEARCH', 'DALLAS');
INSERT INTO `dept` VALUES (30, 'SALES', 'CHICAGO');
INSERT INTO `dept` VALUES (40, 'OPERATIONS', 'BOSTON');

接着放驱动程序:

py4j.protocol.Py4JJavaError: An error occurred while calling o67.load.
: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:102)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:102)

Python环境放入MySQL连接驱动

  • 找到工程中pyspark库包所在的环境,将驱动包放入环境所在的jars目录中
  • 如果是Linux上:注意集群模式所有节点都要放。

第一种情况:

假如你是windows环境:

最终的路径是在这里:

第二种情况:linux环境下,按照如下方式进行

# 进入目录
cd /opt/installs/anaconda3/lib/python3.8/site-packages/pyspark/jars

# 上传jar包:mysql-connector-java-5.1.32.jar

代码练习:

import os

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType

if __name__ == '__main__':
	# 获取sparkSession对象
	# 设置 任务的环境变量
	os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
	# 配置Hadoop的路径,就是前面解压的那个路径
	os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
	# 配置base环境Python解析器的路径
	os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径
	os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
	# 得到sparkSession对象
	spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()
	# 处理逻辑
	# 读取json 数据
	df1 = spark.read.format("json").load("../../datas/sql/person.json")
	df1.show()
	# 另一种写法,推荐使用这一种
	df2 = spark.read.json("../../datas/sql/person.json")
	df2.show()
	df3 = spark.read.csv("../../datas/dept.csv")
	df4 = spark.read.format("csv").load("../../datas/dept.csv")

	# 读取分隔符为别的分隔符的文件
	user_schema = StructType([
		StructField(name="emp_id", dataType=StringType(), nullable=False),
		StructField(name="emp_name", dataType=StringType(), nullable=True),
		StructField(name="salary", dataType=DoubleType(), nullable=True),
		StructField(name="comm", dataType=DoubleType(), nullable=True),
		StructField(name="dept_id", dataType=LongType(), nullable=True)
	])
	# 使用csv 读取了一个 \t 为分隔符的文件,读取的数据字段名很随意,所以可以自定义
	df5 = spark.read.format("csv").option("sep","\t").load("../../datas/emp.tsv",schema=user_schema)
	df5.show()

	# 昨天的作业是否也可以有另一个写法
	movie_schema = StructType([
		StructField(name="movie_id", dataType=LongType(), nullable=False),
		StructField(name="movie_name", dataType=StringType(), nullable=True),
		StructField(name="movie_type", dataType=StringType(), nullable=True)
	])
	movieDF = spark.read.format("csv").option("sep","::").load("../../datas/zuoye/movies.dat",schema=movie_schema)
	movieDF.show()

	spark.read.load(
		path="../../datas/zuoye/movies.dat",
		format="csv",
		sep="::",
		schema=movie_schema
	).show()
	dict = {"user":"root","password":"root"}
	jdbcDf = spark.read.jdbc(url="jdbc:mysql://localhost:3306/spark",table="emp",properties=dict)
	jdbcDf.show()
	# jdbc的另一种写法
	jdbcDf2 = spark.read.format("jdbc") \
		.option("driver", "com.mysql.cj.jdbc.Driver") \
		.option("url", "jdbc:mysql://localhost:3306/spark") \
		.option("dbtable", "spark.dept") \
		.option("user", "root") \
		.option("password", "root").load()
	jdbcDf2.show()

	# 读取hive表中的数据

	# 关闭
	spark.stop()

 

3) 读取table中的数据【hive】

海量数据,如何处理,存储在hdfs上

第一种:

使用spark读取hdfs上的数据(可以使用sparkCore读取,也可以使用sparksql读取),将数据变为表【数据+Schema】,然后编写sql或者sparkCore代码。

rdd --> dataFrame

第二种:推荐

将hdfs上的数据映射成hive的表,然后通过sparkSql连接hive, 编写 sql 处理需求。

  • 场景:Hive底层默认是MR引擎,计算性能特别差,一般用Hive作为数据仓库,使用SparkSQL对Hive中的数据进行计算
    • 存储:数据仓库:Hive:将HDFS文件映射成表
    • 计算:计算引擎:SparkSQL、Impala、Presto:对Hive中的数据表进行处理
  • 问题:SparkSQL怎么能访问到Hive中有哪些表,以及如何知道Hive中表对应的HDFS的地址?

Hive中的表存在哪里?元数据--MySQL , 启动metastore服务即可。

本质上:SparkSQL访问了Metastore服务获取了Hive元数据,基于元数据提供的地址进行计算

先退出base环境:conda deactivate
启动服务:
启动hdfs:  start-dfs.sh  因为hive的数据在那里存储着
启动yarn:  start-yarn.sh 因为spark是根据yarn部署的,假如你的spark是standalone模式,不需要启动yarn.
日志服务也需要启动一下:
mapred --daemon start historyserver
# 启动Spark的HistoryServer:18080
/opt/installs/spark/sbin/start-history-server.sh
启动metastore服务: 因为sparkSQL需要知道表结构,和表数据的位置
hive-server-manager.sh start metastore
启动spark服务: 啥服务也没有了,已经启动完了。
查看metastore服务:
hive-server-manager.sh status metastore

修改配置:

cd /opt/installs/spark/conf
新增:hive-site.xml
vi hive-site.xml

在这个文件中,编写如下配置:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://bigdata01:9083</value>
    </property>
</configuration>

接着将该文件进行分发:
xsync.sh hive-site.xml

操作sparkSQL:

/opt/installs/spark/bin/pyspark --master local[2] --conf spark.sql.shuffle.partitions=2

此处的pyspark更像是一个客户端,里面可以通过python编写spark代码而已。而我们以前安装的pyspark更像是spark的python运行环境。

进入后,通过内置对象spark:

>>> spark.sql("show databases").show()
+---------+
|namespace|
+---------+
|  default|
|     yhdb|
+---------+

>>> spark.sql("select * from yhdb.student").show()
+---+------+                                                                    
|sid| sname|
+---+------+
|  1|laoyan|
|  1|廉德枫|
|  2|  刘浩|
|  3|  王鑫|
|  4|  司翔|
+---+------+

开发环境如何编写代码,操作hive:

Pycharm工具集成Hive开发SparkSQL,必须申明Metastore的地址和启用Hive的支持

spark = SparkSession \
        .builder \
        .appName("HiveAPP") \
        .master("local[2]") \
        .config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \
        .config('hive.metastore.uris', 'thrift://bigdata01:9083') \
        .config("spark.sql.shuffle.partitions", 2) \
        .enableHiveSupport()\
        .getOrCreate()

代码实战:

from pyspark.sql import SparkSession
import os


if __name__ == '__main__':
	# 构建环境变量
	# 配置环境
	os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
	# 配置Hadoop的路径,就是前面解压的那个路径
	os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
	# 配置base环境Python解析器的路径
	os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
	os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
	# 防止在本地操作hdfs的时候,出现权限问题
	os.environ['HADOOP_USER_NAME'] = 'root'

	# 获取sparkSession对象
	spark = SparkSession \
		.builder \
		.appName("HiveAPP") \
		.master("local[2]") \
		.config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \
		.config('hive.metastore.uris', 'thrift://bigdata01:9083') \
		.config("spark.sql.shuffle.partitions", 2) \
		.enableHiveSupport() \
		.getOrCreate()

	spark.sql("select * from yhdb.student").show()

	spark.stop()

代码还可以这样写:

方式二:加载Hive表的数据变成DF,可以调用DSL或者SQL的方式来实现计算

# 读取Hive表构建DataFrame

hiveData = spark.read.table("yhdb.student")

hiveData.printSchema()

hiveData.show()

# 读取hive表中的数据
	spark2 = SparkSession \
		.builder \
		.appName("HiveAPP") \
		.master("local[2]") \
		.config("spark.sql.warehouse.dir", 'hdfs://192.168.233.128:9820/user/hive/warehouse') \
		.config('hive.metastore.uris', 'thrift://192.168.233.128:9083') \
		.config("spark.sql.shuffle.partitions", 2) \
		.enableHiveSupport() \
		.getOrCreate()

	#spark2.sql("show databases").show()
	#spark2.sql("show  tables").show()

	#spark2.sql("select * from yhdb.t_user").show()

	spark2.read.table("t_user2").show()

不要在一个python 文件中,创建两个不同的sparkSession对象,否则对于sparksql获取hive的元数据,有影响。另外,记得添加一个权限校验的语句:

# 防止在本地操作hdfs的时候,出现权限问题
os.environ['HADOOP_USER_NAME'] = 'root'

为什么有些平台不支持,不兼容 sqoop flume datax 这些工具呢?

spark 可以读取日志数据

spark 可以读取数据库数据

spark 可以读取 hdfs 数据

spark 可以读取 hive 数据

------------------------------------

spark 可以读取日志数据,形成一个 A 表,读取 mysql 数据,形成一个 B 表

A 表和 B 表还可以相互关联,此时也就不需要 sqoop、flume、datax 去导入导出了。

spark 还可以将统计出来的结果直接放入 mysql 或者直接放入 hive

--------------------

我们后面学习的内容还是沿用 将日志数据,数据库数据等所有数据抽取到 hive ,然后呢,使用 spark 去统计,统计完之后还是放入 hive ,使用 datax 等工具将结果导出 mysql。

 

二、输出Sink

 

sink --> 下沉 --> 落盘 --> 保存起来

如果输出路径或者表已经存在了怎么办

  • 类型:text /csv【所有具有固定分隔符的文件】/ json/ orc/ parquet / jdbc / table【Hive表】
  • 语法:DataFrame.write.format(保存的类型).save(保存到哪)
    • 方法:save-保存到文件save(path)或者数据库表save()中,saveAsTable-用于保存到Hive表

方式一:给定输出数据源的类型和地址

df.write.format("json").save(path)
df.write.format("csv").save(path)
df.write.format("parquet").save(path)

方式二:直接调用对应数据源类型的方法

df.write.json(path)
df.write.csv(path)
df.write.parquet(path)

特殊参数:option,用于指定输出时的一些配置选项

df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.save()

输出模式:Save Mode

append: 追加模式,当数据存在时,继续追加
overwrite: 覆写模式,当数据存在时,覆写以前数据,存储当前最新数据;
error/errorifexists: 如果目标存在就报错,默认的模式
ignore: 忽略,数据存在时不做任何操作

代码如何编写:

df.write.mode(saveMode="append").format("csv").save(path)

 

实战一:保存普通格式

import os

from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    spark = SparkSession.builder.master("local[2]").appName("").config(
        "spark.sql.shuffle.partitions", 2).getOrCreate()

    df = spark.read.json("../../datas/person.json")

    # 获取年龄最大的人的名字
    df.createOrReplaceTempView("persons")
    rsDf = spark.sql("""
       select name,age from persons where age = (select max(age) from persons)
    """)
    # 将结果打印到控制台
    #rsDf.write.format("console").save()
    #rsDf.write.json("../../datas/result",mode="overwrite")
    #rsDf.write.mode(saveMode='overwrite').format("json").save("../../datas/result")
    #rsDf.write.mode(saveMode='overwrite').format("csv").save("../../datas/result1")
    #rsDf.write.mode(saveMode='overwrite').format("parquet").save("../../datas/result2")
    #rsDf.write.mode(saveMode='append').format("csv").save("../../datas/result1")
    # text 保存路径为hdfs 直接报错,不支持
    #rsDf.write.mode(saveMode='overwrite').text("hdfs://bigdata01:9820/result")
    #rsDf.write.orc("hdfs://bigdata01:9820/result",mode="overwrite")
    rsDf.write.parquet("hdfs://bigdata01:9820/result", mode="overwrite")

    spark.stop()
假如:
spark.sql("select concat(name,' ',age) from person").write.text("hdfs://bigdata01:9820/spark/result")
直接报错:假如你的输出类型是text类型,直接报错
pyspark.sql.utils.AnalysisException: Text data source does not support bigint data type.
假如修改为parquet等类型,是可以直接保存的:
rsDf.write.parquet("hdfs://bigdata01:9820/result")                                                                

 

实战二:保存到数据库中

import os

from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    spark = SparkSession.builder.master("local[2]").appName("").config(
        "spark.sql.shuffle.partitions", 2).getOrCreate()

    df = spark.read.format("csv").option("sep","\t").load("../../datas/zuoye/emp.tsv").toDF("id","name","sal","comm","deptno")

    # 获取年龄最大的人的名字
    df.createOrReplaceTempView("emps")
    rsDf = spark.sql("""
       select * from emps where comm is not null
    """)
    # 不需要事先将表创建好,它可以帮助我们创建
    rsDf.write.format("jdbc") \
     .option("driver", "com.mysql.cj.jdbc.Driver") \
     .option("url", "jdbc:mysql://localhost:3306/spark?characterEncoding=UTF-8") \
     .option("user","root") \
     .option("password", "123456") \
     .option("dbtable", "emp1") \
     .save(mode="overwrite")

    spark.stop()

 

实战三:将结果保存在hive表中

import os

from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
    os.environ['HADOOP_USER_NAME'] = 'root'


    spark = SparkSession \
        .builder \
        .appName("测试本地连接hive") \
        .master("local[2]") \
        .config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \
        .config('hive.metastore.uris', 'thrift://bigdata01:9083') \
        .config("spark.sql.shuffle.partitions", 2) \
        .enableHiveSupport() \
        .getOrCreate()

    df = spark.read.format("csv").option("sep", "\t").load("../../datas/zuoye/emp.tsv").toDF("id", "name", "sal",
                                                                                             "comm", "deptno")

    # 获取年龄最大的人的名字
    df.createOrReplaceTempView("emps")
    rsDf = spark.sql("""
           select * from emps where comm is not null
        """)

    rsDf.write.saveAsTable("yhdb03.emp")

    spark.stop()

三、总结

        SparkSQL 读写数据功能丰富强大,涵盖多种数据源与格式,理解其原理、语法和操作细节,结合不同业务场景(如数据分析、数据迁移、数据存储优化等)灵活运用,能极大提升大数据处理效率,助力在大数据领域深挖数据价值、攻克业务难题,为数据驱动决策筑牢根基。后续实践中,多尝试不同数据、场景组合,深化掌握程度。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2256144.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【1】Python交叉编译到OpenHarmony标准系统运行(arm32位)

本文介绍如何Python语言如何在OpenHarmony标准系统运行,包括5.0r和4.1r以及4.0r,和未来版本的OpenHarmony版本上。 Python语言在OpenHarmony上使用,需要将Python解释器CPython移植到OpenHarmony标准系统。通过交叉编译的方式。 首先来了解几个概念: CPython 是 Python 编…

Windows环境中Python脚本开机自启动及其监控自启动

1 开机自启动 Windows 10/Windows Server 201X具有一个名为“启动”的已知文件夹&#xff0c;系统每次启动开始自动运行应用程序、快捷方式和脚本时都会检查该文件夹&#xff0c;而无需额外配置。 要在Windows启动时运行脚本&#xff0c;先使用WindowsR快捷键打开“运行”对话…

Mysql索引原理及优化——岁月云实战笔记

根据Mysql索引原理及优化这个博主的视频学习&#xff0c;对现在的岁月云项目中mysql进行优化&#xff0c;我要向这个博主致敬&#xff0c;之前应用居多&#xff0c;理论所知甚少&#xff0c;于是将学习到东西&#xff0c;应用下来&#xff0c;看看是否有好的改观。 1 索引原理…

JavaWeb学习(3)(Servlet详细、Servlet的三种实现方式(面试)、Servlet的生命周期、传统web.xml配置Servlet(了解))

目录 一、Servlet详细。 &#xff08;1&#xff09;基本介绍。 &#xff08;2&#xff09;基本作用。 1、接收客户端请求数据。 2、处理请求。 3、完成响应结果。 二、Servlet的三种实现方式。 &#xff08;1&#xff09;实现javax.servlet.Servlet接口。 1、基本介绍。 2、代码…

第6章:布局 --[CSS零基础入门]

CSS 布局是网页设计中至关重要的一个方面&#xff0c;它决定了页面上元素的排列和展示方式。以下是几种常见的 CSS 布局方法和技术&#xff1a; 1. 浮动布局&#xff08;Float Layout&#xff09; 浮动布局&#xff08;Float Layout&#xff09;曾经是网页设计中创建多列布局…

哪里可以找到高质量的街道夜景短视频素材?夜景素材网站推荐

在短视频创作的浪潮中&#xff0c;街道夜景作为一种视觉效果独特、氛围浓郁的题材&#xff0c;深受创作者的青睐。不论是商业广告、创意短片还是个人Vlog&#xff0c;街道夜景的视频素材都能为你的作品增添不小的分量。那么&#xff0c;在哪里可以找到这些高质量的街道夜景短视…

Unity类银河战士恶魔城学习总结(P166 Ailments FX 异常状态伤害粒子特效)

【Unity教程】从0编程制作类银河恶魔城游戏_哔哩哔哩_bilibili 教程源地址&#xff1a;https://www.udemy.com/course/2d-rpg-alexdev/ 本章节创建了三种粒子特效&#xff0c;火焰&#xff0c;寒冰&#xff0c;雷电 主场景创建/特效/粒子 初始的例子特效 火焰 寒冰 雷电 En…

游戏引擎学习第38天

仓库: https://gitee.com/mrxiao_com/2d_game 回顾上次的内容。 我们之前讨论了将精灵放在屏幕上&#xff0c;但颜色错误的问题。问题最终查明是因为使用了一个调整工具&#xff0c;导致文件的字节顺序发生了变化。重新运行“image magic”工具对一些大图像进行重新处理后&am…

数据结构---带头双向循环链表

目录 一、概念 二、接口实现 1、申请新节点 2、初始化 3、尾插 4、尾删 5、头插 6、头删 7、计算链表长度 8、在pos之前插入 9、删除pos位置 10、销毁 三、完整代码 四、顺序表和链表的区别 一、概念 带头双向循环链表&#xff1a;构最复杂&#xff0c;结一…

学习记录:js算法(一百一十八):连接所有点的最小费用

文章目录 连接所有点的最小费用思路一 连接所有点的最小费用 给你一个points 数组&#xff0c;表示 2D 平面上的一些点&#xff0c;其中 points[i] [xi, yi] 。 连接点 [xi, yi] 和点 [xj, yj] 的费用为它们之间的 曼哈顿距离 &#xff1a;|xi - xj| |yi - yj| &#xff0c;其…

使用Altair绘制带有回归线的散点图

散点图和回归线 两个不同数值变量的值在散点图中用点或圆圈表示。每个点在水平轴和垂直轴中的位置表示单个数据点的值。散点图有利于观察变量之间的关系。回归线是最适合数据的直线&#xff0c;从线到图表上绘制的点的总距离最小。 安装 pip install altair在本文中的数据集…

【问题解决方案】项目路径更改后pycharm选定解释器无效

1. 问题重述 第一次创建项目并且项目路径下创建venv虚拟环境后修改项目的路径&#xff08;整个项目移动到另外的地方&#xff09;&#xff0c;这时候出现 2.解决方案 用我这篇文章的方式这时候是解决不了问题的&#xff0c;两个问题出现的原因不同&#xff0c;这个是项目关联…

【C语言--趣味游戏系列】--电脑关机整蛊小游戏

前言&#xff1a; 老铁们&#xff0c;还是那句话&#xff0c;学习很苦游戏来补&#xff0c; 为了提高大家与朋友之间的友谊&#xff0c;博主在这里分享一个电脑关机的恶作剧小游戏&#xff0c;快拿去试试吧&#xff01;&#xff01;&#xff01; 目录&#xff1a; 1.电脑关机代…

基于Matlab卷积神经网络的交通标志识别系统研究与实现

交通标志识别作为智能交通系统的核心技术之一&#xff0c;不仅在自动驾驶领域发挥着关键作用&#xff0c;还在现代道路安全管理中具有重要意义。交通标志为驾驶员提供了有关道路情况的及时信息&#xff0c;包括限速、行驶方向、停车、危险警告等内容&#xff0c;因此能够准确、…

论文概览 |《Urban Analytics and City Science》2023.03 Vol.50 Issue.3

本次给大家整理的是《Environment and Planning B: Urban Analytics and City Science》杂志2023年3月第50卷第3期的论文的题目和摘要&#xff0c;一共包括18篇SCI论文&#xff01; 论文1 A new kind of search 一种新型的搜索 【摘要】 ChatGPT (2022) was first launched o…

Jenkins 中自定义Build History中显示构建信息

有时候会遇到一个代码仓库下面会有多个不同的分支&#xff0c;而这写分支表示着不同的开发者在开发新的需求&#xff0c;但是这样就会出现一个问题&#xff0c;在Jenkins上进行多分支构建的时候&#xff0c;很难找到哪一个是属于自己分支构建的&#xff0c;这样的问题大家应该都…

spring6:3容器:IoC

spring6&#xff1a;3容器&#xff1a;IoC 目录 spring6&#xff1a;3容器&#xff1a;IoC3、容器&#xff1a;IoC3.1、IoC容器3.1.1、控制反转&#xff08;IoC&#xff09;3.1.2、依赖注入3.1.3、IoC容器在Spring的实现 3.2、基于XML管理Bean3.2.1、搭建子模块spring6-ioc-xml…

Java项目实战II基于微信小程序的无中介租房系统(开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、核心代码 五、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者&#xff0c;专注于大学生项目实战开发、讲解和毕业答疑辅导。 一、前言 随着城市化进程的加速&#xff0c;租房市场日益繁荣&a…

使用Vue3+Echarts实现加载中国地图,点击省份地图下钻(完整教程)

一. 前言 在众多 ECharts 图表类型中&#xff0c;开发者始终绕不开的有各种各样的地图开发&#xff0c;关于地图开发&#xff0c;可能比其他图表相对繁琐一些&#xff0c;其实说简单也简单&#xff0c;说复杂也复杂&#xff0c;其中不乏有层级地图、3D 地图等&#xff0c;感觉…

WPF表格控件的列利用模块适配动态枚举类

将枚举列表转化到类内部赋值&#xff0c;在初始化表格行加载和双击事件时&#xff0c;触发类里面的枚举列表的赋值 <c1:Column Header"变更类型" Binding"{Binding ChangeType, ModeTwoWay, ValidatesOnExceptionsTrue, ValidatesOnDataErrorsTrue, NotifyOn…