KDP数据分析实战:从0到1完成数据实时采集处理到可视化

news2025/1/18 8:14:45

智领云自主研发的开源轻量级Kubernetes数据平台,即Kubernetes Data Platform (简称KDP),能够为用户提供在Kubernetes上的一站式云原生数据集成与开发平台。在最新的v1.1.0版本中,用户可借助 KDP 平台上开箱即用的 Airflow、AirByte、Flink、Kafka、MySQL、ClickHouse、Superset 等开源组件快速搭建实时、半实时或批量采集、处理、分析的数据流水线以及可视化报表展示,可视化展示效果如下:

247984561aa6a7370c0c0741a330aded.png

以下我们将介绍一个实时订单数据流水线从数据采集到数据处理,最后到可视化展示的详细建设流程。

 1.流水线设计

借助 KDP 平台的开源组件 Airflow、MySQL、Flink、Kafka、ClickHouse、Superset 完成数据实时采集处理及可视化分析,架构如下: 

8ea91c86309be540823ed486fe2b0dce.jpeg

1.1 数据流

  • 直接使用Flink构建实时数仓,由Flink进行清洗加工转换和聚合汇总,将各层结果集写入Kafka中;

  • ClickHouse从Kafka分别订阅各层数据,将各层数据持久化到ClickHouse中,用于之后的查询分析。

1.2 数据表

本次分析数据基于mock数据,包含数据实时采集处理及可视化分析:

  • 消费者表:customers

字段

字段说明

id

用户ID

name

姓名

age

年龄

gender

性别

  • 订单表:orders

字段

字段说明

order_id

订单ID

order_revenue

订单金额

order_region

下单地区

customer_id

用户ID

create_time

下单时间

1.3 环境说明

在 KDP 页面安装如下组件并完成组件的 QuickStart:

  • MySQL: 实时数据数据源及 Superset/Airflow 元数据库,安装时需要开启binlog

  • Kafka: 数据采集sink

  • Flink: 数据采集及数据处理

  • ClickHouse: 数据存储

  • Superset: 数据可视化

  • Airflow: 作业调度

2. 数据集成与处理

文中使用的账号密码信息请根据实际集群配置进行修改。

2.1 创建MySQL表

2.2 创建 Kafka Topic

进入Kafka broker pod,执行命令创建 Topic,也可以通过Kafka manager 页面创建,以下为进入pod并通过命令行创建的示例:

export BOOTSTRAP="kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092" 


bin/kafka-topics.sh --create \
  --topic ods-order \
  --replication-factor 3 \
  --partitions 10 \
  --bootstrap-server $BOOTSTRAP 


bin/kafka-topics.sh --create \
  --topic ods-customers \
  --replication-factor 3 \
  --partitions 10 \
  --bootstrap-server $BOOTSTRAP


bin/kafka-topics.sh --create \
  --topic dwd-order-customer-valid \
  --replication-factor 3 \
  --partitions 10 \
  --bootstrap-server $BOOTSTRAP


bin/kafka-topics.sh --create \
  --topic dws-agg-by-region \
  --replication-factor 3 \
  --partitions 10 \
  --bootstrap-server $BOOTSTRAP

2.3 创建 ClickHouse 表

进入clickhouse pod,使用`clickhouse-client`执行命令创建表,以下为建表语句:

CREATE DATABASE IF NOT EXISTS kdp_demo;
USE kdp_demo;


-- kafka_dwd_order_customer_valid
CREATE TABLE IF NOT EXISTS kdp_demo.dwd_order_customer_valid (
  order_id Int32,
  order_revenue Float32,
  order_region String,
  create_time DateTime,
  customer_id Int32,
  customer_age Float32,
  customer_name String,
  customer_gender String
) ENGINE = MergeTree()
ORDER BY order_id;


CREATE TABLE kdp_demo.kafka_dwd_order_customer_valid (
  order_id Int32,
  order_revenue Float32,
  order_region String,
  create_time DateTime,
  customer_id Int32,
  customer_age Float32,
  customer_name String,
  customer_gender String
) ENGINE = Kafka
SETTINGS
  kafka_broker_list = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',
  kafka_topic_list = 'dwd-order-customer-valid',
  kafka_group_name = 'clickhouse_group',
  kafka_format = 'JSONEachRow',
  kafka_row_delimiter = '\n';


