新一代开源流数据湖平台Apache Paimon入门实操-下

news2025/1/13 19:43:01

文章目录

  • 实战
    • 写表
      • 插入和覆盖数据
      • 更新数据
      • 删除数据
      • Merge Into
    • 查询表
      • 批量查询
        • 时间旅行
        • 批量增量查询
      • 流式查询
        • 时间旅行
        • ConsumerID
      • 查询优化
    • 系统表
      • 表指定系统表
      • 分区表
      • 全局系统表
      • 维表
    • CDC集成
      • MySQL
      • Kafka
      • 支持schema变更

实战

写表

插入和覆盖数据

可以使用INSERT语句向表中插入新行或覆盖表中的现有数据。插入的行可以由值表达式指定,也可以由查询结果指定。语法格式如下,其与标准sql语法一致

INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query }
  • part_spec:一个可选参数,用于指定分区的键和值对的逗号分隔列表。请注意,可以在分区规范中使用类型化文字(例如,日期’ 2023-01-02 ')。语法: PARTITION ( partition_col_name = partition_col_val [ , … ] )
  • column_list:一个可选参数,用于指定属于table_identifier表的以逗号分隔的列列表。所有指定的列都应该存在于表中,并且不能相互复制。它包括除静态分区列之外的所有列。列列表的大小应该与VALUES子句或查询中数据的大小完全相同。语法: (col_name1 [, column_name2, …])
  • value_expr:指定要插入的值。可以插入显式指定的值或NULL。必须用逗号分隔子句中的每个值。可以指定多个值集来插入多行。目前,Flink不支持直接使用NULL,因此NULL应该通过’ cast (NULL AS data_type) '转换为实际数据类型。语法: VALUES ( { value | NULL } [ , … ] ) [ , ( … ) ]
CREATE TABLE demo1 (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,
    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);

insert into demo1 values(1,1,'order','2023-08-04','19'),(2,2,'pay','2023-08-04','20');
select * from demo1;

CREATE TABLE demo_p1 (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,
    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);
insert into demo_p1 partition(dt='2023-08-04',hh='21') values(3,3,'pv');
insert into demo_p1 select * from demo1;
select * from demo_p1;

image-20230804153022659

覆盖只支持batch模式。覆盖默认情况下,流读取将忽略INSERT OVERWRITE生成的提交。如果想要读取OVERWRITE的提交,可以配置流读覆盖。对于分区表,Paimon的默认覆盖模式是动态分区覆盖(这意味着Paimon只删除出现在覆盖数据中的分区)。可以配置动态分区覆盖来更改它。

RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'BATCH';
  • 覆盖未分区的表
insert overwrite demo1 values(3,3,'pv','2023-08-04','20');
  • 覆盖分区表
insert overwrite demo_p1 select * from demo1;
insert overwrite demo_p1 partition(dt='2023-08-04',hh='20') select user_id,item_id,behavior from demo1;

更新数据

目前,Paimon支持在Flink 1.17及以后的版本中使用UPDATE更新记录。可以在Flink的批处理模式下执行UPDATE。重要的表属性设置,只有主键表支持此特性。

要支持此特性,需要对MergeEngine进行重复数据删除或部分更新。不支持更新主键。语法:UPDATE table_identifier SET column1 = value1, column2 = value2, … WHERE condition;

update demo_p1 set item_id = 5,behavior='uv' where user_id = 1;

# 比如下面,merge-engine默认就是deduplicate
CREATE TABLE MyTable (
	a STRING,
	b INT,
	c INT,
	PRIMARY KEY (a) NOT ENFORCED
) WITH ( 
	'write-mode' = 'change-log',
	'merge-engine' = 'deduplicate' 
);

  • deduplicate:删除重复数据,保留最后一行。
  • Partial-update:部分更新非空字段。
  • aggregation:聚合具有相同主键的字段。
  • first-row:删除重复数据并保留第一行。

详细参数配置可以查看:https://paimon.apache.org/docs/master/maintenance/configurations/

删除数据

Flink1.17+以上SQL支持删除数据。且只有写模式设置为更改日志的表才支持此特性;如果表有主键,则需要对MergeEngine进行重复数据删除以支持此特性。

delete from demo_p1 where behavior = 'pv';
select * from demo_p1;

Merge Into

Paimon通过flink run提交“MERGE - INTO”作业来支持“MERGE INTO”。下载paimon-flink-action文件,无需放到lib目录,与普通开发jar包一样指定路径运行即可

wget https://repository.apache.org/content/groups/snapshots/org/apache/paimon/paimon-flink-action/0.5-SNAPSHOT/paimon-flink-action-0.5-20230804.002229-95.jar

通过Merge Into实现行级别更新,只有主键表支持这个功能,该操作不会产生UPDATE_BEFORE,所以不建议设置’ changelog-producer ’ = ’ input '。合并操作使用“upsert”语义而不是“update”语义,这意味着如果行存在,则执行更新,否则执行插入。例如对于非主键表可以更新每一列,但对于主键表如果希望更新主键,则必须插入具有不同于表中行主键的新行。在这种情况下,“upsert”是有用的。

匹配解释如下:

  • 匹配:更改的行来自目标表,并且每个行都可以基于merge-condition和可选的Matched -condition (source∩target)匹配源表的行。
  • 不匹配:根据合并条件和可选的不匹配条件(source - target),更改的行来自源表,所有行不能匹配任何目标表行。
  • Not-matched-by-source:根据merge-condition和可选的Not-matched-by-source条件(target -source),更改的行来自目标表,并且所有行不能匹配任何源表行。

参数格式:

  • Matched-upsert-changes: col = .col | expression [, …] (Means setting .col with given value. Do not add ‘.’ before ‘col’.),可以使用’ * '来设置具有所有源列的列(要求目标表的模式等于源表的模式)。
  • Not-matched-upsert-changes:类似于matched-upsert-changes,但不能引用源表的列或使用’ * '。
  • insert-values:col1, col2, …, col_end。Must specify values of all columns. For each column, you can reference .col or use an expression。可以使用’ * '插入所有源列(要求目标表的模式等于源表的模式)。
  • 不匹配条件不能使用目标表的列来构造条件表达式。
  • 不匹配源条件不能使用源表的列来构造条件表达式。
create database test;use test;CREATE TABLE wstest1 (	id INT,	ts BIGINT,    vc INT,	PRIMARY KEY (id) NOT ENFORCED)insert into wstest1 values(1,1,1),(2,2,2),(3,3,3);select * from wstest1;

image-20230804180704978

CREATE TABLE wstest2 (	id INT,	ts BIGINT,    vc INT,	PRIMARY KEY (id) NOT ENFORCED)insert into wstest2 values(2,2,2),(3,3,3),(4,4,4),(5,5,5);select * from wstest2;

image-20230804180742128

./bin/flink run \    ./lib/paimon-flink-action-0.5-20230804.002229-95.jar \    merge-into \    --warehouse hdfs://myns/paimon/hive \    --database test \    --table wstest2 \    --source-table test.wstest1 \    --on "wstest2.id = wstest1.id" \    --merge-actions matched-upsert,matched-delete \    --matched-upsert-condition "wstest2.ts > 2" \    --matched-upsert-set "vc = 100" \    --matched-delete-condition "wstest2.ts <= 2"
select * from wstest2;

image-20230804181219648

查询表

批量查询

Paimon的批处理读取返回表快照中的所有数据。默认情况下,批处理读取返回最新的快照。在sql-client中,设置执行模式为

RESET 'execution.checkpointing.interval';SET 'execution.runtime-mode' = 'batch';

时间旅行

带时间旅行的Paimon批读可以指定一个快照或一个标签,并读取相应的数据。通过查看文件系统存储元数据和数据可以看下上面test库中wstest2表的快照目录可以查看目前有1和2两个快照版本

image-20230807094224697

-- 读取id为1的快照SELECT * FROM wstest2 /*+ OPTIONS('scan.snapshot-id' = '1') */;-- 以Unix毫秒为单位从指定的时间戳读取快照SELECT * FROM wstest2 /*+ OPTIONS('scan.timestamp-millis' = '1691143583490') */;-- 读取标签“my-tag”SELECT * FROM wstest2 /*+ OPTIONS('scan.tag-name' = 'my-tag') */;

读取id和时间戳读取数据如下

image-20230807093656105

批量增量查询

读取开始快照(不包含)和结束快照之间的增量变化。例如:

  • “5,10”表示快照5和快照10之间的变化。
  • ’ TAG1,TAG3 '表示TAG1和TAG3之间的变化。
SELECT * FROM wstest2 /*+ OPTIONS('incremental-between' = '1,2') */;

image-20230807094720754

流式查询

默认情况下,流式读取在第一次启动时生成表上的最新快照,并继续读取最新的更改。

-- Flink SQLSET 'execution.checkpointing.interval'='30s';SET 'execution.runtime-mode' = 'streaming';

可以做流式读取没有快照数据,可以使用最新的扫描模式;连续读取最新更改,而不在开始时生成快照。

SELECT * FROM wstest2 /*+ OPTIONS('scan.mode' = 'latest') */;

如果只想处理今天及以后的数据,可以使用分区过滤器:

SELECT * FROM wstest2 WHERE dt > '2023-08-04'

时间旅行

如果它不是一个分区表,或者不能按分区进行过滤,可以使用Time travel的流读取。

-- 从id为1的快照读取更改SELECT * FROM wstest2 /*+ OPTIONS('scan.snapshot-id' = '1') */;-- 从指定时间戳的快照读取更改SELECT * FROM wstest2 /*+ OPTIONS('scan.timestamp-millis' = '1691143583490') */;-- 在第一次启动时读取快照id 1,并继续读取更改SELECT * FROM wstest2 /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '1') */;

