揭秘“湖仓一体”——Flink+Paimon+StarRocks,打造实时分析新纪元

news2024/12/25 21:26:06

1.湖仓一体

数据湖仓是 Flink 流批一体发挥重要作用的场景,使用 Flink + Paimon + starRocks 来构建湖仓一体数据分析.
Apache Paimon 是一个专为实时数据处理而设计的湖表格式,它最大的亮点是使用了 LSM Tree 技术。与 Hudi 相比,Paimon 在更新插入(Upsert)操作上速度快了4倍,查询扫描(Scan)速度也提高了10倍。这意味着它能提供更快的响应速度,同时降低数据入湖的成本,并且让开发者用起来更高效。Paimon 社区十分活跃,很多产品都在迅速与其兼容,这让它的生态系统发展得比其他湖库表格式更快、更全面。
StarRocks 是一款高性能分析型数据仓库,使用向量化、MPP 架构、CBO、智能物化视图、可实时更新的列式存储引擎等技术实现多维、实时、高并发的数据分析。StarRocks 既支持从各类实时和离线的数据源高效导入数据,也支持直接分析数据湖上各种格式的数据。StarRocks 兼容 MySQL 协议,可使用 MySQL 客户端和常用 BI 工具对接。同时 StarRocks 具备水平扩展,高可用、高可靠、易运维等特性。广泛应用于实时数仓、OLAP 报表、数据湖分析等场景。
湖仓一体化.jpg

Flink + Paimon + StarRocks 流式湖仓方案将 3 个产品做了非常紧密的结合,首先使用 Flink 流批一体计算引擎将数仓以 Paimon 格式在湖上构建,使用 Flink 完成数仓 ODS 到 DWD 层,DWS 和 ADS 的计算。通过使用 StarRocks 对各层数仓做统一的 OLAP 查询和 ADS 层在线分析。基于 Paimon 可以实现高吞吐入湖;基于 Flink 可以实现全链路的流批一体计算,基于 StarRocks 可以实现高性能的 OLAP 查询,所以整个链路从实时性、时效性、成本几个方面都可以取得比较好的平衡。
使用 StarRocks 统一管理数据湖和数据仓库,将高并发和实时性要求很高的业务放在 StarRocks 中分析,也可以使用 External Catalog 和外部表进行数据湖上的分析。StarRocks 从 3.1 版本开始支持 Paimon Catalog。
Paimon Catalog 是一种 External Catalog。通过 Paimon Catalog,您不需要执行数据导入就可以直接查询 Apache Paimon 里的数据。
在数据湖仓场景下,使用 Flink 可以完成复杂的数据拼接以及聚合计算,并且达到很高的实时性的要求。另外,实时链路在使用的过程中不可避免的会因为一些数据延迟等问题导致会有数据修正和数据回溯的需求。Flink 流批一体的特性能够让用户方便的使用与实时链路一样的作业代码,高效地完成数据修正和数据回溯的需求。

2.演示架构

通过flink-cdc 监听MySQL Binlog数据同步到Paimon ODS层,然后进行DWD数据清洗宽表打通,再到DWS层进行多维度汇总聚合,最后同ADS层进行数据呈现.其中用到streamPark进行作业编排.
2.1 组件使用版本

  • flink1.18.1
  • paimon0.8
  • fink-cdc3.1
  • streamPark2.1.4
  • starRocks3.1
    安装方式请自行安装

2.2 场景说明
在mysql创建3张表:用户表users,订单表orders,商品表products,订单详情表order_details
分析3个指标: 用户的总购买金额 产品的销售数量 用户的平均订单金额
在执行下面操作前,确保mysql,flink,streamPark,starRocks已经启动.

步骤一:准备演示数据

  1. 在MySQL中执行以下命令,创建数据表。
use emp;

-- 用户表
CREATE TABLE users (
    user_id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '用户ID,主键',
    user_name VARCHAR(50) NOT NULL COMMENT '用户名',
    email VARCHAR(100) NOT NULL COMMENT '邮箱',
    registration_date DATE NOT NULL COMMENT '注册日期',
    PRIMARY KEY (`user_id`) USING BTREE
) COMMENT '用户表';

