Doris-Routine Load(二十七)

news2024/12/25 12:24:22

例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能。

适用场景

当前仅支持从 Kafka 系统进行例行导入,使用限制:

(1)支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。

(2)支持的消息格式为 csv, json 文本格式。csv 每一个 message 为一行,且行尾不包含换行符。

(3)默认支持 Kafka 0.10.0.0(含)以上版本。如果要使用 Kafka 0.10.0.0 以下版本(0.9.0, 0.8.2, 0.8.1, 0.8.0),需要修改 be 的配置,将 kafka_broker_version_fallback 的值设置为要兼容的旧版本,或者在创建 routine load 的时候直接设置property.broker.version.fallback的值为要兼容的旧版本,使用旧版本的代价是 routine load 的部分新特性可能无法使用,如根据时间设置 kafka 分区的 offset。

基本原理

如上图,Client 向 FE 提交一个例行导入作业。

(1)FE 通过 JobScheduler 将一个导入作业拆分成若干个 Task。每个 Task 负责导入指定的一部分数据。Task 被 TaskScheduler 分配到指定的 BE 上执行。

(2)在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入。导入完成后,向 FE 汇报。

(3)FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的Task 进行重试。

(4)整个例行导入作业通过不断的产生新的 Task,来完成数据不间断的导入。

基本语法

CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]

执行 HELP ROUTINE LOAD 可以查看语法帮助,下面是参数说明

1)[db.]job_name

导入作业的名称,在同一个 database 内,相同名称只能有一个 job 在运行。

2)tbl_name

指定需要导入的表的名称。

3)merge_type

数据的合并类型,一共支持三种类型 APPEND、DELETE、MERGE 其中,APPEND 是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据 key 相同的所有行,MERGE 语义 需要与 delete on 条件联合使用,表示满足 delete 条件的数据按照 DELETE 语义处理其余的按照 APPEND 语义处理 , 语法为

[WITHMERGE|APPEND|DELETE]
4)load_properties

用于描述导入数据。语法:

[column_separator], [columns_mapping], [where_predicates], [delete_on_predicates], [source_sequence], [partitions], [preceding_predicates]

(1)column_separator:

指定列分隔符,如: COLUMNS TERMINATED BY "," 这个只在文本数据导入的时候需要指定,JSON 格式的数据导入不需要指定这个参数。

默认为:\t

(2)columns_mapping:

指定源数据中列的映射关系,以及定义衍生列的生成方式。

映射列:

按顺序指定,源数据中各个列,对应目的表中的哪些列。对于希望跳过的列,可以指定一个不存在的列名。假设目的表有三列 k1, k2, v1。源数据有 4 列,其中第 1、2、4 列分别对应 k2, k1, v1。则书写如下:

COLUMNS (k2, k1, xxx, v1)

其中 xxx 为不存在的一列,用于跳过源数据中的第三列。

衍生列:

以 col_name = expr 的形式表示的列,我们称为衍生列。即支持通过 expr 计算得出目的表中对应列的值。 衍生列通常排列在映射列之后,虽然这不是强制的规定,但是 Doris 总是先解析映射列,再解析衍生列。 接上一个示例,假设目的表还有第 4 列 v2,v2 由 k1 和 k2 的和产生。则可以书写如下:

COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);

再举例,假设用户需要导入只包含 k1 一列的表,列类型为 int。并且需要将源文件中的对应列进行处理:将负数转换为正数,而将正数乘以 100。这个功能可以通过 case when 函数实现,正确写法应如下:

COLUMNS (xx, k1 = case when xx < 0 then cast(-xx as varchar) else cast((xx + '100') as varchar) end)

(3)where_predicates

用于指定过滤条件,以过滤掉不需要的列。过滤列可以是映射列或衍生列。 例如我们只希望导入 k1 大于 100 并且 k2 等于 1000 的列,则书写如下:

WHERE k1 > 100 and k2 = 1000

(4)partitions

指定导入目的表的哪些 partition 中。如果不指定,则会自动导入到对应的 partition 中。

示例:

PARTITION(p1, p2, p3)

