flink SQL实现mysql source sink

news2024/12/22 3:02:25

接上文:一文说清flink从编码到部署上线
环境说明:MySQL:5.7;flink:1.14.0;hadoop:3.0.0;操作系统:CentOS 7.6;JDK:1.8.0_401。

1.代码实现

1.1 EnvUtil实现

EnvUtil用于创建flink的运行环境。

package com.zl.utils;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;
import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;

/**
 * EnvUtil
 * @description:
 */
public class EnvUtil {
    /**
     * 设置flink执行环境
     * @param parallelism 并行度
     */
    public static StreamExecutionEnvironment setFlinkEnv(int parallelism) {
        // System.setProperty("HADOOP_USER_NAME", "用户名") 对应的是 hdfs文件系统目录下的路径:/user/用户名的文件夹名,本文为root
        System.setProperty("HADOOP_USER_NAME", "root");
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        if (parallelism >0 ){
            //设置并行度
            env.setParallelism(parallelism);
        } else {
            env.setParallelism(1);// 默认1
        }

        // 添加重启机制
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, Time.minutes(6)));
        // 启动checkpoint,设置模式为精确一次 (这是默认值),10*60*1000=60000
        env.enableCheckpointing(600000, CheckpointingMode.EXACTLY_ONCE);
        //rocksdb状态后端,启用增量checkpoint
        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
        //设置checkpoint路径
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();

        // 同一时间只允许一个 checkpoint 进行(默认)
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        //最小间隔,10*60*1000=60000
        checkpointConfig.setMinPauseBetweenCheckpoints(60000);
        // 取消任务后,checkpoint仍然保存
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //checkpoint容忍失败的次数
        checkpointConfig.setTolerableCheckpointFailureNumber(5);
        //checkpoint超时时间 默认10分钟
        checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
        //禁用operator chain(方便排查反压)
        env.disableOperatorChaining();
        return env;
    }

    public static StreamTableEnvironment getFlinkTenv(StreamExecutionEnvironment env) {
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
        //设置时区 东八
        tenv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8));
        Configuration configuration = tenv.getConfig().getConfiguration();
        // 开启miniBatch
        configuration.setString("table.exec.mini-batch.enabled", "true");
        // 批量输出的间隔时间
        configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
        // 防止OOM设置每个批次最多缓存数据的条数,可以设为2万条
        configuration.setString("table.exec.mini-batch.size", "20000");
        // 开启LocalGlobal
        configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
        //设置TTL API指定
        tenv.getConfig().setIdleStateRetention(Duration.ofHours(25));

        return tenv;
    }

}

1.2 核心代码

package com.zl;

import com.zl.utils.EnvUtil;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.concurrent.TimeUnit;

public class MysqlExampleSQL {
    public static void main(String[] args) throws Exception {

        // 配置运行环境,并行度1
        StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);
        // 程序间隔离,每个程序单独设置
        env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/MysqlExampleSQL");

        EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);


        /**
         * 数据库版本:8.0.27
         * mysql sink
         */
        tenv.executeSql(
                " CREATE TABLE `sink_products` (" +
                        "id INT," +
                        "name STRING," +
                        "description STRING," +
                        "PRIMARY KEY (`id`) NOT ENFORCED" +
                        ")with (" +
                        "'connector' = 'jdbc'," +
                        "'url' = 'jdbc:mysql://10.86.45.12:30105/flinktest2'," +
                        "'driver' = 'com.mysql.cj.jdbc.Driver'," +//com.mysql.jdbc.Driver,com.mysql.cj.jdbc.Driver
                        "'username' = '" + "root" + "'," +
                        "'password' = '" + "pwd" + "'," +// 记得修改为实际密码
                        "'table-name' = 'products'" +
                        ")"
        );

        /**
         * 数据库版本:5.7.20
         * cdc方式同步业务库数据
         */
        tenv.executeSql("CREATE TABLE `src_products`( " +
                "id INT," +
                "name STRING," +
                "description STRING," +
                "PRIMARY KEY (`id`)  NOT ENFORCED" +
                ") with (" +
                "'connector' = 'mysql-cdc', " +
                "'hostname' = '" + "10.86.37.169" + "', " +
                "'port' = '" + "3306" + "', " +
                "'username' = '" + "root" + "', " +
                "'password' = '" + "pwd" + "', " +// 记得修改为实际密码
                "'database-name' = '" + "flinktest1" + "', " +
                "'table-name' = 'products'," +
                "'debezium.snapshot.mode' = 'initial'" +
                ")"
        );

        /**
         * 数据同步
         */
        TableResult tableResult1 = tenv.executeSql("INSERT INTO sink_products " +
                "SELECT " +
                "id, " +
                "name, " +
                "description " +
                "FROM src_products");

        // 通过 TableResult 来获取作业状态
        tableResult1.print();
    }
}

