paimon的四种changelog模式(1)-input模式

news2025/1/15 10:31:16

环境创建

 CREATE CATALOG fs_catalog WITH (
    'type'='paimon',
    'warehouse'='file:/data/soft/paimon/catalog'
);

USE CATALOG fs_catalog;

drop table if exists t_changelog_input;
 
 CREATE TABLE t_changelog_input (
      age BIGINT,
      money BIGINT,
      hh STRING,
      PRIMARY KEY (hh) NOT ENFORCED
)WITH (
    'merge-engine' = 'deduplicate',
    'changelog-producer' = 'lookup'
);

paimon的snapshot和checkpoint之间的关系

  • 一次snapshot会产生一个data文件
  • 一次checkpoint会产生1-2个snapshot文件,要看这次checkpoint是否触发compaction,触发了就是2个data文件(一个是合并后的数据,一个本次checkpoint写入数据),否则只有一个(本次checkpoint写入数据)
  • 流式写入根据checkpoint间隔,定期进行checkpoint
  • 批写(手动执行sql脚本)每一个sql会立即生成一次checkpoint效果

执行一次插入操作

insert into t_changelog_input values(10,1000,'1');

root@wsl01:/data/soft/paimon/catalog/default.db/t_changelog_input/bucket-0# ll
total 8
-rw-r–r-- 1 root root 1217 Nov 27 13:40 changelog-aa32a677-6df6-40fa-83bd-3b7a6d38c84a-0.parquet
-rw-r–r-- 1 root root 1217 Nov 27 13:40 data-aa32a677-6df6-40fa-83bd-3b7a6d38c84a-1.parquet

changelog文件
在这里插入图片描述

data文件
在这里插入图片描述

相同主键,再执行一次插入操作

insert into t_changelog_input values(10,2000,'1');

root@wsl01:/data/soft/paimon/catalog/default.db/t_changelog_input/bucket-0# ll
total 16
-rw-r–r-- 1 root root 1217 Nov 27 13:40 changelog-aa32a677-6df6-40fa-83bd-3b7a6d38c84a-0.parquet
-rw-r–r-- 1 root root 1217 Nov 27 13:43 changelog-abee540c-f29d-4d18-8578-2d173d43452b-0.parquet
-rw-r–r-- 1 root root 1217 Nov 27 13:40 data-aa32a677-6df6-40fa-83bd-3b7a6d38c84a-1.parquet
-rw-r–r-- 1 root root 1217 Nov 27 13:43 data-abee540c-f29d-4d18-8578-2d173d43452b-1.parquet

changelog文件
在这里插入图片描述
data文件
在这里插入图片描述

sqlclient 流式查询

-- Flink SQL
SET 'execution.runtime-mode' = 'streaming';

--设置检查点的间隔为10秒
SET 'execution.checkpointing.interval'='10 s';
set parallelism.default=1;

-- 使用changelog,展示op,操作类型
SET 'sql-client.execution.result-mode'='changelog';

-- 从当前快照开始,读取变化情况 
SELECT * FROM t_changelog_input;

在这里插入图片描述

-- 从ID=1的快照开始,读取变化情况 
SELECT * FROM t_changelog_input  /*+ OPTIONS('scan.snapshot-id' = '1') */;

在这里插入图片描述

sqlclient 批查询

Flink SQL> SET 'sql-client.execution.result-mode'='tableau';
[INFO] Execute statement succeed.

Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeed.

Flink SQL> SELECT * FROM t_changelog_input;
+-----+-------+----+
| age | money | hh |
+-----+-------+----+
|  10 |  2000 |  1 |
+-----+-------+----+
1 row in set

Flink SQL> SELECT * FROM t_changelog_input  /*+ OPTIONS('scan.snapshot-id' = '1') */;
+-----+-------+----+
| age | money | hh |
+-----+-------+----+
|  10 |  1000 |  1 |
+-----+-------+----+
1 row in set

Flink SQL> SELECT * FROM t_changelog_input  /*+ OPTIONS('scan.snapshot-id' = '2') */;
+-----+-------+----+
| age | money | hh |
+-----+-------+----+
|  10 |  2000 |  1 |
+-----+-------+----+
1 row in set

执行cdc采集

使用方法
https://paimon.apache.org/docs/0.9/flink/cdc-ingestion/mysql-cdc/

适配情况
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/overview/

MYSQL数据源
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/

下载地址
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.1.0/flink-sql-connector-mysql-cdc-3.1.0.jar

