StarRocks Lakehouse 快速入门——Apache Paimon

news2024/9/20 16:47:59

StarRocks Lakehouse 快速入门指南为您提供了湖仓技术概览,旨在帮助您迅速掌握其核心特性、独特优势和应用场景。本指南将指导您如何高效地利用 StarRocks 构建解决方案。文章末尾,我们集合了来自阿里云、饿了么、喜马拉雅和同程旅行等行业领导者在 StarRocks x Paimon Streaming Lakehouse 活动中的实战经验分享。通过这些真实案例,您可以更直观地了解如何在实际应用中发挥 StarRocks Lakehouse 的最佳效用。希望这些实践能启发您在自己的项目中应用 StarRocks Lakehouse,发掘其潜力,实现数据价值最大化。

Apache Paimon 介绍

Apache Paimon (后简称 Paimon)起源于 Apache Flink (后简称 Flink)的一个子项目,起初它只是 Flink 内置的 Table Store 的一个格式,经过了几年的发展后,在 2024 年成功从 Apache 软件基金会(ASF)孵化器毕业,成为正式的顶级项目。Paimon 围绕具有 ACID 特性的数据湖存储构建,支持 DML 操作, 可以完整地支持批处理和流处理。它创新性地将 LSM Tree 与湖格式相结合,具有高效的实时更新能力与 compaction 效率。

Paimon 架构与关键特性

架构设计

关键特性

Apache Paimon 是一个高性能的数据湖存储系统,旨在支持批处理和流处理。其主要特点包括:

  1. 统一的批处理和流处理 :Paimon 提供了一个单一的数据存储格式,可用于批处理和流处理,确保在不同处理范式之间实现无缝的数据分析。

  2. Schema Evolution :允许在不需要完全重写数据的情况下进行数据模式的变化,这对于处理不断变化的数据需求至关重要。

  3. ACID 事务 :Paimon 通过提供 ACID(原子性、一致性、隔离性、持久性)事务,确保数据的一致性和可靠性,这对于在复杂数据处理流水线中维护数据完整性至关重要。

  4. Time Travel :允许用户访问数据的历史版本,方便进行数据审计、调试和历史分析。

  5. 与大数据生态系统的集成 :Paimon 无缝集成了流行的大数据处理框架,如 Apache Flink、Apache Spark 和 Apache Hive,便于采用和互操作性。

Apache Paimon 的优势

Paimon 的优势主要在以下四大方面:

  1. 高效的实时更新 :Paimon 提供了灵活的实时数据流更新能力,支持最低一分钟内的数据时效性。它能够处理部分列更新、聚合更新,并生成变更日志,为下游系统提供实时数据流。

  2. 优化的流写流读 :作为源自 Flink 内置格式的系统,Paimon 与 Flink 高度兼容,支持高效的流式读写操作。同时,Paimon 也与 Spark 紧密集成,成为 Spark 批处理计算的理想选择。

  3. 强大的查询性能 :Paimon 支持高效的 OLAP 查询,具备点查能力和丰富的索引功能。社区正积极推进索引技术的发展,如 bitmap 索引和布隆过滤器,以进一步提升查询效率。

  4. 大规模的离线处理 :Paimon 能够处理大规模的离线数据集,并对 Append 表提供全面支持,满足超大规模数据处理的需求。

Paimon 使用场景

  1. 数据湖存储:作为数据湖的存储底座,管理各类数据,包括结构化、半结构化和非结构化数据。

  2. 实时数据处理:将实时数据摄入到数据湖中,支持实时数据的写入、更新和查询,构建实时数据仓库,满足实时性业务需求,例如实时监控、实时报表等。

  3. 数据库入湖:提升 ods 层时效性,替代基于传统方式(如 Hive 实时同步、凌晨合并等)的数据入库。

  4. 构建下游数据层:利用湖的增量能力构建下游的 dwd 层,节省计算资源。

  5. 局部更新:支持局部数据的更新,适用于需要频繁更新部分数据的场景,例如构建实时统计视图和报表、宽表构建等。

  6. 流读/增量读:通过增量流读的方式读取数据,支持实时数仓的建设,极大地缩减数据可见时间,提升数据的实时性,同时降低底层数仓的压力。

Pamion 表模型

Primary Key

主键表,可以支持新增、更新和删除表中的数据。如果将多条具有相同主键的数据写入 Paimon 主键表,将根据数据合并机制对数据进行合并。主键表适用于需要进行数据更新和删除操作,并且对数据一致性要求较高的场景。

Append Table