1.3 pom.xml

注意修改此处:
在这里插入图片描述

2.web UI

在这里插入图片描述
在这里插入图片描述

3.数据库

flinktest1.products
在这里插入图片描述
flinktest2.products
在这里插入图片描述

4.部署

相关构建、部署,参考:一文说清flink从编码到部署上线
部署脚本:

flink run-application -t yarn-application -Dparallelism.default=1 -Denv.java.opts=" -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" -Dtaskmanager.memory.process.size=1g -Dyarn.application.name="FlinkCdcMysql"  -Dtaskmanager.numberOfTaskSlots=1 -c com.zl.MysqlExampleSQL /home/FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

部署日志:
在这里插入图片描述
yarn:
在这里插入图片描述

5.常见问题

5.1 错误1

开发环境,错误日志:
“Caused by: java.lang.NoSuchMethodError: com.mysql.cj.CharsetMapping.getJavaEncodingForMysqlCharset(Ljava/lang/String;)Ljava/lang/String;”

解决:去掉pom.xml中“mysql-connector-java”相关依赖。

5.2 错误2

部署后,错误日志:
①“Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: ‘connector’=‘mysql-cdc’”。
②“Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier ‘jdbc’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’ in the classpath”。

解决:
“flink-connector-jdbc_2.11-1.14.0.jar”、“flink-connector-mysql-cdc-2.4.0.jar”放到服务器flink的lib目录,如下图所示:

在这里插入图片描述

5.3 错误3

部署后,错误日志:
“ Exception java.lang.NoClassDefFoundError: com/mysql/cj/jdbc/Driver”。

解决:
“mysql-connector-java-8.0.27.jar”放到服务器flink的lib目录,如下图所示:
在这里插入图片描述

6.代码

完整代码见:https://gitee.com/core815/flink-cdc-mysql

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2263551.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

知乎 PB 级别 TiDB 数据库集群管控实践

以下文章来源于知乎技术专栏 ,作者代晓磊 导读 在现代企业中,数据库的运维管理至关重要,特别是面对分布式数据库的复杂性和大规模集群的挑战。作为一款兼容 MySQL 协议的分布式关系型数据库,TiDB 在高可用、高扩展性和强一致性方…

SpringBoot+Vue3实现阿里云视频点播 实现教育网站 在上面上传对应的视频,用户开会员以后才能查看视频

要使用阿里云视频点播(VOD)实现一个教育网站,其中用户需要成为会员后才能查看视频,这个过程包括上传视频、设置权限控制、构建前端播放页面以及确保只有付费会员可以访问视频内容。 1. 视频上传与管理 创建阿里云账号&#xff…

【前端】 async 和 await 以及 generator生成器函数

一、背景 这一篇随机主要是想记录一下自己学习js中有关异步内容的东西。然后发现有人拿异步跟生成器函数进行比较了一下,因此一起学习了一下。 二、知识点相关内容及实验test 2.1 generator 生成器函数 generator函数的作用:每次访问返回函数中yield…

智能座舱进阶-应用框架层-Handler分析

首先明确, handler是为了解决单进程内的线程之间的通信问题的。我也需要理解Android系统中进程和线程的概念, APP启动后,会有三四个线程启动起来,其中,有一条mainUITread的线程,专门用来处理UI事件&#xf…

windows openssl编译x64版libssl.lib,编译x64版本libcurl.lib,支持https,vs2015编译器

不要纠结,直接选择用perl编译! 告诫想要用弄成vs编译版的,暂时先别给自己增加麻烦 告诫,以下执行的每一步,都不要纠结 先安装环境 nasm 64位版本 https://www.nasm.us/pub/nasm/releasebuilds/2.16.01/win64/nasm-…

汽车供应链 “剧变”开始,“智能感知潜在龙头”诞生

智能汽车产业链“剧变”已经开启,智能感知软硬件能力的权重正在不断被放大。 比如满足高阶泊车的第二代AK2超声波传感器、满足人机共驾场景需求的电子外后视镜(CMS)、iTOF 3D成像视觉感知(用于舱内监控)等新产品&…

Python+OpenCV系列:AI看图识人、识车、识万物