bin/flink run \
lib/paimon-flink-action-0.9.0.jar \
mysql_sync_table \
--warehouse 'file:/data/soft/paimon/catalog' \
--database 'default' \
--table 't_changelog_input' \
--mysql_conf hostname=wsl \
--mysql_conf username=root \
--mysql_conf password=root \
--mysql_conf database-name='test' \
--mysql_conf table-name='t_changelog_input' \
--table_conf changelog-producer=input \
--table_conf sink.parallelism=1 

mysql新增一条数据,然后再修改

changelog文件,新增数据时,和data数据一样。
修改mysql数据时,paimon同步到数据后发现,changelog和data不一样了,这次是两条,而data只有一条

data文件
在这里插入图片描述
changelog文件
在这里插入图片描述
流式消费的结果也是正确的
在这里插入图片描述

结论

  • changelog=input的情况下,一次checkpoint产生一个data文件的同时,也会产生一个changelog文件
  • changelog文件内容和data文件内容完全一致
  • input 情况下,如果你的操作不完整,那么流式读取的结果也是不对的
    • 上述操作insert2次相同主键,按照主键表的逻辑,应该是会出现-D +I 或者 -U +U 的场景,但是由于input模式,不会额外的处理changelog,你insert两次,我的changelog就写两次insert,你流式读取,那我就重放我的changelog,你写的changelog不对,流处理结果也就不对了,要想对,那就需要先insert,然后delete,再insert,或者执行update操作,总之不能利用主键表根据主键更新的特性来更新数据
    • 但是批处理的结果是正确的,因为批处理不管changelog
  • changelog=input就是为流处理设计的,但是数据处理操作必须要标准,必须要有-U +U -D +I的操作,而CDC任务接到的数据就是标准的,因此他两个一般是绑定的,cdc采集任务的表,如果需要流处理消费,最好使用changelog=input

应用场景

  • 用于cdc采集的表
  • and
  • 后期要进行流式处理的表

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2250865.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【趣味】斗破苍穹修炼文字游戏HTML,CSS,JS

目录 图片展示 游戏功能 扩展功能 完整代码 实现一个简单的斗破苍穹修炼文字游戏,你可以使用HTML、CSS和JavaScript结合来构建游戏的界面和逻辑。以下是一个简化版的游戏框架示例,其中包含玩家修炼的过程、增加修炼进度和显示经验值的基本功能。 图片…

一款现代化的轻量级跨平台Redis桌面客户端

Tiny RDM‌ 是一款现代化的轻量级跨平台Redis桌面客户端,专为开发和运维人员设计,旨在提供便捷、高效的Redis操作体验。它支持macOS、Windows和Linux操作系统,安装包大小约为10MB,具有广泛的兼容性和便携性‌。 功能特性 ‌轻量级…

【大数据学习 | Spark调优篇】Spark之JVM调优

1. Java虚拟机垃圾回收调优的背景 如果在持久化RDD的时候,持久化了大量的数据,那么Java虚拟机的垃圾回收就可能成为一个性能瓶颈。因为Java虚拟机会定期进行垃圾回收,此时就会追踪所有的java对象,并且在垃圾回收时,找…

《使用Python进行数据挖掘:理论、应用与案例研究》

嘿,今天我要给你们介绍一本使用Python进行数据挖掘的好书。这本书是由吴迪博士撰写的,他是雷曼学院商学院的助理教授,也是数据科学的实战派。 在这个时代,数据多得让人眼花缭乱,要从中找出有用的信息,那可不…

C++之C++11新特性(三)--- 智能指针

目录 一、智能指针 1.1 为什么需要智能指针 1.2 内存泄漏 1.2.1 内存泄漏的基本概念 1.2.2 内存泄漏的分类 1.2.3 如何避免内存泄漏 1.3 智能指针的使用及其原理 1.3.1 RAII 1.3.2 智能指针的基本原理 1.3.3 auto_ptr 1.3.4 unique_ptr 1.3.5 shared_ptr 1.3.6 sha…

Flink学习连载文章8--时间语义

Time的分类 (时间语义) EventTime:事件(数据)时间,是事件/数据真真正正发生时/产生时的时间 IngestionTime:摄入时间,是事件/数据到达流处理系统的时间 ProcessingTime:处理时间,是事件/数据被处理/计算时的系统的时间 EventTime的重要性 假设,你正在去往地下停…

【Docker】部署nginx

docker部署nginx docker部署nginx镜像加速器1、拉取nginx镜像2、创建nginx容器3、浏览器访问 docker部署nginx 镜像加速器 备注:阿里云镜像加速地址 https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors可用的镜像源: https://https://reg…

github webhooks 实现网站自动更新

本文目录 Github Webhooks 介绍Webhooks 工作原理配置与验证应用云服务器通过 Webhook 自动部署网站实现复制私钥编写 webhook 接口Github 仓库配置 webhook以服务的形式运行 app.py Github Webhooks 介绍 Webhooks是GitHub提供的一种通知方式,当GitHub上发生特定事…