读取数据如下

image-20230807095452502

ConsumerID

这是一个实验性的功能。可以在流式读表时指定消费者id:

SELECT * FROM wstest2 /*+ OPTIONS('consumer-id' = 'myid') */;

当流读取Paimon表时,要记录到文件系统中的下一个快照id。这有几个好处:

  • 当前一个作业停止时,新启动的作业可以继续使用前一个进度,而无需从状态恢复。新的读取将从消费者文件中找到的下一个快照id开始读取。
  • 在确定快照是否过期时,Paimon查看文件系统中表的所有消费者,如果仍然有消费者依赖于该快照,则该快照将不会在到期时被删除。
  • 当没有水印定义时,Paimon表会将快照中的水印传递给下游的Paimon表,这意味着您可以跟踪整个管道的水印进度。

注意:消费者将阻止快照过期,可以指定consumer.expiration-time来管理消费者的生命周期。可以使用给定的消费者ID和下一个快照ID重置消费者。

SELECT * FROM wstest2 /*+ OPTIONS('consumer-id' = 'itxiaoshen') */;insert into wstest2 values(6,6,6),(7,7,7);SELECT * FROM wstest2 /*+ OPTIONS('consumer-id' = 'itxiaoshen') */;

image-20230807101227738

查询优化

强烈建议与查询一起指定分区和主键过滤器,这将加快查询的数据跳过。可以加速数据跳转的过滤函数有:

  • =
  • <
  • <=
  • >
  • >=
  • IN (...)
  • LIKE 'abc%'
  • IS NULL

