流数据湖平台Apache Paimon(二)集成 Flink 引擎

news2025/1/20 13:30:32

文章目录

  • 第2章 集成 Flink 引擎
    • 2.1 环境准备
      • 2.1.1 安装 Flink
      • 2.1.2 上传 jar 包
      • 2.1.3 启动 Hadoop
      • 2.1.4 启动 sql-client
    • 2.2 Catalog
      • 2.2.1 文件系统
      • 2.2.2 Hive Catalog
      • 2.2.3 sql 初始化文件
    • 2.3 DDL
      • 2.3.1 建表
      • 2.3.2 修改表
    • 2.4 DML
      • 2.4.1 插入数据
      • 2.4.2 覆盖数据
      • 2.4.3 更新数据
      • 2.4.4 删除数据
      • 2.4.5 Merge Into
    • 2.5 DQL查询表
      • 2.5.1 批量查询
      • 2.5.2 流式查询
      • 2.5.3 查询优化
    • 2.6 系统表
      • 2.6.1 快照表 Snapshots Table
      • 2.6.2 模式表 Schemas Table
      • 2.6.3 选项表 Options Table
      • 2.6.4 审计日志表 Audit log Table
      • 2.6.5 文件表 Files Table
      • 2.6.6 标签表 Tags Table
    • 2.7 维表Join
    • 2.8 CDC集成
      • 2.8.1 MySQL
      • 2.8.2 Kafka
      • 2.8.3 支持的schema变更

第2章 集成 Flink 引擎

Paimon目前支持Flink 1.17, 1.16, 1.15 和 1.14。本课程使用Flink 1.17.0。

2.1 环境准备

环境准备

2.1.1 安装 Flink

1)上传并解压Flink安装包

tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/

2)配置环境变量

sudo vim /etc/profile.d/my_env.sh

export HADOOP_CLASSPATH=hadoop classpath

source /etc/profile.d/my_env.sh

2.1.2 上传 jar 包

1)下载并上传Paimon的jar包

jar包下载地址:https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.17/0.5-SNAPSHOT/

2)拷贝paimon的jar包到flink的lib目录下

cp paimon-flink-1.17-0.5-20230703.002437-67.jar /opt/module/flink-1.17.0/lib

2.1.3 启动 Hadoop

(略)

2.1.4 启动 sql-client

1)修改flink-conf.yaml配置

vim /opt/module/flink-1.16.0/conf/flink-conf.yaml

#解决中文乱码,1.17之前参数是env.java.opts

env.java.opts.all: -Dfile.encoding=UTF-8

classloader.check-leaked-classloader: false

taskmanager.numberOfTaskSlots: 4

execution.checkpointing.interval: 10s

state.backend: rocksdb

state.checkpoints.dir: hdfs://hadoop102:8020/ckps

state.backend.incremental: true

2)启动 Flink集群

(1)解决依赖问题

cp /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.4.jar /opt/module/flink-1.17.0/lib/

(2)这里以 Yarn-Session模式为例

/opt/module/flink-1.17.0/bin/yarn-session.sh -d

3)启动Flink的sql-client

/opt/module/flink-1.17.0/bin/sql-client.sh -s yarn-session

img

4)设置结果显示模式

SET ‘sql-client.execution.result-mode’ = ‘tableau’;

2.2 Catalog

Paimon Catalog可以持久化元数据,当前支持两种类型的metastore:

文件系统(默认):将元数据和表文件存储在文件系统中。

hive:在 hive metastore中存储元数据。用户可以直接从 Hive 访问表。

2.2.1 文件系统

CREATE CATALOG fs_catalog WITH (

‘type’ = ‘paimon’,

‘warehouse’ = ‘hdfs://hadoop102:8020/paimon/fs’

);

USE CATALOG fs_catalog;

2.2.2 Hive Catalog

通过使用Hive Catalog,对Catalog的更改将直接影响相应的hive metastore。在此类Catalog中创建的表也可以直接从 Hive 访问。

要使用 Hive Catalog,数据库名称、表名称和字段名称应小写。

1)上传 hive-connector

将flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar上川到Flink的lib目录下

2)重启yarn-session集群

3)启动hive的metastore服务

nohup hive --service metastore &

4)创建Hive Catalog

CREATE CATALOG hive_catalog WITH (

  'type' = 'paimon',

  'metastore' = 'hive',

'uri' = 'thrift://hadoop102:9083',

'hive-conf-dir' = '/opt/module/hive/conf',

  'warehouse' = 'hdfs://hadoop102:8020/paimon/hive'

);


USE CATALOG hive_catalog;

5)注意事项

使用hive Catalog通过alter table更改不兼容的列类型时,参见 HIVE-17832。需要配置

vim /opt/module/hive/conf/hive-site.xml;

  <property>

​    <name>hive.metastore.disallow.incompatible.col.type.changes</name>

​    <value>false</value>

  </property>

上述配置需要在hive-site.xml中配置,且hive metastore服务需要重启。

如果使用的是 Hive3,请禁用 Hive ACID:

hive.strict.managed.tables=false

hive.create.as.insert.only=false

metastore.create.as.acid=false

2.2.3 sql 初始化文件

1)创建初始化sql文件

vim conf/sql-client-init.sql

CREATE CATALOG fs_catalog WITH (

  'type' = 'paimon',

  'warehouse' = 'hdfs://hadoop102:8020/paimon/fs'

);

 

CREATE CATALOG hive_catalog WITH (

  'type' = 'paimon',

  'metastore' = 'hive',

'uri' = 'thrift://hadoop102:9083',

'hive-conf-dir' = '/opt/module/hive/conf',

  'warehouse' = 'hdfs://hadoop102:8020/paimon/hive'

);

 

 

USE CATALOG hive_catalog;

 

SET 'sql-client.execution.result-mode' = 'tableau';

2)启动sql-client时,指定该sql初始化文件

bin/sql-client.sh -s yarn-session -i conf/sql-client-init.sql

3)查看catalog

show catalogs;

show current catalog;

2.3 DDL

2.3.1 建表

2.3.1.1 管理表

在 Paimon Catalog中创建的表就是Paimon的管理表,由Catalog管理。当表从Catalog中删除时,其表文件也将被删除,类似于Hive的内部表。

1)创建表

CREATE TABLE test (

  user_id BIGINT,

  item_id BIGINT,

  behavior STRING,

  dt STRING,

  hh STRING,

  PRIMARY KEY (dt, hh, user_id) NOT ENFORCED

);

2)创建分区表

CREATE TABLE test_p (

  user_id BIGINT,

  item_id BIGINT,

  behavior STRING,

  dt STRING,

  hh STRING,

  PRIMARY KEY (dt, hh, user_id) NOT ENFORCED

) PARTITIONED BY (dt, hh);

通过配置partition.expiration-time,可以自动删除过期的分区。

如果定义了主键,则分区字段必须是主键的子集。

可以定义以下三类字段为分区字段:

创建时间(推荐):创建时间通常是不可变的,因此您可以放心地将其视为分区字段并将其添加到主键中。

事件时间:事件时间是原表中的一个字段。对于CDC数据来说,比如从MySQL CDC同步的表或者Paimon生成的Changelogs,它们都是完整的CDC数据,包括UPDATE_BEFORE记录,即使你声明了包含分区字段的主键,也能达到独特的效果。

CDC op_ts:不能定义为分区字段,无法知道之前的记录时间戳。

3)Create Table As

表可以通过查询的结果创建和填充,例如,我们有一个这样的sql: CREATE TABLE table_b AS SELECT id, name FORM table_a, 生成的表table_b将相当于创建表并插入数据以下语句:CREATE TABLE table_b(id INT, name STRING); INSERT INTO table_b SELECT id, name FROM table_a;

使用CREATE TABLE AS SELECT时我们可以指定主键或分区。

