## flink- mysql同步数据至starrocks-2.5.0之数据同步

news2024/12/23 22:15:26

flink- mysql同步数据至starrocks-2.5.0之数据同步

mysql 创建 表

CREATE TABLE `t_user` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
  `age` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1;

starrocks 创建 表

starrocks 默认用户是 root ,不需要密码

mysql -P9030 -h127.0.0.1 -uroot --prompt="StarRocks > "

也可以修改密码

SET PASSWORD FOR 'root' = PASSWORD('root');

创建表t_user_sink:


StarRocks > use flink;
StarRocks > CREATE TABLE `t_user_sink` (
    ->   `id` bigint NOT NULL,
    ->   `user_name` varchar(255) DEFAULT NULL,
    ->   `age` int DEFAULT NULL
    -> )
    -> PRIMARY KEY(`id`)
    -> DISTRIBUTED BY HASH(id) BUCKETS 3
    -> PROPERTIES
    -> (
    ->     "replication_num" = "1" 
    -> );
Query OK, 0 rows affected (0.08 sec)

StarRocks > select * from t_user_sink;
Empty set (0.02 sec)

StarRocks > SHOW PROC '/frontends'\G
*************************** 1. row ***************************
             Name: 192.168.16.2_9010_1686905244720
               IP: 192.168.16.2
      EditLogPort: 9010
         HttpPort: 8030
        QueryPort: 9030
          RpcPort: 9020
             Role: LEADER
        ClusterId: 776853271
             Join: true
            Alive: true
ReplayedJournalId: 53824
    LastHeartbeat: 2023-06-21 02:01:41
         IsHelper: true
           ErrMsg: 
        StartTime: 2023-06-20 11:50:07
          Version: 2.5.0-0ee1b3b8c
1 row in set (0.02 sec)

# 查看be
StarRocks > SHOW PROC '/backends'\G

**记住 HttpPort, QueryPort, 代码中要用到 **

sql:

CREATE TABLE `t_user_sink` (
  `id` bigint NOT NULL,
  `user_name` varchar(255) DEFAULT NULL,
  `age` int DEFAULT NULL
)
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(id) BUCKETS 3
PROPERTIES
(
    "replication_num" = "1" 
);

程序

依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example.db</groupId>
    <artifactId>flink-cdc-starrocks</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>flink-cdc-starrocks</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.15.4</flink.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <log4j.version>2.20.0</log4j.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>

    <dependencies>
        <!--        cdc-->
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jdk8</artifactId>
            <version>2.15.2</version>
        </dependency>
        <dependency>
            <groupId>com.starrocks</groupId>
            <artifactId>flink-connector-starrocks</artifactId>
            <version>1.2.7_flink-1.15</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-guava</artifactId>
            <version>30.1.1-jre-15.0</version>
        </dependency>
          <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-table-api-java-bridge</artifactId>
              <version>${flink.version}</version>
          </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--        cdc-->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.33</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <scope>compile</scope>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <scope>compile</scope>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <scope>compile</scope>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.26</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>flink-cdc-starrocks</finalName>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <!--声明绑定到maven的compile阶段 -->
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- Maven Assembly Plugin -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.4.1</version>
                <configuration>
                    <!-- get all project dependencies -->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <!-- bind to the packaging phase -->
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!--拷贝依赖到jar外面的lib目录-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>3.5.0</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>prepare-package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <!-- 拷贝项目依赖包到lib/目录下 -->
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                            <overWriteReleases>false</overWriteReleases>
                            <overWriteSnapshots>false</overWriteSnapshots>
                            <overWriteIfNewer>true</overWriteIfNewer>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

MysqlDbCdc

采用 flink table api 方式