在人工智能风靡全球的今天,用 Python 和 OpenCV 结合机器学习实现物体识别,不仅是酷炫技能,更是掌握未来的敲门砖。本篇博文手把手教你如何通过摄像头或图片输入,识别人、动物、车辆及其他物品,让你的程序瞬间具备 AI …

JVM 详解

一. JVM 内存区域的划分 1. 程序计数器 程序计数器是JVM中一块比较小的空间, 它保存下一条要执行的指令的地址. [注]: 与CPU的程序计数器不同, 这里的下一条指令不是二进制的机器语言, 而是Java字节码. 2. 栈 保存方法中的局部变量, 方法的形参, 方法之间的调用关系. 栈又…

C# opencvsharp 流程化-脚本化-(2)ROI

ROI ROI也是经常需要使用的方法。特别是在图像编辑中。ROI又称感兴趣的区域,但是图像是矩阵是矩形的,感兴趣的是乱八七糟的,所以还有一个Mask需要了解一下的。 public class RoiStep : IImageProcessingStep{public ImageProcessingStepType…

wazuh-modules-sca-scan

sca模块主函数wm_sca_main -> wm_sca_start 检查policy文件中的每一个项目wm_sca_check_policy static int wm_sca_check_policy(const cJSON * const policy, const cJSON * const checks, OSHash *global_check_list) {if(!policy) {return 1;}const cJSON * const id c…

uniapp 自定义图标03

插入工程,修改名称文件内容 编译运行

在Windows本地用网页查看编辑服务器上的 jupyter notebook

​ Motivation: jupyter notebook 可以存中间变量,方便我调整代码,但是怎么用服务器的GPU并在网页上查看编辑呢? 参考 https://zhuanlan.zhihu.com/p/440080687 服务端(Ubuntu): 激活环境 source activate my_env安装notebook …

【YOLO 项目实战】(11)YOLO8 数据集与模型训练

欢迎关注『youcans动手学模型』系列 本专栏内容和资源同步到 GitHub/youcans 【YOLO 项目实战】(1)YOLO5 环境配置与检测 【YOLO 项目实战】(10)YOLO8 环境配置与推理检测 【YOLO 项目实战】(11)YOLO8 数据…

Ubuntu22.04上安装esp-idf

一、安装准备# 建议使用Ubuntu 20.04 或 Ubuntu 22.04 操作系统 为了在 Ubuntu 22.04 中使用 esp-idf,需要安装一些依赖包 sudo apt-get install git wget flex bison gperf python3\python3-pip python3-venv cmake ninja-build ccache\libffi-dev libssl-dev dfu…

nginx-虚拟主机配置笔记

目录 nginx的安装可以查看nginx安装https://blog.csdn.net/m0_68472908/article/details/144609023?spm1001.2014.3001.5501 一、 基于域名 二、 基于IP 三、 基于端口 nginx的安装可以查看nginx安装https://blog.csdn.net/m0_68472908/article/details/144609023?spm100…

AlipayHK支付宝HK接入-商户收款(PHP)

一打开支付宝国际版 二、点开商户服务 三、下载源码

Soul Android端稳定性背后的那些事

前言:移动应用的稳定性对于用户体验和产品商业价值都有着至关重要的作用。应用崩溃会导致关键业务中断、用户留存率下降、品牌口碑变差、生命周期价值下降等影响,甚至会导致用户流失。因此,稳定性是APP质量构建体系中最基本和最关键的一环。当…

深度学习模型 DeepSeek-VL2 及其消费级显卡需求分析

DeepSeek-VL2 是由 DeepSeek 团队开发的一款先进的视觉语言模型,采用了混合专家(MoE)架构,旨在提升多模态理解能力。该模型包括三个版本:DeepSeek-VL2-Tiny、DeepSeek-VL2-Small 和 DeepSeek-VL2。每个版本具有不同的模…

首批|云轴科技ZStack成为开放智算产业联盟首批会员单位

近日 ,在Linux基金会AI & Data及中国开源软件推进联盟的指导之下,开放智算产业联盟成立大会在北京成功召开。在大会上,联盟首次公布了组织架构并颁发了首批会员单位证书。凭借ZStack AIOS平台智塔和在智算领域的技术创新,云轴…

word实现两栏格式公式居中,编号右对齐

1、确定分栏的宽度 选定一段文字 点击分栏:如本文的宽度为22.08字符 2、将公式设置为 两端对齐,首行无缩进。 将光标放在 公式前面 点击 格式-->段落-->制表位 在“制表位位置”输入-->11.04字符(22.08/211.04字符)&…