INSERT INTO users (user_id, user_name, email, registration_date) VALUES (1, '张山', 'alice@example.com', '2023-01-15');
INSERT INTO users (user_id, user_name, email, registration_date) VALUES (2, '李四', 'bob@example.com', '2023-02-20');
INSERT INTO users (user_id, user_name, email, registration_date) VALUES (3, '刘博', 'charlie@example.com', '2023-03-10');

-- 订单表
CREATE TABLE orders (
    order_id bigint AUTO_INCREMENT PRIMARY KEY COMMENT '订单ID,主键',
    user_id INT NOT NULL COMMENT '用户ID,外键,关联到users表',
    order_date DATE NOT NULL COMMENT '订单日期',
    total_amount DECIMAL(10, 2) NOT NULL COMMENT '订单总金额',
	PRIMARY KEY (`order_id`) USING BTREE
) COMMENT '订单表';

-- 演示数据
INSERT INTO orders (user_id, order_date, total_amount) VALUES
(1, '2023-04-01', 150.00),
(2, '2023-04-05', 200.00),
(3, '2023-04-10', 250.00),
(1, '2023-04-15', 300.00);

-- 商品表
CREATE TABLE products (
    product_id bigint AUTO_INCREMENT PRIMARY KEY COMMENT '产品ID,主键',
    product_name VARCHAR(100) NOT NULL COMMENT '产品名',
    price DECIMAL(10, 2) NOT NULL COMMENT '产品价格',
    PRIMARY KEY (`product_id`) USING BTREE
) COMMENT '产品表';

-- 演示数据
INSERT INTO products (product_name, price) VALUES
('笔记本', 50.00),
('手表', 75.00),
('耳机', 100.00);

-- 订单详情表
CREATE TABLE order_details (
    order_detail_id bigint AUTO_INCREMENT PRIMARY KEY COMMENT '订单详情ID,主键',
    order_id INT NOT NULL COMMENT '订单ID,外键,关联到orders表',
    product_id INT NOT NULL COMMENT '产品ID,外键,关联到products表',
    quantity INT NOT NULL COMMENT '购买数量',
    subtotal DECIMAL(10, 2) NOT NULL COMMENT '小计金额(quantity * price)',
     PRIMARY KEY (`order_detail_id`) USING BTREE
) COMMENT '订单详情表';

-- 演示数据
INSERT INTO order_details (order_id, product_id, quantity, subtotal) VALUES
(1, 1, 2, 100.00),
(1, 2, 1, 50.00),
(2, 1, 1, 50.00),
(2, 3, 2, 150.00),
(3, 2, 2, 150.00),
(3, 3, 1, 100.00),
(4, 3, 3, 300.00);

步骤二:mysql数据同步paimon

确保mysql已经开启binlog

1.编写flink-cdc同步任务,在flink-cdc的创建job文件夹,然后在里面创建mysql-to-paimon.yml

source:
type: mysql
name: MySQL Source
hostname: 192.168.1.72
port: 3306
username: root
password: dory@2022
tables: emp.users,emp.products,emp.order_details,emp.orders
server-id: 5401-5404

sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /opt/software/paimon

pipeline:
name: MySQL to Paimon Pipeline
parallelism: 1

  1. 在 flink-cdc的lib文件夹下添加:
    flink-cdc-pipeline-connector-mysql-3.1.0.jar
    flink-cdc-pipeline-connector-paimon-3.1.0.jar
    mysql-connector-java-8.0.27.jar
  2. 在保证flink集群启动的情况下,进行启动flink-cdc
 ./bin/flink-cdc.sh job/mysql-to-paimom.yaml --jar lib/mysql-connector-java-8.0.27.jar
# 执行成功会出现jobid
Pipeline has been submitted to cluster.
Job ID: b68bfad5753ae600eeb1efed17d957ff
Job Description: MySQL to Paimon Pipeline

4.来到flink工作台进行查询任务
image.png

5.在服务器上查看同步文件信息

 cd /opt/software/paimon/
 cd emp.db/
 ls
 #显示已经同步过来
order_details  orders  products  users

已经完成ODS层数据同步.

步骤三: DWD数据清洗宽表打通

1.打开streamPark,进行开始编写flink sql
image.png

SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.min-pause' = '10s';
-- 创建CATALOG
CREATE CATALOG paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = 'file:/opt/software/paimon'
);

-- 切换CATALOG
USE CATALOG paimon_catalog;
create DATABASE IF NOT EXISTS emp;
-- 切换database
use emp;