public class MysqlDbCdc {


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      /*  env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        //2.Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,需要从Checkpoint或者Savepoint启动程序
        //2.1 开启Checkpoint,每隔5秒钟做一次CK
        env.enableCheckpointing(5000L);
        //2.2 指定CK的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //2.3 设置任务关闭的时候保留最后一次CK数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //2.4 指定从CK自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));*/
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 数据源表
        String sourceDDL =
               "CREATE TABLE `t_user` (\n" +
                "  `id` bigint,\n" +
                "  `user_name` varchar(255),\n" +
                "  `age` int,\n" +
                "  PRIMARY KEY (`id`) NOT ENFORCED\n" +
                ") WITH (\n" +
                        " 'connector' = 'mysql-cdc',\n" +
                        " 'hostname' = '192.168.x.xx',\n" +
                        " 'port' = '3306',\n" +
                        " 'username' = 'root',\n" +
                        " 'password' = 'root',\n" +
                        " 'database-name' = 'flink-db',\n" +
                       " 'server-time-zone' = 'Asia/Shanghai',\n" +
                        " 'table-name' = 't_user'\n" +
                        ")";
        // 输出目标表
        String sinkDDL =
                "CREATE TABLE `t_user_sink` (\n" +
                        "  `id` bigint,\n" +
                        "  `user_name` varchar(255),\n" +
                        "  `age` int,\n" +
                        "  PRIMARY KEY (`id`) NOT ENFORCED\n" +
                        ") WITH (\n" +
                        "  'sink.properties.format' = 'json',\n" +
                        "  'username' = 'root',\n" +
                        "  'password' = '',\n" +
                        "  'sink.max-retries' = '10',\n" +
                        "  'sink.buffer-flush.max-rows' = '1000000',\n" +
                        "  'sink.buffer-flush.max-bytes' = '300000000',\n" +
                        "  'sink.properties.strip_outer_array' = 'true',\n" +
                        "  'sink.buffer-flush.interval-ms' = '15000',\n" +
                        "  'load-url' = '192.168.x.xx:8030',\n" +
                        "  'database-name' = 'flink',\n" +
                        "  'jdbc-url' = 'jdbc:mysql://192.168.x.xx:9030/flink?useUnicode=true" +
                        "&characterEncoding=UTF-8&userSSL=false&serverTimezone=Asia/Shanghai',\n" +
                        "  'connector' = 'starrocks',\n" +
                        "  'table-name' = 't_user_sink'" +
                        ")";

        String transformSQL =
                "INSERT INTO t_user_sink  SELECT * FROM t_user";
        tableEnv.executeSql(sourceDDL);
       tableEnv.executeSql(sinkDDL);
        TableResult tableResult = tableEnv.executeSql(transformSQL);
        tableResult.print();
        env.execute("abc");
    }

}

日志 log4j2.xml:

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
    </Appenders>
    <Loggers>
        <Root level="error">
            <AppenderRef ref="Console"/>
        </Root>
    </Loggers>
</Configuration>

上传flink job

将jar 包上传到 flink dashboard, 且需要将依赖包一并上传,不然 flink 缺少运行 jar包

在 mysql 中插入数据

INSERT INTO `flink-db`.`t_user` (`id`, `user_name`, `age`) VALUES (1, 'hello3', 12);
INSERT INTO `flink-db`.`t_user` (`id`, `user_name`, `age`) VALUES (2, 'abc', 1);
INSERT INTO `flink-db`.`t_user` (`id`, `user_name`, `age`) VALUES (3, 'dsd', 23);

查看 flink dashboard 日志:

在这里插入图片描述

这样应该就是同步成功了

查看 starrocks 数据库:

StarRocks > select * from t_user_sink;
+------+-----------+------+
| id   | user_name | age  |
+------+-----------+------+
|    2 | abc       |    1 |
|    3 | dsd       |   23 |
|    1 | hello3    |   12 |
+------+-----------+------+
3 rows in set (0.01 sec)

进行 删除后,发现 starrocks 也同步进行了删除

good luck!

参考

  • flink-cdc-connectors

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/670105.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

手机app测试杂谈

