Doris实战篇-准实时数仓架构设计与实现

news2025/1/14 0:53:08

前言

这是一篇Doris数据仓库架构随笔,会应用到一些优秀的用户案例和自己的见解,欢迎大家留言评论。

前景回顾

下图是之前文章有提到过的数仓架构,这种架构的好处是实时强,数据产生以后,直接可以消费到,而且架构简单,直接用flinkcdc结合dinky或者streampark就能够零代码的把业务数据同步到doris进行数仓构建操作,缺点就是太吃资源(后续doris官方说能够整库同步flinkcdc消耗的资源问题可能要小很多)

资源消耗多的原因 

目前flinkcdc的方式如果用flinksql的方式那么一张表就要对应一个flink任务,消耗的资源比较多。 

架构演进 

参考架构

架构演进参考下面是某个公司的分享架构图(不记得是哪个公司的分享了)。

架构改进 

对于上面架构的启发,由于想到hudi尚不稳定,加上doris的社区比较活跃提供的技术支持比较好,所以得到下面的架构。

  •  为了解决掉flinksql每一张表都要一个任务的问题,对于这一块进行了代码的实现,由于flinkcdc采集的Debezium数据格式的数据,就对flinkcdc对应的序列化类进行重写,是的flinkcdc的数据能够获得数据的主键数据,从而实现到kafka的整库同步不乱序。
  • flume采集kafka的数据也是同样的道理,正则的形式动态的感知对应的主题动态的采集,作为doris数据丢失数据以后的一个备份作用。

Demo实现 

创建Doris的表

对应flinkcdc的数据格式(尽量保存最原始的数据)

CREATE TABLE `maxcomputer_demo_nihao` (
  `ts`  date COMMENT '数据操作时间',
  `op` varchar(5) NOT NULL COMMENT '操作类型',
  `before` JSONB COMMENT '数据操作之前的数据',
  `after` JSONB COMMENT '数据操作之后的数据',
  `source` JSONB COMMENT '元数据信息',
  `ts_ms` bigint(20) NULL COMMENT '真正的时间搓字段'
) ENGINE=OLAP
DUPLICATE KEY(`ts`)
COMMENT 'OLAP'
PARTITION BY RANGE(`ts`)()
DISTRIBUTED BY HASH(ts) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-2147483648",
"dynamic_partition.end" = "4",
"dynamic_partition.prefix" = "p",
"dynamic_partition.replication_allocation" = "tag.location.default: 3",
"dynamic_partition.buckets" = "10",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.history_partition_num" = "47",
"dynamic_partition.hot_partition_num" = "0",
"dynamic_partition.reserved_history_periods" = "NULL",
"dynamic_partition.storage_policy" = "",
"dynamic_partition.storage_medium" = "HDD",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
);

创建Doris的Routine Load 

CREATE ROUTINE LOAD demo.test1 ON maxcomputer_demo_nihao
COLUMNS(op, before, after, source,ts_ms, ts=from_unixtime(ts_ms/1000, '%Y-%m-%d'))
PROPERTIES
(
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    "strict_mode" = "false",
    "format" = "json",
    "jsonpaths" = "[\"$.op\",\"$.before\",\"$.after\",\"$.source\",\"$.ts_ms\"]",
    "strip_outer_array" = "false"
)
FROM KAFKA
(
    "kafka_broker_list" = "ip:ip:9092,ip:9092",
    "kafka_topic" = "maxcomputer_demo_table7",
	"property.kafka_default_offsets" = "OFFSET_BEGINNING",
	"property.group.id" = "xxx"
);

相关Routine Load操作

#查看任务情况
HELP SHOW ROUTINE LOAD;
SHOW ROUTINE LOAD FOR demo.test1;
#暂停任务
HELP PAUSE ROUTINE LOAD
PAUSE ROUTINE LOAD FOR test1;
#重启暂停任务
HELP RESUME ROUTINE LOAD;
RESUME ROUTINE LOAD FOR test1;
#停止任务
HELP STOP ROUTINE LOAD;
STOP ROUTINE LOAD FOR test1;