-- 创建dwd_user_orders表
CREATE TABLE IF NOT EXISTS dwd_user_orders (
order_id bigint,
user_id bigint,
user_name STRING,
order_date date,
total_amount decimal,
PRIMARY KEY (order_id) NOT ENFORCED
);

-- 创建dwd_orders_products_details表
CREATE TABLE IF NOT EXISTS dwd_orders_products_details (
order_detail_id bigint,
order_id bigint,
product_id bigint,
product_name STRING,
price decimal,
quantity bigint,
subtotal decimal,
PRIMARY KEY (order_detail_id) NOT ENFORCED
);

  

INSERT INTO
dwd_user_orders
SELECT
o.order_id,o.user_id,u.user_name,o.order_date,o.total_amount
FROM orders o join users u ON o.user_id=u.user_id;
INSERT INTO
dwd_orders_products_details
SELECT
d.order_detail_id,d.order_id,d.product_id,p.product_name,p.price,d.quantity,d.subtotal
FROM order_details d join products p ON p.product_id=d.product_id;

发布启动任务
image.png

flink-web-ui查看任务
image.png

步骤三:进行维度分析

创建DWS层进行多维度汇总聚合,还是在streamPark编写DWS层任务

统计维度指标:

  • 用户的总购买金额
  • 产品的销售数量
  • 订单的平均金额
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.min-pause' = '10s';
-- 创建CATALOG
CREATE CATALOG paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = 'file:/opt/software/paimon'
);

  

-- 切换CATALOG
USE CATALOG paimon_catalog;
create DATABASE IF NOT EXISTS emp;
-- 切换database
use emp;
-- 创建用户的总购买金额表
CREATE TABLE IF NOT EXISTS dws_user_total_amount (
user_id bigint,
user_name STRING,
total_spent decimal,
PRIMARY KEY (user_id) NOT ENFORCED
);

-- 创建产品的销售数量
CREATE TABLE IF NOT EXISTS dws_product_sales_quantity (
product_id bigint,
product_name STRING,
total_quantity BIGINT,
PRIMARY KEY (product_id) NOT ENFORCED
);

-- 创建订单的平均金额
CREATE TABLE IF NOT EXISTS dws_order_average_amount (
order_id bigint,
average_order_amount decimal,
PRIMARY KEY (order_id) NOT ENFORCED
);

-- 用户的总购买金额
INSERT INTO
dws_user_total_amount
SELECT user_id,user_name, sum(total_amount) AS total_spent
FROM dwd_user_orders
group by user_id,user_name;
-- 产品的销售数量
INSERT INTO
dws_product_sales_quantity
SELECT product_id,product_name,SUM(quantity) AS total_quantity
FROM dwd_orders_products_details
group by product_id,product_name;
-- 订单的平均金额
INSERT INTO
dws_order_average_amount
SELECT order_id,AVG(total_amount) AS average_order_amount
FROM dwd_user_orders
group by order_id;

发布启动任务
image.png

flink-web-ui查看任务
image.png

步骤四:ADS查看维度结果数据

这里要使用starRocks进行查询paimon catalog数据表.在starRock 中paimon catalog是一种外部catalog.可以直接进行查询数据.
保证starRock正常启动.安装方式参考:https://www.cnblogs.com/freeweb/p/18137023
DBeaver连接上starRocks
image.png

查询对应维度数据
image.png

-- 查询用户的总购买金额
SELECT * FROM paimon_catalog.emp.dws_user_total_amount;
-- 结果
user_id|user_name|total_spent|
-------+---------+-----------+
      1|张山       |        450|
      2|李四       |        200|
      3|刘博       |        250|
  
-- 查询产品的销售数量
SELECT * FROM paimon_catalog.emp.dws_product_sales_quantity;
-- 结果
product_id|product_name|total_quantity|
----------+------------+--------------+
         1|笔记本         |             3|
         2|手表          |             3|
         3|耳机          |             6|
-- 查询订单的平均金额
SELECT * FROM paimon_catalog.emp.dws_order_average_amount;
-- 结果
order_id|average_order_amount|
--------+--------------------+
       1|                 150|
       2|                 200|
       3|                 250|
       4|                 300|

步骤五: 演示数据实时更新