CREATE MATERIALIZED VIEW kdp_demo.mv_dwd_order_customer_valid TO kdp_demo.dwd_order_customer_valid AS
SELECT
  order_id,
  order_revenue,
  order_region,
  create_time,
  customer_id,
  customer_age,
  customer_name,
  customer_gender
FROM kdp_demo.kafka_dwd_order_customer_valid;


-- kafka_dws_agg_by_region
CREATE TABLE IF NOT EXISTS kdp_demo.dws_agg_by_region (
  order_region String,
  order_cnt Int64,
  order_total_revenue Float32
) ENGINE = ReplacingMergeTree()
ORDER BY order_region;


CREATE TABLE kdp_demo.kafka_dws_agg_by_region (
  order_region String,
  order_cnt Int64,
  order_total_revenue Float32
) ENGINE = Kafka
SETTINGS
  kafka_broker_list = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',
  kafka_topic_list = 'dws-agg-by-region',
  kafka_group_name = 'clickhouse_group',
  kafka_format = 'JSONEachRow',
  kafka_row_delimiter = '\n';


CREATE MATERIALIZED VIEW kdp_demo.mv_dws_agg_by_region TO kdp_demo.dws_agg_by_region AS
SELECT
  order_region,
  order_cnt,
  order_total_revenue
FROM kdp_demo.kafka_dws_agg_by_region;

2.4 创建 Flink SQL 作业

2.4.1 SQL部分

CREATE DATABASE IF NOT EXISTS `default_catalog`.`kdp_demo`;


-- create source tables
CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`orders_src`(
    `order_id` INT NOT NULL,
    `order_revenue` FLOAT NOT NULL,
    `order_region` STRING NOT NULL,
    `customer_id` INT NOT NULL,
    `create_time` TIMESTAMP,
    PRIMARY KEY(`order_id`) NOT ENFORCED
) with (
    'connector' = 'mysql-cdc',
    'hostname' = 'kdp-data-mysql',
    'port' = '3306',
    'username' = 'bdos_dba',
    'password' = 'KdpDba!mysql123',
    'database-name' = 'kdp_demo',
    'table-name' = 'orders'
);


CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`customers_src` (
    `id` INT NOT NULL,
    `age` FLOAT NOT NULL,
    `name` STRING NOT NULL,
    `gender` STRING NOT NULL,
    PRIMARY KEY(`id`) NOT ENFORCED
) with (
    'connector' = 'mysql-cdc',
    'hostname' = 'kdp-data-mysql',
    'port' = '3306',
    'username' = 'bdos_dba',
    'password' = 'KdpDba!mysql123',
    'database-name' = 'kdp_demo',
    'table-name' = 'customers'
);


-- create ods dwd and dws tables
CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`ods_order_table` (
    `order_id` INT,
    `order_revenue` FLOAT,
    `order_region` VARCHAR(40),
    `customer_id` INT,
    `create_time` TIMESTAMP,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'ods-order',
    'properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',
    'key.format' = 'json',
    'value.format' = 'json'
);


CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`ods_customers_table` (
    `customer_id` INT,
    `customer_age` FLOAT,
    `customer_name` STRING,
    `gender` STRING,
    PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'ods-customers',
    'properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',
    'key.format' = 'json',
    'value.format' = 'json'
);


CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`dwd_order_customer_valid` (
    `order_id` INT,
    `order_revenue` FLOAT,
    `order_region` STRING,
    `create_time` TIMESTAMP,
    `customer_id` INT,
    `customer_age` FLOAT,
    `customer_name` STRING,
    `customer_gender` STRING,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'dwd-order-customer-valid',
    'properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',
    'key.format' = 'json',
    'value.format' = 'json'
);


CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`dws_agg_by_region` (
    `order_region` VARCHAR(40),
    `order_cnt` BIGINT,
    `order_total_revenue` FLOAT,
    PRIMARY KEY (order_region) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'dws-agg-by-region',
    'properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',
    'key.format' = 'json',
    'value.format' = 'json'
);


USE kdp_demo;
-- EXECUTE STATEMENT SET
-- BEGIN
INSERT INTO ods_order_table SELECT * FROM orders_src;
INSERT INTO ods_customers_table SELECT * FROM customers_src;
INSERT INTO
    dwd_order_customer_valid