Paimon将按主键对数据进行排序,这加快了点查询和范围查询的速度。当使用复合主键时,查询过滤器最好在主键的最左边形成一个前缀,以获得良好的加速。

假设一个表具有以下表结构:

CREATE TABLE orders (    catalog_id BIGINT,    order_id BIGINT,    .....,    PRIMARY KEY (catalog_id, order_id) NOT ENFORCED -- composite primary key)

通过为主键的最左边的前缀指定一个范围过滤器,查询可以获得很好的加速。

SELECT * FROM orders WHERE catalog_id=1025;SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;SELECT * FROM orders  WHERE catalog_id=1025  AND order_id>2035 AND order_id<6000;

但是下面的过滤器不能很好地加速查询。

SELECT * FROM orders WHERE order_id=29495;SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;

系统表

表指定系统表

表指定的系统表包含每个表的元数据和信息,例如创建的快照和正在使用的选项。用户可以通过批量查询访问系统表。

目前,Flink、Spark和Trino都支持查询系统表。在某些情况下,表名需要用反引号括起来以避免语法解析冲突,例如三重访问模式:

SELECT * FROM my_catalog.my_db.`MyTable$snapshots`;
  • 快照表
# 通过快照表可以查询该表的快照历史信息,包括快照中发生的记录计数。select * from wstest2$snapshots;

image-20230807094350201

通过查询快照表,可以了解该表的提交和过期信息以及数据的时间旅行。

  • 表模式

可以通过schemas表查询该表的历史模式。

SELECT * FROM wstest2$schemas;

image-20230807103013073

可以连接快照表和模式表以获得给定快照的字段。

SELECT s.snapshot_id, t.schema_id, t.fields     FROM wstest2$snapshots s JOIN MyTable$schemas t     ON s.schema_id=t.schema_id where s.snapshot_id=1;
  • 表选项

可以查询表的选项信息,这些信息是通过选项表从DDL指定的。未显示的选项将是默认值。可以参考[Configuration]。

SELECT * FROM wstest2$options;
  • 审计日志表

如果需要审计表的变更日志,可以使用audit_log系统表。通过audit_log表,可以在获取表的增量数据时获取rowkind列。您可以使用该列进行过滤和其他操作,以完成审计。

SELECT * FROM wstest2$audit_log;
> +I:插入操作,新增数据。> -U:使用更新行之前的内容进行更新操作,一条数据的修改会产生两个U 标识符数据。其中-U 含义为修改前数据。> +U:使用更新行的新内容进行更新操作,修改之后的数据。> -D:删除操作,删除的数据。
  • 表文件

