前言:原本想讲如何基于Flink实现定制化计算引擎的开发,并以FlinkCDC为例介绍;发现这两个在表达上不知以谁为主,所以先分析FlinkCDC的应用场景和技术实现原理,下一篇再去分析Flink能在哪些方面,做定制化计算引擎的开发操作。本文将从FlinkCDC应用场景开始,然后讲述其基于Flink的实现原理和代码应用,为下一篇介绍基于Flink开发定制化引擎做铺垫。
一、FlinkCDC应用场景
经常有同事或朋友问,Flink和FlinkCDC有什么区别?
Flink是一个流数据处理计算框架,FlinkCDC是数据采集工具:
Flink应用场景对比的是Storm、Spark;
FlinkCDC应用场景对比的是Sqoop、Canal、Maxwell和KafkaConnectSource、Debezium等;
FlinkCDC是Flink社区伙伴对数据采集需求,开发的一个SDK工具,让Flink在数据捕捉场景,使用起来更方便一些。
1.1 CDC的应用场景分析
CDC的英文名是Change Data Capture (变化数据获取);解决的应用场景,是对存储中间件中数据的采集,比如Mysql、Orcle、PGSql、MongoDB等中间件;
采集的方式分为基于查询和基于BinLog两种;
以mysql的数据采集为例:可以通过jdbc批次查询,也可以通过Binlog解析增量数据采集;
两者的一些特性对比如下:
基于查询的CDC直接获取数据,基于binlog的采集需要开启binlog服务。
1.2 FlinkCDC的应用分析
FlinkCDC和Canal实现的应用场景需求是差不多的,都是通过binlog采集增量数据;
但是可用性上的不同是:
对于cannal类似的采集服务需要三步:
- 1.开启mysql的binlog
- 2.将数据写到kafka
- 3.用flink订阅kafka中的数据进行业务需求处理
对于FlinkCDC:只需要在binlog开启后,直接在一个Flink任务内做业务处理(可以写到kafka处理也行);
所有如cannal的采集功能服务,都需要单独维护一套服务,增加了运维负担,FlinkCDC可以当作任务部署到集群,大幅减轻了数据采集的应用难度;
用一个任务就完成这个应用功能:
二、FlinkCDC技术分析与本地操作
2.1 FlinkCDC的技术架构分析
与Canal这些提供服务能力的服务不同,FlinkCdc只是一个任务,可以简单的开发和部署。
Flink是借用了Debezium的功能,Debezium是一个可轻量级嵌入代码逻辑的服务,将Debezium的采集功能,用Flink的sourceFunction包装,然后打包成SDK提供给Flink开发使用;
借用Flink自己的算子和sink能力,可以将采集到的数据以Flink的特性加工数据,并将数据写入Flink内置的connect组件,sink到服务里,如Kafka、Pulser、ES、RabbitMQ、MongoDB等。
2.2 本地操作
2.2.1准备mysql数据库表和数据
use flink_test;
#检测binlog是否开启
show variables like '%log_bin%'
#构建测试表
CREATE TABLE `event_info` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(255) NOT NULL,
`category` varchar(512) DEFAULT NULL,
`pv` int DEFAULT 0,
`uv` int DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
#写入数据
insert into `event_info`(`id`,`name`,`category`,`pv`,`uv`) values
(1,'aaa','nfh',20,8),
(2,'bbb','dfgf',30,2),
(3,'ccc','fsd',40,4),
(4,'ddd','afs',50,7),
(5,'eee','asfa',60,3)
(6,'aaa','nfh',20,8),
(7,'bbb','dfgf',30,2),
(8,'ccc','fsd',40,4),
(9,'ddd','afs',50,7),
(10,'eee','asfa',60,3);
2.2.2 pom文件
注意Flink和FlinkCDC的版本映射,很多显示的可以关联的版本之间是冲突的,这是一个很繁琐的工作,我调试各个版本之间的映射,花了一天左右的时间[求赞求收藏];
下面这是Flink1.14.5版本和FlinkCDC2.2.1版本已经调好的依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>changedateDoris</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.version>2.12</scala.version>
<java.version>1.8</java.version>
<flink.version>1.14.5</flink.version>
<fastjson.version>1.2.62</fastjson.version>
<hadoop.version>2.8.3</hadoop.version>
<scope.mode>compile</scope.mode>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<!-- springboot 依赖-->
<!-- flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- Add log dependencies when debugging locally -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- mysql-connector -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.12</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.2.1 </version>
<exclusions>
<exclusion>
<artifactId>flink-shaded-guava</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<!-- <filters>-->
<!-- <filter>${project.basedir}/src/main/resources/env/application-${profileActive}.properties</filter>-->
<!-- </filters>-->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
<excludes>
<exclude>org.slf4j:slf4j-api:jar:</exclude>
</excludes>
</artifactSet>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<configuration>
<encoding>utf-8</encoding>
<useDefaultDelimiters>true</useDefaultDelimiters>
<delimiters>
<delimiter>$[*]</delimiter>
</delimiters>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
2.2.3 java代码
package yto.com.net.demo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class mysqlBinlogRead {
private static final Logger log = LoggerFactory.getLogger(mysqlBinlogRead.class);
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("ip")
.port(3306)
.databaseList("flink_test")
.tableList("flink_test.event_info")
.username("mysqlUser")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
// .startupOptions(StartupOptions.earliest())
.build();
Configuration configuration = new Configuration();
configuration.setInteger(RestOptions.PORT, 8083);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// enable checkpoint
env.enableCheckpointing(10000);
DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
SingleOutputStreamOperator<String> process = cdcSource.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String row, ProcessFunction<String, String>.Context context, Collector<String> collector) throws Exception {
JSONObject rowJson = JSON.parseObject(row);
String op = rowJson.getString("op");
JSONObject source = rowJson.getJSONObject("source");
String table = source.getString("table");
}
});
cdcSource.print("message=:");
env.execute("flinkCdc Read message");
}
}
2.2.4 运行结果
如图所示,已经写入库的结果通过connect获取,增量数据通过binlog获取;
注意:表中的历史数据过多,全量读取的时候将会内存溢出。