60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-完整版

news2024/12/28 3:49:35

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录


本文详细的介绍了Flink CDC的应用,并且提供三个示例进行说明如何使用,即使用Flink sql client的观察数据同步的情况、通过DataStream API 捕获数据变化情况以及通过完整示例应用Flink CDC的ELT操作步骤及验证。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,本文依赖Flink 集群环境、可选的docker环境、Elasticsearch、Kibana、mysql。

本专题分为以下几篇文章:
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-Streaming ELT介绍及示例(2)
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-完整版

一、Flink CDC Connectors介绍

本文介绍的CDC是基于2.4版本,当前版本已经发布至3.0,本Flink 专栏介绍是基于Flink 1.17版本,CDC 2.4版本支持到1.17版本。

1、CDC Connectors介绍及架构图

Apache Flink®的CDC连接器是用于Apache Flnk®的一组源连接器,使用更改数据捕获(CDC)接收来自不同数据库的更改。Apache Flink®的CDC连接器将Debezium集成为捕获数据更改的引擎。因此,它可以充分利用Debezium的能力。

了解更多关于Debezium的信息。

或者参考:37、Flink 的CDC 格式:debezium部署以及mysql示例

在这里插入图片描述

2、支持的连接

在这里插入图片描述

3、Flink CDC与 Flink 版本关系

在这里插入图片描述

4、特性介绍

  • 支持读取数据库快照,并在处理失败后立即继续读取binlog。
  • CDC连接器用于DataStream API,用户可以在一个作业中使用多个数据库和表的更改,而无需部署Debezium和Kafka。
  • 用于Table/SQL API的CDC连接器,用户可以使用SQL DDL创建CDC源以监视单个表上的更改。

下表显示了连接器的当前功能:
在这里插入图片描述

5、flink sql client集成flink cdc

1)、集成步骤

1、需要有一个flink的集群环境
具体搭建参考:2、Flink1.13.5二种部署方式(Standalone、Standalone HA )、四种提交任务方式(前两种及session和per-job)验证详细步骤

2、下载flink cdc的jar并放在FLINK_HOME/lib/目录下面
下载地址:https://github.com/ververica/flink-cdc-connectors/releases

3、重启flink集群

2)、示例:捕获mysql的user表数据变化情况

本示例的前提是设置好了binlog,具体设置方式可以参考文章:
37、Flink 的CDC 格式:debezium部署以及mysql示例

Flink SQL> CREATE TABLE mysql_binlog_user (
>  id INT NOT NULL,
>  name STRING,
>  age INT,
>  PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>  'connector' = 'mysql-cdc',
>  'hostname' = '192.168.10.44',
>  'port' = '3306',
>  'username' = 'root',
>  'password' = '123456',
>  'database-name' = 'cdctest',
>  'table-name' = 'user'
> );
[INFO] Execute statement succeed.

Flink SQL> select * from mysql_binlog_user;
+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |           4 |                        test456 |        8888 |
| +I |           2 |                       alanchan |          20 |
| +I |           3 |                    alanchanchn |          33 |
| +I |           1 |                           alan |          18 |
| -U |           4 |                        test456 |        8888 |
| +U |           4 |                        test123 |        8888 |
| -U |           4 |                        test123 |        8888 |
| +U |           4 |                        test123 |       66666 |
| -D |           4 |                        test123 |       66666 |
| +I |           4 |                   alanchanchn2 |         100 |

Flink SQL> select name ,sum(age) from mysql_binlog_user group by name;
+----+--------------------------------+-------------+
| op |                           name |      EXPR$1 |
+----+--------------------------------+-------------+
| +I |                   alanchanchn2 |         100 |
| +I |                       alanchan |          20 |
| +I |                    alanchanchn |          33 |
| +I |                           alan |          18 |


6、flink datastream API集成flink cdc

本示例是捕获mysql cdctest库的user表数据变化情况。

1)、maven依赖

使用flink cdc添加如下依赖即可,但flink本身的运行环境相关依赖需要添加。

<!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc -->
<dependency>
	<groupId>com.ververica</groupId>
	<artifactId>flink-sql-connector-mysql-cdc</artifactId>
	<version>2.4.0</version>
	<scope>provided</scope>
</dependency>

2)、代码实现


import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import lombok.extern.slf4j.Slf4j;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
@Slf4j
public class TestFlinkCDCFromMysqlDemo {
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.enableCheckpointing(3000);

		MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
				.hostname("192.168.10.44")
				.port(3306)
				.databaseList("cdctest") // 设置捕获的数据库, 如果需要同步整个数据库,请将 tableList 设置为 ".*".
				.tableList("cdctest.user") // 设置捕获的表
				.username("root")
				.password("123456")
				.deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
				.build();

		DataStream<String> result = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
		log.info(result.toString());
		result.map(new MapFunction<String,String>() {

			@Override
			public String map(String value) throws Exception {
				log.info("value ======={}",value);
				return value;
			}
			
		});

		env.execute();
	}
}

3)、验证

在程序运行起来后,对cdctest.user表的数据进行添加、修改、删除操作,观察程序控制台日志输出情况

08:50:26.819 [Source: MySQL Source -> Map (4/16)#0] INFO com.win.TestFlinkCDCFromMysqlDemo  - value ======={"before":null,"after":{"id":2,"name":"alanchan","age":20},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdctest","sequence":null,"table":"user","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1705884626222,"transaction":null}
08:50:26.821 [Source: MySQL Source -> Map (4/16)#0] INFO com.win.TestFlinkCDCFromMysqlDemo  - value ======={"before":null,"after":{"id":3,"name":"alanchanchn","age":33},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdctest","sequence":null,"table":"user","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1705884626223,"transaction":null}
08:50:26.821 [Source: MySQL Source -> Map (4/16)#0] INFO com.win.TestFlinkCDCFromMysqlDemo  - value ======={"before":null,"after":{"id":1,"name":"alan","age":18},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdctest","sequence":null,"table":"user","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1705884626221,"transaction":null}
08:50:26.822 [Source: MySQL Source -> Map (4/16)#0] INFO com.win.TestFlinkCDCFromMysqlDemo  - value ======={"before":null,"after":{"id":4,"name":"test456","age":999000},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdctest","sequence":null,"table":"user","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1705884626223,"transaction":null}
一月 22, 2024 8:50:27 上午 com.github.shyiko.mysql.binlog.BinaryLogClient connect信息: 
Connected to 192.168.10.44:3306 at alan_master_logbin.000004/10816 (sid:6116, cid:565)
08:50:56.030 [Source: MySQL Source -> Map (1/16)#0] INFO com.win.TestFlinkCDCFromMysqlDemo  - value ======={"before":{"id":4,"name":"test456","age":999000},"after":{"id":4,"name":"test456","age":8888},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1705884032000,"snapshot":"false","db":"cdctest","sequence":null,"table":"user","server_id":1,"gtid":null,"file":"alan_master_logbin.000004","pos":11010,"row":0,"thread":557,"query":null},"op":"u","ts_ms":1705884655747,"transaction":null}

4)、debezium数据格式介绍

关于debezium更多的信息可以参考:37、Flink 的CDC 格式:debezium部署以及mysql示例

在flink cdc的版本中,不需要特别对debezium数据格式进行处理,默认的形如下面的内容,也即不带schema的,解析方式参考上例。

{

	"before": {
		"name": "alan_test",
		"scores": 666.0
	},
	"after": {
		"name": "alan_test",
		"scores": 888.0
	},
	"source": {
		"version": "1.7.2.Final",
		"connector": "mysql",
		"name": "ALAN",
		"ts_ms": 1705717298000,
		"snapshot": "false",
		"db": "cdctest",
		"sequence": null,
		"table": "userscoressink",
		"server_id": 1,
		"gtid": null,
		"file": "alan_master_logbin.000004",
		"pos": 4931,
		"row": 0,
		"thread": null,
		"query": null
	},
	"op": "u",
	"ts_ms": 1705717772785,
	"transaction": null

}

在某些情况下可能需要带schema的,形如下例,

如果需要解析则需要将JsonDebeziumDeserializationSchema()改成JsonDebeziumDeserializationSchema(true)

一般推荐使用系统默认的,不带schema的数据格式。


{
	"schema": {
		"type": "struct",
		"fields": [{
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": true,
				"field": "name"
			}, {
				"type": "double",
				"optional": true,
				"field": "scores"
			}],
			"optional": true,
			"name": "ALAN.cdctest.userscoressink.Value",
			"field": "before"
		}, {
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": true,
				"field": "name"
			}, {
				"type": "double",
				"optional": true,
				"field": "scores"
			}],
			"optional": true,
			"name": "ALAN.cdctest.userscoressink.Value",
			"field": "after"
		}, {
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": false,
				"field": "version"
			}, {
				"type": "string",
				"optional": false,
				"field": "connector"
			}, {
				"type": "string",
				"optional": false,
				"field": "name"
			}, {
				"type": "int64",
				"optional": false,
				"field": "ts_ms"
			}, {
				"type": "string",
				"optional": true,
				"name": "io.debezium.data.Enum",
				"version": 1,
				"parameters": {
					"allowed": "true,last,false"
				},
				"default": "false",
				"field": "snapshot"
			}, {
				"type": "string",
				"optional": false,
				"field": "db"
			}, {
				"type": "string",
				"optional": true,
				"field": "sequence"
			}, {
				"type": "string",
				"optional": true,
				"field": "table"
			}, {
				"type": "int64",
				"optional": false,
				"field": "server_id"
			}, {
				"type": "string",
				"optional": true,
				"field": "gtid"
			}, {
				"type": "string",
				"optional": false,
				"field": "file"
			}, {
				"type": "int64",
				"optional": false,
				"field": "pos"
			}, {
				"type": "int32",
				"optional": false,
				"field": "row"
			}, {
				"type": "int64",
				"optional": true,
				"field": "thread"
			}, {
				"type": "string",
				"optional": true,
				"field": "query"
			}],
			"optional": false,
			"name": "io.debezium.connector.mysql.Source",
			"field": "source"
		}, {
			"type": "string",
			"optional": false,
			"field": "op"
		}, {
			"type": "int64",
			"optional": true,
			"field": "ts_ms"
		}, {
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": false,
				"field": "id"
			}, {
				"type": "int64",
				"optional": false,
				"field": "total_order"
			}, {
				"type": "int64",
				"optional": false,
				"field": "data_collection_order"
			}],
			"optional": true,
			"field": "transaction"
		}],
		"optional": false,
		"name": "ALAN.cdctest.userscoressink.Envelope"
	},
	"payload": {
		"before": {
			"name": "alan_test",
			"scores": 666.0
		},
		"after": {
			"name": "alan_test",
			"scores": 888.0
		},
		"source": {
			"version": "1.7.2.Final",
			"connector": "mysql",
			"name": "ALAN",
			"ts_ms": 1705717298000,
			"snapshot": "false",
			"db": "cdctest",
			"sequence": null,
			"table": "userscoressink",
			"server_id": 1,
			"gtid": null,
			"file": "alan_master_logbin.000004",
			"pos": 4931,
			"row": 0,
			"thread": null,
			"query": null
		},
		"op": "u",
		"ts_ms": 1705717772785,
		"transaction": null
	}
}