在mysql表进行修改数据查询维度表数据是否发生计算结果变更
添加一条人员信息,产品信息,订单信息,订单详情信息,看维度表数据是否发生变化

  1. 在mysql中添加下面数据
INSERT INTO users (user_id, user_name, email, registration_date) 
VALUES (4, '刘晓天', 'charlie@example.com', '2024-06-17');

INSERT INTO orders (order_id,user_id, order_date, total_amount) VALUES
(5,4, '2024-06-17', 1800000.00);

INSERT INTO products (product_id,product_name, price) VALUES
(4,'天启坦克', 1800000.00);

INSERT INTO order_details (order_id, product_id, quantity, subtotal) VALUES
(5, 4, 1, 1800000.00);

sleep 5s

  1. 查看维度分析结果,已经发生结果变化.

删除,修改mysql表同样会触发维度结果变化
image.png

image.png
image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1831568.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

日本新入管法通过:2027年起实施[育成就劳]制度,新制度更适合外国劳工在日本工作和生活!

最近,日本新入管法:新的育成就业制度预计将在2027年开始实施,而1993年开始的旧的技能实习制度将被废除。 新制度的主要内容 新制度的目的是解决日本国内的劳动力不足问题,确保有足够的劳动者。表示:“为了让日本成为…

jrt从量变到质变

又是一个加班的周末,上周把台式机代码和数据库环境弄好了,这周进行大数据测试,直接把标本、标本医嘱、报告、报告结果、药敏结果等数据插入到1亿的规模,跑了一天一夜插入了5000多万个标本,后面接着补剩下的到一亿。 演…

使用 Cheerio 和 Node.js 进行网络搜刮 2024

Web scraping 是一种强大的技术,用于从网站提取数据,广泛应用于数据分析、市场研究和内容聚合。截至2024年,利用 Cheerio 和 Node.js 进行 web scraping 仍然是一种流行且高效的方法。本文将深入探讨使用 Cheerio 和 Node.js 进行 web scrapi…

交易方法论

如何复盘,复盘哪些内容: 1复盘指数 2复盘板块 3复盘个股 4复盘涨停板 5跌停板 6自选股 1复盘新闻 2国家大势 3行业大势 4公司大事 5资金流向 6龙虎榜 板块强度标准 板块内至少有5只涨停板 板块连续资金流入超过3天 板块有5只以上走漂亮上升趋势 一次性关注方向不…

LangChain-ChatGLM本地搭建|报错合集(win10)

安装过程 1. 创建虚拟环境 conda create -n langchain-chatglm python3.10 conda activate langchain-chatglm2. 部署 langchain-ChatGLM git clone https://github.com/imClumsyPanda/langchain-ChatGLMpip3 install -r requirements.txt pip3 install -U gradio pip3 inst…

太速科技-FMC213V3-基于FMC兼容1.8V IO的Full Camera Link 输入子卡

FMC213V3-基于FMC兼容1.8V IO的Full Camera Link 输入子卡 一、板卡概述 该板卡为了考虑兼容1.8V电平IO,适配Virtex7,Kintex Ultrascale,Virtex ultrasacle FPGA而特制,如果要兼容原来的3.3V 也可以修改硬件参数。板卡支持1路…

【自动驾驶】ROS小车系统介绍

文章目录 小车组成轮式运动底盘的组成轮式运动底盘的分类轮式机器人的控制方式感知传感器ROS决策主控ROS介绍ROS的坐标系ROS的单位机器人电气连接变压模块运动底盘的电气连接ROS主控与传感器的电气连接运动底盘基本组成电池电机控制器与驱动器控制器与运动底盘状态数据&#xf…

记录第一次突发情况

项目场景: 这台云服务器主要是我学习在用,也不是很大,2核2g3M40G硬盘。 在这台服务器上,我主要使用了docker并且把所有的东西,都通过docker安装,比如MySQL,redis, elasticsearch。 …

视频合成渲染服务解决方案,数字人+PPT+视频云剪辑

在金融理财领域,一个生动、直观、专业的视频,往往能够在海量信息中脱颖而出,帮助客户更好地理解产品、把握市场动态。然而,传统的视频制作方式往往周期长、成本高、难以适应快速变化的市场需求。 美摄科技,作为行业领…

CANape使用问题记录

