文章目录
- 前言
- 1.环境准备
- 2.准备数据
- 2.1 MySQL
- 2.2 postgres
- 3.启动flink和flink sql client
- 3.1启动flink
- 3.2启动flink SQL client
- 4.在flink SQL CLI中使用flink DDL创建表
- 4.1开启checkpoint
- 4.2对于数据库中的表 products, orders, shipments, 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据
- 4.3 使用flink SQL将order、products、shipments关联,并将关联后的订单信息写入es
- 5.修改MySQL和postgres中的数据,测试es中的数据是否能实时更新
- 5.1在 MySQL 的 orders 表中插入一条数据
- 5.2在 Postgres 的 shipment 表中插入一条数据
- 5.3在 MySQL 的 orders 表中更新订单的状态
- 5.4在 Postgres 的 shipment 表中更新物流的状态
- 5.5在 MYSQL 的 orders 表中删除一条数据
前言
最近在学习Flink CDC相关的知识,作为小白就从官方的教学示例开始动手,本篇文章分享的是官方教程实践–基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL。
1.环境准备
(1)docker-compose部署mysql、postgres、es、kibana
在yaml所在目录下执行以下命令:
docker-compose up -d
部署yaml文件内容具体如下:
version: '2.1'
services:
postgres:
image: debezium/example-postgres:1.1
ports:
- "5432:5432"
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
mysql:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
elasticsearch:
image: elastic/elasticsearch:7.6.0
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
ports:
- "9200:9200"
- "9300:9300"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
kibana:
image: elastic/kibana:7.6.0
ports:
- "5601:5601"
(2)部署flink
- 下载flink安装包并解压
flink下载地址:https://archive.apache.org/dist/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.11.tgz - 下载以下依赖包并放在flink安装目录的lib包中
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.13.2/flink-sql-connector-elasticsearch7_2.11-1.13.2.jar
https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1.0/flink-sql-connector-mysql-cdc-2.1.0.jar
https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.1.0/flink-sql-connector-postgres-cdc-2.1.0.jar
注意:官方文档上使用snapshot jar包,编译GitHub上master分支后将snapshot jar包放到flink安装目录的lib包后再做后续的测试会出现以下报错:
日志中会出现以下报错信息:日志显示无法执行SQL语句,然后在等待job初始化时发生错误,最后重试次数用完,flink也挂了
2022-10-25 15:17:29,917 INFO org.apache.flink.configuration.Configuration [] - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
2022-10-25 15:18:32,472 WARN org.apache.flink.table.client.cli.CliClient [] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Could not execute SQL statement.
at org.apache.flink.table.client.gateway.local.LocalExecutor.executeModifyOperations(LocalExecutor.java:228) ~[flink-sql-client_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.client.cli.CliClient.callInserts(CliClient.java:518) ~[flink-sql-client_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:507) ~[flink-sql-client_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:409) ~[flink-sql-client_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:327) [flink-sql-client_2.11-1.13.2.jar:1.13.2]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_281]
at org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:327) [flink-sql-client_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297) [flink-sql-client_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221) [flink-sql-client_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) [flink-sql-client_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) [flink-sql-client_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) [flink-sql-client_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) [flink-sql-client_2.11-1.13.2.jar:1.13.2]
Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:777) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:742) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeModifyOperations$4(LocalExecutor.java:226) ~[flink-sql-client_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90) ~[flink-sql-client_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.client.gateway.local.LocalExecutor.executeModifyOperations(LocalExecutor.java:226) ~[flink-sql-client_2.11-1.13.2.jar:1.13.2]
... 12 more
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'insert-into_default_catalog.default_database.enriched_orders'.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:759) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:742) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeModifyOperations$4(LocalExecutor.java:226) ~[flink-sql-client_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90) ~[flink-sql-client_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.client.gateway.local.LocalExecutor.executeModifyOperations(LocalExecutor.java:226) ~[flink-sql-client_2.11-1.13.2.jar:1.13.2]
... 12 more
Caused by: java.lang.RuntimeException: Error while waiting for job to be initialized
at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:160) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:82) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) ~[?:1.8.0_281]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) ~[?:1.8.0_281]
at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457) ~[?:1.8.0_281]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_281]
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) ~[?:1.8.0_281]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) ~[?:1.8.0_281]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) ~[?:1.8.0_281]
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_281]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[?:1.8.0_281]
at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$null$0(AbstractSessionClusterExecutor.java:83) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:144) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:82) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) ~[?:1.8.0_281]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) ~[?:1.8.0_281]
at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457) ~[?:1.8.0_281]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_281]
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) ~[?:1.8.0_281]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) ~[?:1.8.0_281]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) ~[?:1.8.0_281]
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
2.准备数据
2.1 MySQL
docker-compose exec mysql mysql -uroot -p123456
mysql> CREATE DATABASE mydb;
Query OK, 1 row affected (0.01 sec)
mysql> USE mydb;
Database changed
mysql> CREATE TABLE products (
-> id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
-> name VARCHAR(255) NOT NULL,
-> description VARCHAR(512)
-> );
Query OK, 0 rows affected (0.05 sec)
mysql> ALTER TABLE products AUTO_INCREMENT = 101;
Query OK, 0 rows affected (0.02 sec)
Records: 0 Duplicates: 0 Warnings: 0
mysql> INSERT INTO products
-> VALUES (default,"scooter","Small 2-wheel scooter"),
-> (default,"car battery","12V car battery"),
-> (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
-> (default,"hammer","12oz carpenter's hammer"),
-> (default,"hammer","14oz carpenter's hammer"),
-> (default,"hammer","16oz carpenter's hammer"),
-> (default,"rocks","box of assorted rocks"),
-> (default,"jacket","water resistent black wind breaker"),
-> (default,"spare tire","24 inch spare tire");
Query OK, 9 rows affected (0.01 sec)
Records: 9 Duplicates: 0 Warnings: 0
mysql> select * from products;
+-----+--------------------+---------------------------------------------------------+
| id | name | description |
+-----+--------------------+---------------------------------------------------------+
| 101 | scooter | Small 2-wheel scooter |
| 102 | car battery | 12V car battery |
| 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 |
| 104 | hammer | 12oz carpenter's hammer |
| 105 | hammer | 14oz carpenter's hammer |
| 106 | hammer | 16oz carpenter's hammer |
| 107 | rocks | box of assorted rocks |
| 108 | jacket | water resistent black wind breaker |
| 109 | spare tire | 24 inch spare tire |
+-----+--------------------+---------------------------------------------------------+
9 rows in set (0.00 sec)
mysql> CREATE TABLE orders (
-> order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
-> order_date DATETIME NOT NULL,
-> customer_name VARCHAR(255) NOT NULL,
-> price DECIMAL(10, 5) NOT NULL,
-> product_id INTEGER NOT NULL,
-> order_status BOOLEAN NOT NULL -- Whether order has been placed
-> ) AUTO_INCREMENT = 10001;
Query OK, 0 rows affected (0.04 sec)
mysql> show tables;
+----------------+
| Tables_in_mydb |
+----------------+
| orders |
| products |
+----------------+
2 rows in set (0.00 sec)
mysql> INSERT INTO orders
-> VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
-> (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
-> (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
Query OK, 3 rows affected (0.01 sec)
Records: 3 Duplicates: 0 Warnings: 0
mysql> select * from orders;
+----------+---------------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_name | price | product_id | order_status |
+----------+---------------------+---------------+----------+------------+--------------+
| 10001 | 2020-07-30 10:08:22 | Jark | 50.50000 | 102 | 0 |
| 10002 | 2020-07-30 10:11:09 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2020-07-30 12:00:30 | Edward | 25.25000 | 106 | 0 |
+----------+---------------------+---------------+----------+------------+--------------+
3 rows in set (0.01 sec)
2.2 postgres
docker-compose exec postgres psql -h localhost -U postgres
postgres=# CREATE TABLE shipments (
postgres(# shipment_id SERIAL NOT NULL PRIMARY KEY,
postgres(# order_id SERIAL NOT NULL,
postgres(# origin VARCHAR(255) NOT NULL,
postgres(# destination VARCHAR(255) NOT NULL,
postgres(# is_arrived BOOLEAN NOT NULL
postgres(# );
CREATE TABLE
postgres=# ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
ALTER SEQUENCE
postgres=# ALTER TABLE public.shipments REPLICA IDENTITY FULL;
ALTER TABLE
postgres=# INSERT INTO shipments
postgres-# VALUES (default,10001,'Beijing','Shanghai',false),
postgres-# (default,10002,'Hangzhou','Shanghai',false),
postgres-# (default,10003,'Shanghai','Hangzhou',false);
INSERT 0 3
postgres=# select * from shipments;
shipment_id | order_id | origin | destination | is_arrived
-------------+----------+----------+-------------+------------
1001 | 10001 | Beijing | Shanghai | f
1002 | 10002 | Hangzhou | Shanghai | f
1003 | 10003 | Shanghai | Hangzhou | f
(3 rows)
3.启动flink和flink sql client
3.1启动flink
sunxi@bogon flink-1.13.2 % ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host bogon.
Starting taskexecutor daemon on host bogon.
访问flink web UI:http://localhost:8081/#/overview
3.2启动flink SQL client
./bin/sql-client.sh
4.在flink SQL CLI中使用flink DDL创建表
4.1开启checkpoint
Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Session property has been set.
4.2对于数据库中的表 products, orders, shipments, 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据
- MySQL
Flink SQL> CREATE TABLE products (
> id INT,
> name STRING,
> description STRING,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = 'localhost',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = '123456',
> 'database-name' = 'mydb',
> 'table-name' = 'products'
> );
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE orders (
> order_id INT,
> order_date TIMESTAMP(0),
> customer_name STRING,
> price DECIMAL(10, 5),
> product_id INT,
> order_status BOOLEAN,
> PRIMARY KEY (order_id) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = 'localhost',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = '123456',
> 'database-name' = 'mydb',
> 'table-name' = 'orders'
> );
[INFO] Execute statement succeed.
- postgres
Flink SQL> CREATE TABLE shipments (
> shipment_id INT,
> order_id INT,
> origin STRING,
> destination STRING,
> is_arrived BOOLEAN,
> PRIMARY KEY (shipment_id) NOT ENFORCED
> ) WITH (
> 'connector' = 'postgres-cdc',
> 'hostname' = 'localhost',
> 'port' = '5432',
> 'username' = 'postgres',
> 'password' = 'postgres',
> 'database-name' = 'postgres',
> 'schema-name' = 'public',
> 'table-name' = 'shipments'
> );
[INFO] Execute statement succeed.
- es
CREATE TABLE enriched_orders (
> order_id INT,
> order_date TIMESTAMP(0),
> customer_name STRING,
> price DECIMAL(10, 5),
> product_id INT,
> order_status BOOLEAN,
> product_name STRING,
> product_description STRING,
> shipment_id INT,
> origin STRING,
> destination STRING,
> is_arrived BOOLEAN,
> PRIMARY KEY (order_id) NOT ENFORCED
> ) WITH (
> 'connector' = 'elasticsearch-7',
> 'hosts' = 'http://localhost:9200',
> 'index' = 'enriched_orders'
> );
[INFO] Execute statement succeed.
4.3 使用flink SQL将order、products、shipments关联,并将关联后的订单信息写入es
Flink SQL> INSERT INTO enriched_orders
> SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
> FROM orders AS o
> LEFT JOIN products AS p ON o.product_id = p.id
> LEFT JOIN shipments AS s ON o.order_id = s.order_id;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 6c670cd6c73be12d35a7796adb0ec8b4
flink web ui显示如下:
kibana中数据显示如下:
5.修改MySQL和postgres中的数据,测试es中的数据是否能实时更新
5.1在 MySQL 的 orders 表中插入一条数据
mysql> INSERT INTO orders
-> VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
Query OK, 1 row affected (0.01 sec)
5.2在 Postgres 的 shipment 表中插入一条数据
postgres=# INSERT INTO shipments
postgres-# VALUES (default,10004,'Shanghai','Beijing',false);
INSERT 0 1
5.3在 MySQL 的 orders 表中更新订单的状态
mysql> UPDATE orders SET order_status = true WHERE order_id = 10004;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1 Changed: 1 Warnings: 0
5.4在 Postgres 的 shipment 表中更新物流的状态
postgres=# UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;
UPDATE 1
5.5在 MYSQL 的 orders 表中删除一条数据
mysql> DELETE FROM orders WHERE order_id = 10004;
Query OK, 1 row affected (0.00 sec)