类似flink CDC
databricks 官方文档:
How to Simplify CDC With Delta Lake's Change Data Feed - The Databricks Blog
delta.io 官方文档:
Change data feed — Delta Lake Documentation
概述
更改数据馈送 (CDF) 功能允许 Delta 表跟踪 Delta 表版本之间的行级更改
在 Delta 表上启用时,运行时会记录写入表中的所有数据的“更改事件”。这包括行数据以及指示指定行是被插入、删除还是更新的元数据。
CDF的使用方式
您可以使用 DataFrame API(即 df.read )在批查询中读取更改事件,也可以在使用 DataFrame API(即 df.readStream )的流式查询中读取更改事件。
delta表开启CDF
CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
已存在的表开启CDF:
ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
查看CDF变更
preimage 为更新前的值
postimage 为更新后的值
CREATE TABLE delta.`/tmp/delta/student` (id INT, name STRING, age INT) USING DELTA TBLPROPERTIES (delta.enableChangeDataFeed = true) ; insert into table delta.`/tmp/delta/student` values (1,'name1',11),(2,'name2',12),(3,'name3',13); update delta.`/tmp/delta/student` set age = 33 where id = 1; delete from delta.`/tmp/delta/student` where id=2;
scala> spark.sql(""" SELECT * FROM table_changes_by_path('/tmp/delta/student', 0,3) """).show(false) +---+-----+---+----------------+---------------+-----------------------+ |id |name |age|_change_type |_commit_version|_commit_timestamp | +---+-----+---+----------------+---------------+-----------------------+ |1 |name1|11 |update_preimage |2 |2023-05-01 19:41:39.237| |1 |name1|33 |update_postimage|2 |2023-05-01 19:41:39.237| |2 |name2|12 |delete |3 |2023-05-01 19:42:18.601| |1 |name1|11 |insert |1 |2023-05-01 19:41:32.211| |2 |name2|12 |insert |1 |2023-05-01 19:41:32.211| |3 |name3|13 |insert |1 |2023-05-01 19:41:32.211| +---+-----+---+----------------+---------------+-----------------------+
注意: SELECT * FROM table_changes_by_path('/tmp/delta/student', 0) 等效于 SELECT * FROM table_changes_by_path('/tmp/delta/student', 0 , latest版本)
CDF配合Merge语句
How to Simplify CDC With Delta Lake's Change Data Feed - The Databricks Blog
__change_data 文件夹
Delta Lake 在Delta 表目录下的 _change_data 文件夹中记录了 UPDATE 、 DELETE 和 MERGE 操作的更改数据。当 Delta Lake 检测到它可以直接从事务日志中有效地计算更改数据馈送时,可能会跳过这些记录。特别是insert-only操作和full partition delete不会在 _change_data 目录下产生数据。
VACUUM会剔除CDF
_change_data 文件夹中的文件遵循表的保留策略。因此,如果您运行 VACUUM 命令,更改数据馈送数据也会被删除.
CDF支持速率控制
支持速率限制( maxFilesPerTrigger 、 maxBytesPerTrigger ).