手机上的 app 分为基于 HTML5 的 app(类似于 pc 上的 b/S 应用)和本地 app(类似于 C/S 结构)。 所以测试上我们也可以充分吸收 web 的 b/s 和 c/s 测试经验。但是不同于 pc 上的应用 测试&#xff0c;手机上的测试有其独特性 测试前的思考:我们这个产品主要是做什么的?为什么我…

03. 青龙面板配置B站快速升级任务天选时刻脚本(保姆级图文)

目录 功能介绍与环境要求1. 修改配置文件拉取.sh脚本2. 拉取库脚本3. 安装 dotnet 环境4.1 扫码登录方式4.2 b站cookie方式登录&#xff08;如果你扫码成功了就不用看这个了&#xff09;获取cookie新建cookie的环境变量 5. 配置任务设置变量6. 运行每日任务测试一下总结 欢迎关…

搭建cloud项目以及各个依赖和配置说明

文章目录 背景步骤配置父pom文件spring-cloud和spring-cloud-alibaba的区别 添加网关模块配置网关的application.yml文件网关入口 普通模块普通模块的配置文件&#xff1a;普通模块的pom文件启动类&#xff1a;application.yml文件和bootstrap.yml文件的区别 总结 背景 最近在…

chatgpt赋能python:Python描述符:更加灵活的属性管理方式

Python描述符&#xff1a;更加灵活的属性管理方式 Python是一种高级编程语言&#xff0c;它的简单易用、高效性和灵活性使得它成为了现代企业和开发者的首选开发语言之一。然而&#xff0c;在使用Python编写代码时&#xff0c;很多时候需要对属性进行访问和修改&#xff0c;而…

C++单目运算符和特殊运算符的重载(9)

运算符的重载 原理和机制 C中运算符只支持基本数据类型运算&#xff0c;如果需要运算符支持类类型的运算&#xff0c;需要使用C提供的新语法 ------- 运算符的重载 运算符的重载本质上是通过函数来实现的&#xff0c;将类类型数据的运算过程写成一个特殊的函数&#xff0c;当…

YOLOv8 图像分割

一、背景 二、环境配置 官网&#xff1a;Previous PyTorch Versions | PyTorch cuda 11.7 pytorch 1.13.0 torchvision 0.14.0 pytorch-cuda 11.7 三、安装yolov8 官网&#xff1a;GitHub - ultralytics/ultralytics: NEW - YOLOv8 &#x1f680; in PyTorch > ONNX &…

Mendix 10 树形组件分析及应用

一、前言 产品研发目标是服务于业务&#xff0c;解决具体业务问题&#xff0c;带来业务价值。 因此&#xff0c;任何产品功能的推出&#xff0c;都应该秉承“从实践中来&#xff0c;到实践中去”的原则&#xff0c;在实战中发现问题&#xff0c;通过新功能设计和功能改进解决…

chatgpt科普

引言 chatgpt没有国内开放&#xff0c;为什么如此重要。抛开技术细节&#xff0c;少用专业名词&#xff0c;在整体功能上讲解 ChatGPT 的工作原理、制造过程、涌现的能力、未来的影响以及如何应对。让大家明白&#xff1a; ChatGPT是如何回答问题的。 它是怎么被制造的&…

Go语言实现单链表

博主最近在学习Go语言&#xff0c;所以打算更新一期Go语言版本的数据结构。这篇文章将的是Go语言如何实现单链表。 文章目录 前言一、个人见解&#xff0c;为什么学GO&#xff1f;二、Go语言实现单链表1.创建节点2.通过数组创建一个单链表3.遍历单链表4.单链表插入操作4.1 伪代…

HOOPS Visualize SDK 2023 Crack

桌面和移动工程应用程序的图形引擎 HOOPS Visualize 是 3D 图形 SDK&#xff0c;支持来自市场领导者 Hexagon、Trimble、Ansys、SOLIDWORKS、™ Autodesk 等的数百个工程应用程序。 用于 3D CAD 渲染的图形 SDK HOOPS Visualize 是一个以工程为中心的高性能图形库&#xff0c…

安卓蓝牙ATT协议介绍