(5)delete_on_predicates

表示删除条件,仅在 merge type 为 MERGE 时有意义,语法与 where 相同

(6)source_sequence:

只适用于 UNIQUE_KEYS,相同 key 列下,保证 value 列按照 source_sequence 列进行REPLACE, source_sequence 可以是数据源中的列,也可以是表结构中的一列。

(7)preceding_predicates

PRECEDING FILTER predicate用于过滤原始数据。原始数据是未经列映射、转换的数据。用户可以在对转换前的数据前进行一次过滤,选取期望的数据,再进行转换。

5)job_properties

用于指定例行导入作业的通用参数。 语法:

PROPERTIES (
 "key1" = "val1",
 "key2" = "val2"
)

目前支持以下参数:

(1)desired_concurrent_number

期望的并发度。一个例行导入作业会被分成多个子任务执行。这个参数指定一个作业最多有多少任务可以同时执行。必须大于 0。默认为 3。 这个并发度并不是实际的并发度,实际的并发度,会通过集群的节点数、负载情况,以及数据源的情况综合考虑。

一个作业,最多有多少 task 同时在执行。对于 Kafka 导入而言,当前的实际并发度计算如下:

Min(partition num, desired_concurrent_number, alive_backend_num, 
Config.max_routine_load_task_concurrrent_num)

其中 Config.max_routine_load_task_concurrrent_num 是系统的一个默认的最大并发数限制。这是一个 FE 配置,可以通过改配置调整。默认为 5。

其中 partition num 指订阅的 Kafka topic 的 partition 数量。alive_backend_num 是当前正常的 BE 节点数。

(2)max_batch_interval/max_batch_rows/max_batch_size这三个参数分别表示:

  • ① 每个子任务最大执行时间,单位是秒。范围为 5 到 60。默认为 10。

  • ② 每个子任务最多读取的行数。必须大于等于 200000。默认是 200000。

  • ③ 每个子任务最多读取的字节数。单位是字节,范围是 100MB 到 1GB。默认是100MB。

这三个参数,用于控制一个子任务的执行时间和处理量。当任意一个达到阈值,则任务结束。 例如:

"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"

(3)max_error_number

采样窗口内,允许的最大错误行数。必须大于等于 0。默认是 0,即不允许有错误行。

采样窗口为 max_batch_rows * 10。即如果在采样窗口内,错误行数大于 max_error_number,则会导致例行作业被暂停,需要人工介入检查数据质量问题。 被 where 条件过滤掉的行不算错误行

(4)strict_mode

是否开启严格模式,默认为关闭。如果开启后,非空原始数据的列类型变换如果结果为NULL,则会被过滤。指定方式为 "strict_mode" = "true"

(5)timezone

指定导入作业所使用的时区。默认为使用 Session 的 timezone 参数。该参数会影响所有导入涉及的和时区有关的函数结果

(6)format

指定导入数据格式,默认是 csv,支持 json 格式

(7)jsonpaths

jsonpaths: 导入 json 方式分为:简单模式和匹配模式。如果设置了jsonpath 则为匹配模式导入,否则为简单模式导入,具体可参考示例

(8)strip_outer_array

布尔类型,为 true 表示 json 数据以数组对象开始且将数组对象中进行展平,默认值是false

(9)json_root

json_root 为合法的 jsonpath 字符串,用于指定 json document 的根节点,默认值为""

(10)send_batch_parallelism

整型,用于设置发送批处理数据的并行度,如果并行度的值超过BE配置中的max_send_batch_parallelism_per_job,那么作为协调点的BE将使用max_send_batch_parallelism_per_job 的值

6)data_source_properties

数据源的类型。当前支持:Kafka

(
 "key1" = "val1",
 "key2" = "val2"
)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1258905.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

二维码智慧门牌管理系统:实现高效信息管理

文章目录 前言一、 功能升级优势 前言 随着科技的飞速发展和人们生活节奏的加快&#xff0c;传统的门牌管理系统已经不再适应现代社会的需求。为了解决这一问题&#xff0c;全新的二维码智慧门牌管理系统升级解决方案应运而生&#xff0c;为用户带来前所未有的便捷与高效。 一…

