flink- mysql同步数据至starrocks-2.5.0之数据同步
mysql 创建 表
CREATE TABLE `t_user` (
`id` bigint NOT NULL AUTO_INCREMENT,
`user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`age` int DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1;
starrocks 创建 表
starrocks 默认用户是 root ,不需要密码
mysql -P9030 -h127.0.0.1 -uroot --prompt="StarRocks > "
也可以修改密码
SET PASSWORD FOR 'root' = PASSWORD('root');
创建表t_user_sink
:
StarRocks > use flink;
StarRocks > CREATE TABLE `t_user_sink` (
-> `id` bigint NOT NULL,
-> `user_name` varchar(255) DEFAULT NULL,
-> `age` int DEFAULT NULL
-> )
-> PRIMARY KEY(`id`)
-> DISTRIBUTED BY HASH(id) BUCKETS 3
-> PROPERTIES
-> (
-> "replication_num" = "1"
-> );
Query OK, 0 rows affected (0.08 sec)
StarRocks > select * from t_user_sink;
Empty set (0.02 sec)
StarRocks > SHOW PROC '/frontends'\G
*************************** 1. row ***************************
Name: 192.168.16.2_9010_1686905244720
IP: 192.168.16.2
EditLogPort: 9010
HttpPort: 8030
QueryPort: 9030
RpcPort: 9020
Role: LEADER
ClusterId: 776853271
Join: true
Alive: true
ReplayedJournalId: 53824
LastHeartbeat: 2023-06-21 02:01:41
IsHelper: true
ErrMsg:
StartTime: 2023-06-20 11:50:07
Version: 2.5.0-0ee1b3b8c
1 row in set (0.02 sec)
# 查看be
StarRocks > SHOW PROC '/backends'\G
**记住 HttpPort, QueryPort, 代码中要用到 **
sql:
CREATE TABLE `t_user_sink` (
`id` bigint NOT NULL,
`user_name` varchar(255) DEFAULT NULL,
`age` int DEFAULT NULL
)
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(id) BUCKETS 3
PROPERTIES
(
"replication_num" = "1"
);
程序
依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example.db</groupId>
<artifactId>flink-cdc-starrocks</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>flink-cdc-starrocks</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.15.4</flink.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<log4j.version>2.20.0</log4j.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- cdc-->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
<version>2.15.2</version>
</dependency>
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>1.2.7_flink-1.15</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>30.1.1-jre-15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- cdc-->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>compile</scope>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<scope>compile</scope>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<scope>compile</scope>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</dependency>
</dependencies>
<build>
<finalName>flink-cdc-starrocks</finalName>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<!--声明绑定到maven的compile阶段 -->
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- Maven Assembly Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<!-- get all project dependencies -->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<!-- bind to the packaging phase -->
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<!--拷贝依赖到jar外面的lib目录-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.5.0</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<!-- 拷贝项目依赖包到lib/目录下 -->
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
MysqlDbCdc
采用 flink table api 方式
public class MysqlDbCdc {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/* env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//2.Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,需要从Checkpoint或者Savepoint启动程序
//2.1 开启Checkpoint,每隔5秒钟做一次CK
env.enableCheckpointing(5000L);
//2.2 指定CK的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//2.3 设置任务关闭的时候保留最后一次CK数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//2.4 指定从CK自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));*/
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 数据源表
String sourceDDL =
"CREATE TABLE `t_user` (\n" +
" `id` bigint,\n" +
" `user_name` varchar(255),\n" +
" `age` int,\n" +
" PRIMARY KEY (`id`) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = '192.168.x.xx',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = 'root',\n" +
" 'database-name' = 'flink-db',\n" +
" 'server-time-zone' = 'Asia/Shanghai',\n" +
" 'table-name' = 't_user'\n" +
")";
// 输出目标表
String sinkDDL =
"CREATE TABLE `t_user_sink` (\n" +
" `id` bigint,\n" +
" `user_name` varchar(255),\n" +
" `age` int,\n" +
" PRIMARY KEY (`id`) NOT ENFORCED\n" +
") WITH (\n" +
" 'sink.properties.format' = 'json',\n" +
" 'username' = 'root',\n" +
" 'password' = '',\n" +
" 'sink.max-retries' = '10',\n" +
" 'sink.buffer-flush.max-rows' = '1000000',\n" +
" 'sink.buffer-flush.max-bytes' = '300000000',\n" +
" 'sink.properties.strip_outer_array' = 'true',\n" +
" 'sink.buffer-flush.interval-ms' = '15000',\n" +
" 'load-url' = '192.168.x.xx:8030',\n" +
" 'database-name' = 'flink',\n" +
" 'jdbc-url' = 'jdbc:mysql://192.168.x.xx:9030/flink?useUnicode=true" +
"&characterEncoding=UTF-8&userSSL=false&serverTimezone=Asia/Shanghai',\n" +
" 'connector' = 'starrocks',\n" +
" 'table-name' = 't_user_sink'" +
")";
String transformSQL =
"INSERT INTO t_user_sink SELECT * FROM t_user";
tableEnv.executeSql(sourceDDL);
tableEnv.executeSql(sinkDDL);
TableResult tableResult = tableEnv.executeSql(transformSQL);
tableResult.print();
env.execute("abc");
}
}
日志 log4j2.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="error">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
上传flink job
将jar 包上传到 flink dashboard, 且需要将依赖包一并上传,不然 flink 缺少运行 jar包
在 mysql 中插入数据
INSERT INTO `flink-db`.`t_user` (`id`, `user_name`, `age`) VALUES (1, 'hello3', 12);
INSERT INTO `flink-db`.`t_user` (`id`, `user_name`, `age`) VALUES (2, 'abc', 1);
INSERT INTO `flink-db`.`t_user` (`id`, `user_name`, `age`) VALUES (3, 'dsd', 23);
查看 flink dashboard 日志:
这样应该就是同步成功了
查看 starrocks 数据库:
StarRocks > select * from t_user_sink;
+------+-----------+------+
| id | user_name | age |
+------+-----------+------+
| 2 | abc | 1 |
| 3 | dsd | 23 |
| 1 | hello3 | 12 |
+------+-----------+------+
3 rows in set (0.01 sec)
进行 删除后,发现 starrocks 也同步进行了删除
good luck!
参考
- flink-cdc-connectors