四.海量数据实时分析-Doris数据导入导出

news2024/11/13 10:58:13

数据导入

1.概述

Apache Doris 提供多种数据导入方案,可以针对不同的数据源进行选择不同的数据导入方式。

数据源导入方式
对象存储(s3),HDFS使用 Broker 导入数据
本地文件Stream Load, MySQL Load
Kafka订阅 Kafka 数据
Mysql、PostgreSQL,Oracle,SQLServer通过外部表同步数据
通过 JDBC 导入使用 JDBC 同步数据
导入 JSON 格式数据JSON 格式数据导入
AutoMQAutoMQ Load

按导入方式划分

Broker Load通过 Broker 导入外部存储数据
Stream Load流式导入数据 (本地文件及内存数据)
Routine Load导入 Kafka 数据
Insert Into外部表通过 INSERT 方式导入数据
S3 LoadS3 协议的对象存储数据导入
MySQL LoadMySQL 客户端导入本地数据

支持的数据格式 : 不同的导入方式支持的数据格式略有不同。

导入方式支持的格式
Broker Loadparquet, orc, csv, gzip
Stream Loadcsv, json, parquet, orc
Routine Loadcsv, json
MySQL Loadcsv

Apache Doris 的每一个导入作业,不论是使用 Broker Load 进行批量导入,还是使用 INSERT 语句进行单条导入,都是一个完整的事务操作。导入事务可以保证一批次内的数据原子生效,不会出现部分数据写入的情况。

同时,一个导入作业都会有一个 Label,用于在一个数据库(Database)下唯一标识一个导入作业。Label 可以由用户指定,部分导入功能也会由系统自动生成。

Label 是用于保证对应的导入作业,仅能成功导入一次。一个被成功导入的 Label,再次使用时,会被拒绝并报错 Label already used。通过这个机制,可以在 Doris 侧做到 At-Most-Once 语义。如果结合上游系统的 At-Least-Once 语义,则可以实现导入数据的 Exactly-Once 语义。

导入方式分为同步和异步。对于同步导入方式,返回结果即表示导入成功还是失败。而对于异步导入方式,返回成功仅代表作业提交成功,不代表数据导入成功,需要使用对应的命令查看导入作业的运行状态。

2.Insert 导入

INSERT INTO 支持将 Doris 查询的结果导入到另一个表中。INSERT INTO 是一个同步导入方式,执行导入后返回导入结果。可以通过请求的返回判断导入是否成功。INSERT INTO 可以保证导入任务的原子性,要么全部导入成功,要么全部导入失败。

主要的 Insert Into 命令包含以下两种:

  • INSERT INTO tbl SELECT …

  • INSERT INTO tbl (col1, col2, …) VALUES (1, 2, …), (1,3, …)

这种导入方式并不适用于大量数据的场景,性能太差,只能测试的时候使用

第一步:创建表