CREATE TABLE test1(

user_id BIGINT,

item_id BIGINT

);

CREATE TABLE test2 AS SELECT * FROM test1;

– 指定分区

CREATE TABLE test2_p WITH (‘partition’ = ‘dt’) AS SELECT * FROM test_p;

– 指定配置

CREATE TABLE test3(

​ user_id BIGINT,

​ item_id BIGINT

) WITH (‘file.format’ = ‘orc’);

CREATE TABLE test3_op WITH (‘file.format’ = ‘parquet’) AS SELECT * FROM test3;

– 指定主键

CREATE TABLE test_pk WITH (‘primary-key’ = ‘dt,hh’) AS SELECT * FROM test;

– 指定主键和分区

CREATE TABLE test_all WITH (‘primary-key’ = ‘dt,hh’, ‘partition’ = ‘dt’) AS SELECT * FROM test_p;

4)Create Table Like

创建与另一个表具有相同schema、分区和表属性的表。

CREATE TABLE test_ctl LIKE test;

5)表属性

用户可以指定表属性来启用Paimon的功能或提高Paimon的性能。有关此类属性的完整列表,请参阅配置: https://paimon.apache.org/docs/master/maintenance/configurations/。

CREATE TABLE tbl(

  user_id BIGINT,

  item_id BIGINT,

  behavior STRING,

  dt STRING,

  hh STRING,

  PRIMARY KEY (dt, hh, user_id) NOT ENFORCED

) PARTITIONED BY (dt, hh) 

WITH (

  'bucket' = '2',

  'bucket-key' = 'user_id'

);

2.3.1.2 外部表

外部表由Catalog记录但不管理。如果删除外部表,其表文件不会被删除,类似于Hive的外部表。

Paimon 外部表可以在任何Catalog中使用。如果您不想创建Paimon Catalog而只想读/写表,则可以考虑外部表。

CREATE TABLE ex (

  user_id BIGINT,

  item_id BIGINT,

  behavior STRING,

  dt STRING,

  hh STRING,

  PRIMARY KEY (dt, hh, user_id) NOT ENFORCED

) WITH (

  'connector' = 'paimon',

  'path' = 'hdfs://hadoop102:8020/paimon/external/ex',

  'auto-create' = 'true' 

);

2.3.1.3 临时表

仅 Flink 支持临时表。与外部表一样,临时表只是记录,但不由当前 Flink SQL 会话管理。如果临时表被删除,其资源将不会被删除。当 Flink SQL 会话关闭时,临时表也会被删除。与外部表的区别在于,临时表在Paimon Catalog中创建。

如果想将Paimon Catalog与其他表一起使用,但不想将它们存储在其他Catalog中,可以创建临时表。

USE CATALOG hive_catalog;

 

CREATE TEMPORARY TABLE temp (

  k INT,

  v STRING

) WITH (

  'connector' = 'filesystem',

  'path' = 'hdfs://hadoop102:8020/temp.csv',

  'format' = 'csv'

);

2.3.2 修改表

2.3.2.1 修改表

1)更改/添加表属性

ALTER TABLE test SET (

‘write-buffer-size’ = ‘256 MB’

);

2)重命名表名称

ALTER TABLE test1 RENAME TO test_new;

3)删除表属性

ALTER TABLE test RESET (‘write-buffer-size’);

2.3.2.2 修改列

1)添加新列

ALTER TABLE test ADD (c1 INT, c2 STRING);

2)重命名列名称

ALTER TABLE test RENAME c1 TO c0;

3)删除列

ALTER TABLE test DROP (c0, c2);

4)更改列的可为空性

CREATE TABLE test_null(

id INT PRIMARY KEY NOT ENFORCED,

coupon_info FLOAT NOT NULL

);

– 列coupon_info修改成允许为null

ALTER TABLE test_null MODIFY coupon_info FLOAT;

– 列coupon_info修改成不允许为null

– 如果表中已经有null值, 修改之前先设置如下参数删除null值

SET ‘table.exec.sink.not-null-enforcer’ = ‘DROP’;

ALTER TABLE test_null MODIFY coupon_info FLOAT NOT NULL;

5)更改列注释

ALTER TABLE test MODIFY user_id BIGINT COMMENT ‘user id’;

6)添加列位置

ALTER TABLE test ADD a INT FIRST;

ALTER TABLE test ADD b INT AFTER a;

7)更改列位置

ALTER TABLE test MODIFY b INT FIRST;

ALTER TABLE test MODIFY a INT AFTER user_id;

8)更改列类型

ALTER TABLE test MODIFY a DOUBLE;

2.3.2.3 修改水印

1)添加水印

CREATE TABLE test_wm (

id INT,

name STRING,

ts BIGINT

);

ALTER TABLE test_wm ADD(

et AS to_timestamp_ltz(ts,3),

WATERMARK FOR et AS et - INTERVAL ‘1’ SECOND

);

2)更改水印

ALTER TABLE test_wm MODIFY WATERMARK FOR et AS et - INTERVAL ‘2’ SECOND;

3)去掉水印

ALTER TABLE test_wm DROP WATERMARK;

2.4 DML

2.4.1 插入数据

INSERT 语句向表中插入新行或覆盖表中的现有数据。插入的行可以由值表达式或查询结果指定,跟标准的sql语法一致。

INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query }

part_spec

可选,指定分区的键值对列表,多个用逗号分隔。可以使用类型文字(例如,date’2019-01-02’)。

语法: PARTITION (分区列名称 = 分区列值 [ , … ] )

column_list

可选,指定以逗号分隔的字段列表。

语法:(col_name1 [,column_name2, …])

所有指定的列都应该存在于表中,并且不能相互重复。它包括除静态分区列之外的所有列。字段列表的大小应与 VALUES 子句或查询中的数据大小完全相同。

value_expr

指定要插入的值。可以插入显式指定的值或 NULL。必须使用逗号分隔子句中的每个值。可以指定多于一组的值来插入多行。

语法:VALUES ( { 值 | NULL } [ , … ] ) [ , ( … ) ]

目前,Flink 不支持直接使用 NULL,因此需要将 NULL 转换为实际数据类型值,比如“CAST (NULL AS STRING)”

注意:将 Nullable 字段写入 Not-null 字段

不能将另一个表的可为空列插入到一个表的非空列中。Flink可以使用COALESCE函数来处理,比如A表的key1是not null,B表的key2是nullable:

INSERT INTO A key1 SELECT COALESCE(key2, ) FROM B

案例:

INSERT INTO test VALUES(1,1,‘order’,‘2023-07-01’,‘1’), (2,2,‘pay’,‘2023-07-01’,‘2’);

INSERT INTO test_p PARTITION(dt=‘2023-07-01’,hh=‘1’) VALUES(3,3, ‘pv’);

– 执行模式区分流、批

INSERT INTO test_p SELECT * from test;

Paimon支持在sink阶段通过partition和bucket对数据进行shuffle。

2.4.2 覆盖数据

覆盖数据只支持batch模式。默认情况下,流式读取将忽略 INSERT OVERWRITE 生成的提交。如果你想读取OVERWRITE的提交,你可以配置streaming-read-overwrite。

RESET ‘execution.checkpointing.interval’;

SET ‘execution.runtime-mode’ = ‘batch’;

1)覆盖未分区的表

INSERT OVERWRITE test VALUES(3,3,‘pay’,‘2023-07-01’,‘2’);

2)覆盖分区表

对于分区表,Paimon默认的覆盖模式是动态分区覆盖(即Paimon只删除insert overwrite数据中出现的分区)。您可以配置动态分区覆盖来更改它。

INSERT OVERWRITE test_p SELECT * from test;

覆盖指定分区:

INSERT OVERWRITE test_p PARTITION (dt = ‘2023-07-01’, hh = ‘2’) SELECT user_id,item_id,behavior from test;

3)清空表

可以使用 INSERT OVERWRITE 通过插入空值来清除表(关闭动态分区覆盖)。

INSERT OVERWRITE test_p/*+ OPTIONS(‘dynamic-partition-overwrite’=‘false’) */ SELECT * FROM test_p WHERE false;

2.4.3 更新数据

目前,Paimon 在 Flink 1.17 及后续版本中支持使用 UPDATE 更新记录。您可以在Flink的批处理模式下执行UPDATE。

只有主键表支持此功能。不支持更新主键。

MergeEngine 需要deduplicate或partial-update才能支持此功能。(默认deduplicate)

UPDATE test SET item_id = 4, behavior = ‘pv’ WHERE user_id = 3;

2.4.4 删除数据

从表中删除(Flink 1.17):

只有写入模式设置为change-log的表支持此功能。(有主键默认就是change-log)

如果表有主键,MergeEngine需要为deduplicate。(默认deduplicate)

DELETE FROM test WHERE user_id = 3;

2.4.5 Merge Into

通过merge into实现行级更新,只有主键表支持此功能。该操作不会产生 UPDATE_BEFORE,因此不建议设置 ‘changelog-producer’ = ‘input’。

merge-into 操作使用“upsert”语义而不是“update”,这意味着如果该行存在,则执行更新,否则执行插入。

1)语法说明:

<FLINK_HOME>/bin/flink run \

  /path/to/paimon-flink-action-0.5-SNAPSHOT.jar \

  merge-into \

  --warehouse <warehouse-path> \

  --database <database-name> \

  --table <target-table> \

  [--target-as <target-table-alias>] \

  --source-table <source-table-name> \

  [--source-sql <sql> ...]\

  --on <merge-condition> \

  --merge-actions <matched-upsert,matched-delete,not-matched-insert,not-matched-by-source-upsert,not-matched-by-source-delete> \

  --matched-upsert-condition <matched-condition> \

  --matched-upsert-set <upsert-changes> \

  --matched-delete-condition <matched-condition> \

  --not-matched-insert-condition <not-matched-condition> \

  --not-matched-insert-values <insert-values> \

  --not-matched-by-source-upsert-condition <not-matched-by-source-condition> \

  --not-matched-by-source-upsert-set <not-matched-upsert-changes> \

  --not-matched-by-source-delete-condition <not-matched-by-source-condition> \

  [--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]]

--source-sql <sql> 可以传递sql来配置环境并在运行时创建源表。

“match”的说明:

(1)matched:更改的行来自目标表,每个行都可以根据条件匹配源表行(source ∩ target):

合并条件(–on)

匹配条件(–matched-xxx-condition)

(2)not-matched:更改的行来自源表,并且根据条件所有行都不能与任何目标表的行匹配(source – target):

合并条件(–on)

不匹配条件(–not-matched-xxx-condition):不能使用目标表的列来构造条件表达式。

(3)not-matched-by-source:更改的行来自目标表,并且基于条件所有行都不能与任何源表的行匹配(target – source):

合并条件(–on)

源不匹配条件(–not-matched-by-source-xxx-condition):不能使用源表的列来构造条件表达式。

2)案例实操

需要用到paimon-flink-action-xxxx.jar,上传:

cp paimon-flink-action-0.5-20230703.002437-53.jar /opt/module/flink-1.17.0/opt

下载地址:

https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-action/0.5-SNAPSHOT/

(1)准备测试表:

use catalog hive_catalog;

create database test;

use test;

 

CREATE TABLE ws1 (

  id INT,

  ts BIGINT,

  vc INT,

  PRIMARY KEY (id) NOT ENFORCED

);

 

INSERT INTO ws1 VALUES(1,1,1),(2,2,2),(3,3,3);

 

 

CREATE TABLE ws_t (

  id INT,

  ts BIGINT,

  vc INT,

  PRIMARY KEY (id) NOT ENFORCED

);

INSERT INTO ws_t VALUES(2,2,2),(3,3,3),(4,4,4),(5,5,5);

(2)案例一: ws_t与ws1匹配id,将ws_t中ts>2的vc改为10,ts<=2的删除

bin/flink run \

/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \

merge-into \

–warehouse hdfs://hadoop102:8020/paimon/hive \

–database test \

–table ws_t \

–source-table test.ws1 \

–on “ws_t.id = ws1.id” \

–merge-actions matched-upsert,matched-delete \

–matched-upsert-condition “ws_t.ts > 2” \

–matched-upsert-set “vc = 10” \

–matched-delete-condition “ws_t.ts <= 2”

(3)案例二: ws_t与ws1匹配id,匹配上的将ws_t中vc加10,ws1中没匹配上的插入ws_t中

bin/flink run \

/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \

merge-into \

–warehouse hdfs://hadoop102:8020/paimon/hive \

–database test \

–table ws_t \

–source-table test.ws1 \

–on “ws_t.id = ws1.id” \

–merge-actions matched-upsert,not-matched-insert \

–matched-upsert-set “vc = ws_t.vc + 10” \

–not-matched-insert-values “*”

(4)案例三: ws_t与ws1匹配id,ws_t中没匹配上的,ts大于4则vc加20,ts=4则删除

bin/flink run \

/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \

merge-into \

–warehouse hdfs://hadoop102:8020/paimon/hive \

–database test \

–table ws_t \

–source-table test.ws1 \

–on “ws_t.id = ws1.id” \

–merge-actions not-matched-by-source-upsert,not-matched-by-source-delete \

–not-matched-by-source-upsert-condition “ws_t.ts > 4” \

–not-matched-by-source-upsert-set “vc = ws_t.vc + 20” \

–not-matched-by-source-delete-condition " ws_t.ts = 4"

(5)案例四: 使用–source-sql创建新catalog下的源表,匹配ws_t的id,没匹配上的插入ws_t

bin/flink run \

/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \

merge-into \

–warehouse hdfs://hadoop102:8020/paimon/hive \

–database test \

–table ws_t \

–source-sql “CREATE CATALOG fs2 WITH (‘type’ = ‘paimon’,‘warehouse’ = ‘hdfs://hadoop102:8020/paimon/fs2’)” \

–source-sql “CREATE DATABASE IF NOT EXISTS fs2.test” \

–source-sql “CREATE TEMPORARY VIEW fs2.test.ws2 AS SELECT id+10 as id,ts,vc FROM test.ws1” \

–source-table fs2.test.ws2 \

–on “ws_t.id = ws2. id” \

–merge-actions not-matched-insert\

–not-matched-insert-values “*”

2.5 DQL查询表

2.5.1 批量查询

就像所有其他表一样,Paimon 表可以使用 SELECT 语句进行查询。

Paimon的批量读取返回表快照中的所有数据。默认情况下,批量读取返回最新快照。

在sql-client中,设置执行模式为批即可:

RESET ‘execution.checkpointing.interval’;

SET ‘execution.runtime-mode’ = ‘batch’;

2.5.1.1 时间旅行

1)读取指定id的快照

SELECT * FROM ws_t /*+ OPTIONS(‘scan.snapshot-id’ = ‘1’) */;

SELECT * FROM ws_t /*+ OPTIONS(‘scan.snapshot-id’ = ‘2’) */;

2)读取指定时间戳的快照

– 查看快照信息

SELECT * FROM ws_t&snapshots;

SELECT * FROM ws_t /*+ OPTIONS(‘scan.timestamp-millis’ = ‘1688369660841’) */;

3)读取指定标签

SELECT * FROM ws_t /*+ OPTIONS(‘scan.tag-name’ = ‘my-tag’) */;

2.5.1.2 增量查询

读取开始快照(不包括)和结束快照之间的增量更改。例如,“3,5”表示快照 3 和快照 5 之间的更改:

SELECT * FROM ws_t /*+ OPTIONS(‘incremental-between’ = ‘3,5’) */;

在batch模式中,不返回DELETE记录,因此-D的记录将被删除。如果你想查看DELETE记录,可以查询audit_log表:

SELECT * FROM ws_t$audit_log /*+ OPTIONS(‘incremental-between’ = ‘3,5’) */;

2.5.2 流式查询

默认情况下,Streaming read 在第一次启动时会生成表上的最新快照,并继续读取最新的更改。

SET ‘execution.checkpointing.interval’=‘30s’;

SET ‘execution.runtime-mode’ = ‘streaming’;

也可以从最新读取,设置扫描模式:

SELECT * FROM ws_t /*+ OPTIONS(‘scan.mode’ = ‘latest’) */

2.5.2.1 时间旅行

如果只想处理今天及以后的数据,则可以使用分区过滤器来实现:

SELECT * FROM test_p WHERE dt > ‘2023-07-01’

如果不是分区表,或者无法按分区筛选,可以使用时间旅行的流读取。

1)从指定快照id开始读取变更数据

SELECT * FROM ws_t /*+ OPTIONS(‘scan.snapshot-id’ = ‘1’) */;

2)从指定时间戳开始读取

SELECT * FROM ws_t /*+ OPTIONS(‘scan.timestamp-millis’ = ‘1688369660841’) */;

3)第一次启动时读取指定快照数据,并继续读取变化

SELECT * FROM ws_t /*+ OPTIONS(‘scan.mode’=‘from-snapshot-full’,‘scan.snapshot-id’ = ‘3’) */;

2.5.2.2 Consumer ID

1)优点

在流式读取表时指定consumer-id,这是一个实验性功能。

当流读取Paimon表时,下一个快照id将被记录到文件系统中。这有几个优点:

当之前的作业停止后,新启动的作业可以继续消耗之前的进度,而不需要从状态恢复。新的读取将从消费者文件中找到的下一个快照 ID 开始读取。

在判断一个快照是否过期时,Paimon会查看文件系统中该表的所有消费者,如果还有消费者依赖这个快照,那么这个快照就不会因为过期而被删除。

当没有水印定义时,Paimon表会将快照中的水印传递到下游Paimon表,这意味着您可以跟踪整个管道的水印进度。

注意:消费者将防止快照过期。可以指定“consumer.expiration-time”来管理消费者的生命周期。

2)案例演示

指定consumer-id开始流式查询:

SELECT * FROM ws_t /*+ OPTIONS(‘consumer-id’ = ‘atguigu’) */;

停掉原先的流式查询,插入数据:

insert into ws_t values(6,6,6);

再次指定consumer-id流式查询:

SELECT * FROM ws_t /*+ OPTIONS(‘consumer-id’ = ‘atguigu’) */;

2.5.3 查询优化

强烈建议在查询时指定分区和主键过滤器,这将加快查询的数据跳过速度。

可以加速数据跳跃的过滤函数有:

=

<

<=

=

IN (…)

LIKE ‘abc%’

IS NULL

Paimon会按主键对数据进行排序,从而加快点查询和范围查询的速度。使用复合主键时,查询过滤器最好形成主键的最左边前缀,以获得良好的加速效果。

CREATE TABLE orders (

catalog_id BIGINT,

order_id BIGINT,

…,

PRIMARY KEY (catalog_id, order_id) NOT ENFORCED – composite primary key

)

通过为主键最左边的前缀指定范围过滤器,查询获得了很好的加速。

SELECT * FROM orders WHERE catalog_id=1025;

SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;

SELECT * FROM orders

WHERE catalog_id=1025jkjkjk

AND order_id>2035 AND order_id<6000;

下面例子的过滤器不能很好地加速查询:

SELECT * FROM orders WHERE order_id=29495;

SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;

2.6 系统表

系统表包含有关每个表的元数据和信息,例如创建的快照和使用的选项。用户可以通过批量查询访问系统表。

2.6.1 快照表 Snapshots Table

通过snapshots表可以查询表的快照历史信息,包括快照中发生的记录数。

SELECT * FROM ws_t$snapshots;

通过查询快照表,可以了解该表的提交和过期信息以及数据的时间旅行。

2.6.2 模式表 Schemas Table

通过schemas表可以查询该表的历史schema。

SELECT * FROM ws_t$schemas;

可以连接快照表和模式表以获取给定快照的字段。

SELECT s.snapshot_id, t.schema_id, t.fields

FROM ws_t s n a p s h o t s s J O I N w s t snapshots s JOIN ws_t snapshotssJOINwstschemas t

ON s.schema_id=t.schema_id where s.snapshot_id=3;

2.6.3 选项表 Options Table

可以通过选项表查询DDL中指定的表的选项信息。未显示的选项将是默认值。

SELECT * FROM ws_t$options;

2.6.4 审计日志表 Audit log Table

如果需要审计表的changelog,可以使用audit_log系统表。通过audit_log表,获取表增量数据时可以获取rowkind列。您可以利用该栏目进行过滤等操作来完成审核。

rowkind 有四个值:

+I:插入操作。

-U:使用更新行的先前内容进行更新操作。

+U:使用更新行的新内容进行更新操作。

-D:删除操作。

SELECT * FROM ws_t$audit_log;

2.6.5 文件表 Files Table

可以查询特定快照表的文件。

– 查询最新快照的文件

SELECT * FROM ws_t$files;

– 查询指定快照的文件

SELECT * FROM ws_t$files /*+ OPTIONS(‘scan.snapshot-id’=‘1’) */;

2.6.6 标签表 Tags Table

通过tags表可以查询表的标签历史信息,包括基于哪些快照进行标签以及快照的一些历史信息。您还可以通过名称获取所有标签名称和时间旅行到特定标签的数据。

SELECT * FROM ws_t$tags;

2.7 维表Join

Paimon支持Lookup Join语法,它用于从 Paimon 查询的数据来补充维度字段。要求一个表具有处理时间属性,而另一个表由查找源连接器支持。

Paimon 支持 Flink 中具有主键的表和append-only的表查找联接。以下示例说明了此功能。

USE CATALOG fs_catalog;

CREATE TABLE customers (

id INT PRIMARY KEY NOT ENFORCED,

name STRING,

country STRING,

zip STRING

);

INSERT INTO customers VALUES(1,‘zs’,‘ch’,‘123’),(2,‘ls’,‘ch’,‘456’), (3,‘ww’,‘ch’,‘789’);

CREATE TEMPORARY TABLE Orders (

order_id INT,

total INT,

customer_id INT,

proc_time AS PROCTIME()

) WITH (

‘connector’ = ‘datagen’,

‘rows-per-second’=‘1’,

‘fields.order_id.kind’=‘sequence’,

‘fields.order_id.start’=‘1’,

‘fields.order_id.end’=‘1000000’,

‘fields.total.kind’=‘random’,

‘fields.total.min’=‘1’,

‘fields.total.max’=‘1000’,

‘fields.customer_id.kind’=‘random’,

‘fields.customer_id.min’=‘1’,

‘fields.customer_id.max’=‘3’

);

SELECT o.order_id, o.total, c.country, c.zip

FROM Orders AS o

JOIN customers

FOR SYSTEM_TIME AS OF o.proc_time AS c

ON o.customer_id = c.id;

Lookup Join算子会在本地维护一个RocksDB缓存并实时拉取表的最新更新。查找连接运算符只会提取必要的数据,因此您的过滤条件对于性能非常重要。

如果Orders(主表)的记录Join缺失,因为customers(查找表)对应的数据还没有准备好。可以考虑使用Flink的Delayed Retry Strategy For Lookup。

2.8 CDC集成