任务停止以后从消息末尾开始消费 

#失败重启以后用下面这个区别就是删除了"property.kafka_default_offsets" = "OFFSET_BEGINNING",那么默认就从最新的位置开始消费,丢失的数据可以重hive里面导入过来
CREATE ROUTINE LOAD demo.test1 ON maxcomputer_demo_nihao
COLUMNS(op, before, after, source,ts_ms, ts=from_unixtime(ts_ms/1000, '%Y-%m-%d'))
PROPERTIES
(
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    "strict_mode" = "false",
    "format" = "json",
    "jsonpaths" = "[\"$.op\",\"$.before\",\"$.after\",\"$.source\",\"$.ts_ms\"]",
    "strip_outer_array" = "false"
)
FROM KAFKA
(
    "kafka_broker_list" = "ip:9092,ip:9092,ip:9092",
    "kafka_topic" = "maxcomputer_demo_table7",
	"property.group.id" = "xxx"
);

相关的其他操作

#如果binlog保存的时间过短,那么会导致如果flink任务在从savepoint重启的时候找不到对应的binlog
SHOW VARIABLES LIKE 'expire_logs_days';
SHOW BINARY LOGS;
#如果flink-conf.yaml没有配置这个,会导致写入kafka的中文乱码问题(尽管你的消息生产是UTF-8也会中文乱码)
env.java.opts: "-Dfile.encoding=UTF-8"

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/724591.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【网络安全】带你了解【黑客】

