spark导入doris的几种方式

news2025/1/10 2:35:29

本文主要介绍通过spark导入doris的3种方式。

1.最简单的方式:jdbc

jdbc 方式需要引入mysql-connector-java的依赖


<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.48</version>
</dependency>

代码demo

	.....


    df.show()
    df
      .write
      .format("jdbc")
      .mode(SaveMode.Append)
      .option("driver", "com.mysql.jdbc.Driver")
      .option("url", "jdbc:mysql://xxxx:xx/xx?rewriteBatchedStatements=true")
      .option("batchsize", "10000")
      .option("user", "xxxx")
      .option("password", "xxxx")
      .option("isolationLevel", "NONE")
      .option("dbtable", "xxxxxx")
      .save()


注意:

一定要添加?rewriteBatchedStatements=true参数,不然导入速度会很慢。

2.Doris官方推荐的方式:Spark Doris Connector


Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。

代码库地址:https://github.com/apache/doris-spark-connector

  • 支持从Doris中读取数据
  • 支持Spark DataFrame批量/流式 写入Doris
  • 可以将Doris表映射为DataFrame或者RDD,推荐使用DataFrame
  • 支持在Doris端完成数据过滤,减少数据传输量。

版本兼容:

ConnectorSparkDorisJavaScala
1.2.03.2, 3.1, 2.31.0 +82.12, 2.11
1.1.03.2, 3.1, 2.31.0 +82.12, 2.11
1.0.13.1, 2.30.12 - 0.1582.12, 2.11

使用已经编译好的版本


可在https://repo.maven.apache.org/maven2/org/apache/doris/下载需要的jar包 但是可供选择的版本比较少,目前只有下图中的3个。

在这里插入图片描述

自行编译


编译步骤:
  1. 修改custom_env.sh.tpl文件,重命名为custom_env.sh
  2. 在源码目录下执行: sh build.sh 根据提示输入你需要的 Scala 与 Spark 版本进行编译。

编译成功后,会在 dist 目录生成目标jar包,如:spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar。 将此文件复制到 SparkClassPath 中即可使用 Spark-Doris-Connector

例如,Local 模式运行的 Spark,将此文件放入 jars/ 文件夹下。Yarn集群模式运行的Spark,则将此文件放入预部署包中。

例如将 spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar 上传到 hdfs 并在 spark.yarn.jars 参数上添加 hdfs 上的 Jar 包路径

  1. 上传 spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar 到hdfs。
hdfs dfs -mkdir /spark-jars/
hdfs dfs -put /your_local_path/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar /spark-jars/
  1. 在集群中添加 spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar 依赖。
spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar

使用Maven管理

<dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>spark-doris-connector-3.2_2.12</artifactId>
    <version>1.2.0</version>
</dependency>

请根据不同的 Spark 和 Scala 版本替换相应的 Connector 版本。

写入示例

DataFrame(batch/stream)

## batch sink
val mockDataDF = List(
  (3, "440403001005", "21.cn"),
  (1, "4404030013005", "22.cn"),
  (33, null, "23.cn")
).toDF("id", "mi_code", "mi_name")
mockDataDF.show(5)

mockDataDF.write.format("doris")
  .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
  .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
  .option("user", "$YOUR_DORIS_USERNAME")
  .option("password", "$YOUR_DORIS_PASSWORD")
  //其它选项
  //指定你要写入的字段
  .option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE")
  .save()

## stream sink(StructuredStreaming)
val kafkaSource = spark.readStream
  .option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
  .option("startingOffsets", "latest")
  .option("subscribe", "$YOUR_KAFKA_TOPICS")
  .format("kafka")
  .load()
kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
  .writeStream
  .format("doris")
  .option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
  .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
  .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
  .option("user", "$YOUR_DORIS_USERNAME")
  .option("password", "$YOUR_DORIS_PASSWORD")
  //其它选项
  //指定你要写入的字段
  .option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE")
  .start()
  .awaitTermination()

3.Spark Load