二、Flink CDC Streaming ELT介绍

1、介绍及架构图

CDC流式ELT框架是一个流数据集成框架,旨在为用户提供更强大的API。它允许用户通过自定义的Flink操作符和作业提交工具来配置他们的数据同步逻辑。该框架优先优化任务提交过程,并提供增强的功能,如整个数据库同步、分片和模式更改同步。

在这里插入图片描述

✅端到端数据集成框架
✅ API,用于数据集成用户轻松构建作业
✅ 源/接收器中的多表支持
✅ 同步整个数据库
✅ 模式进化能力

2、核心概念及流程图

Flink CDC 3.0框架中流动的数据类型被称为Event,表示外部系统生成的更改事件。每个事件都标有发生更改的表ID。事件分为SchemaChangeEvent和DataChangeEvent,分别表示表结构和数据的变化。
在这里插入图片描述

1)、Data Source Connector

Data Source Connector捕获外部系统中的更改,并将其转换为事件作为同步任务的输出。它还为框架提供了一个MetadataAccessor ,用于读取外部系统的元数据。

2)、Data Sink connector

Data Sink connector连接器接收来自Data Source的更改事件,并将其应用于外部系统。此外,MetadataApplier用于将元数据更改从源系统应用到目标系统。

由于事件以流水线方式从上游流向下游,因此数据同步任务被称为数据流水线。数据管道由数据源、路由、转换和数据接收器组成。转换可以向事件添加额外的内容,路由器可以重新映射与事件相对应的表ID。

3)、Table ID