1m照片手机怎么拍?一分钟解决!

我们都知道现在的手机像素特别好&#xff0c;随便拍一张照片都是2-3MB&#xff0c;有时候上课或者会议要拍很多照片&#xff0c;这些照片其实又不需要如此清晰&#xff0c;就会特别占内存&#xff0c;下面就向大家介绍三种好用的办法。 方法一&#xff1a;拍完照后手机截图进行…

【开源】基于JAVA的海南旅游景点推荐系统

项目编号&#xff1a; S 023 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S023&#xff0c;文末获取源码。} 项目编号&#xff1a;S023&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 用户端2.2 管理员端 三、系统展示四…

训练 CNN 对 CIFAR-10 数据中的图像进行分类

1. 加载 CIFAR-10 数据库 import keras from keras.datasets import cifar10# 加载预先处理的训练数据和测试数据 (x_train, y_train), (x_test, y_test) cifar10.load_data() 2. 可视化前 24 个训练图像 import numpy as np import matplotlib.pyplot as plt %matplotlib …

【Java开发基础】intellij IDEA快速配置JDBC驱动连接MySQL数据库并查询数据,其实真的很简单,我5分钟就学会了!

&#x1f680; 个人主页 极客小俊 ✍&#x1f3fb; 作者简介&#xff1a;web开发者、设计师、技术分享博主 &#x1f40b; 希望大家多多支持一下, 我们一起学习和进步&#xff01;&#x1f604; &#x1f3c5; 如果文章对你有帮助的话&#xff0c;欢迎评论 &#x1f4ac;点赞&a…

多功能音乐沙漏的设计与实现

【摘要】随着当今社会快节奏生活的发展&#xff0c;当代大学生越来忽视时间管理的重要性&#xff0c;在原本计划只看几个视频只玩几个游戏的碎片化娱乐中耗费了大量的时光&#xff0c;对于自己原本的学习生活产生了巨大的影响。为更加有效的反映时间的流逝&#xff0c;特设计该…

canvas基础:绘制线段,绘制多边形

canvas实例应用100 专栏提供canvas的基础知识&#xff0c;高级动画&#xff0c;相关应用扩展等信息。 canvas作为html的一部分&#xff0c;是图像图标地图可视化的一个重要的基础&#xff0c;学好了canvas&#xff0c;在其他的一些应用上将会起到非常重要的帮助。 文章目录 使用…

TDA4开发环境Docker化

文章目录 背景1. TDA4X Linux SDK编译环境镜像构建1.1 安装SDK1.2 验证制卡1.2.1 出现的问题:1.3 验证编译1.3.1 出现的问题2. TDA4X Linux-RT SDK编译环境镜像构建2.1 安装SDK2.2 出现的问题参考背景 开始阅读本篇前,假设你已经对docker有了一定了解,且有过docker换件搭建…

优思学院|5S不只是清洁,但却离不开清洁!

很多说5S不止是清洁和搞卫生那么简单&#xff0c;相信有正规地学习过5S的人都应该深切了解。 不过&#xff0c;5S之中的确包括了清理、清洁的步骤&#xff0c;5S&#xff0c;也被称为“五常法则”或“五常法”&#xff0c;它包含了&#xff1a; 整理&#xff08;SEIRI&#x…

8.统一异常处理 + 统一记录日志

目录 1.统一异常处理 2.统一记录日志 1.统一异常处理 在 HomeController 类中添加请求方法&#xff08;服务器发生异常之后需要统一处理异常&#xff0c;记录日志&#xff0c;然后转到 500 页面&#xff0c;需要人工处理重定向到 500 页面&#xff0c;提前把 500 页面请求访问…

Containerd Container管理功能解析

Containerd Container管理功能解析 container是containerd的一个核心功能&#xff0c;用于创建和管理容器的基本信息。 本篇containerd版本为v1.7.9。 更多文章访问 https://www.cyisme.top 本文从ctr c create命令出发&#xff0c;分析containerd的容器及镜像管理相关功能。 …