Spark Load 通过外部的 Spark 资源实现对导入数据的预处理,提高 Doris 大数据量的导入性能并且节省 Doris 集群的计算资源。主要用于初次迁移,大数据量导入 Doris 的场景。

Spark Load 是利用了 Spark 集群的资源对要导入的数据的进行了排序,Doris BE 直接写文件,这样能大大降低 Doris 集群的资源使用,对于历史海量数据迁移降低 Doris 集群资源使用及负载有很好的效果。

配置 ETL 集群

Spark Load 是利用了 Spark 集群的资源对要导入的数据的进行了排序,Doris BE 直接写文件,这样能大大降低 Doris 集群的资源使用,对于历史海量数据迁移降低 Doris 集群资源使用及负载有很好的效果。

提交 Spark 导入任务之前,需要配置执行 ETL 任务的 Spark 集群。

-- create spark resource
CREATE EXTERNAL RESOURCE resource_name
PROPERTIES
(
  type = spark,
  spark_conf_key = spark_conf_value,
  working_dir = path,
  broker = broker_name,
  broker.property_key = property_value,
  broker.hadoop.security.authentication = kerberos,
  broker.kerberos_principal = doris@YOUR.COM,
  broker.kerberos_keytab = /home/doris/my.keytab
  broker.kerberos_keytab_content = ASDOWHDLAWIDJHWLDKSALDJSDIWALD
)

-- drop spark resource
DROP RESOURCE resource_name

-- show resources
SHOW RESOURCES
SHOW PROC "/resources"

-- privileges
GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identity
GRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name

REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identity
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name

创建资源

resource_name 为 Doris 中配置的 Spark 资源的名字。

PROPERTIES 是 Spark 资源相关参数,如下:

  • type:资源类型,必填,目前仅支持 Spark。

  • Spark 相关参数如下:

    • spark.master: 必填,目前支持 Yarn,Spark://host:port。
    • spark.submit.deployMode: Spark 程序的部署模式,必填,支持 Cluster、Client 两种。
    • spark.hadoop.fs.defaultFS: Master 为 Yarn 时必填。
    • spark.submit.timeout:spark任务超时时间,默认5分钟
  • YARN RM 相关参数如下:

    • 如果 Spark 为单点 RM,则需要配置spark.hadoop.yarn.resourcemanager.address,表示单点 ResourceManager 地址。
    • 如果 Spark 为 RM-HA,则需要配置(其中 hostname 和 address 任选一个配置):
      • spark.hadoop.yarn.resourcemanager.ha.enabled: ResourceManager 启用 HA,设置为 True。
      • spark.hadoop.yarn.resourcemanager.ha.rm-ids: ResourceManager 逻辑 ID 列表。
      • spark.hadoop.yarn.resourcemanager.hostname.rm-id: 对于每个 rm-id,指定 ResourceManager 对应的主机名。
      • spark.hadoop.yarn.resourcemanager.address.rm-id: 对于每个 rm-id,指定 host:port 以供客户端提交作业。
  • HDFS HA 相关参数如下:

    • spark.hadoop.fs.defaultFS, HDFS 客户端默认路径前缀
    • spark.hadoop.dfs.nameservices, HDFS 集群逻辑名称
    • spark.hadoop.dfs.ha.namenodes.nameservices01 , nameservice 中每个 NameNode 的唯一标识符
    • spark.hadoop.dfs.namenode.rpc-address.nameservices01.mynamenode1, 每个 NameNode 的完全限定的 RPC 地址
    • spark.hadoop.dfs.namenode.rpc-address.nameservices01.mynamenode2, 每个 NameNode 的完全限定的 RPC 地址
    • spark.hadoop.dfs.client.failover.proxy.provider = org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider, 设置实现类
  • working_dir
    

    : ETL 使用的目录。Spark 作为 ETL 资源使用时必填。例如:hdfs://host:port/tmp/doris。

    • 其他参数为可选,参考 http://spark.apache.org/docs/latest/configuration.html
  • working_dir: ETL 使用的目录。Spark 作为 ETL 资源使用时必填。例如:hdfs://host:port/tmp/doris。

  • broker.hadoop.security.authentication:指定认证方式为 Kerberos。

  • broker.kerberos_principal:指定 Kerberos 的 Principal。

  • broker.kerberos_keytab:指定 Kerberos 的 Keytab 文件路径。该文件必须为 Broker 进程所在服务器上的文件的绝对路径,并且可以被 Broker 进程访问。

  • broker.kerberos_keytab_content:指定 Kerberos 中 Keytab 文件内容经过 Base64 编码之后的内容。这个跟 broker.kerberos_keytab 配置二选一即可。

  • broker
    

    : Broker 名字。Spark 作为 ETL 资源使用时必填。需要使用

    ALTER SYSTEM ADD BROKER
    

    命令提前完成配置。

    • broker.property_key: Broker 读取 ETL 生成的中间文件时需要指定的认证信息等。
  • env
    

    : 指定 Spark 环境变量,支持动态设置,比如当认证 Hadoop 认为方式为 Simple 时,设置 Hadoop 用户名和密码

    • env.HADOOP_USER_NAME: 访问 Hadoop 用户名
    • env.HADOOP_USER_PASSWORD:密码