连接到外部系统时,需要与外部系统的存储对象建立映射关系。这就是Table ID所指的内容。
为了与大多数外部系统兼容,表ID由三元组表示:(namespace,schemaName,Table)。连接器需要在Table ID 和外部系统中的存储对象之间建立映射。

例如,MySQL/Doris中的表被映射到(null,database,table),而消息队列系统(如Kafka)中的主题被映射到了(null,null,topic)

4)、Data Source

Data Source用于访问元数据并从外部系统读取更改后的数据。数据源可以同时从多个表中读取数据。

Data Source属性:
类型:源的类型,例如MySQL、Postgres。
名称:源的名称,用户定义(可选,提供默认值)。
源的其他自定义配置。

例如,使用yaml文件来定义mysql源


source:
  type: mysql
  name: mysql-source   #optional,description information
  host: localhost
  port: 3306
  username: admin
  password: pass
  tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_\.*
  

5)、Data Sink

Data Sink用于应用架构更改并将更改数据写入外部系统。一个数据接收器可以同时写入多个表。

Data Sink的属性:
类型:接收器的类型,例如MySQL或PostgreSQL。
名称:接收器的名称,用户定义(可选,提供默认值)。

接收器的其他自定义配置。

例如,使用这个yaml文件来定义kafka接收器:


sink:
  type: kafka
  name: mysink-queue           	# Optional parameter for description purpose
  bootstrap-servers: localhost:9092
  auto-create-table: true      	# Optional parameter for advanced functionalities
  

6)、Route

Route指定每个事件的table ID。最典型的场景是子数据库和子表的合并,将多个上游源表路由到同一个汇点表。

Route,需要以下内容:
source table:源表id,支持正则表达式
sink-table:sink-table id,支持正则表达式
说明:路由规则说明(可选,提供默认值)

例如,如果将数据库“mydb”中的表“web_order”同步到Kafka主题“ods_web.order”,使用此yaml文件来定义此路由:


route:
  source-table: mydb.default.web_order
  sink-table: ods_web_order
  description: sync table to one destination table with given prefix ods_
  

7)、Data Pipeline

由于事件以流水线方式从上游流向下游,因此数据同步任务也称为数据流水线。

Data Pipeline 属性:
名称:管道的名称,将作为作业名称提交到Flink集群。
将实现其他高级功能,如自动表创建、模式演化等。

例如,使用这个yaml文件来定义管道:


pipeline:
  name: mysql-to-kafka-pipeline
  parallelism: 1
  

3、示例:将MySQL的user表数据同步至Elasticsearch表

本示例使用的是Flink 1.18版本。

其安装及验证参考文章:1、Flink1.12.7或1.13.5详细介绍及本地安装部署、验证

基于 Flink CDC 快速构建 MySQL 流式 ETL。本示例演示都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE。

本示例的实现内容是将mysql中cdctest的user表数据不变化的同步至Elasticsearch的e_f_user表中

验证user表的历史数据、新增、修改和删除数据后Elasticsearch的e_f_user表的变化情况

如果需要进行计算则在提交flink任务的时候修改其sql即可,该部分在提交任务处会有说明。

1)、整体架构

本图为盗图,并且本示例不包含postgres数据库。
在这里插入图片描述

以下为实现本示例的具体步骤。

2)、环境准备

本文只是为了简单起见,尽可能的减少外部环境的依赖,所以使用了docker的环境,在实际工作中以实际的环境而定。

在使用flink cdc时可以不需要使用docker和docker-compose。

需要具备的环境是Flink 集群、flink cdc相应的jar包、数据源和数据目标相关的集群或单机。

如果不是用docker环境,本示例的步骤可以跳过环境准备、创建 docker-compose.yml即可。

需要安装docker、docker-compose,不再赘述。
验证环境是否安装成功通过查看其版本号。


[root@server5 ~]# docker --version
Docker version 25.0.0, build e758fe5

[root@server5 ~]# docker-compose --version
docker-compose version 1.29.2, build unknown

3)、创建 docker-compose.yml


version: '2.1'
services:
  mysql:
    image: debezium/example-mysql:1.1
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_USER=root
      - MYSQL_PASSWORD=123456
  elasticsearch:
    image: elastic/elasticsearch:7.6.0
    environment:
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - discovery.type=single-node
    ports:
      - "9200:9200"
      - "9300:9300"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
  kibana:
    image: elastic/kibana:7.6.0
    ports:
      - "5601:5601"


