前言
这是一篇Doris数据仓库架构随笔,会应用到一些优秀的用户案例和自己的见解,欢迎大家留言评论。
前景回顾
下图是之前文章有提到过的数仓架构,这种架构的好处是实时强,数据产生以后,直接可以消费到,而且架构简单,直接用flinkcdc结合dinky或者streampark就能够零代码的把业务数据同步到doris进行数仓构建操作,缺点就是太吃资源(后续doris官方说能够整库同步flinkcdc消耗的资源问题可能要小很多)
资源消耗多的原因
目前flinkcdc的方式如果用flinksql的方式那么一张表就要对应一个flink任务,消耗的资源比较多。
架构演进
参考架构
架构演进参考下面是某个公司的分享架构图(不记得是哪个公司的分享了)。
架构改进
对于上面架构的启发,由于想到hudi尚不稳定,加上doris的社区比较活跃提供的技术支持比较好,所以得到下面的架构。
- 为了解决掉flinksql每一张表都要一个任务的问题,对于这一块进行了代码的实现,由于flinkcdc采集的Debezium数据格式的数据,就对flinkcdc对应的序列化类进行重写,是的flinkcdc的数据能够获得数据的主键数据,从而实现到kafka的整库同步不乱序。
- flume采集kafka的数据也是同样的道理,正则的形式动态的感知对应的主题动态的采集,作为doris数据丢失数据以后的一个备份作用。
Demo实现
创建Doris的表
对应flinkcdc的数据格式(尽量保存最原始的数据)
CREATE TABLE `maxcomputer_demo_nihao` (
`ts` date COMMENT '数据操作时间',
`op` varchar(5) NOT NULL COMMENT '操作类型',
`before` JSONB COMMENT '数据操作之前的数据',
`after` JSONB COMMENT '数据操作之后的数据',
`source` JSONB COMMENT '元数据信息',
`ts_ms` bigint(20) NULL COMMENT '真正的时间搓字段'
) ENGINE=OLAP
DUPLICATE KEY(`ts`)
COMMENT 'OLAP'
PARTITION BY RANGE(`ts`)()
DISTRIBUTED BY HASH(ts) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-2147483648",
"dynamic_partition.end" = "4",
"dynamic_partition.prefix" = "p",
"dynamic_partition.replication_allocation" = "tag.location.default: 3",
"dynamic_partition.buckets" = "10",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.history_partition_num" = "47",
"dynamic_partition.hot_partition_num" = "0",
"dynamic_partition.reserved_history_periods" = "NULL",
"dynamic_partition.storage_policy" = "",
"dynamic_partition.storage_medium" = "HDD",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
);
创建Doris的Routine Load
CREATE ROUTINE LOAD demo.test1 ON maxcomputer_demo_nihao
COLUMNS(op, before, after, source,ts_ms, ts=from_unixtime(ts_ms/1000, '%Y-%m-%d'))
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "[\"$.op\",\"$.before\",\"$.after\",\"$.source\",\"$.ts_ms\"]",
"strip_outer_array" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "ip:ip:9092,ip:9092",
"kafka_topic" = "maxcomputer_demo_table7",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.group.id" = "xxx"
);
相关Routine Load操作
#查看任务情况
HELP SHOW ROUTINE LOAD;
SHOW ROUTINE LOAD FOR demo.test1;
#暂停任务
HELP PAUSE ROUTINE LOAD
PAUSE ROUTINE LOAD FOR test1;
#重启暂停任务
HELP RESUME ROUTINE LOAD;
RESUME ROUTINE LOAD FOR test1;
#停止任务
HELP STOP ROUTINE LOAD;
STOP ROUTINE LOAD FOR test1;
任务停止以后从消息末尾开始消费
#失败重启以后用下面这个区别就是删除了"property.kafka_default_offsets" = "OFFSET_BEGINNING",那么默认就从最新的位置开始消费,丢失的数据可以重hive里面导入过来
CREATE ROUTINE LOAD demo.test1 ON maxcomputer_demo_nihao
COLUMNS(op, before, after, source,ts_ms, ts=from_unixtime(ts_ms/1000, '%Y-%m-%d'))
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "[\"$.op\",\"$.before\",\"$.after\",\"$.source\",\"$.ts_ms\"]",
"strip_outer_array" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "ip:9092,ip:9092,ip:9092",
"kafka_topic" = "maxcomputer_demo_table7",
"property.group.id" = "xxx"
);
相关的其他操作
#如果binlog保存的时间过短,那么会导致如果flink任务在从savepoint重启的时候找不到对应的binlog
SHOW VARIABLES LIKE 'expire_logs_days';
SHOW BINARY LOGS;
#如果flink-conf.yaml没有配置这个,会导致写入kafka的中文乱码问题(尽管你的消息生产是UTF-8也会中文乱码)
env.java.opts: "-Dfile.encoding=UTF-8"