Flink 使用mysql cdc实时监听mysql并且同步到StarRocks(SR)
问题:我们在使用过程中请注意cdc版本和flink的版本,目前flink 1.15.2还没有很好地cdc兼容版本有能力的可以自己编译,参见目前版本兼容;
SR官方推荐的是Flink sql版本(支持增删改同步,实时同步) 如果不可以修改或者删除,请检查你的flink版本和cdc版本以及sr sink的版本。
以上是flink sql同步,sr官网有教程,以下以Flink DataStream Api编程示例演示基于mysql cdc同步到SR(增删改)
1、pom.xml
注意:这个例子基于flink 1.13,截止目前不推荐1.15*版本,
1、请注意scala版本jar包注释;
2、scope则是当你需要打包交由flink集群托管时需要设置provided
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.txlc</groupId>
<artifactId>dwh-cdc</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.6</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>test</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java
1.15版本以上 flink-streaming-java
以下需要加上scala版本_2.12
-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>test</scope>-->
</dependency>
<!--<!–1.14*–>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_2.12</artifactId>
<version>${flink.version}</version>
<!– <scope>provided</scope>–>
</dependency>
<!–1.15*–>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader_2.12</artifactId>
<version>${flink.version}</version>
<!– <scope>provided</scope>–>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-test-utils_2.12</artifactId>
<version>${flink.version}</version>
<!– <scope>test</scope>–>
</dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<!-- <version>30.1.1-jre-15.0</version>-->
<version>18.0-13.0</version>
<!-- <version>30.1.1-jre-14.0</version>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.10.3</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.21</version>
<!-- <version>5.1.49</version>-->
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.3.4</version>
<!-- <scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.20.graal</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.2.0</version>
<!-- <scope>provided</scope>-->
</dependency>
<!-- 1.30flink SR官方推荐的 -->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<!-- <version>1.2.4_flink-1.15</version>-->
<!-- <version>1.2.4_flink-1.13_2.12</version>-->
<version>1.2.3_flink-1.13_2.11</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.10</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<!-- The service transformer is needed to merge META-INF/services files -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
<projectName>Apache Flink</projectName>
<encoding>UTF-8</encoding>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<!--<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>-->
</plugins>
</build>
</project>
2、需要自定义序列化格式(删除的关键点)
注意:
1、mysql cdc同步过来的格式并不能直接由SR sink处理,需要拿出来before或者after中的json数据,并且如果你想要更新或者删除需要增加__op
字段.
2、这里同步有个小问题即日期需要自己处理才可以完美同步到SR.
package *;
import com.alibaba.fastjson2.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.text.SimpleDateFormat;
import java.util.Objects;
/**
* 自定义反序列化
*
* @author JGMa
*/
public class TxlcCustomerSchema implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
String topic = sourceRecord.topic();
String[] strings = topic.split("\\.");
// String database = strings[1];
// String table = strings[2];
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Struct value = (Struct) sourceRecord.value();
// JSONObject data = new JSONObject();
Struct before = value.getStruct("before");
JSONObject beforeData = new JSONObject();
if (before != null) {
for (Field field : before.schema().fields()) {
Object o = before.get(field);
beforeData.put(field.name(), o);
}
}
Struct after = value.getStruct("after");
JSONObject afterData = new JSONObject();
if (after != null) {
for (Field field : after.schema().fields()) {
Object o = after.get(field);
afterData.put(field.name(), o);
}
}
Envelope.Operation op = Envelope.operationFor(sourceRecord);
System.out.println("->" + value.toString());
System.out.println("===" + beforeData.toJSONString());
System.out.println(">>>" + afterData.toJSONString());
// JSONObject object = new JSONObject();
// object.put("database", database);
// object.put("table", table);
if (Objects.equals(op, Envelope.Operation.DELETE)) {
// starrocks表需要使用主键模型,另外json中需要有{"__op":1}表示删除,{"__op":0}表示upsert
beforeData.put("__op", 1);
collector.collect(beforeData.toJSONString());
} else if (Objects.equals(op, Envelope.Operation.UPDATE)) {
afterData.put("__op", 0);
}
collector.collect(afterData.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
3、编写source & sink
package *;
import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import com.txlc.cdc.execute.core.TxlcCustomerSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 全量监听mysql同步到starrocks
* Print MySQL Snapshot + Binlog
* <p>
* warning:sr表字段容量要足够,否则会插入NULL
*
* @author JGMa
*/
public class FlinkMysqlCDCStarrocks {
private static final Logger log = LoggerFactory.getLogger(FlinkMysqlCDCStarrocks.class);
public static void main(String[] args) throws Exception {
ParameterTool paramTool = ParameterTool.fromArgs(args);
// String tableName = paramTool.get("table");
// String srcHost = paramTool.get("srcHost");
// String srcDatabase = paramTool.get("srcDatabase");
// String srcUsername = paramTool.get("srcUsername");
// String srcPassword = paramTool.get("srcPassword");
String tableName = "temp_flink";
String srcHost = "192.168.10.14";
String srcDatabase ="xcode";
String srcUsername ="root";
String srcPassword ="123456";
//1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.flinkcdc 做断点续传,需要将flinkcdc读取binlog的位置信息以状态方式保存在checkpoint中即可.
//(1)开启checkpoint 每隔5s 执行一次ck 指定ck的一致性语义
// env.enableCheckpointing(5000);
// CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//
// checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//
// //3.设置任务关闭后,保存最后后一次cp数据.
// checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
//
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
// // 设置checkpoint的超时时间 即一次checkpoint必须在该时间内完成 不然就丢弃
//
// checkpointConfig.setCheckpointTimeout(600000);
// // 设置两次checkpoint之间的最小时间间隔
// checkpointConfig.setMinPauseBetweenCheckpoints(500);
// // 设置并发checkpoint的数目
// checkpointConfig.setMaxConcurrentCheckpoints(1);
// 有界数据流,则会采用批方式进行数据处理
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// 开启checkpoints的外部持久化 这里设置了 清除job时保留checkpoint
// 目前代码不能设置保留的checkpoint个数 默认值时保留一个 假如要保留3个
// 可以在flink-conf.yaml中配置 state.checkpoints.num-retained: 3
// env.setStateBackend();
//5.创建Sources数据源
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname(srcHost)
.port(3306)
.databaseList(srcDatabase)
.tableList(srcDatabase + "." + tableName)
.username(srcUsername)
.password(srcPassword)
// converts SourceRecord to JSON String
.deserializer(new TxlcCustomerSchema())
.build();
//6.添加数据源
DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "[MySQL Source]")
.setParallelism(1);
streamSource.addSink(StarRocksSink.sink(
StarRocksSinkOptions.builder()
.withProperty("connector", "starrocks")
.withProperty("jdbc-url", "jdbc:mysql://192.168.10.245:9030?characterEncoding=utf-8&useSSL=false")
.withProperty("load-url", "192.168.10.11:8030")
.withProperty("username", "root")
.withProperty("password", "123456")
.withProperty("table-name", tableName)
.withProperty("database-name", "data_center")
.withProperty("sink.buffer-flush.interval-ms", "10000")
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
// .withProperty("sink.properties.column_separator", "\\x01")
// .withProperty("sink.properties.row_delimiter", "\\x02")
.withProperty("sink.parallelism", "1")
.build()
)).name(">>>StarRocks Sink<<<");
env.execute("mysql sync StarRocks 表:" + tableName);
}
}
最后:保证mysql和sr中已经建立了表就可以完成一比一同步;
如果解决了你的问题