可以查询指定快照表的文件。

-- 查询最新快照的文件SELECT * FROM wstest2$files;

image-20230807103636295

-- 还可以查询指定快照的文件SELECT * FROM wstest2$files /*+ OPTIONS('scan.snapshot-id'='1') */;

image-20230807103757946

  • 表标签

通过标签表可以查询该表的标签历史信息,包括标签基于哪些快照,以及快照的一些历史信息。还可以获得所有标签名称和时间旅行到特定的标签数据名称。

SELECT * FROM wstest2$tags;
  • 表消费者

可以查询包含下一个快照的所有消费者。

SELECT * FROM wstest2$consumers;

image-20230807103944792

  • 表清单文件

可以查询当前表的最新快照或指定快照中包含的所有清单文件。

-- 查询最新快照的清单信息SELECT * FROM wstest2$manifests;

image-20230807104649956

-- 也可以查询带有指定快照的清单SELECT * FROM wstest2$manifests /*+ OPTIONS('scan.snapshot-id'='1') */;

image-20230807104709319

分区表

可以查询表的分区文件。

SELECT * FROM demo_p1$partitions;

image-20230807105913198

全局系统表

全局系统表包含当前存在的所有表的统计信息;为了方便检索,创建了一个参考系统数据库sys,可以用sql在flink中显示所有全局系统表:

所有选项表,这个表类似于Options table,但是它显示所有的表选项都是all database。

SELECT * FROM sys.all_table_options;

维表

Paimon支持Lookup Join,它用于从Paimon查询数据来补充维度字段,是流查询中的一种连接。连接要求一个表具有处理时间属性,另一个表由查找源连接器提供支持。

在Flink中,Paimon支持对带有主键的表和仅追加表进行查找连接。下面的示例说明了这个特性,创建一个Paimon表并实时更新它。

USE CATALOG fs_catalog;CREATE TABLE customers (    id INT PRIMARY KEY NOT ENFORCED,    name STRING,    country STRING,    zip STRING);-- 启动一个流作业来更新客户表INSERT INTO customers values(1,'zhangsan','china','aaa'),(2,'lisi','china','bbb'),(3,'wangwu','china','ccc');select * from customers;-- 创建一个临时左表,就像从kafkaCREATE TEMPORARY TABLE Orders (    order_id INT,    total INT,    customer_id INT,    proc_time AS PROCTIME()) WITH (  'connector' = 'datagen',   'rows-per-second'='1',   'fields.order_id.kind'='random',   'fields.order_id.min'='1',   'fields.order_id.max'='1000000',   'fields.total.kind'='sequence',   'fields.total.start'='1',   'fields.total.end'='1000',   'fields.customer_id.kind'='random',   'fields.customer_id.min'='1',   'fields.customer_id.max'='3');select * from Orders;

现在可以在查找连接查询中使用客户。

-- 用客户信息填充每个订单SELECT o.order_id, o.total, c.country, c.zipFROM Orders AS oJOIN customersFOR SYSTEM_TIME AS OF o.proc_time AS cON o.customer_id = c.id;

image-20230807124324530

Lookup Join将在本地维护一个RocksDB缓存,并实时提取表的最新更新。查找连接操作符将只提取必要的数据,因此筛选条件对性能非常重要。此特性仅适用于最多包含数千万条记录的表,以避免过度使用本地磁盘。

如果Orders(主表)join的记录缺失,因为客户(查找表)的相应数据还没有准备好。可以考虑使用Flink的Delayed Retry Strategy For Lookup进行查找。以下选项允许用户微调RocksDB以获得更好的性能,可以在表属性或动态表提示中指定它们。

-- 动态表提示示例SELECT o.order_id, o.total, c.country, c.zipFROM Orders AS o JOIN customers /*+ OPTIONS('lookup.cache-rows'='20000') */FOR SYSTEM_TIME AS OF o.proc_time AS cON o.customer_id = c.id;

CDC集成

通过模式演化,Paimon支持多种方式将数据摄取到Paimon表中;这意味着添加的列将实时同步到Paimon表,并且不会为此重新启动同步作业。目前支持以下同步方式:

  • MySQL同步表:将MySQL中的一个或多个表同步到一个Paimon表中。
  • MySQL同步数据库:将整个MySQL数据库同步到一个Paimon数据库。
  • API同步表:自定义数据流输入到一个Paimon表。
  • Kafka同步表:将一个Kafka主题的表同步到一个Paimon表。
  • Kafka同步数据库:同步一个包含多个表的Kafka主题或多个包含一个表的主题到一个Paimon数据库。

