动态分区和临时分区
动态分区
旨在对表级别的分区实现生命周期管理(TTL),减少用户的使用负担。
目前实现了动态添加分区及动态删除分区的功能。只支持 Range 分区。
原理
在某些使用场景下,用户会将表按照天进行分区划分,每天定时执行例行任务,这时需要使用方手动管理分区,否则可能由于使用方没有创建分区导致数据导入失败,这给使用方带来了额外的维护成本。通过动态分区功能,用户可以在建表时设定动态分区的规则。FE 会启动一个后台线程,根据用户指定的规则创建或删除分区。用户也可以在运行时对现有规则进行变更。
使用方式
动态分区的规则可以在建表时指定,或者在运行时进行修改。当前仅支持对单分区列的分区表设定动态分区规则
建表时指定:
CREATE TABLE tbl1
(...)
PROPERTIES
(
-- 添加动态分区的规则
"dynamic_partition.prop1" = "value1",
"dynamic_partition.prop2" = "value2",
...
)
运行时修改:
ALTER TABLE tbl1 SET
(
"dynamic_partition.prop1" = "value1",
"dynamic_partition.prop2" = "value2",
...
)
动态分区规则参数
1. dynamic_partition.enable:是否开启动态分区特性。默认是true
2. dynamic_partition.time_unit:动态分区调度的单位。可指定为 HOUR、DAY、WEEK、MONTH。分别表示按小时、按天、按星期、按月进行分区创建或删除。
3. dynamic_partition.time_zone:动态分区的时区,如果不填写,则默认为当前机器的系统的时区
4. dynamic_partition.start:动态分区的起始偏移,为负数。以当天(星期/月)为基准,分区范围在此偏移之前的分区将会被删除。如果不填写,则默认为 -2147483648,即不删除历史分区。
5. dynamic_partition.end:动态分区的结束偏移,为正数。根据 time_unit 属性的不同,以当天(星期/月)为基准,提前创建对应范围的分区。
6. dynamic_partition.prefix:动态创建的分区名前缀。
7. dynamic_partition.buckets:动态创建的分区所对应的分桶数量
8. dynamic_partition.replication_num:动态创建的分区所对应的副本数量,如果不填写,则默认为该表创建时指定的副本数量
9. dynamic_partition.start_day_of_week:当 time_unit 为 WEEK 时,该参数用于指定每周的起始点。取值为 1 到 7。其中 1 表示周一,7 表示周日。默认为 1,即表示每周以周一为起始点。
10. dynamic_partition.start_day_of_month:当 time_unit 为 MONTH 时,该参数用于指定每月的起始日期。取值为 1 到 28。其中 1 表示每月1号,28 表示每月28号。默认为 1,即表示每月以1号位起始点。暂不支持以29、30、31号为起始日,以避免因闰年或闰月带来的歧义
11. dynamic_partition.create_history_partition:为 true 时代表可以创建历史分区,默认是false
12. dynamic_partition.history_partition_num:当 create_history_partition 为 true 时,该参数用于指定创建历史分区数量。默认值为 -1, 即未设置。
13. dynamic_partition.hot_partition_num:指定最新的多少个分区为热分区。对于热分区,系统会自动设置其 storage_medium 参数为SSD,并且设置 storage_cooldown_time 。hot_partition_num:设置往前 n 天和未来所有分区为热分区,并自动设置冷却时间
修改动态分区属性
ALTER TABLE tbl1 SET
(
"dynamic_partition.prop1" = "value1",
...
);
ALTER TABLE partition_test SET
(
"dynamic_partition.time_unit" = "week",
"dynamic_partition.start" = "-1",
"dynamic_partition.end" = "1"
);
某些属性的修改可能会产生冲突。假设之前分区粒度为 DAY,并且已经创建了如下分区:
p20200519: ["2020-05-19", "2020-05-20")
p20200520: ["2020-05-20", "2020-05-21")
p20200521: ["2020-05-21", "2020-05-22")
如果此时将分区粒度改为 MONTH,则系统会尝试创建范围为 [“2020-05-01”, “2020-06-01”) 的分区,而该分区的分区范围和已有分区冲突,所以无法创建。而范围为 [“2020-06-01”, “2020-07-01”) 的分区可以正常创建。因此,2020-05-22 到 2020-05-30 时间段的分区,需要自行填补。
查看动态分区表调度情况
通过以下命令可以进一步查看当前数据库下,所有动态分区表的调度情况:
mysql> SHOW DYNAMIC PARTITION TABLES;
+-----------+--------+----------+-------------+------+--------+---------+-----------+----------------+---------------------+--------+------------------------+----------------------+-------------------------+
| TableName | Enable | TimeUnit | Start | End | Prefix | Buckets | StartOf | LastUpdateTime | LastSchedulerTime | State | LastCreatePartitionMsg | LastDropPartitionMsg | ReservedHistoryPeriods |
+-----------+--------+----------+-------------+------+--------+---------+-----------+----------------+---------------------+--------+------------------------+----------------------+-------------------------+
| d3 | true | WEEK | -3 | 3 | p | 1 | MONDAY | N/A | 2020-05-25 14:29:24 | NORMAL | N/A | N/A | [2021-12-01,2021-12-31] |
| d5 | true | DAY | -7 | 3 | p | 32 | N/A | N/A | 2020-05-25 14:29:24 | NORMAL | N/A | N/A | NULL |
| d4 | true | WEEK | -3 | 3 | p | 1 | WEDNESDAY | N/A | 2020-05-25 14:29:24 | NORMAL | N/A | N/A | NULL |
| d6 | true | MONTH | -2147483648 | 2 | p | 8 | 3rd | N/A | 2020-05-25 14:29:24 | NORMAL | N/A | N/A | NULL |
| d2 | true | DAY | -3 | 3 | p | 32 | N/A | N/A | 2020-05-25 14:29:24 | NORMAL | N/A | N/A | NULL |
| d7 | true | MONTH | -2147483648 | 5 | p | 8 | 24th | N/A | 2020-05-25 14:29:24 | NORMAL | N/A | N/A | NULL |
+-----------+--------+----------+-------------+------+--------+---------+-----------+----------------+---------------------+--------+------------------------+----------------------+-------------------------+
7 rows in set (0.02 sec)
- LastUpdateTime: 最后一次修改动态分区属性的时间
- LastSchedulerTime: 最后一次执行动态分区调度的时间
- State: 最后一次执行动态分区调度的状态
- LastCreatePartitionMsg: 最后一次执行动态添加分区调度的错误信息
- LastDropPartitionMsg: 最后一次执行动态删除分区调度的错误信息
临时分区
规则
- 临时分区的分区列和正式分区相同,且不可修改。
- 一张表所有临时分区之间的分区范围不可重叠,但临时分区的范围和正式分区范围可以重叠。
- 临时分区的分区名称不能和正式分区以及其他临时分区重复。
临时分区支持添加、删除、替换操作。
添加临时分区
以通过 ALTER TABLE ADD TEMPORARY PARTITION 语句对一个表添加临时分区:
ALTER TABLE tbl1 ADD TEMPORARY PARTITION tp1 VALUES LESS THAN("2020-02-01");
ALTER TABLE tbl1 ADD TEMPORARY PARTITION tp2 VALUES LESS THAN("2020-02-02")
("in_memory" = "true", "replication_num" = "1")
DISTRIBUTED BY HASH(k1) BUCKETS 5;
ALTER TABLE tbl3 ADD TEMPORARY PARTITION tp1 VALUES IN ("Beijing", "Shanghai");
ALTER TABLE tbl3 ADD TEMPORARY PARTITION tp1 VALUES IN ("Beijing", "Shanghai")
("in_memory" = "true", "replication_num" = "1")
DISTRIBUTED BY HASH(k1) BUCKETS 5;
添加操作的一些说明:
- 临时分区的添加和正式分区的添加操作相似。临时分区的分区范围独立于正式分区。
- 临时分区可以独立指定一些属性。包括分桶数、副本数、是否是内存表、存储介质等信息。
删除临时分区
可以通过 ALTER TABLE DROP TEMPORARY PARTITION 语句删除一个表的临时分区:
ALTER TABLE tbl1 DROP TEMPORARY PARTITION tp1;
删除操作的一些说明:
- 删除临时分区,不影响正式分区的数据。
替换分区
可以通过 ALTER TABLE REPLACE PARTITION 语句将一个表的正式分区替换为临时分区。
-- 正式分区替换成临时分区以后,正是分区的数据会被删除,并且这个过程是不可逆的
-- 用之前要小心
ALTER TABLE tbl1 REPLACE PARTITION (p1) WITH TEMPORARY PARTITION (tp1);
ALTER TABLE partition_test REPLACE PARTITION (p20230104) WITH TEMPORARY PARTITION (tp1);
ALTER TABLE tbl1 REPLACE PARTITION (p1, p2) WITH TEMPORARY PARTITION (tp1, tp2)
PROPERTIES (
"strict_range" = "false",
"use_temp_partition_name" = "true"
);
- strict_range:默认为 true。
- 对于 Range 分区,当该参数为 true 时,表示要被替换的所有正式分区的范围并集需要和替换的临时分区的范围并集完全相同。当置为 false 时,只需要保证替换后,新的正式分区间的范围不重叠即可。
- 对于 List 分区,该参数恒为 true。要被替换的所有正式分区的枚举值必须和替换的临时分区枚举值完全相同。
- use_temp_partition_name:默认为 false。当该参数为 false,并且待替换的分区和替换分区的个数相同时,则替换后的正式分区名称维持不变。如果为 true,则替换后,正式分区的名称为替换分区的名称。
替换操作的一些说明:
- 分区替换成功后,被替换的分区将被删除且不可恢复。
数据的导入和查询
导入临时分区
根据导入方式的不同,指定导入临时分区的语法稍有差别。这里通过示例进行简单说明
查询结果用insert导入
INSERT INTO tbl TEMPORARY PARTITION(tp1, tp2, ...) SELECT ....
查看数据
SELECT ... FROM
tbl1 TEMPORARY PARTITION(tp1, tp2, ...)
JOIN
tbl2 TEMPORARY PARTITION(tp1, tp2, ...)
ON ...
WHERE ...;
doris中join的优化原理
Shuffle Join(Partitioned Join)
订单明细表:
CREATE TABLE test.order_info_shuffle
(
`order_id` varchar(20) COMMENT "订单id",
`user_id` varchar(20) COMMENT "用户id",
`goods_id` VARCHAR(20) COMMENT "商品id",
`goods_num` Int COMMENT "商品数量",
`price` double COMMENT "商品价格"
)
duplicate KEY(`order_id`)
DISTRIBUTED BY HASH(`order_id`) BUCKETS 5;
导入数据:
insert into test.order_info_shuffle values\
('o001','u001','g001',1,9.9 ),\
('o001','u001','g002',2,19.9),\
('o001','u001','g003',2,39.9),\
('o002','u002','g001',3,9.9 ),\
('o002','u002','g002',1,19.9),\
('o003','u002','g003',1,39.9),\
('o003','u002','g002',2,19.9),\
('o003','u002','g004',3,99.9),\
('o003','u002','g005',1,99.9),\
('o004','u003','g001',2,9.9 ),\
('o004','u003','g002',1,19.9),\
('o004','u003','g003',4,39.9),\
('o004','u003','g004',1,99.9),\
('o004','u003','g005',4,89.9);
商品表:
CREATE TABLE test.goods_shuffle
(
`goods_id` VARCHAR(20) COMMENT "商品id",
`goods_name` VARCHAR(20) COMMENT "商品名称",
`category_id` VARCHAR(20) COMMENT "商品品类id"
)
duplicate KEY(`goods_id`)
DISTRIBUTED BY HASH(`goods_id`) BUCKETS 5;
导入数据:
insert into test.goods_shuffle values\
('g001','iphon13','c001'),\
('g002','ipad','c002'),\
('g003','xiaomi12','c001'),\
('g004','huaweip40','c001'),\
('g005','headset','c003');
Sql示例:
EXPLAIN
select
oi.order_id,
oi.user_id,
oi.goods_id,
gs.goods_name,
gs.category_id,
oi.goods_num,
oi.price
from order_info_shuffle as oi
-- 我们可以不指定哪一种join方式,doris会自己根据数据的实际情况帮我们选择
JOIN goods_shuffle as gs
on oi.goods_id = gs.goods_id;
EXPLAIN select
oi.order_id,
oi.user_id,
oi.goods_id,
gs.goods_name,
gs.category_id,
oi.goods_num,
oi.price
from order_info_shuffle as oi
-- 可以显式的hint指定我们想要的join类型
JOIN [shuffle] goods_shuffle as gs
on oi.goods_id = gs.goods_id;
适用场景:不管数据量,不管是大表join大表还是大表join小表都可以用
优点:通用
缺点:需要shuffle内存和网络开销比较大,效率不高
Broadcast Join
当一个大表join小表的时候,将小表广播到每一个大表所在的每一个节点上(以hash表的形式放在内存中)这样的方式叫做Broadcast Join,类似于mr里面的一个map端join
订单明细表:
CREATE TABLE test.order_info_broadcast
(
`order_id` varchar(20) COMMENT "订单id",
`user_id` varchar(20) COMMENT "用户id",
`goods_id` VARCHAR(20) COMMENT "商品id",
`goods_num` Int COMMENT "商品数量",
`price` double COMMENT "商品价格"
)
duplicate KEY(`order_id`)
DISTRIBUTED BY HASH(`goods_id`) BUCKETS 5;
导入数据:
insert into test.order_info_broadcast values\
('o001','u001','g001',1,9.9 ),\
('o001','u001','g002',2,19.9),\
('o001','u001','g003',2,39.9),\
('o002','u002','g001',3,9.9 ),\
('o002','u002','g002',1,19.9),\
('o003','u002','g003',1,39.9),\
('o003','u002','g002',2,19.9),\
('o003','u002','g004',3,99.9),\
('o003','u002','g005',1,99.9),\
('o004','u003','g001',2,9.9 ),\
('o004','u003','g002',1,19.9),\
('o004','u003','g003',4,39.9),\
('o004','u003','g004',1,99.9),\
('o004','u003','g005',4,89.9);
商品表:
CREATE TABLE test.goods_broadcast
(
`goods_id` VARCHAR(20) COMMENT "商品id",
`goods_name` VARCHAR(20) COMMENT "商品名称",
`category_id` VARCHAR(20) COMMENT "商品品类id"
)
duplicate KEY(`goods_id`)
DISTRIBUTED BY HASH(`goods_id`) BUCKETS 5;
导入数据:
insert into test.goods_broadcast values\
('g001','iphon13','c001'),\
('g002','ipad','c002'),\
('g003','xiaomi12','c001'),\
('g004','huaweip40','c001'),\
('g005','headset','c003');
- 显式使用 Broadcast Join:
EXPLAIN
select
oi.order_id,
oi.user_id,
oi.goods_id,
gs.goods_name,
gs.category_id,
oi.goods_num,
oi.price
from order_info_broadcast as oi
JOIN [broadcast] goods_broadcast as gs
on oi.goods_id = gs.goods_id;
优点:避免了shuffle,提高了运算效率
缺点:有限制,必须右表数据量比较小
Bucket Shuffle Join
利用建表时候分桶的特性,当join的时候,join的条件和左表的分桶字段一样的时候,将右表按照左表分桶的规则进行shuffle操作,使右表中需要join的数据落在左表中需要join数据的BE节点上的join方式叫做Bucket Shuffle Join。
使用
从 0.14 版本开始默认为 true,新版本可以不用设置这个参数了!
show variables like '%bucket_shuffle_join%';
set enable_bucket_shuffle_join = true;
订单明细表:
CREATE TABLE test.order_info_bucket
(
`order_id` varchar(20) COMMENT "订单id",
`user_id` varchar(20) COMMENT "用户id",
`goods_id` VARCHAR(20) COMMENT "商品id",
`goods_num` Int COMMENT "商品数量",
`price` double COMMENT "商品价格"
)
duplicate KEY(`order_id`)
DISTRIBUTED BY HASH(`goods_id`) BUCKETS 5;
导入数据:
insert into test.order_info_bucket values\
('o001','u001','g001',1,9.9 ),\
('o001','u001','g002',2,19.9),\
('o001','u001','g003',2,39.9),\
('o002','u002','g001',3,9.9 ),\
('o002','u002','g002',1,19.9),\
('o003','u002','g003',1,39.9),\
('o003','u002','g002',2,19.9),\
('o003','u002','g004',3,99.9),\
('o003','u002','g005',1,99.9),\
('o004','u003','g001',2,9.9 ),\
('o004','u003','g002',1,19.9),\
('o004','u003','g003',4,39.9),\
('o004','u003','g004',1,99.9),\
('o004','u003','g005',4,89.9);
商品表:
CREATE TABLE test.goods_bucket
(
`goods_id` VARCHAR(20) COMMENT "商品id",
`goods_name` VARCHAR(20) COMMENT "商品名称",
`category_id` VARCHAR(20) COMMENT "商品品类id"
)
duplicate KEY(`goods_id`)
DISTRIBUTED BY HASH(`goods_id`) BUCKETS 3;
CREATE TABLE test.goods_bucket1
(
`goods_id` VARCHAR(20) COMMENT "商品id",
`goods_name` VARCHAR(20) COMMENT "商品名称",
`category_id` VARCHAR(20) COMMENT "商品品类id"
)
duplicate KEY(`goods_id`)
DISTRIBUTED BY HASH(`goods_name`) BUCKETS 3;
导入数据:
insert into test.goods_bucket values\
('g001','iphon13','c001'),\
('g002','ipad','c002'),\
('g003','xiaomi12','c001'),\
('g004','huaweip40','c001'),\
('g005','headset','c003');
通过 explain 查看 join 类型
EXPLAIN
select
oi.order_id,
oi.user_id,
oi.goods_id,
gs.goods_name,
gs.category_id,
oi.goods_num,
oi.price
from order_info_bucket as oi
-- 目前 Bucket Shuffle Join不能像Shuffle Join那样可以显示指定Join方式,
-- 只能让执行引擎自动选择,
-- 选择的顺序:Colocate Join -> Bucket Shuffle Join -> Broadcast Join -> Shuffle Join。
JOIN goods_bucket as gs
where oi.goods_id = gs.goods_id;
EXPLAIN select
oi.order_id,
oi.user_id,
oi.goods_id,
gs.goods_name,
gs.category_id,
oi.goods_num,
oi.price
from order_info_bucket as oi
-- 目前 Bucket Shuffle Join不能像Shuffle Join那样可以显示指定Join方式,
-- 只能让执行引擎自动选择,
-- 选择的顺序:Colocate Join -> Bucket Shuffle Join -> Broadcast Join -> Shuffle Join。
JOIN goods_bucket1 as gs
where oi.goods_id = gs.goods_id;
注意事项
- Bucket Shuffle Join 只生效于 Join 条件为等值的场景
- Bucket Shuffle Join 要求左表的分桶列的类型与右表等值 join 列的类型需要保持一致,否则无法进行对应的规划。
- Bucket Shuffle Join 只作用于 Doris 原生的 OLAP 表,对于 ODBC,MySQL,ES 等外表,当其作为左表时是无法规划生效的。
- Bucket Shuffle Join只能保证左表为单分区时生效。所以在 SQL 执行之中,需要尽量使用 where 条件使分区裁剪的策略能够生效。
Colocation Join
中文意思叫位置协同分组join,指需要join的两份数据都在同一个BE节点上,这样在join的时候,直接本地join计算即可,不需要进行shuffle。
- Colocation Group(位置协同组CG):在同一个 CG内的 Table 有着相同的 Colocation Group Schema,并且有着相同的数据分片分布(满足三个条件)。
- Colocation Group Schema(CGS):用于描述一个 CG 中的 Table,和 Colocation 相关的通用 Schema 信息。包括分桶列类型,分桶数以及分区的副本数等。
使用限制
- 建表时两张表的分桶列的类型和数量需要完全一致,并且桶数一致,才能保证多张表的数据分片能够一一对应的进行分布控制。
- 同一个 CG 内所有表的所有分区(Partition)的副本数必须一致。如果不一致,可能出现某一个Tablet 的某一个副本,在同一个 BE 上没有其他的表分片的副本对应
- 同一个 CG 内的表,分区的个数、范围以及分区列的类型不要求一致。
Runtime Filter
Runtime Filter会在有join动作的 sql运行时,创建一个HashJoinNode和一个ScanNode来对join的数据进行过滤优化,使得join的时候数据量变少,从而提高效率。
指定 RuntimeFilter 类型
set runtime_filter_type="BLOOM_FILTER,IN,MIN_MAX";
set runtime_filter_type="MIN_MAX";
参数解释:
- runtime_filter_type: 包括Bloom Filter、MinMax Filter、IN predicate、IN Or Bloom Filter
- Bloom Filter: 针对右表中的join字段的所有数据标注在一个布隆过滤器中,从而判断左表中需要join的数据在还是不在
- MinMax Filter: 获取到右表表中数据的最大值和最小值,看左表中查看,将超出这个最大值最小值范围的数据过滤掉
- IN predicate: 将右表中需要join字段所有数据构建一个IN predicate,再去左表表中过滤无意义数据
- runtime_filter_wait_time_ms: 左表的ScanNode等待每个Runtime Filter的时间,默认1000ms
- runtime_filters_max_num: 每个查询可应用的Runtime Filter中Bloom Filter的最大数量,默认10
- runtime_bloom_filter_min_size: Runtime Filter中Bloom Filter的最小长度,默认1M
- runtime_bloom_filter_max_size: Runtime Filter中Bloom Filter的最大长度,默认16M
- runtime_bloom_filter_size: Runtime Filter中Bloom Filter的默认长度,默认2M
- runtime_filter_max_in_num: 如果join右表数据行数大于这个值,我们将不生成IN predicate,默认102400
示例实操:
建表
CREATE TABLE test (t1 INT) DISTRIBUTED BY HASH (t1) BUCKETS 2
PROPERTIES("replication_num" = "1");
INSERT INTO test VALUES (1), (2), (3), (4);
CREATE TABLE test2 (t2 INT) DISTRIBUTED BY HASH (t2) BUCKETS 2
PROPERTIES("replication_num" = "1");
INSERT INTO test2 VALUES (3), (4), (5);
查看执行计划
set runtime_filter_type="BLOOM_FILTER,IN,MIN_MAX";
EXPLAIN SELECT t1 FROM test JOIN test2 where test.t1 = test2.t2;
+----------------------------------------------------------------------------+
| Explain String |
+----------------------------------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:<slot 2> |
| PARTITION: UNPARTITIONED |
| |
| VRESULT SINK |
| |
| 4:VEXCHANGE |
| |
| PLAN FRAGMENT 1 |
| |
| PARTITION: HASH_PARTITIONED: `default_cluster:test`.`test`.`t1` |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 04 |
| UNPARTITIONED |
| |
| 2:VHASH JOIN |
-- 这边能够看到使用的join方式是BUCKET_SHUFFLE
| | join op: INNER JOIN(BUCKET_SHUFFLE)[Tables are not in the same group] |
| | equal join conjunct: `test`.`t1` = `test2`.`t2` |
-- 这边能够看到使用的运行时过滤方式是in_or_bloom
| | runtime filters: RF000[in_or_bloom] <- `test2`.`t2` |
| | cardinality=0 |
| | vec output tuple id: 2 | |
| |----3:VEXCHANGE |
| | |
| 0:VOlapScanNode |
| TABLE: test(test), PREAGGREGATION: ON |
| runtime filters: RF000[in_or_bloom] -> `test`.`t1` |
| partitions=1/1, tablets=2/2, tabletList=14114,14116 |
| cardinality=0, avgRowSize=4.0, numNodes=1 |
| |
| PLAN FRAGMENT 2 |
| |
| PARTITION: HASH_PARTITIONED: `default_cluster:test`.`test2`.`t2` |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 03 |
| BUCKET_SHFFULE_HASH_PARTITIONED: `test2`.`t2` |
| |
| 1:VOlapScanNode |
| TABLE: test2(test2), PREAGGREGATION: ON |
| partitions=1/1, tablets=2/2, tabletList=14122,14124 |
| cardinality=0, avgRowSize=4.0, numNodes=1 |
+----------------------------------------------------------------------------+