介绍 ATT&#xff0c;Attribute Protocol&#xff0c;用于发现、读、写对端设备的协议(针对BLE设备) ATT允许蓝牙远程设备&#xff08;比如遥控器&#xff09;作为服务端提供拥有关联值的属性集&#xff0c;让作为客户端的设备&#xff08;比如手机、电视&#xff09;来发现、…

通用能力及AI核心能力表现优异!合合信息智能文档处理系统(IDP)高评级通过中国信通院评估

数字经济快速发展的背后&#xff0c;全球数据总量呈现出爆发式增长趋势。智能文档处理&#xff08;IDP&#xff09;技术能够高效地从多格式文档中捕捉、提取和处理数据&#xff0c;帮助机构和企业大幅提升文档处理效率&#xff0c;节约时间和人力成本。近期&#xff0c;合合信息…

C语言进阶--字符函数与内存函数

目录 一.字符函数 1.strlen函数 模拟实现strlen 2.strcpy函数 模拟实现strcpy 3.strcat函数 模拟实现strcat strcat能否用于自己追加自己&#xff1f; 4.strcmp函数 模拟实现strcmp 5.strncpy函数 6.strncat函数 7.strncmp函数 模拟实现strncmp 8.strstr函数 模…

哨兵2号数据下载与利用Python处理(波段融合、降采样、可视化、裁剪等)

简单介绍 网址:https://scihub.copernicus.eu/dhus/#/home 哨兵2号(Sentinel-2)是欧洲空间局(European Space Agency,简称ESA)推出的一组遥感卫星,旨在为地球观测和环境监测提供高质量的光学图像数据。 S2MSI2A是哨兵2号卫星的一种传感器。 S2MSI2A是哨兵2号卫星搭载…

huggingface - PEFT.参数效率微调

GitHub - huggingface/peft: &#x1f917; PEFT: State-of-the-art Parameter-Efficient Fine-Tuning. 最先进的参数高效微调 (PEFT) 方法 Parameter-Efficient Fine-Tuning (PEFT) 方法可以使预训练语言模型 (PLM) 高效适应各种下游应用程序&#xff0c;而无需微调模型的所有…

记录--Vue3 封装 ECharts 通用组件

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 按需导入的配置文件 配置文件这里就不再赘述&#xff0c;内容都是一样的&#xff0c;主打一个随用随取&#xff0c;按需导入。 import * as echarts from "echarts/core"; // 引入用到的图表…

ctfshow web入门 php特性 web93-97

1.web93 intval($num,0),0代表根据变量类型进行使用哪一种进制进行取整 可以使用8进制&#xff0c;正负数&#xff0c;小数点 payload: 010574 4476.0 4476.0 2.web94 过滤了0&#xff0c;不能使用8进制了&#xff0c;还可以使用小数点&#xff0c;正负数等 payload&#xff1…

【Java算法题】剑指offer_算法之02动态规划

对于动态规划问题&#xff0c;我将拆解为如下五步曲&#xff0c;这五步都搞清楚了&#xff0c;才能说把动态规划真的掌握了&#xff01; 确定dp数组&#xff08;dp table&#xff09;以及下标的含义确定递推公式dp数组如何初始化确定遍历顺序举例推导dp数组 JZ42 连续子数组的…

CTFshow-pwn入门-栈溢出pwn35-pwn36

pwn35 首先还是先下载pwn文件拖进虚拟机加上可执行权限&#xff0c;使用checksec命令查看文件的信息。 chmod x pwn checksec pwn32位的我们直接拖进ida中反编译&#xff1a; // main int __cdecl main(int argc, const char **argv, const char **envp) {FILE *stream; // […

阿里云 OSS介绍

1、什么是阿里云 OSS&#xff1f; OSS 为 Object Storage Service&#xff0c;即对象存储服务。是阿里云提供的海量、安全、低成本、高可靠的云存储服务。 OSS 具有与平台无关的 RESTful API 接口&#xff0c;可以在任意应用、任意时间、任意地点 存储与访问 任何类型的数据。…