MySQL

Paimon支持使用更改数据捕获(CDC)同步来自不同数据库的更改,此功能需要Flink及其CDC连接器,准备CDC Bundled Jar。在上一篇我们已经将flink-sql-connector-mysql-cdc-2.4.1.jar拷贝到Flink的Lib目录。

  • 同步表:通过在Flink数据流作业中使用mysql-sync-table动作或直接通过Flink运行,用户可以将一个或多个MySQL表同步到一个Paimon表中。

如果指定的Paimon表不存在,则此操作将自动创建表。它的模式将从所有指定的MySQL表派生。如果Paimon表已经存在,它的模式将与所有指定的MySQL表的模式进行比较。

./bin/flink run \    ./lib/paimon-flink-action-0.5-20230804.002229-95.jar \    mysql-sync-table \    --warehouse hdfs://myns/paimon/hive \    --database test \    --table my_users_cdc \    --primary-keys id \    --mysql-conf hostname=192.168.50.95 \    --mysql-conf username=root \    --mysql-conf password=123456 \    --mysql-conf database-name='test' \    --mysql-conf table-name='my_users' \    --catalog-conf metastore=hive \    --catalog-conf uri=thrift://hadoop2:9083 \    --table-conf bucket=4 \    --table-conf changelog-producer=input \    --table-conf sink.parallelism=4

启动后查看下表的信息已经同步

image-20230807140531281

修改MySQL数据库my_users表中id为4的age字段的值从40改为55

image-20230807140750362

可以看到已经获取变更的数据

image-20230807140826925

还可以使用正则表达式设置’ database-name ‘来捕获多个数据库。通过对–mysql-conf database-name=‘source_db.+’ ;一个典型的场景是,一个表’ source_table ‘被分成数据库’ source_db1 ', ’ source_db2 ‘…,然后可以同步所有’ source_table '的数据到一个Paimon表。

  • 同步数据库:通过在Flink数据流作业中使用mysql-sync-database动作或直接通过Flink运行,用户可以将整个MySQL数据库同步到一个Paimon数据库。

只有具有主键的表才会被同步。对于每个要同步的MySQL表,如果对应的Paimon表不存在,该操作将自动创建表。它的模式将从所有指定的MySQL表派生。如果Paimon表已经存在,它的模式将与所有指定的MySQL表的模式进行比较。

./bin/flink run \    ./lib/paimon-flink-action-0.5-20230804.002229-95.jar \    mysql-sync-database \    --warehouse hdfs://myns/paimon/hive \    --database test \    --table-prefix "ods_" \    --table-suffix "_cdc" \    --mysql-conf hostname=192.168.50.95 \    --mysql-conf username=root \    --mysql-conf password=123456 \    --mysql-conf database-name=test \    --catalog-conf metastore=hive \    --catalog-conf uri=thrift://hadoop2:9083 \    --table-conf bucket=4 \    --table-conf changelog-producer=input \    --table-conf sink.parallelism=4

运行后查看已经有对应整库的表了

image-20230807143057749

修改MySQL数据库new_users表中id为1的age字段的值从当前33改为43

image-20230807143418520

可以看到已经获取变更的数据

image-20230807143431515

希望该作业同步包含历史数据的表[order, custom];可以通过从作业的前一个快照中恢复,从而重用作业的现有状态来实现这一点。恢复的作业将首先对新添加的表进行快照,然后继续自动从以前的位置读取变更日志。

./bin/flink run \   --fromSavepoint savepointPath \    ./lib/paimon-flink-action-0.5-20230804.002229-95.jar \    mysql-sync-database \    --warehouse hdfs:///path/to/warehouse \    --database test_db \    --mysql-conf hostname=127.0.0.1 \    --mysql-conf username=root \    --mysql-conf password=123456 \    --mysql-conf database-name=source_db \    --catalog-conf metastore=hive \    --catalog-conf uri=thrift://hive-metastore:9083 \    --table-conf bucket=4 \    --including-tables 'product|user|address|order|custom'

可以设置——mode组合,以启用同步新添加的表而无需重新启动作业。

--including-tables 'tbl.+'

通过将database-name设置为正则表达式,同步作业将捕获匹配数据库下的所有表,并将同名的表合并到一个表中。可以设置——merge-shards false来阻止合并碎片。同步表将被命名为’ databaseName_tableName ',以避免潜在的名称冲突。

Kafka