CREATE TABLE testdb.test_table(
    user_id            BIGINT       NOT NULL COMMENT "用户 ID",
    name               VARCHAR(20)           COMMENT "用户姓名",
    age                INT                   COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;

第二步:使用 INSERT INTO VALUES 向源表导入数据(不推荐在生产环境中使用)

INSERT INTO testdb.test_table (user_id, name, age)
VALUES (1, "Emily", 25),
       (2, "Benjamin", 35),
       (3, "Olivia", 28),
       (4, "Alexander", 60),
       (5, "Ava", 17);

INSERT INTO 是一种同步导入方式,导入结果会直接返回给用户。

Query OK, 5 rows affected (0.308 sec)
{'label':'label_3e52da787aab4222_9126d2fce8f6d1e5', 'status':'VISIBLE', 'txnId':'9081'}

也可以通过SELECT 来导入数据

INSERT INTO testdb.test_table2
SELECT * FROM testdb.test_table WHERE age < 30;
Query OK, 3 rows affected (0.544 sec)
{'label':'label_9c2bae970023407d_b2c5b78b368e78a7', 'status':'VISIBLE', 'txnId':'9084'}

查看手册:https://doris.incubator.apache.org/zh-CN/docs/data-operate/import/insert-into-manual#%E5%8F%82%E8%80%83%E6%89%8B%E5%86%8C

3.Stream Load

Stream Load 支持通过 HTTP 协议将本地文件或数据流导入到 Doris 中。Stream Load 是一个同步导入方式,执行导入后返回导入结果,可以通过请求的返回判断导入是否成功。一般来说,可以使用 Stream Load 导入 10GB 以下的文件,如果文件过大,建议将文件进行切分后使用 Stream Load 进行导入。Stream Load 可以保证一批导入任务的原子性,要么全部导入成功,要么全部导入失败。

Stream Load 支持导入 CSV、JSON、Parquet 与 ORC 格式的数据。在导入 CSV 文件时,需要明确区分空值(null)与空字符串:

  • 空值(null)需要用 \N 表示,a,\N,b 数据表示中间列是一个空值(null)

  • 空字符串直接将数据置空,a, ,b 数据表示中间列是一个空字符串

在使用 Stream Load 时,需要通过 HTTP 协议发起导入作业给 FE 节点,FE 会以轮询方式,重定向(redirect)请求给一个 BE 节点以达到负载均衡的效果。也可以直接发送 HTTP 请求作业给指定的 BE 节点。在 Stream Load 中,Doris 会选定一个节点做为 Coordinator 节点。Coordinator 节点负责接受数据并分发数据到其他节点上。

在这里插入图片描述

  1. Client 向 FE 提交 Stream Load 导入作业请求

  2. FE 会随机选择一台 BE 作为 Coordinator 节点,负责导入作业调度,然后返回给 Client 一个 HTTP 重定向

  3. Client 连接 Coordinator BE 节点,提交导入请求

  4. Coordinator BE 会分发数据给相应 BE 节点,导入完成后会返回导入结果给 Client

  5. Client 也可以直接通过指定 BE 节点作为 Coordinator,直接分发导入作业

Stream Load 通过 HTTP 协议提交和传输。下例以 curl 工具为例,演示通过 Stream Load 提交导入作业。详细语法可以参见 STREAM LOAD

该方式中涉及HOST:PORT都是对应的HTTP协议端口。但须保证客户端所在机器网络能够联通FE,BE所在机器。

  • BE的HTTP协议端口,默认为8040。
  • FE的HTTP协议端口,默认为8030。

第一步:创建一个数据库和表

CREATE TABLE testdb.test_streamload(
    user_id            BIGINT       NOT NULL COMMENT "用户 ID",
    name               VARCHAR(20)           COMMENT "用户姓名",
    age                INT                   COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

第二步:创建一个load.txt文件,内容如下

1,Emily,25
2,Benjamin,35
3,Olivia,28
4,Alexander,60
5,Ava,17
6,William,69
7,Sophia,32
8,James,64
9,Emma,37
10,Liam,64

第三步:执行下面命令

curl -u root:123456 -H "label:load_local_file_test" \
-H "column_separator:,"  -T load.txt \
http://192.168.220.253:8040/api/testdb/test_streamload/_stream_load
  • -u root:123456 : 代表的是doris的账号密码
  • label : 标签用来防止重复导入
  • column_separator:, :代表的是数据的分隔符
  • -T load.txt : 要导入的数据
  • http://drois地址:BE端口/api/数据库/数据表/_stream_load

更多的导入参数看配置:https://doris.incubator.apache.org/zh-CN/docs/data-operate/import/stream-load-manual#%E5%AF%BC%E5%85%A5%E9%85%8D%E7%BD%AE%E5%8F%82%E6%95%B0

4.Broker Load

Stream Load 是一种推的方式,即导入的数据依靠客户端读取,并推送到 Doris。Broker Load 则是将导入请求发送给 Doris,有 Doris 主动拉取数据,所以如果数据存储在类似 HDFS 或者 对象存储中,则使用 Broker Load 是最方便的。这样,数据就不需要经过客户端,而有 Doris 直接读取导入。Broker Load 适合源数据存储在远程存储系统,比如 HDFS,并且数据量比较大的场景,比如几十G,上百G。

用户在提交导入任务后,FE 会生成对应的 Plan 并根据目前 BE 的个数和文件的大小,将 Plan 分给 多个 BE 执行,每个 BE 执行一部分导入数据。

BE 在执行的过程中会从 Broker 拉取数据,在对数据 transform 之后将数据导入系统。所有 BE 均完成导入,由 FE 最终决定导入是否成功。

在这里插入图片描述
从上图中可以看到,BE 会依赖 Broker 进程来读取相应远程存储系统的数据。之所以引入 Broker 进程,主要是用来针对不同的远程存储系统,用户可以按照 Broker 进程的标准开发其相应的 Broker 进程,Broker 进程可以使用 Java 程序开发,更好的兼容大数据生态中的各类存储系统。由于 broker 进程和 BE 进程的分离,也确保了两个进程的错误隔离,提升 BE 的稳定性。

注意:使用Broker 导入必须安装和启动 Broker ,以及安装和启动HDFS,高一点的Doris默认安装好了HDFS,我这个版本还需自己安装

5.安装HDFS

第一步:下载 hadoop , 下载地址https://downloads.apache.org/hadoop/common/ ,下载之后进行解压,我的解压目录为 /root/hadoop-3.2.4 解压命令如下

tar -zxf hadoop-3.2.4.tar.gz

第二步:创建数据存储目录

mkdir /root/hadoop-3.2.4/tmp 
mkdir /root/hadoop-3.2.4/hdfs 
mkdir /root/hadoop-3.2.4/hdfs/data 
mkdir /root/hadoop-3.2.4/hdfs/name

第四步:配置环境变量 vi /etc/profile,加入一下内容 ,保存退出后,然后执行命令:source /etc/profile让其生效

export HADOOP_HOME=/root/hadoop-3.2.4
export PATH=$PATH:$HADOOP_HOME/bin

第四步:修改下面五个文件,位置在 /root/hadoop-3.2.4/etc/hadoop目录中

hadoop-3.2.4/etc/hadoop/hadoop-env.sh
hadoop-3.2.4/etc/hadoop/core-site.xml
hadoop-3.2.4/etc/hadoop/hdfs-site.xml
hadoop-3.2.4/etc/hadoop/mapred-site.xml
hadoop-3.2.4/etc/hadoop/yarn-site.xml

每个文件的内容分别如下 ,第一个: hadoop-env.sh ,加入Java的环境变量

# The java implementation to use.

#export JAVA_HOME=${JAVA_HOME}
export JAVA_HOME=/usr/java/jdk1.8.0_171-amd64

export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root

修改 core-site.xml 配置文件,内容如下

<configuration>
 <property>
    <name>fs.defaultFS</name>
    <value>hdfs://机器IP:9000</value>
  ####注释  : HDFS的URI,文件系统://namenode标识:端口号
</property>

<property>
    <name>hadoop.tmp.dir</name>
    <value>/root/hadoop-3.2.4/tmp</value>
    ###注释: namenode上本地的hadoop临时文件夹
</property>
</configuration>

修改配置文件 hdfs-site.xml 内容如下

<configuration> 
   <property>
       <name>dfs.replication</name>
       <value>1</value>
       <description>副本个数,配置默认是3,应小于datanode机器数量</description>
   </property>
</configuration>

修改配置文件 mapred-site.xml ,内容如下

<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
</configuration>

修改配置文件 yarn-site.xml ,内容如下

<configuration>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
</configuration>

最后进入到安装目录 cd /root/hadoop-3.2.4/ 然后执行下面命令启动HDFS

bin/hdfs namenode -format #对 HDFS这个分布式文件系统中的 DataNode 进行分块
sbin/hadoop-daemon.sh start namenode
sbin /hadoop-daemon.sh start datanode
sbin/hadoop-daemon.sh start secondarynamenode

启动之后,创建一个文件 test.txt ; 执行测试命令: hadoop fs -put ./test.txt / ,浏览器访问9870端口可以查看文件列表,如下在这里插入图片描述

6.通导入数据

第一步,创建一个数据文件,比如:load.text,内容如下

5,xxx,22
6,ooo,33
7,jjj,44

这三个列对应了我Doris中的数据库和表 testdb.test_streamload ,然后执行命令把 load.txt 上传到HDFS中

hadoop fs -put ./load.txt /

第二步:执行命令,把HDFS上的load.txt 数据导入到Doris中,访问 http://ip:8030/ ,通过界面导入如下

LOAD LABEL testdb.lable_test_streamload //标签,用来判断重复的
    (
    DATA INFILE("hdfs://192.168.220.253:9000/load.txt")  //HDFS上的数据文件
    INTO TABLE `test_streamload` //表名
    COLUMNS TERMINATED BY ","        //数据分隔符     
    (user_id,name,age) //数据对应的列
    )
    with HDFS (
    "fs.defaultFS"="hdfs://192.168.220.253:9000", //hdfs默认地址
    "hdfs_user"="root"  //hdfs用户名
    )
    PROPERTIES
    (
    "timeout"="1200",
    "max_filter_ratio"="0.1"
    );

执行后效果如下:
在这里插入图片描述
然后通过:select * from test_streamload 即可查看导入的数据

数据的导出

数据导出(Export)是 Doris 提供的一种将数据导出的功能。该功能可以将用户指定的表或分区的数据,以文本的格式,通过 Broker 进程导出到远端存储上,如 HDFS / 对象存储(支持 S3 协议)等。
访问:http://192.168.220.253:8030/ ,执行导出命令

EXPORT TABLE testdb.test_streamload  //导出的数据库和表
PARTITION (test_streamload) 		//分区多个分区用逗号分开
TO "hdfs://192.168.220.253:9000/test_streamload" 	//HDFS地址,后面是表名
PROPERTIES
(
    "label" = "mylabel",		//标签
    "column_separator"=",",	//分隔符
    "columns" = "user_id,username,age",		//导出的字段名
    "exec_mem_limit"="2147483648",		
    "timeout" = "3600"
)
WITH BROKER "fs_broker"		// brocker的名字
(
    "username" = "root",
    "password"="123456"
);
  • 查看分区命令:show PARTITIONS from 数据库.表;
  • fs_broker :Brocker的名字可以在 doris控制台界面的 system中查看
  • label:本次导出作业的标识。后续可以使用这个标识查看作业状态
  • column_separator:列分隔符。默认为 \t。支持不可见字符,比如 ‘\x07’
  • columns:要导出的列,使用英文状态逗号隔开,如果不填这个参数默认是导出表的所有列。
  • exec_mem_limit:表示 Export 作业中,一个查询计划在单个 BE 上的内存使用限制。默认 2GB。单位字节。
  • timeout:作业超时时间。默认 2 小时。单位秒。

在这里插入图片描述
执行成功后,可以通过HDFS控制台查看导出的文件
在这里插入图片描述

文章结束,如果对你有所帮助请收藏加好评哦

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2115993.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

表格多列情况下,loading不显示问题

问题描述&#xff1a; 用element plus 做得表格&#xff0c;如下图&#xff0c;列数较多&#xff0c;且部分表格内容显示比较复杂&#xff0c;数据量中等的情况下&#xff0c;有一个switch 按钮&#xff0c;切换部分列的显示和隐藏&#xff0c;会发现&#xff0c;切换为显示的时…

单线程 TCP/IP 服务器和客户端的实现

单线程 TCP/IP 服务器和客户端的实现 文章目录 单线程 TCP/IP 服务器和客户端的实现通信流程服务端客户端 代码实现服务端客户端 运行结果 通信流程 服务端 socket&#xff1a;创建监听的文件描述符(socket) fd&#xff1b;bind&#xff1a;fd 和自身的 ip 和端口绑定&#x…

【Transformer】Positional Encoding

文章目录 为什么需要位置编码&#xff1f;预备知识三角函数求和公式旋转矩阵逆时针旋转顺时针旋转 原始Transformer中的位置编码论文中的介绍具体计算过程为什么是线性变换&#xff1f; 大模型常用的旋转位置编码RoPE基本原理Llama3中的代码实现 参考资料 为什么需要位置编码&a…

DPDK基础入门(五):报文转发

网络处理模块划分 Packet Input: 接收数据包&#xff0c;将其引入处理流程。Pre-processing: 对数据包进行初步处理&#xff0c;例如基本的检查和标记。Input Classification: 细化数据包的分类&#xff0c;例如基于协议或流进行分流。Ingress Queuing: 将数据包放入队列中进行…

【信息学奥赛题】

目录 一、计算机组成与工作原理 二、计算机信息表示 三、计算机软件系统 四、计算机网络基础 五、多媒体知识 六、数据结构 七、程序语言知识 八、知识性问题 一、计算机组成与工作原理 1&#xff0e;下列不属于冯诺依曼计算机模型的核心思想是&#xff08;D&#xff…

Spring源码(3)Aware接口、初始化和销毁方法、@Scope、@Primary

1、目标 本文的主要目标是学习Spring源码中Aware接口、初始化和销毁方法、Scope注解、Primary注解的使用 2、Aware接口 Component public class MyBeanAware implements BeanNameAware, ApplicationContextAware {Overridepublic void setBeanName(String name) {System.out…

Linux系统本地化部署Dify并安装Ollama运行llava大语言模型详细教程

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

屏保壁纸 芝麻时钟比屏保壁纸更好看的桌面动态屏保 大气美观

屏保壁纸 芝麻时钟比屏保壁纸更好看的桌面动态屏保 大气美观&#xff0c;今天小编给大家带来一款非常大气美观的桌面时钟屏保&#xff0c;比屏保壁纸更好看&#xff0c;更美观的桌面屏保软件。非常有个性化哦&#xff0c;我们看看这种屏保主题&#xff0c;是不是让你眼前一亮呢…

20240908 每日AI必读资讯

新AI编程工具爆火&#xff1a;手机2分钟创建一个APP&#xff01; - AI初创公司Replit推出的智能体——Replit Agent。开发环境、编写代码、安装软件包、配置数据库、部署等等&#xff0c;统统自动化&#xff01; - 操作方式也是极其简单&#xff0c;只需一个提出Prompt的动作…

HBuilderx 安装 compile-node-sass编译工具

在使用HBuilderx工具&#xff0c;利用uni-app框架开发前端过程中&#xff0c;应用 “.scss”扩展名的的样式文件&#xff0c;scss作为css的预编译文件&#xff0c;在实际开发中是需要编译的&#xff0c;所以需要安装插件 compile-node-sass。 本人在CSDN下载插件“compile-node…

2.软件生命周期及流程(包含笔试/面试题)

一、软件生命周期 1.什么是软件的生命周期&#xff1f; 软件生命周期就是软件从开始研发到最终被废弃不用的一整个过程。 二、软件生命周期模型 1.瀑布型生命周期模型&#xff08;基本不用这个模型&#xff09; 最早期的模型&#xff0c;流程是从上而下的&#xff0c;如同瀑布流…

【机器人工具箱Robotics Toolbox开发笔记(二)】Matlab中机器人工具箱的下载与安装

Matlab机器人工具箱(Robotics Toolbox)可从Peter Corke教授提供的网站上免费下载。网址为:http://www.petercorke.com/Robotics_Toolbox.html。 图1 网站所提供的机器人工具箱版本 在Downloading the Toolbox栏目中单击here按钮进入下载页面,然后在该页面中填写国家、组织…

基于Python爬虫的淘宝服装数据分析项目

文章目录 一.项目介绍二.爬虫代码代码分析 三. 数据处理四. 数据可视化 一.项目介绍 该项目是基于Python爬虫的淘宝服装数据分析项目&#xff0c;以致于帮助商家了解当前服装市场的需求&#xff0c;制定更加精确的营销策略。首先&#xff0c;需要爬取淘宝中关于服装的大量数据…

JS_函数声明

JS中的方法,多称为函数,函数的声明语法和JAVA中有较大区别 函数说明 函数没有权限控制符不用声明函数的返回值类型,需要返回在函数体中直接return即可,也无需void关键字参数列表中,无需数据类型调用函数时,实参和形参的个数可以不一致声明函数时需要用function关键字函数没有…

STM32F407VET6开发板RT-Thread MSH 串口的适配

相关文章 STM32F407VET6开发板RT-Thread的移植适配 环境 STM32F407VET6 开发板&#xff08;魔女&#xff09;&#xff0c;http://www.stm32er.com/ Keil MDK5&#xff0c;版本 5.36 串口驱动 RT-Thread 通过适配 串口驱动&#xff0c;可以使用 MSH shell 当前手动搭建的 …

c++基础版

c基础版 Windows环境搭建第一个C程序c程序运行原理注释常亮字面常亮符号常亮 变量数据类型整型实型常量类型确定char类型字符串布尔类型 控制台输入随机数产生枚举定义数组数组便利 指针基础野指针空指针指针运算动态内存分配 结构体结构体默认值结构体数组结构体指针结构体指针…

JavaWeb笔记整理13——Mybatis

目录 Mybatis介绍 删除 预编译SQL SQL注入 新增 更新 查询 数据封装 条件查询 XML映射文件 动态SQL 更新案例 foreach Mybatis介绍 删除 预编译SQL SQL注入 新增 更新 查询 数据封装 条件查询 XML映射文件 动态SQL <if> 更新案例<set> foreach &l…

消息中间件 --Kafka

一、 Kafka 1.kafka介绍 Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。 生产者发送消息&#xff0c;多个消费者只能有一个消费者接收到消息 生产者发送消息&#xff0c;多个消费者都可以接收到消息 producer&#xff1a;发布消息的对象称之为主题生产者…

人工智能,语音识别也算一种人工智能。

现在挺晚了&#xff0c;还是没有去睡觉&#xff0c;自己在想什么呢&#xff0c;也不确定。 这是一篇用语音写的文章&#xff0c;先按自己的想法说出来&#xff0c;然后再适当修改&#xff0c;也许就是一个不错的文章。 看来以后就不需要打字了&#xff0c;语音识别度很高&#…

两数之和--力扣1

两数之和 题目思路C代码 题目 思路 根据题目要求&#xff0c;元素不能重复且不需要排序&#xff0c;我们这里使用哈希表unordered_map。注意题目说了只对应一种答案。 所以我们在循环中&#xff0c;使用目标值减去当前循环的nums[i]&#xff0c;得到差值&#xff0c;如果我们…