CANape使用问题记录 1、添加变量后无法开启测量 1、添加变量后无法开启测量 点击开启测量后,出现以下对话框: 解决方法: 添加新变量后,修改变量测量配置; 改为polling, 1000,即采用轮训的方法…

收入增长,再进一步丨用友BIP收入云大消费品行业收入管理联合解决方案正式发布

随着数智化时代的来临,消费品行业对于收款到收入侧的管理需求日益增强,对管理的精细度和时效性要求也越来越高。传统的收入管理模式已难以满足企业快速变化的市场需求。如何精准地预测收入、优化收入结构、提高收入管理质量,以及实现收入管理…

雪花算法和UUID

目录 雪花算法概念优点和不足优点:缺点:解决方案代码示例 UUID优点与不足优点不足 两种算法的比较应用场景区别 雪花算法 概念 雪花算法是一个分布式id生成算法,它生成的id一般情况下具有唯一性。由64位01数字组成,第一位是符号位,始终为0。…

Kubernetes集群中如何利用北极星因果指标设置正确的POD规格——CPU篇

在 Kubernetes 容量规划中,追求的是集群的稳定性和资源使用效率之间的平衡: 资源分配过多会造成浪费。 资源分配过少则会导致用户请求时延上升,影响集群的稳定性。 背景 公众号之前翻译了一篇 Sysdig 的文章,Kubernetes 容量规…

玩转nRF52840-DK开发套件(2)

介绍如何在Windows操作系统上使用Arm Keil MDK。Arm Keil MDK附带Arm C/C编译器和Vision集成开发环境(IDE),以及所有nRF5SDK的版本提供了现成的Keil项目。 1. 安装最新的 nRF5 SDK. 链接:nRF5 SDK - nordicsemi.com 点击Download&…

泰迪智能科技董事长张良均荣获“2024年广东软件风云榜新锐企业家”

6月13日,在广州举办2024年粤港澳软件产业高质量发展大会、第十二届粤港云计算大会暨第七届粤港澳ICT大会。大会以“培育信息技术新质生产力,打造粤港澳发展创新引擎”为主题,研讨基础软件、云计算、人工智能等新一代技术的新态势、新应用&…

14.编写自动化测试(上)

标题 一、如何编写测试1.1 一些概念1.2 测试函数剖析1.3 使用assert!宏检查结果1.4 使用assert_eq!和assert_ne!宏来测试相等1&#xff09; assert_eq!2&#xff09; assert_ne! 1.5 使用 should_panic 检查 panic 二、将 Result<T, E> 用于测试 一、如何编写测试 1.1 一…

解决外网404:清除DNS缓存并配置host主机使用知名公共DNS服务

在 Windows 上清除/刷新 DNS 缓存 对于所有Windows版本&#xff0c;清除DNS缓存的过程都是相同的。你需要使用管理员权限打开命令提示符并运行ipconfig /flushdns。 浏览器清除DNS缓存 大多数现代的Web浏览器都有一个内置的DNS客户端&#xff0c;以防止每次访问该网站时…

vscode字符多行自动增长插件。

多行字符自动增长插件CharAutoIncre 当你使用shiftalt选中了多行,并输入了’1’,这时这几行都变成了’1’. 这时你可以选中&#xff08;shift左键&#xff09;为’1’的这几行, 接下来按下shiftaltq此时’1’变为了’12345’自增长的样式。 同时本插件支持字符’a-z,A-Z’。 目…

高考没考好焦虑怎么选计算机专业!一篇告诉你,推荐三个风口专业!想学计算机怎么选大学专业

高考成绩揭晓&#xff0c;几家欢喜几家愁。对于那些未能如愿考取理想分数的同学来说&#xff0c;未来似乎蒙上了一层阴影。尤其是在计算机专业如此热门的今天&#xff0c;低分考生是否还有机会在这个领域找到一席之地&#xff1f;本文将为你揭秘&#xff0c;即使高考成绩不理想…

Nature Microbiology丨VITA单细菌转录组测序技术助力深入解析奶牛瘤胃微生物组功能异质性

瘤胃微生物组一直以来都是研究相对不足但又极其复杂的微生物生态系统之一。瘤胃微生物能够有效降解植物纤维&#xff0c;将其转化为高质量的蛋白质产品&#xff0c;在这一过程中&#xff0c;由于微生物强烈的发酵&#xff0c;还会产生大量气体&#xff0c;其成分主要包括二氧化…