在上一篇我们已经将flink-sql-connector-kafka-1.17.1.jar拷贝到Flink的Lib目录。Flink提供了几种Kafka CDC格式:canal-json, debezium-json,ogg-json,maxwell-json。如果Kafka主题中的消息是使用更改数据捕获(CDC)工具从另一个数据库捕获的更改事件,那么您可以使用Paimon Kafka CDC。将解析后的INSERT、UPDATE、DELETE消息写入paimon表。也即是目前只支持CanalCDC,其他在未来应该会支持

image-20230807144558427

准备

[mysqld]log-bin=mysql-bin # 开启 binlogbinlog-format=ROW # 选择 ROW 模式

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限

CREATE USER canal IDENTIFIED BY 'canal';  GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

先安装canal

# 先下载canalwget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz# 解压tar -xvf canal.deployer-1.1.6.tar.gz

修改canal.properties和instance.properties两个配置文件,vim conf/canal.properties

canal.serverMode = kafkakafka.bootstrap.servers = kafka1:9092,kafka2:9092,kafka3:9092

vim conf/example/instance.properties

canal.instance.master.address=mysqlserver:3306canal.instance.dbUsername=canalcanal.instance.dbPassword=canal# mq configcanal.mq.topic=cc_test2

启动canal

# 启动canal./bin/startup.sh

修改MySQL的account数据库的account_tbl表的数据,先通过

image-20230807173244489

./kafka-console-producer.sh --broker-list kafka1:9092 --topic cc_test2

消费kafka的主题cc_test2数据成功如下,验证canal的配置正确。

