大数据Doris(四十六):Stream Load基本原理和语法介绍

news2024/11/15 8:59:54

文章目录

Stream Load基本原理和语法介绍

一、基本原理

二、语法与结果

1、语法

2、返回结果


Stream Load基本原理和语法介绍

Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。Stream load 同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。

Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据,建议的导入数据量在 1G 到 10G 之间。由于 Stream load 是一种同步的导入方式,所以用户如果希望用同步方式获取导入结果,也可以使用这种导入。

目前Stream Load支持数据格式有CSV,JSON,1.2版本后支持Parquet、orc格式。

一、基本原理

下图展示了 Stream load 的主要流程,省略了一些导入细节。

Stream load 中,Doris 会选定一个BE节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。

用户通过 HTTP 协议提交导入命令。如果提交到 FE,则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。导入的最终结果由 Coordinator BE 返回给用户。

二、语法与结果

1、语法

Stream Load 通过 HTTP 协议提交和传输数据,常用方式使用curl命令进行提交导入,命令如下:

curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load

以上命令中user:passwd 指的是登录doris的用户名和密码;-H 代表的是Header,Header中可以指定导入任务参数;-T 指定的是导入数据文件,需要指定到对应的数据文件名称;-XPUT 执行fe 节点和端口以及导入的数据库和表信息。

Stream Load 由于使用的是 HTTP 协议,所以所有导入任务有关的参数均设置在 Header 中,-H格式为:-H "key1:value1",支持的常见属性如下:

  • label

导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 label。label 是用户在导入命令中自定义的名称。通过这个 label,用户可以查看对应导入任务的执行情况。

label 的另一个作用,是防止用户重复导入相同的数据。 强烈推荐用户同一批次数据使用相同的 label 。这样同一批次数据的重复请求只会被接受一次,保证了 At-Most-Once 

当 label 对应的导入作业状态为 CANCELLED 时,该 label 可以再次被使用。

  • column_separator

用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。

如hive文件的分隔符\x01,需要指定为-H "column_separator:\x01"。可以使用多个字符的组合作为列分隔符。

  • line_delimiter

用于指定导入文件中的换行符,默认为\n。可以使用做多个字符的组合作为换行符。

  • max_filter_ratio

导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。如果用户希望忽略错误的行,可以通过设置这个参数大于 0,来保证导入可以成功。

计算公式为:

(dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) > max_filter_ratio
dpp.abnorm.ALL:表示数据质量不合格的行数。如类型不匹配,列数不匹配,长度不匹配等等。
dpp.norm.ALL:指的是导入过程中正确数据的条数。可以通过 SHOW LOAD 命令查询导入任务的正确数据量。
原始文件的行数 = dpp.abnorm.ALL + dpp.norm.ALL
  • where

导入任务指定的过滤条件。Stream load 支持对原始数据指定 where 语句进行过滤。被过滤的数据将不会被导入,也不会参与 filter ratio 的计算,但会被计入num_rows_unselected。

  • Partitions

待导入表的 Partition 信息,如果待导入数据不属于指定的 Partition 则不会被导入。这些数据将计入 dpp.abnorm.ALL

  • columns

待导入数据的函数变换配置,目前 Stream load 支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。

列顺序变换例子:

原始数据有三列(src_c1,src_c2,src_c3), 目前doris表也有三列(dst_c1,dst_c2,dst_c3)

如果原始表的src_c1列对应目标表dst_c1列,原始表的src_c2列对应目标表dst_c2列,原始表的src_c3列对应目标表dst_c3列,则写法如下:
columns: dst_c1, dst_c2, dst_c3

如果原始表的src_c1列对应目标表dst_c2列,原始表的src_c2列对应目标表dst_c3列,原始表的src_c3列对应目标表dst_c1列,则写法如下:
columns: dst_c2, dst_c3, dst_c1

 表达式变换例子:

原始文件有两列,目标表也有两列(c1,c2)但是原始文件的两列均需要经过函数变换才能对应目标表的两列,则写法如下:
columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = month(tmp_c2)
其中 tmp_*是一个占位符,代表的是原始文件中的两个原始列。
  • format

指定导入数据格式,支持csv、json,默认是csv。doris 1.2 版本后支持csv_with_names(支持csv文件行首过滤)、csv_with_names_and_types(支持csv文件前两行过滤)

  • exec_mem_limit

导入内存限制。默认为 2GB,单位为字节。

  • strict_mode

Stream Load 导入可以开启 strict mode 模式。开启方式为在 HEADER 中声明 strict_mode=true 。默认的 strict mode 为关闭。

  • merge_type

数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE 其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete 条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理。

  • two_phase_commit

Stream load 导入可以开启两阶段事务提交模式:在Stream load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为PRECOMMITTED,用户手动触发commit操作之后,数据才可见。例如:

1、发起stream load预提交操作

curl -X PUT --location-trusted -u user:passwd -H "txn_id:18036" -H "txn_operation:commit" http://fe_host:http_port/api/{db}/{table}/_stream_load_2pc

{
"status": "Success",
"msg": "transaction [18036] commit successfully."
}
curl --location-trusted -u user:passwd -H "two_phase_commit:true" -T test.txt http://fe_host:http_port/api/{db}/{table}/_stream_load

{
"TxnId": 18036,
"Label": "55c8ffc9-1c40-4d51-b75e-f2265b3602ef",
"TwoPhaseCommit": "true",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 100,
"NumberLoadedRows": 100,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 1031,
"LoadTimeMs": 77,
"BeginTxnTimeMs": 1,
"StreamLoadPutTimeMs": 1,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 58,
"CommitAndPublishTimeMs": 0
}

2、对事务触发commit操作

curl -X PUT --location-trusted -u user:passwd -H "txn_id:18036" -H "txn_operation:commit" http://fe_host:http_port/api/{db}/{table}/_stream_load_2pc

{
"status": "Success",
"msg": "transaction [18036] commit successfully."
}

注意:请求发往fe或be均可 ;commit 的时候可以省略 url 中的 {table}

3、对事务触发abort操作

curl -X PUT --location-trusted -u user:passwd -H "txn_id:18037" -H "txn_operation:abort" http://fe_host:http_port/api/{db}/{table}/_stream_load_2pc

{
"status": "Success",
"msg": "transaction [18037] abort successfully."
}

注意:请求发往fe或be均可 ;abort 的时候可以省略 url 中的 {table}

2、返回结果

由于 Stream load 是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户。返回结果示例如下:

{
"TxnId": 1003,
"Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee",
"Status": "Success",
"ExistingJobStatus": "FINISHED", // optional
"Message": "OK",
"NumberTotalRows": 1000000,
"NumberLoadedRows": 1000000,
"NumberFilteredRows": 1,
"NumberUnselectedRows": 0,
"LoadBytes": 40888898,
"LoadTimeMs": 2144,
"BeginTxnTimeMs": 1,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 325,
"WriteDataTimeMs": 1933,
"CommitAndPublishTimeMs": 106,
"ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
}

以上结果参数解释如下:

  • TxnId:导入的事务ID。用户可不感知。
  • Label:导入 Label。由用户指定或系统自动生成。
  • Status:导入完成状态。
    • "Success":表示导入成功。
    • "Publish Timeout":该状态也表示导入已经完成,只是数据可能会延迟可见,无需重试。
    • "Label Already Exists":Label 重复,需更换 Label。
    • "Fail":导入失败。
  • ExistingJobStatus:已存在的 Label 对应的导入作业的状态。