SELECT
    o.order_id,
    o.order_revenue,
    o.order_region,
    o.create_time,
    c.id as customer_id,
    c.age as customer_age,
    c.name as customer_name,
    c.gender as customer_gender
FROM
    customers_src c
        JOIN orders_src o ON c.id = o.customer_id
WHERE
    c.id <> -1;
INSERT INTO
    dws_agg_by_region
SELECT
    order_region,
    count(*) as order_cnt,
    sum(order_revenue) as order_total_revenue
FROM
    dwd_order_customer_valid
GROUP BY
    order_region;
-- END;

2.4.2 使用 StreamPark 创建 Flink SQL 作业

具体使用参考 StreamPark 文档。

maven 依赖:

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-sql-connector-mysql-cdc</artifactId>
    <version>3.0.1</version>
</dependency>

2.5 创建 Airflow DAG

2.5.1 DAG 文件部分

import random
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago




default_args = {
    'owner': 'admin',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
}


dag = DAG(
    'kdp_demo_order_data_insert',
    description='Insert into orders by using random data',
    schedule_interval=timedelta(minutes=1),
    start_date=days_ago(1),
    catchup=False,
    tags=['kdp-example'],
)


# MySQL connection info
mysql_host = 'kdp-data-mysql'
mysql_db = 'kdp_demo'
mysql_user = 'bdos_dba'
mysql_password = 'KdpDba!mysql123'
mysql_port = '3306'
cities = ["北京", "上海", "广州", "深圳", "成都", "杭州", "重庆", "武汉", "西安", "苏州", "天津", "南京", "郑州",
          "长沙", "东莞", "青岛", "宁波", "沈阳", "昆明", "合肥", "大连", "厦门", "哈尔滨", "福州", "济南", "温州",
          "佛山", "南昌", "长春", "贵阳", "南宁", "金华", "石家庄", "常州", "泉州", "南通", "太原", "徐州", "嘉兴",
          "乌鲁木齐", "惠州", "珠海", "扬州", "兰州", "烟台", "汕头", "潍坊", "保定", "海口"]
city = random.choice(cities)
consumer_id = random.randint(1, 100)
order_revenue = random.randint(1, 100)
# 插入数据的 BashOperator
insert_data_orders = BashOperator(
    task_id='insert_data_orders',
    bash_command=f'''
    mysql -h {mysql_host} -P {mysql_port} -u {mysql_user} -p{mysql_password} {mysql_db} -e "
    INSERT INTO orders(order_revenue,order_region,customer_id) VALUES({order_revenue},'{city}',{consumer_id});"
    ''',
    dag=dag,
)
insert_data_orders

2.5.2 DAG 说明及执行

当前Airflow安装时,需要指定可访问的git 仓库地址,因此需要将 Airflow DAG 提交到 Git 仓库中。每分钟向orders表插入一条数据。

2.6 数据验证

使用ClickHouse验证数据:

(1)进入ClickHouse客户端

clickhouse-client 
# default pass: ckdba.123

(2)执行查询

SELECT * FROM kdp_demo.dwd_order_customer_valid;
SELECT count(*) FROM kdp_demo.dwd_order_customer_valid;

(3)对比验证MySQL中数据是否一致

select count(*) from kdp_demo.orders;

3. 数据可视化

在2.6中数据验证通过后,可以通过Superset进行数据可视化展示。使用账号`admin/admin`登录Superset页面(注意添加本地 Host 解析):http://superset-kdp-data.kdp-e2e.io

3.1 创建图表

导入我们制作好的图表:

  1. 下载面板:https://gitee.com/linktime-cloud/example-datasets/raw/main/superset/dashboard_export_20240607T100739.zip

  2. 导入面板

(1)选择下载的文件导入

eed49ffb69952693ad5a46da6be81f08.png

(2)输入 ClickHouse 的用户`default`的默认密码`ckdba.123`:

4c07d54c7ad0f2f66085481d5bfe77ab.png

3.2 效果展示

最终的实时订单数据图表展示如下,随着订单数据的更新,图表中的数据也会实时更新:

57bd6fa097e66e1da7dc8aa103a599b4.png

快速体验

🚀GitHub项目:

https://github.com/linktimecloud/kubernetes-data-platform

欢迎您参与开源社区的建设🤝

 - FIN -       

1ad0c68fe5ea3d59a392d306012eef0b.png