{"data":[{"id":"1","user_id":"6","money":"100"}],"database":"account","es":1691400547000,"id":2,"isDdl":false,"mysqlType":{"id":"int","user_id":"varchar(255)","money":"int"},"old":[{"user_id":"5"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"user_id":12,"money":4},"table":"account_tbl","ts":1691400547176,"type":"UPDATE"}{"data":[{"id":"1","user_id":"7","money":"100"}],"database":"account","es":1691400566000,"id":3,"isDdl":false,"mysqlType":{"id":"int","user_id":"varchar(255)","money":"int"},"old":[{"user_id":"6"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"user_id":12,"money":4},"table":"account_tbl","ts":1691400566482,"type":"UPDATE"}

image-20230807173110048

  • 同步表:通过在Flink数据流作业中使用kafka-sync-table动作或直接通过Flink运行,用户可以将Kafka的一个主题中的一个或多个表同步到一个Paimon表中。
./bin/flink run \    ./lib/paimon-flink-action-0.5-20230804.002229-95.jar \    kafka-sync-table \    --warehouse hdfs://myns/paimon/hive \    --database test \    --table kafka_account_tbl_cdc \    --primary-keys id \    --kafka-conf properties.bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092 \    --kafka-conf topic=cc_test2 \    --kafka-conf properties.group.id=itxs \    --kafka-conf value.format=canal-json \    --catalog-conf metastore=hive \    --catalog-conf uri=thrift://hadoop2:9083 \    --table-conf bucket=4 \    --table-conf changelog-producer=input \    --table-conf sink.parallelism=4

运行后可以到通过Kafka已经将对应表和数据都同步到

image-20230807173538278

如果指定的Paimon表不存在,则此操作将自动创建表。它的模式将从所有指定的Kafka主题的表中派生,它从主题中获得最早的非ddl数据解析模式。如果Paimon表已经存在,它的模式将与所有指定Kafka主题表的模式进行比较。

  • 同步库:通过在Flink数据流作业中使用KafkaSyncDatabaseAction或直接通过Flink运行,用户可以将多主题或一个主题同步到一个Paimon数据库。只有具有主键的表才会被同步。

此操作将为所有表构建单个合并接收器。对于每个要同步的Kafka主题的表,如果对应的Paimon表不存在,这个动作将自动创建表,并且它的模式将从所有指定的Kafka主题的表中派生。如果Paimon表已经存在,并且它的模式与从Kafka记录解析的模式不同,这个动作将尝试进行模式进化。

修改canal.mq.topic=cc_test3后重启启动canal

./bin/flink run \    ./lib/paimon-flink-action-0.5-20230804.002229-95.jar \    kafka-sync-database \    --warehouse hdfs://myns/paimon/hive \    --database test \    --table-prefix "ods_" \    --table-suffix "_cdc" \    --kafka-conf properties.bootstrap.servers=192.168.5.120:9092 \    --kafka-conf topic=cc_test3 \    --kafka-conf properties.group.id=itxs \    --kafka-conf scan.startup.mode=earliest-offset \    --kafka-conf value.format=canal-json \    --catalog-conf metastore=hive \    --catalog-conf uri=thrift://hadoop2:9083 \    --table-conf bucket=4 \    --table-conf changelog-producer=input \    --table-conf sink.parallelism=4

运行后,也可以看下后很多ods开发和cdc结尾的表,可以看到

image-20230807175320517

在MySQL的student表修改数据

image-20230807175508068

可以看到在sql-client中查询表已经捕获到最新变更的数据,至此基于Kafka通过MySQL多表已验证完毕。

select * from ods_student_cdc;

image-20230807175547388

支持schema变更

CDC摄取支持有限数量的模式更改,也即是可以自动同步表结构信息。目前,框架不能重命名表,删除列,所以rename table和drop COLUMN的行为将被忽略,rename COLUMN将添加一个新的列。当前支持的模式更改包括:

  • 添加列。

  • 修改列类型。更具体地说

    • 将字符串类型(char, varchar, text)转换为另一种长度更长的字符串类型
    • 从二进制类型(binary, varbinary, blob)转换为另一种长度更长的二进制类型
    • 从整数类型(tinyint, smallint, int, bigint)转换为另一个范围更大的整数类型
    • 从浮点类型(float, double)转换为另一种范围更大的浮点类型;
  • 本人博客网站IT小神 www.itxiaoshen.com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/844268.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python与深度学习(十六):CNN和宝可梦模型二

目录 1. 说明2. 宝可梦模型的CNN模型测试2.1 导入相关库2.2 加载模型2.3 设置保存图片的路径2.4 加载图片2.5 数据处理和归一化2.6 对图片进行预测2.7 显示图片 3. 完整代码和显示结果4. 多张图片进行测试的完整代码以及结果 1. 说明 本篇文章是对上篇文章宝可梦模型训练的模型…

yolo txt 转 labelme json 格式

talk is cheap show me the code! def convert_txt_to_labelme_json(txt_path, image_path, output_dir, image_fmt.jpg):# txt 转labelme json# 将yolo的txt转labelme jsontxts glob.glob(os.path.join(txt_path, "*.txt"))for txt in txts:labelme_json {versio…

【项目 计网3】Socket介绍 4.9字节序 4.10字节序转换函数

文章目录 4.8 Socket介绍4.9字节序简介字节序举例 4.10字节序转换函数 4.8 Socket介绍 所谓 socket&#xff08;套接字&#xff09;&#xff0c;就是对网络中不同主机上的应用进程之间进行双向通信的端点的抽象。一个套接字就是网络上进程通信的一端&#xff0c;提供了应用层进…

培训报名小程序报名功能开发

目录 1 创建页面2 新建URL参数3 课程详细信息4 报名数据源创建5 报名信息功能开发6 设置页面跳转7 最终实现的效果总结 在培训报名小程序中&#xff0c;我们已经开发了首页和列表页。在列表页点击报名时就跳转到报名页面&#xff0c;先看我们的原型 报名页分为两个部分&#x…

反弹shell的N种姿势

预备知识1. 关于反弹shell 就是控制端监听在某TCP/UDP端口&#xff0c;被控端发起请求到该端口&#xff0c;并将其命令行的输入输出转到控制端。reverse shell与telnet&#xff0c;ssh等标准shell对应&#xff0c;本质上是网络概念的客户端与服务端的角色反转。2. 反弹shel…

【Linux命令详解 | cp命令】Linux系统中用于复制文件或目录的命令

文章标题 简介参数列表二&#xff0c;使用介绍1. 复制单个文件2. 复制多个文件3. 复制目录4. 保留文件属性5. 创建链接6. 强制覆盖7. 显示复制进度8. 创建备份9. 只有当源文件比目标文件新时才复制10. 复制链接文件 总结 简介 cp命令在Linux系统中用于复制文件或目录。其功能强…

刷题笔记 day9

1658 将 x 减到 0 的最小操作数 解析&#xff1a;1. 当数组的两端的数都大于x时&#xff0c;直接返回 -1。 2. 当数组所有数之和小于 x 时 &#xff0c;直接返回 -1。 3. 数组中可以将 x 消除为0&#xff0c;那么可以从左边减小为 0 &#xff1b;可以从右边减小为 0 &#xff1…

20230807通过ffmpeg将DTS编码的AUDIO音频转换为AAC编码

20230807通过ffmpeg将DTS编码的AUDIO音频转换为AAC编码 2023/8/7 20:04 ffmpeg dts 转AAC 缘起&#xff1a;由于网上找的电影没有中文字幕&#xff0c;有内置的英文字幕&#xff0c;但是还是通过剪映/RP2023识别一份英文字幕备用&#xff01; I:\Downloads\2005[红眼航班]Red E…

助力企业数字化转型,为何 PaaS 化是最佳方式?

在数字化的浪潮中&#xff0c;身份管理已经成为各个行业数字化转型的关键要素。建立一个高效的身份治理中台&#xff0c;不仅能有效保障企业信息安全和数据安全&#xff0c;还能提升企业的数字化程度。 然而&#xff0c;身份管理的构建并非一项简单的任务。当企业在构建身份治理…

llama2模型下载

介绍 LLaMA 2-CHAT与OpenAI ChatGPT效果一样好。LLaMA 2与LLaMA 1架构相同,LLaMA 2训练数据是2000000000000个tokens,还是用了1000000个人类新标注的数据。上下文长度由2048提升为4096。 本教程提供两种下载方式: 1官方下载脚本下载 2hugging face网站下载 官网资格申请 …

赴日IT培训 赴日程序员工作适合什么人?

日本作为全球第三大经济体&#xff0c;IT行业发展十分迅速&#xff0c;日本拥有世界领先的科技公司&#xff0c;如索尼、丰田、日立等&#xff0c;这为IT行业提供了广阔的发展平台和良好的职业前景。此外&#xff0c;日本政府对IT行业也给予了充分的政策支持&#xff0c;像是对…

[CKA]考试之查看Pod日志

由于最新的CKA考试改版&#xff0c;不允许存储书签&#xff0c;本博客致力怎么一步步从官网把答案找到&#xff0c;如何修改把题做对&#xff0c;下面开始我们的 CKA之旅 题目为&#xff1a; Task 监控名为foobar的Pod的日志&#xff0c;并过滤出具有unable-to-access-websi…

Windows系统下添加了新环境变量无需重启电脑激活新环境变量的方法

首先WinR&#xff0c;再输入cmd&#xff0c;进入终端&#xff0c;输入以下命令&#xff1a; set Pathc输入完以上命令回车&#xff0c;如下&#xff1a; 关闭终端后再次打开输入cl&#xff0c;如果输出以下类似信息说明新的环境变量已经添加成功&#xff0c;如下&#xff1a; …

前端实习day20

今天解决了不少bug&#xff0c;成就感满满&#xff0c;有几个问题困扰了我很久&#xff0c;我查阅了很多博客&#xff0c;终于找到解决思路&#xff0c;顺利解决&#xff0c;这里记录一下解决思路。 1、在通过this.$refs.layoutSide.style设置<a-layout-sider>的宽度时&…

kali常见字典目录

kali常见字典目录 Kali Linux 是一个流行的渗透测试和安全评估操作系统&#xff0c;它预装了许多用于渗透测试和安全研究的工具&#xff0c;包括各种字典文件。以下是一些 Kali Linux 中常见的字典目录&#xff1a; 当然可以&#xff01;以下是 Kali Linux 中常见的一些字典目…

小研究 - MySQL 分区分表的设计及实(二)

随着信息技术的快速发展&#xff0c;数据量越来越大&#xff0c;海量的表查询操作需要消耗大量的时间&#xff0c;成为影响数据库访问性能提高的主要因素。为了提升数据库操作的查询效率和用户体验&#xff0c;在关系型数据库管理系统(MySQL)中通过 range 分区和 Merge 存储&am…

重生学c++系列第三课类和对象(上)

好的我们重生c系列的前两期已经介绍完了c祖师爷针对C语言补充的几个新功能&#xff0c;现在我们进入c的真正课题学习——类与对象: C语言是面向过程的&#xff0c;关注的是过程&#xff0c;分析出求解问题的步骤&#xff0c;通过函数调用逐步解决问题。 比如说我们洗菜做饭&am…

QT生成Debug和Release发布版后,运行exe缺少dll问题

在QT Creator生成debug和release的exe执行文件后&#xff0c;运行时&#xff0c;报错缺少*.dll.解决办法1&#xff1a; 在系统环境变量中添加D:\Qt\Qt5.13.2\Tools\mingw730_64\bin后&#xff0c;即可运行。 当使用此方法时&#xff0c;将exe拷贝到其他电脑中运行时&#xff0c…

DAY4,C高级(shell中的函数,循环,排序思想)

1.整理思维导图&#xff1b; 2.写一个函数&#xff0c;获取用户的uid和gid并使用变量接收&#xff1b; 1 #!/bin/bash 2 function get_id()3 {4 read -p "输入用户&#xff1a;…

海外社媒营销:如何树立品牌个性与目标受众共鸣?

随着全球化的不断深入&#xff0c;海外市场对于企业的重要性越来越凸显。在这个数字化时代&#xff0c;社交媒体已经成为品牌塑造和推广的重要渠道之一。然而&#xff0c;海外市场竞争激烈&#xff0c;想要在众多品牌中脱颖而出&#xff0c;就需要在社交媒体关注者的心中树立品…