本文主要介绍通过spark导入doris的3种方式。
1.最简单的方式:jdbc
jdbc 方式需要引入mysql-connector-java的依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
</dependency>
代码demo
.....
df.show()
df
.write
.format("jdbc")
.mode(SaveMode.Append)
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://xxxx:xx/xx?rewriteBatchedStatements=true")
.option("batchsize", "10000")
.option("user", "xxxx")
.option("password", "xxxx")
.option("isolationLevel", "NONE")
.option("dbtable", "xxxxxx")
.save()
注意:
一定要添加?rewriteBatchedStatements=true参数,不然导入速度会很慢。
2.Doris官方推荐的方式:Spark Doris Connector
Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。
代码库地址:https://github.com/apache/doris-spark-connector
- 支持从
Doris
中读取数据 - 支持
Spark DataFrame
批量/流式 写入Doris
- 可以将
Doris
表映射为DataFrame
或者RDD
,推荐使用DataFrame
。 - 支持在
Doris
端完成数据过滤,减少数据传输量。
版本兼容:
Connector | Spark | Doris | Java | Scala |
---|---|---|---|---|
1.2.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 |
1.1.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 |
1.0.1 | 3.1, 2.3 | 0.12 - 0.15 | 8 | 2.12, 2.11 |
使用已经编译好的版本
可在https://repo.maven.apache.org/maven2/org/apache/doris/下载需要的jar包 但是可供选择的版本比较少,目前只有下图中的3个。
自行编译
编译步骤:
- 修改
custom_env.sh.tpl
文件,重命名为custom_env.sh
- 在源码目录下执行:
sh build.sh
根据提示输入你需要的 Scala 与 Spark 版本进行编译。
编译成功后,会在 dist
目录生成目标jar包,如:spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar
。 将此文件复制到 Spark
的 ClassPath
中即可使用 Spark-Doris-Connector
。
例如,Local
模式运行的 Spark
,将此文件放入 jars/
文件夹下。Yarn
集群模式运行的Spark
,则将此文件放入预部署包中。
例如将 spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar
上传到 hdfs 并在 spark.yarn.jars
参数上添加 hdfs 上的 Jar 包路径
- 上传
spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar
到hdfs。
hdfs dfs -mkdir /spark-jars/
hdfs dfs -put /your_local_path/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar /spark-jars/
- 在集群中添加
spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar
依赖。
spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar
使用Maven管理
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-3.2_2.12</artifactId>
<version>1.2.0</version>
</dependency>
请根据不同的 Spark 和 Scala 版本替换相应的 Connector 版本。
写入示例
DataFrame(batch/stream)
## batch sink
val mockDataDF = List(
(3, "440403001005", "21.cn"),
(1, "4404030013005", "22.cn"),
(33, null, "23.cn")
).toDF("id", "mi_code", "mi_name")
mockDataDF.show(5)
mockDataDF.write.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
//其它选项
//指定你要写入的字段
.option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE")
.save()
## stream sink(StructuredStreaming)
val kafkaSource = spark.readStream
.option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
.option("startingOffsets", "latest")
.option("subscribe", "$YOUR_KAFKA_TOPICS")
.format("kafka")
.load()
kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
.writeStream
.format("doris")
.option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
//其它选项
//指定你要写入的字段
.option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE")
.start()
.awaitTermination()
3.Spark Load
Spark Load 通过外部的 Spark 资源实现对导入数据的预处理,提高 Doris 大数据量的导入性能并且节省 Doris 集群的计算资源。主要用于初次迁移,大数据量导入 Doris 的场景。
Spark Load 是利用了 Spark 集群的资源对要导入的数据的进行了排序,Doris BE 直接写文件,这样能大大降低 Doris 集群的资源使用,对于历史海量数据迁移降低 Doris 集群资源使用及负载有很好的效果。
配置 ETL 集群
Spark Load 是利用了 Spark 集群的资源对要导入的数据的进行了排序,Doris BE 直接写文件,这样能大大降低 Doris 集群的资源使用,对于历史海量数据迁移降低 Doris 集群资源使用及负载有很好的效果。
提交 Spark 导入任务之前,需要配置执行 ETL 任务的 Spark 集群。
-- create spark resource
CREATE EXTERNAL RESOURCE resource_name
PROPERTIES
(
type = spark,
spark_conf_key = spark_conf_value,
working_dir = path,
broker = broker_name,
broker.property_key = property_value,
broker.hadoop.security.authentication = kerberos,
broker.kerberos_principal = doris@YOUR.COM,
broker.kerberos_keytab = /home/doris/my.keytab
broker.kerberos_keytab_content = ASDOWHDLAWIDJHWLDKSALDJSDIWALD
)
-- drop spark resource
DROP RESOURCE resource_name
-- show resources
SHOW RESOURCES
SHOW PROC "/resources"
-- privileges
GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identity
GRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identity
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name
创建资源
resource_name
为 Doris 中配置的 Spark 资源的名字。
PROPERTIES
是 Spark 资源相关参数,如下:
-
type
:资源类型,必填,目前仅支持 Spark。 -
Spark 相关参数如下:
spark.master
: 必填,目前支持 Yarn,Spark://host:port。spark.submit.deployMode
: Spark 程序的部署模式,必填,支持 Cluster、Client 两种。spark.hadoop.fs.defaultFS
: Master 为 Yarn 时必填。spark.submit.timeout
:spark任务超时时间,默认5分钟
-
YARN RM 相关参数如下:
- 如果 Spark 为单点 RM,则需要配置
spark.hadoop.yarn.resourcemanager.address
,表示单点 ResourceManager 地址。 - 如果 Spark 为 RM-HA,则需要配置(其中 hostname 和 address 任选一个配置):
spark.hadoop.yarn.resourcemanager.ha.enabled
: ResourceManager 启用 HA,设置为 True。spark.hadoop.yarn.resourcemanager.ha.rm-ids
: ResourceManager 逻辑 ID 列表。spark.hadoop.yarn.resourcemanager.hostname.rm-id
: 对于每个 rm-id,指定 ResourceManager 对应的主机名。spark.hadoop.yarn.resourcemanager.address.rm-id
: 对于每个 rm-id,指定 host:port 以供客户端提交作业。
- 如果 Spark 为单点 RM,则需要配置
-
HDFS HA 相关参数如下:
spark.hadoop.fs.defaultFS
, HDFS 客户端默认路径前缀spark.hadoop.dfs.nameservices
, HDFS 集群逻辑名称spark.hadoop.dfs.ha.namenodes.nameservices01
, nameservice 中每个 NameNode 的唯一标识符spark.hadoop.dfs.namenode.rpc-address.nameservices01.mynamenode1
, 每个 NameNode 的完全限定的 RPC 地址spark.hadoop.dfs.namenode.rpc-address.nameservices01.mynamenode2
, 每个 NameNode 的完全限定的 RPC 地址spark.hadoop.dfs.client.failover.proxy.provider
=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
, 设置实现类
-
working_dir
: ETL 使用的目录。Spark 作为 ETL 资源使用时必填。例如:hdfs://host:port/tmp/doris。
- 其他参数为可选,参考 http://spark.apache.org/docs/latest/configuration.html
-
working_dir
: ETL 使用的目录。Spark 作为 ETL 资源使用时必填。例如:hdfs://host:port/tmp/doris。 -
broker.hadoop.security.authentication
:指定认证方式为 Kerberos。 -
broker.kerberos_principal
:指定 Kerberos 的 Principal。 -
broker.kerberos_keytab
:指定 Kerberos 的 Keytab 文件路径。该文件必须为 Broker 进程所在服务器上的文件的绝对路径,并且可以被 Broker 进程访问。 -
broker.kerberos_keytab_content
:指定 Kerberos 中 Keytab 文件内容经过 Base64 编码之后的内容。这个跟broker.kerberos_keytab
配置二选一即可。 -
broker
: Broker 名字。Spark 作为 ETL 资源使用时必填。需要使用
ALTER SYSTEM ADD BROKER
命令提前完成配置。
broker.property_key
: Broker 读取 ETL 生成的中间文件时需要指定的认证信息等。
-
env
: 指定 Spark 环境变量,支持动态设置,比如当认证 Hadoop 认为方式为 Simple 时,设置 Hadoop 用户名和密码
env.HADOOP_USER_NAME
: 访问 Hadoop 用户名env.HADOOP_USER_PASSWORD
:密码
示例:
-- yarn cluster 模式
CREATE EXTERNAL RESOURCE "spark0"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.jars" = "xxx.jar,yyy.jar",
"spark.files" = "/tmp/aaa,/tmp/bbb",
"spark.executor.memory" = "1g",
"spark.yarn.queue" = "queue0",
"spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
"spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
"working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
"broker" = "broker0",
"broker.username" = "user0",
"broker.password" = "password0"
);
-- spark standalone client 模式
CREATE EXTERNAL RESOURCE "spark1"
PROPERTIES
(
"type" = "spark",
"spark.master" = "spark://127.0.0.1:7777",
"spark.submit.deployMode" = "client",
"working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
"broker" = "broker1"
);
-- yarn HA 模式
CREATE EXTERNAL RESOURCE sparkHA
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.executor.memory" = "1g",
"spark.yarn.queue" = "default",
"spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
"spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
"spark.hadoop.yarn.resourcemanager.address.rm1" = "xxxx:8032",
"spark.hadoop.yarn.resourcemanager.address.rm2" = "xxxx:8032",
"spark.hadoop.fs.defaultFS" = "hdfs://nameservices01",
"spark.hadoop.dfs.nameservices" = "nameservices01",
"spark.hadoop.dfs.ha.namenodes.nameservices01" = "mynamenode1,mynamenode2",
"spark.hadoop.dfs.namenode.rpc-address.nameservices01.mynamenode1" = "xxxx:8020",
"spark.hadoop.dfs.namenode.rpc-address.nameservices01.mynamenode2" = "xxxx:8020",
"spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"working_dir" = "hdfs://nameservices01/doris_prd_data/sinan/spark_load/",
"broker" = "broker_name",
"broker.username" = "username",
"broker.password" = "",
"broker.dfs.nameservices" = "nameservices01",
"broker.dfs.ha.namenodes.nameservices01" = "mynamenode1, mynamenode2",
"broker.dfs.namenode.rpc-address.nameservices01.mynamenode1" = "xxxx:8020",
"broker.dfs.namenode.rpc-address.nameservices01.mynamenode2" = "xxxx:8020",
"broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);
【INFO】本文发自CSDN,尊重原创,转载请先获得许可,并注明原文出处。