该 Docker Compose 中包含的容器有:

  • MySQL: user表 将存储在该数据库中
  • Elasticsearch: 将表user数据 写到 Elasticsearch
  • Kibana: 用来可视化 ElasticSearch 的数据

在 docker-compose.yml 所在目录下执行下面的命令来启动本示例需要的组件:


docker-compose up -d

该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。
可以通过 docker ps 来观察上述的容器是否正常启动了,


[root@server5 docker-compose]# docker ps
CONTAINER ID   IMAGE                         COMMAND                   CREATED          STATUS          PORTS                                                                                  NAMES
161f678695ee   elastic/elasticsearch:7.6.0   "/usr/local/bin/dock…"   10 minutes ago   Up 10 minutes   0.0.0.0:9200->9200/tcp, :::9200->9200/tcp, 0.0.0.0:9300->9300/tcp, :::9300->9300/tcp   docker-compose_elasticsearch_1
49ceac9a6237   elastic/kibana:7.6.0          "/usr/local/bin/dumb…"   10 minutes ago   Up 10 minutes   0.0.0.0:5601->5601/tcp, :::5601->5601/tcp

也可以通过访问 http://server5:5601/ 来查看 Kibana 是否运行正常。
在这里插入图片描述
以上,则完成了docker组件的启动。

4)、下载 Flink 和所需要的依赖包

1、部署flink 1.18版本

不再赘述

2、下载本示例需要用到的connector

本示例需要用到2个jar包,具体如下

  • flink-cdc-pipeline-connector-mysql-3.0.0.jar
  • flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
[root@server5 bin]# pwd
/usr/local/bigdata/flink-1.18.0/lib

[root@server5 lib]# ll
总用量 254792
-rw-r--r-- 1 root root   23763584 124 16:11 flink-cdc-pipeline-connector-mysql-3.0.0.jar
-rw-r--r-- 1  501 games    196577 1019 07:34 flink-cep-1.18.0.jar
-rw-r--r-- 1  501 games    554410 1019 07:36 flink-connector-files-1.18.0.jar
-rw-r--r-- 1  501 games    102375 1019 07:39 flink-csv-1.18.0.jar
-rw-r--r-- 1  501 games 127071526 1019 07:44 flink-dist-1.18.0.jar
-rw-r--r-- 1  501 games    202901 1019 07:38 flink-json-1.18.0.jar
-rw-r--r-- 1  501 games  21058483 1019 07:43 flink-scala_2.12-1.18.0.jar
-rw-r--r-- 1 root root   28440546 124 16:11 flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
-rw-r--r-- 1  501 games  15527413 1019 07:44 flink-table-api-java-uber-1.18.0.jar
-rw-r--r-- 1  501 games  38202299 1019 07:43 flink-table-planner-loader-1.18.0.jar
-rw-r--r-- 1  501 games   3437154 1019 07:34 flink-table-runtime-1.18.0.jar
-rw-r--r-- 1  501 games    208006 923 2022 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1  501 games    301872 923 2022 log4j-api-2.17.1.jar
-rw-r--r-- 1  501 games   1790452 923 2022 log4j-core-2.17.1.jar
-rw-r--r-- 1  501 games     24279 923 2022 log4j-slf4j-impl-2.17.1.jar

5)、启动flink

[root@server5 bin]# pwd
/usr/local/bigdata/flink-1.18.0/bin