更多精彩推

  • 我们开源啦!一键部署免费使用!Kubernetes上直接运行大数据平台!

  • 开源 KDP  v1.1.0 版本正式发布,新增数据集成开发应用场景

  • 在 KubeSphere 上快速安装和使用 KDP 云原生数据平台

  • 在 Rancher 上快速安装和使用 KDP 云原生数据平台

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1903770.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JCR一区 | Matlab实现GAF-PCNN-MATT、GASF-CNN、GADF-CNN的多特征输入数据分类预测/故障诊断

JJCR一区 | Matlab实现GAF-PCNN-MATT、GASF-CNN、GADF-CNN的多特征输入数据分类预测/故障诊断 目录 JJCR一区 | Matlab实现GAF-PCNN-MATT、GASF-CNN、GADF-CNN的多特征输入数据分类预测/故障诊断分类效果格拉姆矩阵图GAF-PCNN-MATTGASF-CNNGADF-CNN 基本介绍程序设计参考资料 分…

make工具

1、什么是make&#xff1f; make是个命令&#xff0c;是个可执行程序&#xff0c;是个工具&#xff0c;用来解析Makefile文件的命令&#xff0c;这个命令存放在/usr/bin/目录下 -rwxr-xr-x 1 root root 250K 2月 15 2022 make -rwxr-xr-x 1 root root 4.8K 2月 15 2022 ma…

图片高效管理神器,随机高度切割,一键生成灰色图片,个性化处理随心所欲

在数字化时代&#xff0c;图片已成为我们生活和工作中不可或缺的一部分。然而&#xff0c;面对海量的图片资源&#xff0c;如何高效管理、快速处理&#xff0c;成为了许多人头疼的问题。今天&#xff0c;我们为您带来了一款全新的图片高效管理神器_——首助编辑高手&#xff0c…

【全面介绍下如何使用Zoom视频会议软件!】

&#x1f3a5;博主&#xff1a;程序员不想YY啊 &#x1f4ab;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f917;点赞&#x1f388;收藏⭐再看&#x1f4ab;养成习惯 ✨希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出…

Halcon 背景网格产品刮伤缺陷检测

* 关闭窗口 dev_close_window ()*关闭程序计数器,图形变量更新,窗口图形更新 dev_update_off ()*设置图像路径 Path : lcd/mura_defects_blur_*读取一张图像 read_image (Image, Path 01)*获取图像大小 get_image_size (Image, Width, Height)*创建一个新窗体 dev_open_window…

昇思25天学习打卡营第13天 | LLM原理和实践:文本解码原理--以MindNLP为例

1. 文本解码原理--以MindNLP为例 1.1 自回归语言模型 根据前文预测下一个单词 一个文本序列的概率分布可以分解为每个词基于其上文的条件概率的乘积 W 0 W_0 W0​:初始上下文单词序列 t t t: 时间步 当生成EOS标签时&#xff0c;停止生成。 MindNLP/huggingface Transfor…

NewStarCTF2023-Misc

目录 week1 CyberChefs Secret 机密图片 流量&#xff01;鲨鱼&#xff01; 压缩包们 空白格 隐秘的眼睛 week2 新建Word文档 永不消逝的电波 1-序章 base! WebShell的利用 Jvav week3 阳光开朗大男孩 大怨种 2-分析 键盘侠 滴滴滴 week4 通大残 Nmap 依…

Unity AssetsBundle 详解

文章目录 1.AssetBundle 概念2.AssetBundle 优势3.AssetBundle 特性4.AssetBundle 使用流程4.1 分组4.2 打包4.3 加载包4.4 加载资源4.5 卸载资源 5.AssetBundleManifest6.AssetBundle的内存占用7.AB包资源加密 1.AssetBundle 概念 AssetBundle又称AB包&#xff0c;是Unity提供…

Python视觉轨迹几何惯性单元超维计算结构算法

&#x1f3af;要点 &#x1f3af;视觉轨迹几何惯性单元超维计算结构算法 | &#x1f3af;超维计算结构视觉场景理解 | &#x1f3af;超维计算结构算法解瑞文矩阵 | &#x1f3af;超维矢量计算递归神经算法 &#x1f36a;语言内容分比 &#x1f347;Python蒙特卡罗惯性导航 蒙…

【漏洞复现】宏景eHR LoadOtherTreeServlet SQL注入漏洞