如果表没有定义主键,默认情况下它就是一个Append Table,其实可以理解为StarRocks中的明细表,写入多条一样的数据不会覆盖,会保留多条。这种类型的表适用于不需要流式更新的用例(如日志数据同步)。

Append Queue

其实可以把Append Queue看作是一种特殊的Append Table。同一个桶中的每条记录都是严格排序的,流式读取会完全按照写入的顺序将记录传输到下游。有点类似kafka中的partition,单分区内严格有序。应用在数据管道场景、状态最综合监控场景、时间流处理场景和金融交易场景。

Time Travel

基于快照文件(snapshot)实现。消费者可以通过不同的快照文件,查询在该快照文件产生时刻的Paimon表中的具体数据。

Compaction 策略

Paimon 目前采用的 compaction 策略类似于 RocksDB 的 universal compaction。默认有两种策略:

  • leveled compaction,RocksDB 的默认 compaction 策略

  • Size tiered

这里跟 StarRocks 目前采用的 compaction 比较类似,都是 Size Tiered Compaction。基本的思路就是尽可能让数据量相近的 rowset 执行 compaction,从而避免 compaction 带来的写放大。

StarRocks x Paimon 极速湖仓分析

当前 StarRocks x Paimon 的能力主要包括:

  1. 支持各类存储系统,包括 HDFS 以及对象存储 S3/OSS/OSS-HDFS

  2. 支持 HMS 以及阿里云 DLF 元数据管理系统

  3. 支持 Paimon 的 Primary Key 和 Append Only 表类型查询

  4. 支持 Paimon 系统表的查询,常见例如 Read Optimized 表,snapshots 表等

  5. 支持 Paimon 表和其他类型数据湖格式的关联查询

  6. 支持 Paimon 表和 StarRocks 内表的关联查询

  7. 支持 Data Cache 加速查询

  8. 支持基于 Paimon 表构建物化视图实现透明加速,查询改写等

  9. 支持Paimon表开启Delete Vector加速查询

对于 Primary Key 表类型,Paimon 社区对 Read Optimized 系统表做了完善的性能优化,可以与 Append Only 表一样充分利用 Native reader 的能力,得到直接查询 Paimon数据的最佳性能。直接查询 Primary Key 表的情况下,若 Primary Key 表里包含没有做 Compaction 的数据,StarRocks 里会通过 JNI 调用 Java 读取这部分内容,性能会有一定的损耗。即使是这种情况,在我们收到用户反馈里,平均还是会有相对 Trino 达到3倍以上的性能提升。

Quick Start

Pamion 部署

使用的组件版本

下载 Flink

以下链接是阿里云机器加速的域名,如果是非阿里云机器,可替换为https://mirrors.aliyun.com

wget "http://mirrors.cloud.aliyuncs.com/apache/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz"

解压

tar -xf flink-1.19.1-bin-scala_2.12.tgz

下载 Paimon 和相关依赖包

wget "https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.19/0.8.2/paimon-flink-1.19-0.8.2.jar"

#如果使用对象存储,需要下载下面的包
wget "https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-oss/0.8.2/paimon-oss-0.8.2.jar"

下载 flink-hadoop 依赖包

wget "https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-10.0/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar"

如果没有如上 jar 包会报错

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration

下载 flink-sql-connector-kafka 依赖包

wget "https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.18/flink-sql-connector-kafka-3.2.0-1.18.jar"

下载 flink-connector-starrocks 依赖包

wget "https://github.com/StarRocks/starrocks-connector-for-apache-flink/releases/download/v1.2.9/flink-connector-starrocks-1.2.9_flink-1.18.jar"

拷贝上面下载的包到 flink/lib 下

cp paimon-flink-1.19-0.8.2.jar paimon-oss-0.8.2.jar flink-shaded-hadoop-2-uber-2.7.5-10.0.jar  flink-connector-starrocks-1.2.9_flink-1.18.jar flink-sql-connector-kafka-3.2.0-1.18.jar flink-1.19.1/lib/

启动 flink 集群

cd flink-1.19.1

#修改flink-1.19.1/conf/config.yaml中numberOfTaskSlots为10,允许同时执行的任务
numberOfTaskSlots: 10

./bin/start-cluster.sh
Kafka 部署

下载安装包

以下链接是阿里云机器加速的域名,如果是非阿里云机器,可替换为https://mirrors.aliyun.com

wget "http://mirrors.cloud.aliyuncs.com/apache/kafka/3.7.0/kafka_2.12-3.7.0.tgz"