Paimon 支持多种通过模式演化将数据提取到 Paimon 表中的方法。这意味着添加的列会实时同步到Paimon表中,并且不会为此重新启动同步作业。

目前支持以下同步方式:

MySQL同步表:将MySQL中的一张或多张表同步到一张Paimon表中。

MySQL同步数据库:将整个MySQL数据库同步到一个Paimon数据库中。

API同步表:将您的自定义DataStream输入同步到一张Paimon表中。

Kafka同步表:将一个Kafka topic的表同步到一张Paimon表中。

Kafka同步数据库:将一个包含多表的Kafka主题或多个各包含一表的主题同步到一个Paimon数据库中。

2.8.1 MySQL

添加Flink CDC 连接器。

cp flink-sql-connector-mysql-cdc-2.4.0.jar /opt/module/flink-1.17.0/lib

重启yarn-session集群和sql-client。

2.8.1.1 同步表

1)语法说明

<FLINK_HOME>/bin/flink run \

/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \

mysql-sync-table

–warehouse \

–database \

–table \

[–partition-keys ] \

[–primary-keys ] \

[–computed-column <‘column-name=expr-name(args[, …])’> [–computed-column …]] \

[–mysql-conf [–mysql-conf …]] \

[–catalog-conf [–catalog-conf …]] \

[–table-conf [–table-conf …]]

参数说明:

配置描述
–warehousePaimon仓库路径。
–databasePaimon Catalog中的数据库名称。
–tablePaimon 表名称。
–partition-keysPaimon 表的分区键。如果有多个分区键,请用逗号连接,例如“dt,hh,mm”。
–primary-keysPaimon 表的主键。如果有多个主键,请用逗号连接,例如“buyer_id,seller_id”。
–computed-column计算列的定义。参数字段来自 MySQL 表字段名称。
–mysql-confFlink CDC MySQL 源表的配置。每个配置都应以“key=value”的格式指定。主机名、用户名、密码、数据库名和表名是必需配置,其他是可选配置。
–catalog-confPaimon Catalog的配置。每个配置都应以“key=value”的格式指定。
–table-confPaimon 表sink的配置。每个配置都应以“key=value”的格式指定。

如果指定的 Paimon 表不存在,此操作将自动创建该表。其schema将从所有指定的 MySQL 表派生。如果 Paimon 表已存在,则其schema将与所有指定 MySQL 表的schema进行比较。

2)案例实操

(1)MySQL一张表同步到Paimon一张表

bin/flink run \

/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \

mysql-sync-table \

–warehouse hdfs://hadoop102:8020/paimon/hive \

–database test \

–table order_info_cdc \

–primary-keys id \

–mysql-conf hostname=hadoop102 \

–mysql-conf username=root \

–mysql-conf password=000000 \

–mysql-conf database-name=gmall \

–mysql-conf table-name=‘order_info’ \

–catalog-conf metastore=hive \

–catalog-conf uri=thrift://hadoop102:9083 \

–table-conf bucket=4 \

–table-conf changelog-producer=input \

–table-conf sink.parallelism=4

(2)MySQL多张表同步到Paimon一张表

bin/flink run \

/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \

mysql-sync-table \

–warehouse hdfs://hadoop102:8020/paimon/hive \

–database test \

–table order_cdc \

–primary-keys id \

–mysql-conf hostname=hadoop102 \

–mysql-conf username=root \

–mysql-conf password=000000 \

–mysql-conf database-name=gmall \

–mysql-conf table-name=‘order_.*’ \

–catalog-conf metastore=hive \

–catalog-conf uri=thrift://hadoop102:9083 \

–table-conf bucket=4 \

–table-conf changelog-producer=input \

–table-conf sink.parallelism=4

2.8.1.2 同步数据库

1)语法说明

<FLINK_HOME>/bin/flink run \

/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \

mysql-sync-database

–warehouse \

–database \

[–ignore-incompatible <true/false>] \

[–table-prefix ] \

[–table-suffix ] \

[–including-tables <mysql-table-name|name-regular-expr>] \

[–excluding-tables <mysql-table-name|name-regular-expr>] \

[–mysql-conf [–mysql-conf …]] \

[–catalog-conf [–catalog-conf …]] \

[–table-conf [–table-conf …]]

参数说明:

配置描述
–warehousePaimon仓库路径。
–databasePaimon Catalog中的数据库名称。
–ignore-incompatible默认为 false,在这种情况下,如果 Paimon 中存在 MySQL 表名,并且它们的 schema 不兼容,则会抛出异常。您可以显式将其指定为 true 以忽略不兼容的表和异常。
–table-prefix所有需要同步的Paimon表的前缀。例如,如果您希望所有同步表都以“ods_”作为前缀,则可以指定“–table-prefix ods_”。
–table-suffix所有需要同步的Paimon表的后缀。用法与“–table-prefix”相同。
–including-tables用于指定要同步哪些源表。您必须使用“|”分隔多个表,例如:‘a|b|c’。支持正则表达式,例如指定“–include-tables test|paimon.*”表示同步表’test’和所有表都以“paimon”开头。
–excluding-tables用于指定哪些源表不同步。用法与“–include-tables”相同。如果同时指定了“-- except-tables”,则“-- except-tables”的优先级高于“–include-tables”。
–mysql-confFlink CDC MySQL源表的配置。每个配置都应以“key=value”的格式指定。主机名、用户名、密码、数据库名和表名是必需配置,其他是可选配置。
–catalog-confPaimon Catalog的配置。每个配置都应以“key=value”的格式指定。
–table-confPaimon 表sink的配置。每个配置都应以“key=value”的格式指定。

只有具有主键的表才会被同步。

对于每个需要同步的MySQL表,如果对应的Paimon表不存在,该操作会自动创建该表。其schema将从所有指定的 MySQL 表派生。如果 Paimon 表已存在,则其schema将与所有指定 MySQL 表的schema进行比较。

2)案例实操

bin/flink run \

/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \

mysql-sync-database \

–warehouse hdfs://hadoop102:8020/paimon/hive \

–database test \

–table-prefix “ods_” \

–table-suffix “_cdc” \

–mysql-conf hostname=hadoop102 \

–mysql-conf username=root \

–mysql-conf password=000000 \

–mysql-conf database-name=gmall \

–catalog-conf metastore=hive \

–catalog-conf uri=thrift://hadoop102:9083 \

–table-conf bucket=4 \

–table-conf changelog-producer=input \

–table-conf sink.parallelism=4 \

–including-tables ‘user_info|order_info|activity_rule’

3)同步数据库下新添加的表

首先假设 Flink 作业正在同步数据库 source_db 下的表 [product、user、address]。提交作业的命令如下所示:

<FLINK_HOME>/bin/flink run \

/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \

mysql-sync-database \

–warehouse hdfs:///path/to/warehouse \

–database test_db \

–mysql-conf hostname=127.0.0.1 \

–mysql-conf username=root \

–mysql-conf password=123456 \

–mysql-conf database-name=source_db \

–catalog-conf metastore=hive \

–catalog-conf uri=thrift://hive-metastore:9083 \

–table-conf bucket=4 \

–table-conf changelog-producer=input \

–table-conf sink.parallelism=4 \

–including-tables ‘product|user|address’

稍后,我们希望作业也同步包含历史数据的表 [order, custom]。我们可以通过从作业的先前快照中恢复并从而重用作业的现有状态来实现这一点。恢复的作业将首先对新添加的表进行快照,然后自动从之前的位置继续读取变更日志。

从以前的快照恢复并添加新表进行同步的命令如下所示:

<FLINK_HOME>/bin/flink run \

–fromSavepoint savepointPath \

/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \

mysql-sync-database \

–warehouse hdfs:///path/to/warehouse \

–database test_db \

–mysql-conf hostname=127.0.0.1 \

–mysql-conf username=root \