[root@server5 bin]# ll
总用量 2356
-rw-r--r-- 1 501 games 2290658 1019 07:44 bash-java-utils.jar
-rwxr-xr-x 1 501 games   23051 1019 04:07 config.sh
-rwxr-xr-x 1 501 games    1318 1019 04:07 find-flink-home.sh
-rwxr-xr-x 1 501 games    2381 1019 04:07 flink
-rwxr-xr-x 1 501 games    4722 1019 04:07 flink-console.sh
-rwxr-xr-x 1 501 games    6783 1019 04:07 flink-daemon.sh
-rwxr-xr-x 1 501 games    1564 1019 04:07 historyserver.sh
-rwxr-xr-x 1 501 games    2498 1019 04:07 jobmanager.sh
-rwxr-xr-x 1 501 games    1650 1019 04:07 kubernetes-jobmanager.sh
-rwxr-xr-x 1 501 games    1717 1019 04:07 kubernetes-session.sh
-rwxr-xr-x 1 501 games    1770 1019 04:07 kubernetes-taskmanager.sh
-rwxr-xr-x 1 501 games    2994 1019 04:07 pyflink-shell.sh
-rwxr-xr-x 1 501 games    4166 1019 04:07 sql-client.sh
-rwxr-xr-x 1 501 games    3299 1019 04:07 sql-gateway.sh
-rwxr-xr-x 1 501 games    2006 1019 04:07 standalone-job.sh
-rwxr-xr-x 1 501 games    1837 1019 04:07 start-cluster.sh
-rwxr-xr-x 1 501 games    1854 1019 04:07 start-zookeeper-quorum.sh
-rwxr-xr-x 1 501 games    1617 1019 04:07 stop-cluster.sh
-rwxr-xr-x 1 501 games    1845 1019 04:07 stop-zookeeper-quorum.sh
-rwxr-xr-x 1 501 games    2960 1019 04:07 taskmanager.sh
-rwxr-xr-x 1 501 games    1725 1019 04:07 yarn-session.sh
-rwxr-xr-x 1 501 games    2405 1019 04:07 zookeeper.sh

[root@server5 bin]# ./start-cluster.sh 

[root@server5 bin]# jps
10130 Jps
26884 TaskManagerRunner
26537 StandaloneSessionClusterEntrypoint

6)、准备mysql数据

本示例使用的数据库是192.168.10.44上cdctest的user表,创建完成后添加几条数据,具体sql如下

SET NAMES utf8mb4;