解压

tar -xf kafka_2.12-3.7.0.tgz

启动 kafka

./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties

./bin/kafka-server-start.sh -daemon ./config/server.properties
测试 Demo

本文测试的场景,订单数据来源于 Kafka,用户数据来源于 MySQL,最终实现在 Paimon 中存储5分钟时间窗口的汇总结果。 这里为了简化测试demo,下文中用StarRocks替代了MySQL。

创建维度表并写入测试数据

CREATE TABLE `users` (
  `user_id` bigint(20) NOT NULL COMMENT "",
  `region` varchar(65533) NULL COMMENT ""
) ENGINE=OLAP 
PRIMARY KEY(`user_id`)
DISTRIBUTED BY HASH(`user_id`);

insert into users values (1,'BeiJing'),(2,'TianJin'),(3,'XiAn'),(4,'ShenZhen'),(5,'BeiJing'),(6,'BeiJing'),(7,'ShenZhen'),(8,'ShenZhen');

Kafka 中创建事实表并写入测试数据

./bin/kafka-topics.sh --create --topic order-details --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

测试数据生成

需要 pip install kafka

from kafka import KafkaProducer
import time
import json
import random
from datetime import datetime, timedelta

start_time = datetime(2024, 7, 24, 15, 0, 0)
end_time = datetime(2024, 7, 24, 18, 0, 0)

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

while True:
    order_id = random.randint(1, 10000)
    user_id = random.randint(1, 8)
    order_amount = round(random.uniform(10.0, 1000.0), 2)
    random_time = start_time + timedelta(seconds=random.randint(0, 3600))

    data = {
        "order_id": order_id,
        "user_id": user_id,
        "order_amount": order_amount,
        "order_time": random_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
    }

    producer.send('order-details', value=json.dumps(data).encode('utf-8'))
    time.sleep(3)

producer.close()

创建 paimon 表并写入测试数据

./bin/sql-client.sh

CREATE CATALOG my_catalog_oss WITH (
    'type' = 'paimon',
    'warehouse' = 'oss://starrocks-public/dba/jingdan/paimon',
    'fs.oss.endpoint' = 'oss-cn-zhangjiakou-internal.aliyuncs.com',
    'fs.oss.accessKeyId' = 'ak',
    'fs.oss.accessKeySecret' = 'sk'
);

use catalog my_catalog_oss;

CREATE TABLE hourly_regional_sales (
  event_time TIMESTAMP(3),
  region STRING,
  total_sales DECIMAL(10, 2)
);


use catalog default_catalog;


 CREATE TABLE orders_kafka (
  order_id BIGINT,
  user_id BIGINT,
  order_amount DECIMAL(10, 2),
  order_time TIMESTAMP(3),
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'order-details',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'order-consumer ',
  'format' = 'json',
  'scan.startup.mode' = 'latest-offset'
);


CREATE TABLE users_starrocks (
  user_id BIGINT,
  region STRING
) WITH (
  'connector'='starrocks',
  'scan-url'='172.26.92.154:8030',
  'jdbc-url'='jdbc:mysql://172.26.92.154:9030',
  'username'='root',
  'password'='xxx',
  'database-name'='jd',
  'table-name'='users'
);


SET 'execution.checkpointing.interval' = '10 s';


INSERT INTO my_catalog_oss.`default`.hourly_regional_sales
SELECT
  TUMBLE_START(order_time, INTERVAL '5' MINUTE) AS event_time,
  u.region,
  CAST(SUM(o.order_amount) AS DECIMAL(10, 2)) AS total_sales
FROM default_catalog.`default_database`.orders_kafka AS o
JOIN default_catalog.`default_database`.users_starrocks AS u ON o.user_id = u.user_id
GROUP BY TUMBLE(order_time, INTERVAL '5' MINUTE), u.region;

查询数据

select * from my_catalog_oss.`default`.hourly_regional_sale;

查询增量数据(Batch Time Travel)

SET 'execution.runtime-mode' = 'batch';
SELECT * FROM hourly_regional_sales /*+ OPTIONS('scan.snapshot-id' = '2') */;

SET 'execution.runtime-mode' = 'batch';
SELECT * FROM hourly_regional_sales /*+ OPTIONS('incremental-between' = '5,10') */;