01 项目架构

关于我 曾经就职于蚂蚁金服&#xff0c;多年的后端开发经验&#xff0c;对微服务、架构这块研究颇深&#xff0c;同时也是一名热衷于技术分享、拥抱开源技术的博主。 个人技术公众号&#xff1a;码猿技术专栏个人博客&#xff1a;www.java-family.cn 前期一直在更新《Spring…

什么是美颜sdk?视频直播美颜sdk技术深度剖析

美颜sdk可以通过实时处理图像&#xff0c;提升主播或用户在视频直播中的外观。通过美颜sdk接口调用可以轻松实现美颜效果。美颜sdk的核心目标是在保持图像真实性的同时&#xff0c;为用户创造出最理想的美化效果。 一、美颜sdk的技术实现 1.面部识别技术&#xff1a;美颜sdk…

虚拟直播在文旅行业火爆发展!背后的“生意经”你抓住了吗?

&#xfeff;自疫情结束以来&#xff0c;文化和旅游行业恢复势头强劲&#xff0c;各大旅游景点消费活跃度持续攀升。在这种情况下&#xff0c;“直播文旅”模式的深度融合对文旅行业的客流导入起到了极大的带动作用。 不过&#xff0c;当前的文旅直播也出现了一些问题&#xf…

流媒体播放器EasyPlayer播放H.265与H.264时进度条样式异常该如何解决?

H5无插件流媒体播放器EasyPlayer属于一款高效、精炼、稳定且免费的流媒体播放器&#xff0c;可支持多种流媒体协议播放&#xff0c;可支持H.264与H.265编码格式&#xff0c;性能稳定、播放流畅&#xff0c;能支持WebSocket-FLV、HTTP-FLV&#xff0c;HLS&#xff08;m3u8&#…

北京 | 竹云与南方电网携手荣获“IDC 2023未来企业奖未来连接领导者”

11月22日-23日&#xff0c;2023第八届IDC中国数字化转型年度盛典在北京召开。本次大会以“竞放数字力量”为主题&#xff0c;汇聚超过1000位来自不同行业的大咖与伙伴共同参与此次盛会&#xff0c;从全球化视角出发&#xff0c;围绕本土化落地人工智能&#xff08;大模型&#…

pytest系列——pytest_collection_modifyitems钩子函数修改测试用例执行顺序

前言 pytest默认执行用例是根据项目下的文件名称按ascii码去收集运行的&#xff1b;文件中的用例是从上往下按顺序执行的。 pytest_collection_modifyitems 这个函数顾名思义就是收集测试用例、改变用例的执行顺序的。 【严格意义上来说&#xff0c;我们在用例设计原则上用例…

NFTScan | 11.20~11.26 NFT 市场热点汇总

欢迎来到由 NFT 基础设施 NFTScan 出品的 NFT 生态热点事件每周汇总。 周期&#xff1a;2023.11.20~ 2023.11.26 NFT Hot News 01/ OKX Ordinals 市场 API 完成升级 11 月 21 日&#xff0c;OKX Ordinals 市场 API 现已完成升级&#xff0c;新增支持按币种单价查询、排序&…

TDA4VM EVM开发板调试笔记

文章目录 1. 前言2. 官网资料导读3. 安装 Linux SDK4. 制作SD 启动卡5. 验证启动1. 前言 TDA4作为一般经典的车规级SOC芯片,基于它的低阶智驾方案目前成为各家智驾方案公司的量产首选,这也使得基于TDA4的开发需求陡增,开发和使用TDA4既要熟悉Linux驱应用开发,还要熟悉传统…

uniapp基础-教程之HBuilderX配置篇-01

uniapp教程之HBuilderX配置篇-01 为什么要做这个教程的梳理&#xff0c;主要用于自己学习和总结&#xff0c;利于增加自己的积累和记忆。首先下载HBuilderX&#xff0c;并保证你的软件在C盘进行运行&#xff0c;最好使用英文或者拼音&#xff0c;这个操作是为了保证软件的稳定…