数据导入
1.概述
Apache Doris 提供多种数据导入方案,可以针对不同的数据源进行选择不同的数据导入方式。
数据源 | 导入方式 |
---|---|
对象存储(s3),HDFS | 使用 Broker 导入数据 |
本地文件 | Stream Load, MySQL Load |
Kafka | 订阅 Kafka 数据 |
Mysql、PostgreSQL,Oracle,SQLServer | 通过外部表同步数据 |
通过 JDBC 导入 | 使用 JDBC 同步数据 |
导入 JSON 格式数据 | JSON 格式数据导入 |
AutoMQ | AutoMQ Load |
按导入方式划分
Broker Load | 通过 Broker 导入外部存储数据 |
---|---|
Stream Load | 流式导入数据 (本地文件及内存数据) |
Routine Load | 导入 Kafka 数据 |
Insert Into | 外部表通过 INSERT 方式导入数据 |
S3 Load | S3 协议的对象存储数据导入 |
MySQL Load | MySQL 客户端导入本地数据 |
支持的数据格式 : 不同的导入方式支持的数据格式略有不同。
导入方式 | 支持的格式 |
---|---|
Broker Load | parquet, orc, csv, gzip |
Stream Load | csv, json, parquet, orc |
Routine Load | csv, json |
MySQL Load | csv |
Apache Doris 的每一个导入作业,不论是使用 Broker Load 进行批量导入,还是使用 INSERT 语句进行单条导入,都是一个完整的事务操作。导入事务可以保证一批次内的数据原子生效,不会出现部分数据写入的情况。
同时,一个导入作业都会有一个 Label,用于在一个数据库(Database)下唯一标识一个导入作业。Label 可以由用户指定,部分导入功能也会由系统自动生成。
Label 是用于保证对应的导入作业,仅能成功导入一次。一个被成功导入的 Label,再次使用时,会被拒绝并报错 Label already used。通过这个机制,可以在 Doris 侧做到 At-Most-Once 语义。如果结合上游系统的 At-Least-Once 语义,则可以实现导入数据的 Exactly-Once 语义。
导入方式分为同步和异步。对于同步导入方式,返回结果即表示导入成功还是失败。而对于异步导入方式,返回成功仅代表作业提交成功,不代表数据导入成功,需要使用对应的命令查看导入作业的运行状态。
2.Insert 导入
INSERT INTO 支持将 Doris 查询的结果导入到另一个表中。INSERT INTO 是一个同步导入方式,执行导入后返回导入结果。可以通过请求的返回判断导入是否成功。INSERT INTO 可以保证导入任务的原子性,要么全部导入成功,要么全部导入失败。
主要的 Insert Into 命令包含以下两种:
-
INSERT INTO tbl SELECT …
-
INSERT INTO tbl (col1, col2, …) VALUES (1, 2, …), (1,3, …)
这种导入方式并不适用于大量数据的场景,性能太差,只能测试的时候使用
第一步:创建表
CREATE TABLE testdb.test_table(
user_id BIGINT NOT NULL COMMENT "用户 ID",
name VARCHAR(20) COMMENT "用户姓名",
age INT COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;
第二步:使用 INSERT INTO VALUES 向源表导入数据(不推荐在生产环境中使用)
INSERT INTO testdb.test_table (user_id, name, age)
VALUES (1, "Emily", 25),
(2, "Benjamin", 35),
(3, "Olivia", 28),
(4, "Alexander", 60),
(5, "Ava", 17);
INSERT INTO 是一种同步导入方式,导入结果会直接返回给用户。
Query OK, 5 rows affected (0.308 sec)
{'label':'label_3e52da787aab4222_9126d2fce8f6d1e5', 'status':'VISIBLE', 'txnId':'9081'}
也可以通过SELECT 来导入数据
INSERT INTO testdb.test_table2
SELECT * FROM testdb.test_table WHERE age < 30;
Query OK, 3 rows affected (0.544 sec)
{'label':'label_9c2bae970023407d_b2c5b78b368e78a7', 'status':'VISIBLE', 'txnId':'9084'}
查看手册:https://doris.incubator.apache.org/zh-CN/docs/data-operate/import/insert-into-manual#%E5%8F%82%E8%80%83%E6%89%8B%E5%86%8C
3.Stream Load
Stream Load 支持通过 HTTP 协议将本地文件或数据流导入到 Doris 中。Stream Load 是一个同步导入方式,执行导入后返回导入结果,可以通过请求的返回判断导入是否成功。一般来说,可以使用 Stream Load 导入 10GB
以下的文件,如果文件过大,建议将文件进行切分后使用 Stream Load 进行导入。Stream Load 可以保证一批导入任务的原子性,要么全部导入成功,要么全部导入失败。
Stream Load 支持导入 CSV、JSON、Parquet 与 ORC 格式的数据。在导入 CSV 文件时,需要明确区分空值(null)与空字符串:
-
空值(null)需要用 \N 表示,a,\N,b 数据表示中间列是一个空值(null)
-
空字符串直接将数据置空,a, ,b 数据表示中间列是一个空字符串
在使用 Stream Load 时,需要通过 HTTP 协议发起导入作业给 FE 节点,FE 会以轮询方式,重定向(redirect)请求给一个 BE 节点以达到负载均衡的效果。也可以直接发送 HTTP 请求作业给指定的 BE 节点。在 Stream Load 中,Doris 会选定一个节点做为 Coordinator 节点。Coordinator 节点负责接受数据并分发数据到其他节点上。
-
Client 向 FE 提交 Stream Load 导入作业请求
-
FE 会随机选择一台 BE 作为 Coordinator 节点,负责导入作业调度,然后返回给 Client 一个 HTTP 重定向
-
Client 连接 Coordinator BE 节点,提交导入请求
-
Coordinator BE 会分发数据给相应 BE 节点,导入完成后会返回导入结果给 Client
-
Client 也可以直接通过指定 BE 节点作为 Coordinator,直接分发导入作业
Stream Load 通过 HTTP 协议提交和传输。下例以 curl 工具为例,演示通过 Stream Load 提交导入作业。详细语法可以参见 STREAM LOAD
该方式中涉及HOST:PORT都是对应的HTTP协议端口。但须保证客户端所在机器网络能够联通FE,BE所在机器。
- BE的HTTP协议端口,默认为8040。
- FE的HTTP协议端口,默认为8030。
第一步:创建一个数据库和表
CREATE TABLE testdb.test_streamload(
user_id BIGINT NOT NULL COMMENT "用户 ID",
name VARCHAR(20) COMMENT "用户姓名",
age INT COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
第二步:创建一个load.txt文件,内容如下
1,Emily,25
2,Benjamin,35
3,Olivia,28
4,Alexander,60
5,Ava,17
6,William,69
7,Sophia,32
8,James,64
9,Emma,37
10,Liam,64
第三步:执行下面命令
curl -u root:123456 -H "label:load_local_file_test" \
-H "column_separator:," -T load.txt \
http://192.168.220.253:8040/api/testdb/test_streamload/_stream_load
- -u root:123456 : 代表的是doris的账号密码
- label : 标签用来防止重复导入
- column_separator:, :代表的是数据的分隔符
- -T load.txt : 要导入的数据
- http://drois地址:BE端口/api/数据库/数据表/_stream_load
更多的导入参数看配置:https://doris.incubator.apache.org/zh-CN/docs/data-operate/import/stream-load-manual#%E5%AF%BC%E5%85%A5%E9%85%8D%E7%BD%AE%E5%8F%82%E6%95%B0
4.Broker Load
Stream Load 是一种推的方式,即导入的数据依靠客户端读取,并推送到 Doris。Broker Load 则是将导入请求发送给 Doris,有 Doris 主动拉取数据,所以如果数据存储在类似 HDFS 或者 对象存储中,则使用 Broker Load 是最方便的。这样,数据就不需要经过客户端,而有 Doris 直接读取导入。Broker Load 适合源数据存储在远程存储系统,比如 HDFS,并且数据量比较大的场景,比如几十G,上百G。
用户在提交导入任务后,FE 会生成对应的 Plan 并根据目前 BE 的个数和文件的大小,将 Plan 分给 多个 BE 执行,每个 BE 执行一部分导入数据。
BE 在执行的过程中会从 Broker 拉取数据,在对数据 transform 之后将数据导入系统。所有 BE 均完成导入,由 FE 最终决定导入是否成功。
从上图中可以看到,BE 会依赖 Broker 进程来读取相应远程存储系统的数据。之所以引入 Broker 进程,主要是用来针对不同的远程存储系统,用户可以按照 Broker 进程的标准开发其相应的 Broker 进程,Broker 进程可以使用 Java 程序开发,更好的兼容大数据生态中的各类存储系统。由于 broker 进程和 BE 进程的分离,也确保了两个进程的错误隔离,提升 BE 的稳定性。
注意:使用Broker 导入必须安装和启动 Broker ,以及安装和启动HDFS,高一点的Doris默认安装好了HDFS,我这个版本还需自己安装
5.安装HDFS
第一步:下载 hadoop , 下载地址https://downloads.apache.org/hadoop/common/ ,下载之后进行解压,我的解压目录为 /root/hadoop-3.2.4
解压命令如下
tar -zxf hadoop-3.2.4.tar.gz
第二步:创建数据存储目录
mkdir /root/hadoop-3.2.4/tmp
mkdir /root/hadoop-3.2.4/hdfs
mkdir /root/hadoop-3.2.4/hdfs/data
mkdir /root/hadoop-3.2.4/hdfs/name
第四步:配置环境变量 vi /etc/profile
,加入一下内容 ,保存退出后,然后执行命令:source /etc/profile
让其生效
export HADOOP_HOME=/root/hadoop-3.2.4
export PATH=$PATH:$HADOOP_HOME/bin
第四步:修改下面五个文件,位置在 /root/hadoop-3.2.4/etc/hadoop目录中
hadoop-3.2.4/etc/hadoop/hadoop-env.sh
hadoop-3.2.4/etc/hadoop/core-site.xml
hadoop-3.2.4/etc/hadoop/hdfs-site.xml
hadoop-3.2.4/etc/hadoop/mapred-site.xml
hadoop-3.2.4/etc/hadoop/yarn-site.xml
每个文件的内容分别如下 ,第一个: hadoop-env.sh ,加入Java的环境变量
# The java implementation to use.
#export JAVA_HOME=${JAVA_HOME}
export JAVA_HOME=/usr/java/jdk1.8.0_171-amd64
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
修改 core-site.xml 配置文件,内容如下
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://机器IP:9000</value>
####注释 : HDFS的URI,文件系统://namenode标识:端口号
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/root/hadoop-3.2.4/tmp</value>
###注释: namenode上本地的hadoop临时文件夹
</property>
</configuration>
修改配置文件 hdfs-site.xml 内容如下
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
<description>副本个数,配置默认是3,应小于datanode机器数量</description>
</property>
</configuration>
修改配置文件 mapred-site.xml ,内容如下
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
修改配置文件 yarn-site.xml ,内容如下
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
最后进入到安装目录 cd /root/hadoop-3.2.4/
然后执行下面命令启动HDFS
bin/hdfs namenode -format #对 HDFS这个分布式文件系统中的 DataNode 进行分块
sbin/hadoop-daemon.sh start namenode
sbin /hadoop-daemon.sh start datanode
sbin/hadoop-daemon.sh start secondarynamenode
启动之后,创建一个文件 test.txt ; 执行测试命令: hadoop fs -put ./test.txt /
,浏览器访问9870端口可以查看文件列表,如下
6.通导入数据
第一步,创建一个数据文件,比如:load.text,内容如下
5,xxx,22
6,ooo,33
7,jjj,44
这三个列对应了我Doris中的数据库和表 testdb.test_streamload
,然后执行命令把 load.txt 上传到HDFS中
hadoop fs -put ./load.txt /
第二步:执行命令,把HDFS上的load.txt 数据导入到Doris中,访问 http://ip:8030/ ,通过界面导入如下
LOAD LABEL testdb.lable_test_streamload //标签,用来判断重复的
(
DATA INFILE("hdfs://192.168.220.253:9000/load.txt") //HDFS上的数据文件
INTO TABLE `test_streamload` //表名
COLUMNS TERMINATED BY "," //数据分隔符
(user_id,name,age) //数据对应的列
)
with HDFS (
"fs.defaultFS"="hdfs://192.168.220.253:9000", //hdfs默认地址
"hdfs_user"="root" //hdfs用户名
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);
执行后效果如下:
然后通过:select * from test_streamload 即可查看导入的数据
数据的导出
数据导出(Export)是 Doris 提供的一种将数据导出的功能。该功能可以将用户指定的表或分区的数据,以文本的格式,通过 Broker 进程导出到远端存储上,如 HDFS / 对象存储(支持 S3 协议)等。
访问:http://192.168.220.253:8030/ ,执行导出命令
EXPORT TABLE testdb.test_streamload //导出的数据库和表
PARTITION (test_streamload) //分区多个分区用逗号分开
TO "hdfs://192.168.220.253:9000/test_streamload" //HDFS地址,后面是表名
PROPERTIES
(
"label" = "mylabel", //标签
"column_separator"=",", //分隔符
"columns" = "user_id,username,age", //导出的字段名
"exec_mem_limit"="2147483648",
"timeout" = "3600"
)
WITH BROKER "fs_broker" // brocker的名字
(
"username" = "root",
"password"="123456"
);
- 查看分区命令:show PARTITIONS from 数据库.表;
- fs_broker :Brocker的名字可以在 doris控制台界面的 system中查看
- label:本次导出作业的标识。后续可以使用这个标识查看作业状态
- column_separator:列分隔符。默认为 \t。支持不可见字符,比如 ‘\x07’
- columns:要导出的列,使用英文状态逗号隔开,如果不填这个参数默认是导出表的所有列。
- exec_mem_limit:表示 Export 作业中,一个查询计划在单个 BE 上的内存使用限制。默认 2GB。单位字节。
- timeout:作业超时时间。默认 2 小时。单位秒。
执行成功后,可以通过HDFS控制台查看导出的文件
文章结束,如果对你有所帮助请收藏加好评哦