创建 StarRocks Paimon Catalog
CREATE EXTERNAL CATALOG paimon_catalog_oss
PROPERTIES
(
    "type" = "paimon",
    "paimon.catalog.type" = "filesystem",
    "paimon.catalog.warehouse" = "oss://starrocks-public/dba/jingdan/paimon",
    "aliyun.oss.access_key" = "ak",
    "aliyun.oss.secret_key" = "sk",
    "aliyun.oss.endpoint" = "oss-cn-zhangjiakou-internal.aliyuncs.com"
);

set catalog paimon_catalog_oss;
use `default`;
select * from hourly_regional_sales;

StarRocks 侧可以实时的看到汇总表的变化

用户案例:

Paimon+StarRocks 在同程旅行的湖仓构建方案

饿了么基于 Paimon+StarRocks 的实时湖仓探索

喜马拉雅基于 Paimon+StarRocks 构建直播实时湖仓

使用 StarRocks x Paimon 创建 Streaming Lakehouse

延伸阅读:

StarRocks x Paimon 构建极速实时湖仓分析架构实践

Paimon+StarRocks 湖仓一体数据分析方案

参考:

https://paimon.apache.org/docs/0.8/flink/quick-start/

https://github.com/facebook/rocksdb/wiki/Universal-Compaction

https://mp.weixin.qq.com/s/7n8787v8oVyn5RHoGwgszQ

https://mp.weixin.qq.com/s/Gh5rrtU4BxsDYvgvbwrR5A

https://mp.weixin.qq.com/s/PiyZgI7DYgAtLh17xlbz8A

更多交流,联系我们:StarRocks

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2044590.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【私有云场景案例分享①】高效的集群管理能力

一、前言 设备的管理对企业至关重要,会影响生产效率、成本控制和竞争力。然而,企业在设备管理上面临设备数量多、设备分布广、维护成本高等挑战。DeviceKeeper设备管理网站作为解决方案,可以通过远程设备监控、远程设备维护和包体共享等功能…

制造业MES系统源码,前端框架:vue.js,后端框架:springboot 功能模块包括:生产计划管理、物料管理、工艺管理、设备管理、

MES系统功能模块解析,MES系统源码 MES系统是一种用于协调和优化制造过程的信息管理系统,可以帮助企业实现生产计划的顺利执行,并提供全面的生产监控和数据分析功能。 MES系统常见的功能模块包括生产计划管理、物料管理、工艺管理、设备管理…

如何把Phalcon 集成到PhpStorm里面

一 背景 按照上一篇文章里面写的Phalcon 创建项目过程中的一些坑, 最终我们在终端可以基于Phalcon命令创建对应的开发项目。但在这个过程中,存在一个问题:那就是写代码的时候,发现Phalcon对应的依赖提示都没有,如下: 从上面这个截图来看,就能发现,Phalcon的啥…

音频剪辑在线工具哪个好?分享5款简单易上手的音频剪辑工具

暑期的泰山人山人海,游客们纷纷涌向这座名胜古迹。站在巍峨的泰山之巅,望着脚下绵延的群山和眼前无边的云海,人们不禁会想要记录下这一刻的声音。 但泰山的风声、游客的喧哗声、还有自然与人文的杂音交织在一起,要想将这声音中的…

【人工智能】 使用线性回归预测波士顿房价 paddlepaddle 框架 飞桨

一、简要介绍 经典的线性回归模型主要用来预测一些存在着线性关系的数据集。 回归模型可以理解为:存在一个点集,用一条曲线去拟合它分布的过程。如果拟合曲线是一条直线,则称为线性回归。 如果是一条二次曲线,则被称为二次回归。 线性回归是回归模型中最简单的一种。 本…

机房监控系统,全面监控机房动力环境实时报警@卓振思众

在现代企业运营中,机房作为计算机系统的核心支撑平台,承载着关键数据和应用的稳定运行。因此,保障机房环境的安全和设备的正常运行至关重要。【卓振思众】机房监控系统,作为一种先进的智能管理工具,正是为了实现这一目…

启发式算法之模拟退火算法