示例:

-- yarn cluster 模式
CREATE EXTERNAL RESOURCE "spark0"
PROPERTIES
(
  "type" = "spark",
  "spark.master" = "yarn",
  "spark.submit.deployMode" = "cluster",
  "spark.jars" = "xxx.jar,yyy.jar",
  "spark.files" = "/tmp/aaa,/tmp/bbb",
  "spark.executor.memory" = "1g",
  "spark.yarn.queue" = "queue0",
  "spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
  "spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
  "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
  "broker" = "broker0",
  "broker.username" = "user0",
  "broker.password" = "password0"
);

-- spark standalone client 模式
CREATE EXTERNAL RESOURCE "spark1"
PROPERTIES
(
  "type" = "spark",
  "spark.master" = "spark://127.0.0.1:7777",
  "spark.submit.deployMode" = "client",
  "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
  "broker" = "broker1"
);

-- yarn HA 模式
CREATE EXTERNAL RESOURCE sparkHA
PROPERTIES
(
  "type" = "spark",
  "spark.master" = "yarn",
  "spark.submit.deployMode" = "cluster",
  "spark.executor.memory" = "1g",
  "spark.yarn.queue" = "default",
  "spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
  "spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
  "spark.hadoop.yarn.resourcemanager.address.rm1" = "xxxx:8032",
  "spark.hadoop.yarn.resourcemanager.address.rm2" = "xxxx:8032",
  "spark.hadoop.fs.defaultFS" = "hdfs://nameservices01",
  "spark.hadoop.dfs.nameservices" = "nameservices01",
  "spark.hadoop.dfs.ha.namenodes.nameservices01" = "mynamenode1,mynamenode2",
  "spark.hadoop.dfs.namenode.rpc-address.nameservices01.mynamenode1" = "xxxx:8020",
  "spark.hadoop.dfs.namenode.rpc-address.nameservices01.mynamenode2" = "xxxx:8020",
  "spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
  "working_dir" = "hdfs://nameservices01/doris_prd_data/sinan/spark_load/",
  "broker" = "broker_name",
  "broker.username" = "username",
  "broker.password" = "",
  "broker.dfs.nameservices" = "nameservices01",
  "broker.dfs.ha.namenodes.nameservices01" = "mynamenode1, mynamenode2",
  "broker.dfs.namenode.rpc-address.nameservices01.mynamenode1" = "xxxx:8020",
  "broker.dfs.namenode.rpc-address.nameservices01.mynamenode2" = "xxxx:8020",
  "broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);

【INFO】本文发自CSDN,尊重原创,转载请先获得许可,并注明原文出处。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/881927.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

案例18 基于Spring Boot+MyBatis的图书信息维护案例

一、案例需求 基于Spring BootMyBatis实现图书信息的新增、修改、删除、查询功能&#xff0c;并实现MySQL数据库的操作。 MySQL数据库创建图书表&#xff08;t_book&#xff09;&#xff0c;图书表有主键、图书名称、图书类别、作者、出版社、简介信息。 二、数据初始化 创建…

“继承与组合:代码复用的两种策略“

White graces&#xff1a;个人主页 &#x1f439;今日诗词:"故人陆续凋零&#xff0c;好似风中落叶啊"&#x1f439; &#x1f649; 内容推荐:“掌握类与对象&#xff0c;点亮编程之路“(下)&#x1f649; &#x1f649;专栏推荐:“Java入门指南&#xff1a;从零开…

案例17 基于Spring Boot+MyBatis的学生信息维护案例

一、案例需求 基于Spring BootMyBatis实现学生信息的新增、修改、删除、查询功能&#xff0c;并实现MySQL数据库的操作。 MySQL数据库创建学生表&#xff08;t_student&#xff09;&#xff0c;有主键、姓名、年龄、性别、出生日期、身份证号、电话号码信息。 二、数据初始化…

07 - 查看、创建、切换和删除分支

查看所有文章链接&#xff1a;&#xff08;更新中&#xff09;GIT常用场景- 目录 文章目录 1. 查看分支2. 创建和切换分支3. 删除分支 1. 查看分支 git branch -va2. 创建和切换分支 第一种&#xff1a; 创建分支&#xff1a; git branch new_branch切换分支&#xff1a; …

PHP实现在线年龄计算器

1. 输入日期查询年龄 2. php laravel框架实现 代码 /*** 在线年龄计算器*/public function ageDateCal(){// 输入的生日时间$birthday $this->request(birthday);// 当前时间$currentDate date(Y-m-d);// 计算周岁$age date_diff(date_create($birthday), date_create($…

Linux 多进程

目录 0x01 linux中特殊的进程 0x02 进程的标识 0x03 创建子进程 0x01 linux中特殊的进程 0号进程&#xff1a;idle进程&#xff0c;系统启动加载的进程1号进程&#xff1a;systemd进程&#xff0c;系统初始化&#xff0c;是所有进程的祖先进程 init2号进程&#xff1a;kthre…

Zabbix监控Kubernets获取节点模板报错

Preprocessing failed for: {“error”:"Request failed with status code 401: {“kind”:“Status”,“apiVersion”:“v1”,"met ad …1. Failed: Discovery error: TypeError: cannot read property 1 of null. Zabbix 监控 Kubernetes 出现采集错误&#xff0c;…

麒麟系统相关

创建虚拟机 镜像下载地址 选择合适的镜像&#xff0c;进入引导后注意不要选择默认的第一条&#xff0c;选择第二条进入安装程序。 root密码修改 使用命令 sudo passwd root 开启ssh 配置好网络后发现能ping通&#xff0c;但无法ssh连接&#xff0c;ps -ef | grep ssh 得…

基于docker实现主从复制

1&#xff1a;实现主从复制这个过程我是趟过坑的&#xff0c;后面是自己动手搞了几遍都成功了以后才开始决定记录的&#xff0c;&#xff08;所以有的截图和上下文对不上的&#xff0c;比如说docker容器的名字对应不上&#xff0c;大家就用自己的就好&#xff09;&#xff0c;打…

“数据”对于仓库管理有多重要?!

仓库数据的重要性 做好仓库数据管理对企业的重要性不言而喻。通过有效地管理数据&#xff0c;企业可以更好地了解市场需求和库存情况&#xff0c;快速响应市场变化&#xff0c;提高库存周转率和客户满意度&#xff1b;此外&#xff0c;数据管理还可以帮助企业降低库存成本、减…

提升物流管理效率,快递批量查询高手软件助你一臂之力

物流管理中&#xff0c;准确跟踪和掌握快递的物流信息是非常重要的。而快递批量查询高手软件的出现&#xff0c;大大提高了物流管理的效率&#xff0c;为企业带来了诸多便利。 传统的快递查询方式往往需要手动逐个输入快递单号&#xff0c;费时费力且容易出错。而有了快递批量查…

源于传承,擎领未来,新架构、新工艺下的“换心工程”——金融电子化访中电金信副总经理、研究院院长况文川

当前&#xff0c;商业银行的经营环境正在发生着深刻而复杂的变化&#xff0c;在深化改革主旋律的指引下&#xff0c;数字化转型已成为我国商业银行普遍认同、广泛采用的战略性举措。核心系统作为承载银行业务的关键支柱系统&#xff0c;一直是各银行在金融科技建设中重点关注和…

Linux/centos上如何配置管理NFS服务器?

Linux/centos上如何配置管理NFS服务器&#xff1f; 1 NFS基础了解1.1 NFS概述1.2 NFS工作流程 2 安装和启动NFS服务2.1 安装NFS服务器2.2 启动NFS服务 3 配置NFS服务器和客户端3.1 配置NFS服务器3.2 配置NFS客户端 4 实际示例4.1 基本要求4.2 案例实现 1 NFS基础了解 NFS&…

Android UI自动化测试框架—SoloPi简介

1、UI自动化测试简介 软件测试简介 ​软件测试是伴随着软件开发一同诞生的&#xff0c;随着软件规模大型化&#xff0c;结构复杂化&#xff0c;软件测试也从最初的简单“调试”&#xff0c;发展到当今的自动化测试。 ​ 自动化测试是什么呢&#xff1f;自动化测试是把以人为…

解析固态光耦的独特特点和优势

固态光耦概述及其重要性 固态光耦是一种电子元件&#xff0c;具有独特的光电隔离功能&#xff0c;广泛应用于电气控制、通信和电力系统等领域。本文将深入探讨固态光耦的特点和优势&#xff0c;介绍它在市场中的重要性以及如何提高收录和首页排名。 高速、高精度的信号传输 …

Dolphinscheduler简单应用(二)—— 告警通知

一、本章目标 演示Dolphinscheduler的告警通知功能,将SQL任务组件查询返回结果集指定为邮件通知内容(支持为:表格、附件或表格附件三种模板)。二、 前提条件 已完成Dolphinscheduler部署 K8S集群部署,可参考文章:基于K8S环境部署Dolphinscheduler及简单应用其他部署形式,…

LeetCode236. 二叉树的最近公共祖先

236. 二叉树的最近公共祖先 文章目录 [236. 二叉树的最近公共祖先](https://leetcode.cn/problems/lowest-common-ancestor-of-a-binary-tree/)一、题目二、题解方法一&#xff1a;递归构建祖先数组方法二&#xff1a;一个非常方便的递归 一、题目 给定一个二叉树, 找到该树中…

性能测试|App性能测试需要关注的指标

一、Android客户端性能测试常见指标&#xff1a; 1、内存 2、CPU 3、流量 4、电量 5、启动速度 6、滑动速度、界面切换速度 7、与服务器交互的网络速度 二、预期标准指定原则 1、分析竞争对手的产品&#xff0c;所有指标要强于竞品 2、产品经理给出的预期性能指标数据…

青翼科技自研2路250MSPS DA回放FMC子卡模块

FMC150_V30是一款基于VITA57.1规范的2路125MSPS采样率16位分辨率AD采集、2路250MSPS采样率16位分辨率DA回放FMC子卡模块。该模块遵循VITA57.1规范&#xff0c;可直接与符合VITA57.1规范的FPGA载卡配合使用&#xff0c;板卡ADC器件采用ADI公司的AD9268芯片&#xff0c;板卡DAC器…

DDIM: DENOISING DIFFUSION IMPLICIT MODELS

DDIM: DENOISING DIFFUSION IMPLICIT MODELS 去噪扩散隐式模型DDIM预测噪声生成过程 实验 论文题目&#xff1a;Denoising Diffusion Implicit Models (DDIM) 论文来源&#xff1a;ICLR 2021 论文地址&#xff1a;https://arxiv.org/pdf/2010.02502.pdf 论文代码&#xff1a;ht…