这个字段只有在当 Status 为 "Label Already Exists" 时才会显示。用户可以通过这个状态,知晓已存在 Label 对应的导入作业的状态。"RUNNING" 表示作业还在执行,"FINISHED" 表示作业成功。

  • Message:导入错误信息。
  • NumberTotalRows:导入总处理的行数。
  • NumberLoadedRows:成功导入的行数。
  • NumberFilteredRows:数据质量不合格的行数。
  • NumberUnselectedRows:被 where 条件过滤的行数。
  • LoadBytes:导入的字节数。
  • LoadTimeMs:导入完成时间。单位毫秒。
  • BeginTxnTimeMs:向Fe请求开始一个事务所花费的时间,单位毫秒。
  • StreamLoadPutTimeMs:向Fe请求获取导入数据执行计划所花费的时间,单位毫秒。
  • ReadDataTimeMs:读取数据所花费的时间,单位毫秒。
  • WriteDataTimeMs:执行写入数据操作所花费的时间,单位毫秒。
  • CommitAndPublishTimeMs:向Fe请求提交并且发布事务所花费的时间,单位毫秒。
  • ErrorURL:如果有数据质量问题,通过访问这个 URL 查看具体错误行。

注意:由于 Stream load 是同步的导入方式,所以并不会在 Doris 系统中记录导入信息,用户无法异步的通过查看导入命令看到 Stream load。使用时需监听创建导入请求的返回值获取导入结果。


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/672268.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

简要介绍 | 图像聚类:概念、原理与方法