-- ----------------------------
-- Table structure for user
-- ----------------------------
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user`  (
  `id` int(11) NOT NULL,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `age` int(11) NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES (1, 'alan', 18);
INSERT INTO `user` VALUES (2, 'alanchan', 20);
INSERT INTO `user` VALUES (3, 'alanchanchn', 33);
INSERT INTO `user` VALUES (4, 'alanchanchn2', 100);

7)、在sql client中的操作

下面的步骤均是在Flink sql client中操作的。

  • 启动sql client
sql-client.sh
  • 设置sql client
SET sql-client.execution.result-mode = tableau;
SET execution.checkpointing.interval = 3s;

Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Execute statement succeed.

Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Execute statement succeed.

  • 创建cdc表
Flink SQL> CREATE TABLE f_user (
>     id INT,
>     name STRING,
>     age INT,
>     PRIMARY KEY (id) NOT ENFORCED
>   ) WITH (
>     'connector' = 'mysql-cdc',
>     'hostname' = '192.168.10.44',
>     'port' = '3306',
>     'username' = 'root',
>     'password' = '123456',
>     'database-name' = 'cdctest',
>     'table-name' = 'user'
>   );
[INFO] Execute statement succeed.

Flink SQL> select * from f_user;
+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |           2 |                       alanchan |          20 |
| +I |           1 |                           alan |          18 |
| +I |           4 |                   alanchanchn2 |         100 |
| +I |           3 |                    alanchanchn |          33 |

Flink SQL> CREATE TABLE e_f_user (
>     id INT,
>     name STRING,
>     age INT,
>     PRIMARY KEY (id) NOT ENFORCED
>  ) WITH (
>      'connector' = 'elasticsearch-7',
>      'hosts' = 'http://server5:9200',
>      'index' = 'e_f_user'
>  );
[INFO] Execute statement succeed.


说明:
本示例使用的是本机的elasticsearch,也可以使用外部的elasticsearch,操作方式一样,不再赘述。
示例如下

# 创建外部elasticsearch的cdc表
Flink SQL> CREATE TABLE e_f_user2 (
>     id INT,
>     name STRING,
>     age INT,
>     PRIMARY KEY (id) NOT ENFORCED
>  ) WITH (
>      'connector' = 'elasticsearch-7',
>      'hosts' = 'http://server1:9200',
>      'index' = 'e_f_user'
>  );
[INFO] Execute statement succeed.

# 提交flink 任务
Flink SQL> insert into e_f_user2 select * from f_user;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 95d884058ad293bb2c567619348c02cd

  • 将f_user数据写入到elasticsearch的e_f_user表中
    本处仅仅是简单的把数据写进去即可,实际上可以做一些复杂的计算操作后再写进去,不再赘述
Flink SQL> insert into e_f_user select * from f_user;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: fadea2443e580767903428d061db955d

此时通过flink的web 界面可以看到Flink 已经有任务在运行了,如下图
在这里插入图片描述

8)、Elasticsearch中查看同步的数据情况

下面是通过kibana中查看写入elasticsearch中的数据,具体操作详细内容可以参看文章:
6、Elasticsearch7.6.1、logstash、kibana介绍及综合示例(ELK、grok插件)
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

9)、CUD(create、update和delete)操作演示

下面是演示在mysql中操作数据,在elasticsearch中的变化情况。

  • 新增一条数据
    mysql中插入一条数据
INSERT INTO `user` VALUES (5000, 'testname', 8888888);

Elasticsearch的kibana刷新后的变化,红框内是刷新后的数据
在这里插入图片描述

  • 修改一条数据
    mysql中针对ID=5000的数据,将name修改成testname5000,age修改成6666
update user set name = 'testname5000',age=6666 where id =5000

Elasticsearch的kibana刷新后的变化,红框内是刷新后的数据
在这里插入图片描述

  • 删除数据
    在mysql中将id=5000的数据删除掉
    Elasticsearch的kibana刷新后的变化
    在这里插入图片描述

10)、环境清理

在 docker-compose.yml 文件所在的目录下执行如下命令停止所有容器:


docker-compose down

以上,本文详细的介绍了Flink CDC的应用,并且提供三个示例进行说明如何使用,即使用Flink sql client的观察数据同步的情况、通过DataStream API 捕获数据变化情况以及通过完整示例应用Flink CDC的ELT操作步骤及验证。

本专题分为以下几篇文章:
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-Streaming ELT介绍及示例(2)
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-完整版

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1421999.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring MVC-01

关于Spring MVC Spring MVC是基于Spring框架基础之上的&#xff0c;主要解决了后端服务器接收客户端提交的请求&#xff0c;并给予响应的相关问题。 MVC Model View Controller&#xff0c;它们分别是&#xff1a; - Model&#xff1a;数据模型&#xff0c;通常由业务逻辑层…

TypeScript(七) 函数

1. TypeScript 函数 1.1. 函数的定义 函数就是包裹在花括号中的代码块&#xff0c;前面使用关键字function。 语法&#xff1a; // An highlighted block function function_name() {// 执行代码 }实例&#xff1a; function test() { // 函数定义console.log("我就是…

Linux系统——文本三剑客

目录 一、grep 1.格式 2.选项 2.1 grep重定向 2.2grep -m 匹配到几次停止 2.3grep -i 忽略大小写 2.4grep -n 显示行号 2.5grep -c 统计匹配行数 2.6grep -A 后几行 2.7grep -C 前后三行 2.8grep -B 前三行 2.9grep -e 或 2.10grep -w 匹配整个单词 2.11grep -r…

聊聊百度造车

10月27日&#xff0c;极越-01上市&#xff0c;一个月后大幅降价&#xff0c;时至今日距离发布已经过去了两个月&#xff0c;官方迟迟不肯公布销量&#xff0c;实际情况大家也都心知肚明。 如今小米汽车技术发布会风头无两&#xff0c;而同一年宣布造车的极越却无人问津&#x…

ElementUI安装与使用指南

Element官网-安装指南 提醒一下&#xff1a;下面实例讲解是在Mac系统演示的&#xff1b; 一、开发环境配置 电脑需要先安装好node.js和vue2或者vue3 安装Node.js Node.js 中文网 安装node.js命令&#xff1a;brew install node node.js安装完后&#xff0c;输入&#xff1…

界面控件DevExtreme v23.2新版亮点 - 全新的Fluent主题

DevExtreme拥有高性能的HTML5 / JavaScript小部件集合&#xff0c;使您可以利用现代Web开发堆栈&#xff08;包括React&#xff0c;Angular&#xff0c;ASP.NET Core&#xff0c;jQuery&#xff0c;Knockout等&#xff09;构建交互式的Web应用程序。从Angular和Reac&#xff0c…

[力扣 Hot100]Day18 矩阵置零

题目描述 给定一个 m x n 的矩阵&#xff0c;如果一个元素为 0 &#xff0c;则将其所在行和列的所有元素都设为 0 。请使用 原地 算法。 出处 思路 在原数组上直接操作势必会出现“冗余”的0&#xff0c;即原本[i,j]处不是0&#xff0c;例如由于i行的其他位置有0导致[i,j]…

【深度学习每日小知识】Precision 精度

Precision 精度 精度是机器学习 &#xff08;ML&#xff09; 中分类器或预测器准确性的度量。它被定义为分类器做出的真正正预测数与分类器所做的正预测总数之比。换句话说&#xff0c;真正正确的是正面预测的比例。 精度是机器学习中的一个关键参数&#xff0c;因为它量化了…

Adobe Photoshop 2024 v25.4.0 - 专业的图片设计软件

Adobe Photoshop 2024 v25.4.0更新了&#xff0c;从照片编辑和合成到数字绘画、动画和图形设计&#xff0c;任何您能想象到的内容都能通过PS2024轻松实现。 利用人工智能技术进行快速编辑。学习新技能并与社区分享您的工作。借助我们的最新版本&#xff0c;做令人惊叹的事情从未…

Java中支持父类转子类,不支持子类转父类吗?

不&#xff0c;我的意思是正好相反。在 Java 中&#xff1a; 子类转父类&#xff08;向上转型&#xff09;&#xff1a;这是自动的且总是安全的。子类是父类的一个特化&#xff0c;因此子类的对象可以被视为是父类的一个实例。例如&#xff0c;如果 ExamineApproveNode 是 Base…

【Tomcat与网络3】Tomcat的整体架构

目录 1.演进1&#xff1a;将连接和处理服务分开 2演进2&#xff1a;Container的演进 3 再论Tomcat的容器结构 4 Tomcat处理请求的过程 5 请求的处理过程与Pipeline-Valve管道 在前面我们介绍了Servlet的基本原理&#xff0c;本文我们结合Tomcat来分析一下如何设计一个大型…

小程序真机调试websocket错误码1006

有心跳机制 模拟器上都没问题连接稳定 一到真机就30秒断连 怎么解决救救我T_T

Flink CEP实现10秒内连续登录失败用户分析

1、什么是CEP&#xff1f; Flink CEP即 Flink Complex Event Processing&#xff0c;是基于DataStream流式数据提供的一套复杂事件处理编程模型。你可以把他理解为基于无界流的一套正则匹配模型&#xff0c;即对于无界流中的各种数据(称为事件)&#xff0c;提供一种组合匹配的…

C#,欧拉数(Eulerian Number)的算法与源代码

1 欧拉数 欧拉数特指 Eulerian Number&#xff0c;不同于 Euler numbers&#xff0c;Eulers number 哦。 组合数学中&#xff0c;欧拉数&#xff08;Eulerian Number&#xff09;是从1到n中正好满足m个元素大于前一个元素&#xff08;具有m个“上升”的排列&#xff09;条件的…

C++ 之LeetCode刷题记录(二十三)

&#x1f604;&#x1f60a;&#x1f606;&#x1f603;&#x1f604;&#x1f60a;&#x1f606;&#x1f603; 开始cpp刷题之旅。 目标&#xff1a;执行用时击败90%以上使用 C 的用户。 118. 杨辉三角 给定一个非负整数 numRows&#xff0c;生成「杨辉三角」的前 numRows …

力扣hot100 括号生成 递归回溯 超简洁版

Problem: 22. 括号生成 Code 使用 static 会被复用 class Solution {List<String> ans new ArrayList<>();public List<String> generateParenthesis(int n){dfs(n, n, "");return ans;}/*** param l 左括号待补个数* param r 右括号待补个数*…

flask+django基于python的网上美食订餐系统_3lyq1

设计旨在提高顾客就餐效率、优化餐厅管理、提高订单准确性和客户的满意度。本系统采用 Python 语言作为开发语言&#xff0c;采用Django框架及其第三方库和第三方工具来进行开发。该方案分为管理员功能模块&#xff0c;商家功能模块以及用户前后功能模块三部分。开发前期根据用…

方法阻塞的解决方案之一

1、简单使用 一个h一个cpp文件 #pragma once #include <iostream> #include <thread> #include <atomic> #include <chrono> #include <string>class Person {public:struct dog {std::string name;int age;};public:void a(std::atomic<bo…

系统架构设计师-22年-下午题目

系统架构设计师-22年-下午题目 更多软考知识请访问 https://ruankao.blog.csdn.net/ 试题一必答&#xff0c;二、三、四、五题中任选两题作答 试题一 (25分) 说明 某电子商务公司拟升级其会员与促销管理系统&#xff0c;向用户提供个性化服务&#xff0c;提高用户的粘性。…