Yan-英杰的主页 悟已往之不谏 知来者之可追 C程序员,2024届电子信息研究生 目录 引言 1. 定义 2. 分类 a. 白帽黑客(White Hat Hacker) b. 黑帽黑客(Black Hat Hacker) c. 灰帽黑客(Gray Hat Hacker…

【Docker】Docker中 AUFS、BTRFS、ZFS、存储池概念的详细讲解

前言 Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux或Windows操作系统的机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。 📕作者简介:热…

企业如何搭建一个出色的帮助中心?做好这6点很重要!

伴随着互联网的不断普及和快速发展,我们的生活已经发生了翻天覆地的改变,企业的商业模式也不断地适应着时代。为了提供给用户更加便捷的服务,逐渐地有更多企业将业务转移到线上。比如,餐馆为了解决点餐难题,就会上线点…

LINUX IOWAIT 是怎么回事,和数据库性能有关吗? (翻译)

开头还是介绍一下群,如果感兴趣polardb ,mongodb ,mysql ,postgresql ,redis 等有问题,有需求都可以加群群内有各大数据库行业大咖,CTO,可以解决你的问题。加群请联系 liuaustin3 ,在新加的朋友会分到2群(共…

Redis线程模式

目录 redis6.0之前:单Reactor单线程 问:为什么Redis要选择单线程??? redis6.0后引入多线程IO redis6.0之前:单Reactor单线程 大致工作流程: 首先,调用 epoll_create() 创建一个 …

剑指 Offer 17: 打印从1到最大的n位数

首先,List类型的要么存Integer,要么存int[],并且后面实现是用ArrayList! 这边的i和j应该从1开始,因为0不能做高位也不能做位数。 这里注意,1是List的实现是用ArrayList,2是从1开始(1…

C++的引用 拷贝赋值和引用赋值

💯 博客内容:C的引用 拷贝赋值和引用赋值 😀 作  者:陈大大陈 🚀 个人简介:一个正在努力学技术的准前端,专注基础和实战分享 ,欢迎私信! 💖 欢迎大家&#…

排名前十的开放式蓝牙耳机,哪个蓝牙耳机好一点?

当前耳机最大的受众群体就是年轻人,最怕自己吵到别人,而且更注重自己的舒适度,所以开放式耳机自从出现以来就深受年轻人喜爱,这种耳机用起来很方便,而且振动感不强,不用进入耳道,拥挤耳朵。开放…

C++图形开发(8):空格键控制小球起跳

文章目录 1.绘制小球2.按下空格让小球跳起来 我们今天来实现下按下空格键控制小球的起跳(还没阅读之前文章的同学可以先去阅读一下:C图形开发专栏) 1.绘制小球 首先,我们要绘制小球,这里就不细讲啦,之前的…

4.1 探索LyScript漏洞挖掘插件

在第一章中我们介绍了x64dbg这款强大的调试软件,通过该软件逆向工程师们可以手动完成对特定进程的漏洞挖掘及脱壳等操作,虽然x64dbg支持内置Script脚本执行模块,但脚本引擎通常来说是不够强大的,LyScript 插件的出现填补了这方面的…

2020~2030年 中国智能驾驶市场和技术趋势分析及主流芯片方案概述

分析及主流芯片方案概述智能驾驶已经成为中国用户买车时首选的配置之一,从L0到L2的整个产品布局,已经是车厂卖车时候的重点宣传点,智能驾驶不同于自动驾驶,无论是法律法规和产品定义都有着明确且清晰的定义。用户本身智能驾驶产品…

十个创业九个死,如何看待大学生创业?

虽然大家都在说“十个创业九个死”,尤其是前几年疫情的影响,很多创业和开店铺的都以失败告终,可是对于大学生来说,创业是很值得去做的一件事情。 熟悉我的朋友都知道,我是从事软件开发行业的,虽然前几年的…

实训笔记7.5

实训笔记7.5 7.5一、座右铭二、软件--软件工程2.1 需求分析2.2 系统设计2.2.1 概要设计2.2.2 数据库设计2.2.3 详细设计 2.3 编码开发/实现2.4 系统测试2.5 系统部署运行和维护 三、Java界面编程---仅作了解四、maven--自动化构建工具4.1 作用:4.2 maven如何帮助我们…

C#常见技能_结构

大家好,我是华山自控编程朱老师 前几天一个学员在学习C#与结构交互时,也不知道结构法可以用来做什么 。下面我们就详细讲讲C# 和结构交互的相关知识。 在C#编程中,结构是一种数据类型,它可以用于存储和组织相关数据,并提供对数据…

MySQL原理探索——24 MySQL是怎么保证主备一致的

在前面的文章中,我不止一次地和你提到了 binlog,大家知道 binlog 可以用来归档,也可以用来做主备同步,但它的内容是什么样的呢?为什么备库执行了 binlog 就可以跟主库保持一致了呢?今天我就正式地和你介绍一…

Selenium 中并行测试的重要性

目录 前言 并行测试 Selenium 中的并行测试 使用 TestNG 和 Selenium 进行并行测试 为什么我们需要在 Selenium 中进行并行测试? 更多测试范围 减少测试时间 成本效益 优化 CI/CD 流程 持续测试 实施并行测试 总结 前言 随着技术的进步,测试…

【JavaEE进阶】Spring核心与设计思想

1,Spring是什么? 我们通常所说的Spring指的是 Spring Framework(Spring 框架),它是一个轻量级的 Java 开源框架,有着活跃庞⼤的社区。Spring 是为了解决企业应用开发的复杂性而创建的,不仅⽀持…

Linux网络抓包工具tcpdump

tcpdump 指令可列出经过指定网络界面的数据包文件头,可以将网络中传送的数据包的 “头” 完全截获下来提供分析。它支持针对网络层、协议、主机、网络或端口的过滤,并提供 and、or、not 等逻辑语句来帮助你摘取有用信息。   由于它需要将网络接口设置为…

青岛大学_王卓老师【数据结构与算法】Week04_11_案例分析与实现1_学习笔记

本文是个人学习笔记,素材来自青岛大学王卓老师的教学视频。 一方面用于学习记录与分享,另一方面是想让更多的人看到这么好的《数据结构与算法》的学习视频。 如有侵权,请留言作删文处理。 课程视频链接: 数据结构与算法基础–…

STM32网络通信Web Server中SSI和CGI的应用

介绍 最近由于项目功能需要,开始研究STM32 WebServer通信以及SSI和CGI应用方法。项目结束后,主要总结浏览器与STM32之间进行通行,STM32作为服务器而浏览器做为客户端进行通行。 文件介绍 此部分的代码是根据ST官方的Web Server例程的基础上…