注1:本文系“简要介绍”系列之一,仅从概念上对图像聚类进行非常简要的介绍,不适合用于深入和详细的了解。 图像聚类:概念、原理与方法 Cluster Analysis | NVIDIA Developer 1. 背景介绍 图像聚类(Image Clustering&a…

波浪理论与伦敦金价走势分析

艾略特波浪理论(Elliott Wave Theory)是一套能应用于伦敦金走势分析的理论,它认为市场的走势会不断重复一种模式,每一周期由5个上升浪和3个下跌浪组成。波浪理论将不同规模的趋势分成了九大类,最长的超大循环波(Grand supercycle) 是横跨200年…

Git版本管理实用指南

特别声明,本博文仅作个人日常使用Git参考之用。主要内容总结来源于:廖雪峰官网的Git教程🌻 🔎 什么是Git Git是目前世界上最先进的分布式版本控制系统,是Linux的创建者用C开发的。GitHub网站2008上线,它为…

ELKB架构安装

文章目录 安装JAVA JDK安装ES手动安装Problem: 无法访问localhost:9200Problem: 用户名密码验证 docker安装ES文件夹内容 安装node.js安装grunt 安装kibanaLogstashBeat 安装JAVA JDK https://www.java.com/en/ 添加环境变量C:\Program Files\Java\jdk-11.0.1\bin Elasticse…

在线广告系统工程架构

一、广告系统概览 广告投放系统:供广告主使用,核心功能包括会员续费、广告库管理、设定推广条件、设置广告出价、查看投放效果等。广告运营后台:供平台的产品运营使用,核心功能包括广告位管理、广告策略管理、以及各种运营工具。广…

HarmonyOS学习路之开发篇—多媒体开发(相机开发 一)

HarmonyOS相机模块支持相机业务的开发,开发者可以通过已开放的接口实现相机硬件的访问、操作和新功能开发,最常见的操作如:预览、拍照、连拍和录像等。 基本概念 相机静态能力 用于描述相机的固有能力的一系列参数,比如朝向、支持…

华为OD机试之按单词下标区间翻转文章内容(Java源码)

文章目录 按单词下标区间翻转文章内容题目描述输入描述输出描述示例代码 按单词下标区间翻转文章内容 题目描述 给定一段英文文章片段,由若干单词组成,单词间以空格间隔,单词下标从0开始。 请翻转片段中指定区间的单词顺序并返回翻转后的内…

芯片工程师求职题目之设计基础篇(1)

1. 进制转换 会算数值在二进制、八进制、十进制以及十六进制之间的任意转换。 会算数值的正码、反码、补码、BCD码。 2. 什么是格雷码(Gray code),它有什么优点。 在一组二进制编码中,若任意两个相邻的数值还有1位二进制数不同,则称这种编…

算法程序设计 之 最长公共子序列(4/8)

一、实验目的: 理解并掌握动态规划算法的基本思想和设计步骤。 实验内容若给定序列X{x1,x2,...,xm},Z{z1,z2,...,zk},若Z是X的子序列,当且仅当存在一个严格递增下标序列{i1,...,ik},使得对于所有j1,2,...,k有:zjxij。例如&#x…

智能家居APP软件开发有何优势?

传统家居行业营销模式已经无法满足现代人多样化个性化的需求,也跟不上互联网时代的发展步伐了,很多传统家居行业都陷入了营销困境。通过智能家居APP软件开发,可以利用互联网改造传统模式,探索新的发展模式,可以说智能家…

Bootstrap 简介

文章目录 Bootstrap 简介什么是 Bootstrap?Bootstrap 包的内容Bootstrap 实例 Bootstrap 简介 什么是 Bootstrap? Bootstrap 是一个用于快速开发 Web 应用程序和网站的前端框架。Bootstrap 是基于 HTML、CSS、JAVASCRIPT 的。 历史 Bootstrap 是由 Twi…

【软件测试知识】什么是持续集成?

持续集成是一种 DevOps 软件开发实践。采用持续集成时,开发人员会定期将代码变更合并到一个中央存储库中,之后系统会自动运行构建和测试操作。持续集成通常是指软件发布流程的构建或集成阶段,需要用到自动化组件(例如 CI 或构建服…

EasyUI表格增加筛选和导出的方法

一、前言 最近写前端页面,要从后台查询表格数据; 查到数据后,就想增加个数据筛选功能,如果再查后台的话会影响效率(还得加参数、再调用后台接口、后台再执行sql),就想前端能不能自己筛选下已经…

车载以太网 - 传输层 - TCP通信过程

目录 TCP 通信阶段 1、连接建立Connection establishment 2、数据传输 Data transfer 3、连接释放 Connection release TCP通信的三个阶段: TCP连接(三次握手) 1、Client(ECUA) -> Server(ECU B)第一次握手 2、Server -> Client 第二次握手…

软件工程基础速通教程(北京理工大学)

文章目录 前言软工上课情况考后感题型分析概念部分大题部分数据流图和数据字典数据流图数据字典 结构化设计工具程序流程图盒图(N-S图)PAD图判定表和判定树PDL(伪码) 软件测试白盒测试法语句覆盖判定覆盖,条件覆盖&…

【MMDetection3.0】训练自己的数据集

本文记录下MMDetection3.0版本,即截至目前最新的版本,训练自定义数据集的过程。当前MMDetection已经封装的很好了,虽然易于使用,但其API也愈发复杂,对于新手不太友好,这里记录下自己的踩坑经历。 数据部分…

5.英语词性之副词

五.什么是副词 副词是修饰动词,形容词,副词的词语,有时也可以修饰数词,介词,连词,名词或全 句。副词是表示行为或状态特征的词,主要作状语,也可以作表语,定语&#xff0c…

音视频开发:Qt在视频剪辑3D桌面软件获胜, 嵌入式不敌安卓

1 Qt Android嵌入式应用层开发方向对比 大家都知道啊,做嵌入式linux设备,一些没有屏幕,比如安防摄像头,门铃之类的,另外一些嵌入式设备是有触控屏,在触控屏上还跑应用软件的,这种比如商场各种…

citywalk话题增长704.76%,小红书本地化内容营销怎么玩?

2023年初发布的《2023小红书年度生活趋势:投入真实生活》报告中提到,今后年轻人会更加意识到周围和附近的重要性,在十大生活趋势预测中,近邻升温(2022年相关笔记数量同比上涨213%)、野到家门口(…

建立小型医学数据库(总结)

建立小型医学数据库 小型医学数据库可以用于存储和管理医学数据,如患者病历、药品信息、试验结果等。这对于医疗机构和科研机构来说非常必要,可以提高数据管理和共享的效率,进而促进医学研究和诊疗水平的提升。 建立小型医学数据库有以下基本…