–mysql-conf password=123456 \

–mysql-conf database-name=source_db \

–catalog-conf metastore=hive \

–catalog-conf uri=thrift://hive-metastore:9083 \

–table-conf bucket=4 \

–including-tables ‘product|user|address|order|custom’

2.8.2 Kafka

Flink 提供了几种 Kafka CDC 格式:canal-json、debezium-json、ogg-json、maxwell-json。如果 Kafka 主题中的消息是使用更改数据捕获 (CDC) 工具从另一个数据库捕获的更改事件,则您可以使用 Paimon Kafka CDC。将解析后的INSERT、UPDATE、DELETE消息写入到paimon表中。Paimon官网列出支持的格式如下:

img

添加Kafka连接器:

cp flink-sql-connector-kafka-1.17.0.jar /opt/module/flink-1.17.0/lib

重启yarn-session集群和sql-client。

2.8.2.1 同步表

1)语法说明

将 Kafka 的一个主题中的一张或多张表同步到一张 Paimon 表中。

<FLINK_HOME>/bin/flink run \

/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \

kafka-sync-table

–warehouse \

–database \

–table \

[–partition-keys ] \

[–primary-keys ] \

[–computed-column <‘column-name=expr-name(args[, …])’> [–computed-column …]] \

[–kafka-conf [–kafka-conf …]] \

[–catalog-conf [–catalog-conf …]] \

[–table-conf [–table-conf …]]

参数说明

配置描述
–warehousePaimon仓库路径。
–databasePaimon Catalog中的数据库名称。
–tablePaimon 表名称。
–partition-keysPaimon 表的分区键。如果有多个分区键,请用逗号连接,例如“dt,hh,mm”。
–primary-keysPaimon 表的主键。如果有多个主键,请用逗号连接,例如“buyer_id,seller_id”。
–computed-column计算列的定义。参数字段来自 Kafka 主题的表字段名称。
–kafka-confFlink Kafka 源的配置。每个配置都应以“key=value”的格式指定。 properties.bootstrap.serverstopicproperties.group.idvalue.format 是必需配置,其他配置是可选的。
–catalog-confPaimon Catalog的配置。每个配置都应以“key=value”的格式指定。
–table-confPaimon 表sink的配置。每个配置都应以“key=value”的格式指定。

如果您指定的 Paimon 表不存在,此操作将自动创建该表。它的schema将从所有指定的Kafka topic的表中派生出来,它从topic中获取最早的非DDL数据解析schema。如果 Paimon 表已存在,则其schema将与所有指定 Kafka 主题表的schema进行比较。

2)案例实操

(1)准备数据(canal-json格式)

为了方便,直接将canal格式的数据插入topic里(user_info单表数据):

kafka-console-producer.sh --broker-list hadoop102:9092 --topic paimon_canal

#插入数据如下:

{“data”:[{“id”:“6”,“login_name”:“t7dk2h”,“nick_name”:“冰冰11”,“passwd”:null,“name”:“淳于冰”,“phone_num”:“13178654378”,“email”:“t7dk2h@263.net”,“head_img”:null,“user_level”:“1”,“birthday”:“1997-12-08”,“gender”:null,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689150607000,“id”:1,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“冰冰”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151566836,“type”:“UPDATE”}

{“data”:[{“id”:“7”,“login_name”:“vihcj30p1”,“nick_name”:“豪心22”,“passwd”:null,“name”:“魏豪心”,“phone_num”:“13956932645”,“email”:“vihcj30p1@live.com”,“head_img”:null,“user_level”:“1”,“birthday”:“1991-06-07”,“gender”:“M”,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151623000,“id”:2,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“豪心”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151623139,“type”:“UPDATE”}

{“data”:[{“id”:“8”,“login_name”:“02r2ahx”,“nick_name”:“卿卿33”,“passwd”:null,“name”:“穆卿”,“phone_num”:“13412413361”,“email”:“02r2ahx@sohu.com”,“head_img”:null,“user_level”:“1”,“birthday”:“2001-07-08”,“gender”:“F”,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151626000,“id”:3,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“卿卿”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151626863,“type”:“UPDATE”}

{“data”:[{“id”:“9”,“login_name”:“mjhrxnu”,“nick_name”:“武新44”,“passwd”:null,“name”:“罗武新”,“phone_num”:“13617856358”,“email”:“mjhrxnu@yahoo.com”,“head_img”:null,“user_level”:“1”,“birthday”:“2001-08-08”,“gender”:null,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151630000,“id”:4,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“武新”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151630781,“type”:“UPDATE”}

{“data”:[{“id”:“10”,“login_name”:“kwua2155”,“nick_name”:“纨纨55”,“passwd”:null,“name”:“姜纨”,“phone_num”:“13742843828”,“email”:“kwua2155@163.net”,“head_img”:null,“user_level”:“3”,“birthday”:“1997-11-08”,“gender”:“F”,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151633000,“id”:5,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“纨纨”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151633697,“type”:“UPDATE”}

(2)从一个 Kafka 主题(包含单表数据)同步到 Paimon表

bin/flink run \

  /opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \

  kafka-sync-table \

  --warehouse hdfs://hadoop102:8020/paimon/hive \

  --database test \

  --table kafka_user_info_cdc \

  --primary-keys id \

  --kafka-conf properties.bootstrap.servers=hadoop102:9092 \

  --kafka-conf topic=paimon_canal \

--kafka-conf properties.group.id=atguigu \

--kafka-conf scan.startup.mode=earliest-offset \

  --kafka-conf value.format=canal-json \

  --catalog-conf metastore=hive \

  --catalog-conf uri=thrift://hadoop102:9083 \

  --table-conf bucket=4 \

  --table-conf changelog-producer=input \

  --table-conf sink.parallelism=4

2.8.2.2 同步数据库

1)语法说明

将多个主题或一个主题同步到一个 Paimon 数据库中。

<FLINK_HOME>/bin/flink run \

  /path/to/paimon-flink-action-0.5-SNAPSHOT.jar \

  kafka-sync-database

  --warehouse <warehouse-path> \

  --database <database-name> \

  [--schema-init-max-read <int>] \

  [--ignore-incompatible <true/false>] \

  [--table-prefix <paimon-table-prefix>] \

  [--table-suffix <paimon-table-suffix>] \

  [--including-tables <table-name|name-regular-expr>] \

  [--excluding-tables <table-name|name-regular-expr>] \

  [--kafka-conf <kafka-source-conf> [--kafka-conf <kafka-source-conf> ...]] \

  [--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \

  [--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]

参数说明:

配置描述
–warehouseThe path to Paimon warehouse.通往派蒙仓库的道路。
–databasePaimon 目录中的数据库名称。
–schema-init-max-read如果您的表全部来自某个Topic,您可以设置该参数来初始化需要同步的表数量。默认值为 1000。
–ignore-incompatible默认为 false,在这种情况下,如果 Paimon 中存在 MySQL 表名,并且它们的 schema 不兼容,则会抛出异常。您可以显式将其指定为 true 以忽略不兼容的表和异常。
–table-prefix所有需要同步的Paimon表的前缀。例如,如果您希望所有同步表都以“ods_”作为前缀,则可以指定“–table-prefix ods_”。
–table-suffix所有需要同步的Paimon表的后缀。用法与“–table-prefix”相同。
–including-tables用于指定要同步哪些源表。您必须使用“|”分隔多个表。因为“|”为特殊字符,需要逗号,例如:‘a|b|c’。支持正则表达式,例如指定“–include-tables test|paimon.*”表示同步表’test’和所有表都以“paimon”开头。
–excluding-tables用于指定哪些源表不同步。用法与“–include-tables”相同。如果同时指定了“-- except-tables”,则“-- except-tables”的优先级高于“–include-tables”。
–kafka-confFlink Kafka 源的配置。每个配置都应以“key=value”的格式指定。 properties.bootstrap.serverstopicproperties.group.idvalue.format 是必需配置,其他配置是可选的。有关完整配置列表,请参阅其文档。
–catalog-confPaimon 目录的配置。每个配置都应以“key=value”的格式指定。请参阅此处以获取目录配置的完整列表。
–table-confPaimon 餐桌水槽的配置。每个配置都应以“key=value”的格式指定。请参阅此处了解表配置的完整列表。

只有具有主键的表才会被同步。

对于每个要同步的Kafka主题的表,如果对应的Paimon表不存在,该操作将自动创建该表。它的schema将从所有指定的Kafka topic的表中派生出来,它从topic中获取最早的非DDL数据解析schema。如果 Paimon 表已存在,则其schema将与所有指定 Kafka 主题表的schema进行比较。

2)案例实操

(1)准备数据(canal-json格式)

为了方便,直接将canal格式的数据插入topic里(user_info和spu_info多表数据):

kafka-console-producer.sh --broker-list hadoop102:9092 --topic paimon_canal_2

#插入数据如下(注意不要有空行):

{“data”:[{“id”:“6”,“login_name”:“t7dk2h”,“nick_name”:“冰冰11”,“passwd”:null,“name”:“淳于冰”,“phone_num”:“13178654378”,“email”:“t7dk2h@263.net”,“head_img”:null,“user_level”:“1”,“birthday”:“1997-12-08”,“gender”:null,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689150607000,“id”:1,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“冰冰”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151566836,“type”:“UPDATE”}

{“data”:[{“id”:“7”,“login_name”:“vihcj30p1”,“nick_name”:“豪心22”,“passwd”:null,“name”:“魏豪心”,“phone_num”:“13956932645”,“email”:“vihcj30p1@live.com”,“head_img”:null,“user_level”:“1”,“birthday”:“1991-06-07”,“gender”:“M”,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151623000,“id”:2,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“豪心”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151623139,“type”:“UPDATE”}

{“data”:[{“id”:“8”,“login_name”:“02r2ahx”,“nick_name”:“卿卿33”,“passwd”:null,“name”:“穆卿”,“phone_num”:“13412413361”,“email”:“02r2ahx@sohu.com”,“head_img”:null,“user_level”:“1”,“birthday”:“2001-07-08”,“gender”:“F”,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151626000,“id”:3,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“卿卿”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151626863,“type”:“UPDATE”}

{“data”:[{“id”:“9”,“login_name”:“mjhrxnu”,“nick_name”:“武新44”,“passwd”:null,“name”:“罗武新”,“phone_num”:“13617856358”,“email”:“mjhrxnu@yahoo.com”,“head_img”:null,“user_level”:“1”,“birthday”:“2001-08-08”,“gender”:null,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151630000,“id”:4,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“武新”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151630781,“type”:“UPDATE”}

{“data”:[{“id”:“10”,“login_name”:“kwua2155”,“nick_name”:“纨纨55”,“passwd”:null,“name”:“姜纨”,“phone_num”:“13742843828”,“email”:“kwua2155@163.net”,“head_img”:null,“user_level”:“3”,“birthday”:“1997-11-08”,“gender”:“F”,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151633000,“id”:5,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“纨纨”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151633697,“type”:“UPDATE”}

{“data”:[{“id”:“12”,“spu_name”:“华为智慧屏 4K全面屏智能电视机1”,“description”:“华为智慧屏 4K全面屏智能电视机”,“category3_id”:“86”,“tm_id”:“3”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151648000,“id”:6,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“spu_name”:“华为智慧屏 4K全面屏智能电视机”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151648872,“type”:“UPDATE”}

{“data”:[{“id”:“3”,“spu_name”:“Apple iPhone 13”,“description”:“Apple iPhone 13”,“category3_id”:“61”,“tm_id”:“2”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151661000,“id”:7,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“spu_name”:“Apple iPhone 12”,“description”:“Apple iPhone 12”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151661828,“type”:“UPDATE”}

{“data”:[{“id”:“4”,“spu_name”:“HUAWEI P50”,“description”:“HUAWEI P50”,“category3_id”:“61”,“tm_id”:“3”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151669000,“id”:8,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“spu_name”:“HUAWEI P40”,“description”:“HUAWEI P40”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151669966,“type”:“UPDATE”}

{“data”:[{“id”:“1”,“spu_name”:“小米12sultra”,“description”:“小米12”,“category3_id”:“61”,“tm_id”:“1”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151700000,“id”:9,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“description”:“小米10”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151700998,“type”:“UPDATE”}

再准备一个只包含spu_info单表数据的Topic:

kafka-console-producer.sh --broker-list hadoop102:9092 --topic paimon_canal_1

#插入数据如下:

{“data”:[{“id”:“12”,“spu_name”:“华为智慧屏 4K全面屏智能电视机1”,“description”:“华为智慧屏 4K全面屏智能电视机”,“category3_id”:“86”,“tm_id”:“3”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151648000,“id”:6,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“spu_name”:“华为智慧屏 4K全面屏智能电视机”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151648872,“type”:“UPDATE”}

{“data”:[{“id”:“3”,“spu_name”:“Apple iPhone 13”,“description”:“Apple iPhone 13”,“category3_id”:“61”,“tm_id”:“2”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151661000,“id”:7,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“spu_name”:“Apple iPhone 12”,“description”:“Apple iPhone 12”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151661828,“type”:“UPDATE”}

{“data”:[{“id”:“4”,“spu_name”:“HUAWEI P50”,“description”:“HUAWEI P50”,“category3_id”:“61”,“tm_id”:“3”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151669000,“id”:8,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“spu_name”:“HUAWEI P40”,“description”:“HUAWEI P40”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151669966,“type”:“UPDATE”}

{“data”:[{“id”:“1”,“spu_name”:“小米12sultra”,“description”:“小米12”,“category3_id”:“61”,“tm_id”:“1”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151700000,“id”:9,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“description”:“小米10”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151700998,“type”:“UPDATE”}

(2)从一个 Kafka 主题(包含多表数据)同步到 Paimon 数据库

bin/flink run \

/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \

kafka-sync-database \

–warehouse hdfs://hadoop102:8020/paimon/hive \

–database test \

–table-prefix “t1_” \

–table-suffix “_cdc” \

–schema-init-max-read 500 \

–kafka-conf properties.bootstrap.servers=hadoop102:9092 \

–kafka-conf topic=paimon_canal_2 \

–kafka-conf properties.group.id=atguigu \

–kafka-conf scan.startup.mode=earliest-offset \

–kafka-conf value.format=canal-json \

–catalog-conf metastore=hive \

–catalog-conf uri=thrift://hadoop102:9083 \

–table-conf bucket=4 \

–table-conf changelog-producer=input \

–table-conf sink.parallelism=4

从多个 Kafka 主题同步到 Paimon 数据库

bin/flink run \

/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \

kafka-sync-database \

–warehouse hdfs://hadoop102:8020/paimon/hive \

–database test \

–table-prefix “t2_” \

–table-suffix “_cdc” \

–kafka-conf properties.bootstrap.servers=hadoop102:9092 \

–kafka-conf topic=“paimon_canal;paimon_canal_1” \

–kafka-conf properties.group.id=atguigu \

–kafka-conf scan.startup.mode=earliest-offset \

–kafka-conf value.format=canal-json \

–catalog-conf metastore=hive \

–catalog-conf uri=thrift://hadoop102:9083 \

–table-conf bucket=4 \

–table-conf changelog-producer=input \

–table-conf sink.parallelism=4

2.8.3 支持的schema变更

cdc 集成支持有限的schema变更。目前,框架无法删除列,因此 DROP 的行为将被忽略,RENAME 将添加新列。当前支持的架构更改包括:

(1)添加列。

(2)更改列类型:

从字符串类型(char、varchar、text)更改为长度更长的另一种字符串类型,

从二进制类型(binary、varbinary、blob)更改为长度更长的另一种二进制类型,

从整数类型(tinyint、smallint、int、bigint)更改为范围更广的另一种整数类型,

从浮点类型(float、double)更改为范围更宽的另一种浮点类型。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/808765.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

idea插件开发-自定义语言4-Syntax Highlighter

SyntaxHighlighter用于指定应如何突出显示特定范围的文本&#xff0c;ColorSettingPage可以定义颜色。 一、Syntax Highter 1、文本属性键&#xfeff; TextAttributesKey用于指定应如何突出显示特定范围的文本。不同类型的数据比如关键字、数字、字符串等如果要突出显示都需…

ReID网络(一):MGN网络

Start MGN 1. 序言 现代基于感知的信息中&#xff0c;视觉信息占了80~85%。基于视觉信息的处理和分析被应用到诸如安防、电力、汽车等领域。 以安防市场为例&#xff0c;早在2017年&#xff0c;行业咨询公司IHS Market&#xff0c;我国在公共和私人领域安装有摄像头约1.76亿…

《TCP IP网络编程》第十三章

第 13 章 多种 I/O 函数 13.1 send & recv 函数 Linux 中的 send & recv&#xff1a; send 函数定义&#xff1a; #include <sys/socket.h> ssize_t send(int sockfd, const void *buf, size_t nbytes, int flags); /* 成功时返回发送的字节数&#xff0c;失败…

36.悬浮板

悬浮板 html部分 <div class"container"><div class"square"></div> </div>css部分 *{margin: 0;padding: 0; } body{background-color: #111;height: 100vh;overflow: hidden;display: flex;justify-content: center;align-it…

layui框架学习(33:流加载模块)

Layui中的流加载模块flow主要支持信息流加载和图片懒加载两部分内容&#xff0c;前者是指动态加载后续内容&#xff0c;示例的话可以参考csdn个人博客主页&#xff0c;鼠标移动到页面底部时自动加载更多内容&#xff0c;而后者是指页面显示图片时才会延迟加载图片信息。   fl…

苍穹外卖-day08

苍穹外卖-day08 本项目学自黑马程序员的《苍穹外卖》项目&#xff0c;是瑞吉外卖的Plus版本 功能更多&#xff0c;更加丰富。 结合资料&#xff0c;和自己对学习过程中的一些看法和问题解决情况上传课件笔记 视频&#xff1a;https://www.bilibili.com/video/BV1TP411v7v6/?sp…

第17节 R语言分析:生物统计数据集 R 编码分析和绘图

生物统计数据集 R 编码分析和绘图 生物统计学,用于对给定文件 data.csv 中的医疗数据应用 R 编码,该文件是患者人口统计数据集,包含有关来自各种祖先谱系的个体的标准信息。 数据集特征解释 脚本 output= file("Output.txt") # File name of output log sink(o…

[数据集][目标检测]城市道路井盖破损丢失目标检测1377张

数据集制作单位&#xff1a;未来自主研究中心(FIRC) 数据集格式&#xff1a;Pascal VOC格式(不包含分割路径的txt文件和yolo格式的txt文件&#xff0c;仅仅包含jpg图片和对应的xml) 图片数量(jpg文件个数)&#xff1a;1377 标注数量(xml文件个数)&#xff1a;1377 标注类别数&a…

Spring源码(五)— 解析XML配置文件(二) 定制化标签解析流程

上一篇以bean标签为例&#xff0c;介绍了属于defaultNamesapce标签的解析流程&#xff0c;但是defaultNamespace中默认的标签只有bean、beans、alias、import这四个&#xff0c;而我们平时在xml中配置的标签有很多。那其余的标签是如何解析&#xff1f; 在这篇文章会详细介绍定…

一个监控系统的典型架构

监控系统的典型架构图&#xff0c;从左往右看&#xff0c;采集器是负责采集监控数据的&#xff0c;采集到数据之后传输给服务端&#xff0c;通常是直接写入时序库。然后就是对时序库的数据进行分析和可视化&#xff0c;分析部分最典型的就是告警规则判断&#xff0c;即图上的告…

【李宏毅机器学习·学习笔记】Deep Learning General Guidance

本节课可视为机器学习系列课程的一个前期攻略&#xff0c;这节课主要对Machine Learning 的框架进行了简单的介绍&#xff1b;并以training data上的loss大小为切入点&#xff0c;介绍了几种常见的在模型训练的过程中容易出现的情况。 课程视频&#xff1a; Youtube&#xff1…

【Spring框架】SpringBoot配置文件

目录 配置文件作用application.properties中午乱码问题&#xff1a;配置文件里面的配置类型分类SpringBoot热部署properties基本语法properties配置文件的优缺点&#xff1a;yml配置文件说明yml基本语法配置对象properties VS yml 配置文件作用 整个项⽬中所有重要的数据都是在…

【MyBatis 学习二】增删改查 参数占位符 #{} 和 ${}的使用

目录 一、增删改查 &#x1f337;1、用户类 &#x1f337;2、UserMapper &#x1f337;3、UserMapper.xml &#x1f337;4、测试类Test &#x1f337;5、UserService类 &#x1f337;6、UserController类 &#x1f337;7、注意点总结 二、#{} 和${} 的使用区别 &…

一个 SpringBoot 项目能处理多少请求

首先&#xff0c;这个问题有坑&#xff0c;因为 spring boot 不处理请求&#xff0c;只是把现有的开源组件打包后进行了版本适配、预定义了一些开源组件的配置通过代码的方式进行自动装配进行简化开发。这是 spring boot 的价值。 如果我是面试官&#xff0c;我不会问这种问题。…

BLE基础理论/Android BLE开发示例

参考&#xff1a;https://blog.csdn.net/qq_36075612/article/details/127739150?spm1001.2014.3001.5502 参考&#xff1a; https://blog.csdn.net/qq_36075612/article/details/122772966?spm1001.2014.3001.5502 目录 蓝牙的分类传统蓝牙低功耗蓝牙 蓝牙专业词汇&#xff…

深度剖析C++ 异常机制

传统排错 我们早在 C 程序里面传统的错误处理手段有&#xff1a; 终止程序&#xff0c;如 assert&#xff1b;缺陷是用户难以接受&#xff0c;说白了就是一种及其粗暴的手法&#xff0c;比如发生内存错误&#xff0c;除0错误时就会终止程序。 返回错误码。缺陷是需要我们自己…

docker启动容器报错

报错信息 [rootDream soft]# docker run -it -d -p 8080:8080 tomcat eec9fab6b9ca06d2bbf1467aef05d8020ee60448978e10ac20c38888934f0a0b docker: Error response from daemon: driver failed programming external connectivity on endpoint hungry_euclid (163242f0079e72…

C语言之pthread_cond_t信号变化探究总结(八十)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…

MySQL | 常用命令示例

MySQL | 常用命令示例 一、启停MySQL数据库服务二、连接MySQL数据库三、创建和管理数据库四、创建和管理数据表五、数据备份和恢复六、查询与优化 MySQL是一款常用的关系型数据库管理系统&#xff0c;广泛应用于各个领域。在使用MySQL时&#xff0c;我们经常需要编写一些常用脚…

M 芯片的 macos 系统安装虚拟机 centos7 网络配置

centos 安装之前把网络配置配好或者是把网线插好 第一步找到这个 第二步打开网络适配器 选择图中所指位置 设置好之后 开机启动 centos 第三步 开机以后 编写网卡文件保存 重启网卡就可以了&#xff0c;如果重启网卡不管用&#xff0c;则重启虚拟机即可 “ ifcfg-ens160 ” 这…