文章目录 1. 模拟退火算法概述1.1 算法起源与发展1.2 算法基本原理 2. 算法实现步骤2.1 初始化过程2.2 迭代与降温策略 3. 模拟退火算法的优化策略3.1 冷却进度表的设计3.2 参数调整与策略 4. 模拟退火算法的应用领域4.1 组合优化问题4.1.1 旅行商问题(TSP&#xff…

Halcon阈值处理的几种分割方法

Halcon阈值处理的几种分割方法 文章目录 Halcon阈值处理的几种分割方法1. 全局阈值2. 基于直方图的自动阈值分割方法3. 自动全局阈值分割方法4. 局部阈值分割方法5. var_threshold算子6 . char_threshold 算子7. dual_threshold算子 在场景中选择物体或特征是图像测量或识别的重…

982200419控制燃烧器可面价

982200419控制燃烧器可面价 982200419控制燃烧器可面价 982200419控制燃烧器可面价 982200419控制燃烧器接线图 982200419控制燃烧器说明书 982200419控制燃烧器线路图 982200419燃烧机也叫燃烧器,按照燃料可分为燃油燃烧机和燃气燃烧机、生物质燃烧机&#x…

胡玫导演《红楼梦之金玉良缘》今日公映 李越版“琏二爷”实力不凡

今日,由胡玫执导,何燕江编剧,林鹏、卢燕、边程、张淼怡、李越等主演的电影《红楼梦之金玉良缘》全国公映。电影改编自曹雪芹不朽名著《红楼梦》,以“宝、黛、钗”三人的情缘纠葛入手,从“木石前盟”看“金玉良缘”&…

YOLOv8更换主干网络成MobileNetV3

目录 1. 添加主干网络模块 ​编辑1.1 在init.py中添加模块名 1.2 主体代码中添加调用语句块 2. 配置yaml文件 3. 修改成功 1. 添加主干网络模块 1.1 在init.py中添加模块名 1.2 主体代码中添加调用语句块 2. 配置yaml文件 3. 修改成功 自己随便找一个程序跑一跑验证…

Idea使用Maven下载源码

如题,Idea中使用Maven下载源码提示Cannot download sources for xxx。原因是在对应的镜像站没有找到源码包。笔者尝试下载spring-web-4.3.0.RELEASE的源码包时提示如此,原因是配置的阿里云镜像站没有上传对应的源码包,配置了华为镜像站后就可…

Linux(离线)内网部署 thingsboard-gateway 网关实战modbus通讯

前面我们讲解了在内网上如何部署Thingsboard,部署之后领导又要求部署上网关,然后通过modbus来监听设备,废话不多说,直接上干活。 第一步:下载thingsboard-gateway安装包 在Thingsboard官网中给了我们一个在线安装的地…

考勤系统微信小程序的设计与实现---附源码29756

目 录 1 绪论 1.1研究背景 1.2研究意义 1.3微信小程序的介绍 2考勤系统微信小程序系统分析 2.1 可行性分析 2.1.1 技术可行性分析 2.1.2 经济可行性分析 2.1.3 法律可行性分析 2.2 系统功能分析 2.2.1 功能性分析 2.2.2 非功能性分析 2.3 系统用例分析 2.4本章小…

计算机的错误计算(六十)

摘要 用另一种方法计算计算机的错误计算(五十五)中案例:先使自变量与 取余,再计算取余后的余弦值,这时,得到了不同的输出。因此,即使不清楚正确结果,Python 与 Visual Studio 也各自…

TypeError when using openai-api

题意:使用 openai-api 时出现 TypeError(类型错误) 问题背景: Using the code below and openai version 0.28.0 i get an error which i cant resolve: 使用以下代码和 openai 版本 0.28.0 时,我遇到了一个无法解决…

软中断、Tasklet 与工作队列的机制分析

文章目录 1 软中断(Softirq)1.1 概念与背景1.2 特点与执行1.3 触发与执行方式1.4 实现与使用1.5 软中断的替代机制 2 Tasklet2.1 概念与特点 3 Bottom Halves(BH)3.1 概念与历史 4 工作队列(Workqueue)4.1 …

ElasticSearch集群搭建与安全认证

文章目录 核心概念集群节点分片CAT API查看集群信息 搭建三节点ES集群安装ES安装Cerebro客户端安装kibana ES集群安全认证ES敏感信息泄露的原因免费的方案集群内部安全通信开启并配置X-Pack的认证 核心概念 集群 一个集群可以有一个或者多个节点不同的集群通过不同的集群名来…

百度百科创建收费吗

百度百科作为一个开放的网络百科全书,旨在为用户提供丰富的知识信息。根据百度百科的官方声明,创建百度百科词条是完全免费的。任何人都可以自由编辑、修改和添加条目。百度百科的创建和编辑过程是由用户自发进行的,不存在官方收费的情况。 百…

【流程引擎】springboot完美集成activiti工作流方案

前言 activiti工作流引擎项目,企业erp、oa、hr、crm等企事业办公系统轻松落地,一套完整并且实际运用在多套项目中的案例,满足日常业务流程审批需求。 项目源码配套文档获取:本文末个人名片直接获取。 一、项目形式 springboot…