Rust SQLx CLI 同步迁移数据库

上文我们介绍了SQLx及SQLite,并介绍了如何使用代码同步迁移数据库。本文介绍Sqlx cli 命令行工具,介绍如何安装、使用,利用其提供的命令实现数据表同步迁移。Java生态中有flyway, sqlx cli 功能类似,利用命令行工具可以和其他语言…

论文笔记-WWW2024-ClickPrompt

论文笔记-WWW2024-ClickPrompt: CTR Models are Strong Prompt Generators for Adapting Language Models to CTR Prediction ClickPrompt: CTR模型是大模型适配CTR预测任务的强大提示生成器摘要1.引言2.预备知识2.1传统CTR预测2.2基于PLM的CTR预测 3.方法3.1概述3.2模态转换3.…

解析客服知识库搭建的五个必要性

在当今竞争激烈的商业环境中,客服知识库的搭建已成为企业提升服务质量、优化客户体验的重要手段。一个完善的客服知识库不仅能帮助企业高效管理客户服务流程,还能显著提升客户满意度和忠诚度。以下是搭建客服知识库的五个必要性: 1. 提升服务…

css—轮播图实现

一、背景 最近和朋友在一起讨论的时候,我们提出了这样的一个提问,难道轮播图的效果只能通过js来实现吗?经过我们的一系列的争论,发现了这是可以通过纯css来实现这一效果的,CSS轮播图也是一种常见的网页展示方式&#x…

40分钟学 Go 语言高并发:【实战课程】工作池(Worker Pool)实现

工作池(Worker Pool)实战实现 一、知识要点概述 模块核心功能实现难点重要程度池化设计管理协程生命周期并发安全、资源控制⭐⭐⭐⭐⭐动态扩缩容根据负载调整池大小平滑扩缩、性能优化⭐⭐⭐⭐任务分发合理分配任务到worker负载均衡、任务优先级⭐⭐⭐…

深度学习3:数据预处理使用Pandas与PyTorch的实践

文章目录 导读一、主题与提纲1.1. 读取数据集1.2. 处理缺失值1.3. 转换为张量格式二、结论本文是经过严格查阅相关权威文献和资料,形成的专业的可靠的内容。全文数据都有据可依,可回溯。特别申明:数据和资料已获得授权。本文内容,不涉及任何偏颇观点,用中立态度客观事实描…

004 MATLAB数值微积分

01 函数的极值点 求解一元函数在区间(x1,x2)中极小值点: xfminbnd(fun,x1,x2)求解初始向量为x0的多元函数极小值点x和对应的极值y [x,y]fminsearch(fun,x0)02 微积分 1.数值微分: 一次微分: diff(x) 若x是一个向量,则返回[x(…

重塑用户体验!快手电商智能巡检平台的实践与探索

导读:随着科技的飞速发展,人工智能(AI)已经成为推动各行各业创新的重要力量。特别是在用户体验方面,AI 技术的应用不仅解决了许多传统问题,还带来了全新的交互方式和更高的用户满意度。本文将从快手电商B端…

C# 结构体

文章目录 前言一、结构体的定义与基本使用(一)定义结构体(二)结构体的使用示例 二、C# 结构的特点(一)丰富的成员类型(二)构造函数相关限制与特性(三)继承方面…

实现Linux平台自定义协议族

一 简介 我们常常在Linux系统中编写socket接收TCP/UDP协议数据,大家有没有想过它怎么实现的,如果我们要实现socket接收自定义的协议数据又该怎么做呢?带着这个疑问,我们一起往下看吧~~ 二 Linux内核函数简介 在Linux系统中要想…

Asp.net core Autofac 案例 注入、AOP 启用接口代理拦截 启用 类代理拦截=== 只会拦截虚方法

资料 core 实现autofac 》》》 安装 如下工具包 安装之后 如出现 这种 》》》编写 AOP类 using Castle.DynamicProxy; using System.Diagnostics;namespace Web01.AOP {/// <summary>/// 日志记录/// </summary>public class LoggingInterceptor : IInterc…

【深度学习】各种卷积—卷积、反卷积、空洞卷积、可分离卷积、分组卷积

在全连接神经网络中&#xff0c;每个神经元都和上一层的所有神经元彼此连接&#xff0c;这会导致网络的参数量非常大&#xff0c;难以实现复杂数据的处理。为了改善这种情况&#xff0c;卷积神经网络应运而生。 一、卷积 在信号处理中&#xff0c;卷积被定义为一个函数经过翻转…