0x01 产品简介 宏景eHR人力资源管理软件是一款人力资源管理与数字化应用相融合&#xff0c;满足动态化、协同化、流程化、战略化需求的软件。 0x02 漏洞概述 宏景eHR LoadOtherTreeServlet接口处存在SQL注入漏洞&#xff0c;未经身份验证的远程攻击者除了可以利用 SQL 注入漏…

[Multi-Modal] MDETR 论文及代码学习笔记

代码地址&#xff1a;https://github.com/ashkamath/mdetr 论文地址&#xff1a;https://arxiv.org/abs/2104.12763 多模态推理系统依靠预先训练的目标检测器从图像中提取感兴趣区域&#xff08;边界框包围区域&#xff09;。然而&#xff0c;这个关键模块通常被用作黑匣子&…

MySQL高级----详细介绍MySQL中的锁

概述 锁是计算机协调多个进程或线程并发访问某一资源的机制&#xff0c;为了解决数据访问的一致性和有效性问题。在数据库中&#xff0c;除传统的计算资源(CPU、RAN、I/O&#xff09;的争用以外&#xff0c;数据也是一种供许多用户共享的资源。如何保证数据并发访问的一致性、…

windows无法访问github

##一、如果发现windows无法访问github时 一般就是我们的dns出现了问题&#xff0c;此时我们需要更换一个dns访问 ##二、解决方法 首先我们访问ip查询地址&#xff0c; https://ipchaxun.com/github.com/ 可更换下面历史ip进行测试&#xff0c;在windows的cmd里面输入ping git…

【C++深度探索】:继承(定义赋值兼容转换作用域派生类的默认成员函数)

✨ 愿随夫子天坛上&#xff0c;闲与仙人扫落花 &#x1f30f; &#x1f4c3;个人主页&#xff1a;island1314 &#x1f525;个人专栏&#xff1a;C学习 &#x1f680; 欢迎关注&#xff1a;&#x1f44d;点赞…

pin是什么?管脚

1.平面分割 1)启动Allegro PCB design &#xff0c;打开.brd。深色部分属于一个net&#xff0c;要做一下修改&#xff0c;将上面的pin包含进shape中&#xff0c;i进行a&#xff0c;b两步操作&#xff0c;删除以前存在的Anti Etch下的line&#xff0c;再将其进行补齐 使它保住上…

MSPM0G3507——OPENMV给M0传数据(用数据包)互相通信(以循迹为例)

OPENMV端代码 # main.py -- put your code here! import pyb, sensor, image, math, time from pyb import UART import ustruct from image import SEARCH_DS, SEARCH_EX import time import sensor, displayuart UART(3, 115200, bits8, parityNone, stop1, timeout_char10…

Pogo-DroneCANPWM模块:可实现DroneCAN转PWM,DroneCAN转dshot,DroneCAN转bdshot

关键词&#xff1a;Ardupilot&#xff0c;Pixhawk&#xff0c;PWM&#xff0c;dshot&#xff0c;bdshot&#xff0c;DroneCANPWM&#xff0c;电调ESC&#xff0c;DroneCAN&#xff0c;UAVCAN&#xff0c;飞控&#xff0c;无人机&#xff0c;UAV Keywords&#xff1a;Ardupilot…

Xilinx FPGA:vivado串口输入输出控制fifo中的数据

一、实验要求 实现同步FIFO回环测试&#xff0c;通过串口产生数据&#xff0c;写入到FIFO内部&#xff0c;当检测到按键信号到来&#xff0c;将FIFO里面的数据依次读出。 二、信号流向图 三、状态转换图 四、程序设计 &#xff08;1&#xff09;按键消抖模块 timescale 1ns…

Python编程学习笔记(1)--- 变量和简单数据类型

1、变量 在学习编程语言之前&#xff0c;所接触的第一个程序&#xff0c;绝大多数都是&#xff1a; print("Hello world!") 接下来尝试使用一个变量。在代码中的开头添加一行代码&#xff0c;并对第二行代码进行修改&#xff0c;如下&#xff1a; message "…

Github 2024-07-07php开源项目日报 Top9

根据Github Trendings的统计,今日(2024-07-07统计)共有9个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量PHP项目9Blade项目2JavaScript项目1Laravel:表达力和优雅的 Web 应用程序框架 创建周期:4631 天开发语言:PHP, BladeStar数量:75969 个Fork数…