一文说清flink从编码到部署上线

news2024/12/24 2:14:49

引言:目前flink的文章比较多,但一般都关注某一特定方面,很少有一个文章,从一个简单的例子入手,说清楚从编码、构建、部署全流程是怎么样的。所以编写本文,自己做个记录备查同时跟大家分享一下。本文以简单的mysql cdc为例展开说明。
环境说明:MySQL:5.7;flink:1.14.0;hadoop:3.0.0;操作系统:CentOS 7.6;JDK:1.8.0_401。

1.MySQL

1.1 创建数据库和测试数据

数据库脚本:

CREATE DATABASE `flinktest`;
USE `flinktest`;
CREATE TABLE `products` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) NOT NULL,
  `description` varchar(512) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8mb4;

insert  into `products`(`id`,`name`,`description`) values 
(1,'aaa','aaaa'),
(2,'ccc','ccc'),
(3,'dd','ddd'),
(4,'eeee','eee'),
(5,'ffff','ffff'),
(6,'hhhh','hhhh'),
(7,'iiii','iiii'),
(8,'jjjj','jjjj');

账号使用root就行。

1.2 开启binlog

参考:https://core815.blog.csdn.net/article/details/144233298
踩坑:测试过程中发现mysql 9.0一直无法获取更新的数据,最终使用的5.7。

2.编码

2.1 主要实现

package com.zl;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

import static com.mysql.cj.conf.PropertyKey.useSSL;

public class MysqlExample {
    public static void main(String[] args) throws Exception {

        List<String> SYNC_TABLES = Arrays.asList("flinktest.products");
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("10.86.37.169")
                .port(3306)
                .databaseList("flinktest")
                .tableList(String.join(",", SYNC_TABLES))
                .username("root")
                .password("pwd")
                .startupOptions(StartupOptions.initial())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        /// 配置flink访问页面-开始
       /* Configuration config = new Configuration();
       // 启用 Web UI,访问地址【http://ip:port】
        config.setBoolean("web.ui.enabled", true); 
        config.setString(RestOptions.BIND_PORT,"8081");
//        这个使用jar直接运行可以,如果提交给yarn会报错,需要改为getExecutionEnvironment()
        StreamExecutionEnvironment env = StreamExecutionEnvironment
        .createLocalEnvironmentWithWebUI(config);*/
        ///配置flink访问页面-结束

        StreamExecutionEnvironment env = StreamExecutionEnvironment
        .getExecutionEnvironment();
        env.setParallelism(1);

        /// 设置CK存储-开始(不需要可注释掉)
        // hadoop部署见:https://core815.blog.csdn.net/article/details/144022938
        // hdfs访问地址见:/home/hadoop-3.3.3/etc/hadoop/core-site.xml
        env.getCheckpointConfig()
        .setCheckpointStorage("hdfs://10.86.97.191:9000"+"/flinktest/");
        env.getCheckpointConfig().setCheckpointInterval(3000);
        /// 设置CK存储-结束

        // 如果不能正常读取mysql的binlog:
        //①可能是mysql没有打开binlog或者mysql版本不支持(当前在mysql5.7.20环境下,功能正常);
        // ②可能是数据库ip、port、账号、密码错误。
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(1).print();

        env.execute("Print MySQL Snapshot + Binlog");

    }

}

2.2 依赖

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.zl.flinkcdc</groupId>
    <artifactId>FlickCDC</artifactId>
    <packaging>jar</packaging>
    <version>1.0-SNAPSHOT</version>
    <name>FlickCDC</name>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink-version>1.14.0</flink-version>
        <flink-cdc-version>2.4.0</flink-cdc-version>
        <hadoop.version>3.0.0</hadoop.version>
        <slf4j.version>1.7.25</slf4j.version>
        <log4j.version>2.16.0</log4j.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>${flink-cdc-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-guava</artifactId>
            <version>30.1.1-jre-15.0</version>
        </dependency>
        <!--<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-guava</artifactId>
            <version>18.0-13.0</version>
        </dependency>-->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.11</artifactId>
            <version>${flink-version}</version>
        </dependency>


        <!--        hadoop相关依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>commons-cli</artifactId>
                    <groupId>commons-cli</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-compress</artifactId>
                    <groupId>org.apache.commons</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>guava</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jackson-annotations</artifactId>
                    <groupId>com.fasterxml.jackson.core</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jackson-core</artifactId>
                    <groupId>com.fasterxml.jackson.core</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jackson-databind</artifactId>
                    <groupId>com.fasterxml.jackson.core</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>asm</artifactId>
                    <groupId>org.ow2.asm</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>avro</artifactId>
                    <groupId>org.apache.avro</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-cli</artifactId>
                    <groupId>commons-cli</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-codec</artifactId>
                    <groupId>commons-codec</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-compress</artifactId>
                    <groupId>org.apache.commons</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-io</artifactId>
                    <groupId>commons-io</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-lang3</artifactId>
                    <groupId>org.apache.commons</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-logging</artifactId>
                    <groupId>commons-logging</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-math3</artifactId>
                    <groupId>org.apache.commons</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>guava</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jackson-databind</artifactId>
                    <groupId>com.fasterxml.jackson.core</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jaxb-api</artifactId>
                    <groupId>javax.xml.bind</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j</artifactId>
                    <groupId>log4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>nimbus-jose-jwt</artifactId>
                    <groupId>com.nimbusds</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>zookeeper</artifactId>
                    <groupId>org.apache.zookeeper</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jsr305</artifactId>
                    <groupId>com.google.code.findbugs</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>gson</artifactId>
                    <groupId>com.google.code.gson</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>commons-cli</artifactId>
                    <groupId>commons-cli</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>guava</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jackson-databind</artifactId>
                    <groupId>com.fasterxml.jackson.core</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>commons-cli</groupId>
            <artifactId>commons-cli</artifactId>
            <version>1.5.0</version>
        </dependency>

        <!--mvn install:install-file -Dfile
        =D:/maven/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
       -DgroupId=org.apache.flink -DartifactId
       =flink-shaded-hadoop-3 -Dversion=3.1.1.7.2.9.0-173-9.0 -Dpackaging=jar-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-3</artifactId>
            <version>3.1.1.7.2.9.0-173-9.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <mainClass>com.zl.MysqlExample</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

完整代码见:https://gitee.com/core815/flink-cdc-mysql

3.打包

mvn版本:3.5.4。
到pom.xml所在路径,执行“mvn package”
在这里插入图片描述
打包效果:
在这里插入图片描述

4.jar直接运行

java -jar FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

在这里插入图片描述

5.flink yarn运行

hadoop、flink、yarn环境见:https://core815.blog.csdn.net/article/details/144022938

把FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar放到“/home”路径下。

执行下面命令:

flink run-application -t yarn-application -Dparallelism.default=1 -Denv.java.opts=" -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" -Dtaskmanager.memory.process.size=1g -Dyarn.application.name="FlinkCdcMysql"  -Dtaskmanager.numberOfTaskSlots=1 -c com.zl.MysqlExample /home/FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

控制台看到如下打印:
在这里插入图片描述
yarn管理页面:
在这里插入图片描述
运行日志查看步骤:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
下面即可看到完整日志:
在这里插入图片描述

6.常见问题

6.1 问题1

日志错误:
The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields.

解决:
修改my.cnf文件。
[mysqld]
default-time-zone=‘Asia/Shanghai’
重启MySQL服务。

6.2 问题2:hdfs

日志错误:
Permission denied: user=PC2023, access=WRITE, inode=“/”:root:supergroup:drwxr-xr-x

解决:

临时解决
hadoop fs -chmod -R 777 /

6.3 问题3:guava30 guava18冲突

分析:
flink 1.13 cdc2.3的组合容易出这个问题。

解决:
参考:https://developer.aliyun.com/ask/574901
flink 使用1.14.0版本;cdc使用2.4.0版本。

6.4 问题4

日志错误:
/user/root/.flink/application_1733492706887_0002/log4j.properties could only be written to 0 of the 1 minReplication nodes

解决:
https://www.pianshen.com/article/1554707968/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2257755.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

IEEE T-RO 软体机器人手指状态估计实现两栖触觉传感

摘要&#xff1a;南方科技大学戴建生院士、林间院士、万芳老师、宋超阳老师团队近期在IEEE T-RO上发表了关于软体机器人手指在两栖环境中本体感知方法的论文。 近日&#xff0c;南方科技大学戴建生院士、林间院士、万芳老师、宋超阳老师团队在机器人顶刊IEEE T-RO上以《Propri…

java类静态初始化死锁问题

问题 前端时间帮同事分析了一个IO线程阻塞问题&#xff0c;该问题导致服务端无法处理任何请求&#xff0c;只能进行重启解决&#xff1b;事发时运维dump了下栈信息&#xff0c;堆栈信息如下图&#xff1a; 从上面可以看到io线程都阻塞于Object.wait()&#xff0c;具体是执行Cl…

厦门凯酷全科技有限公司怎么样?

随着短视频和直播带货的兴起&#xff0c;抖音电商平台迅速崛起&#xff0c;成为众多品牌和商家争夺的新战场。在这个竞争激烈的市场中&#xff0c;如何抓住机遇、实现销售增长&#xff0c;成为了每个企业面临的挑战。厦门凯酷全科技有限公司&#xff08;以下简称“凯酷全”&…

微信小程序uni-app+vue3实现局部上下拉刷新和scroll-view动态高度计算

微信小程序uni-appvue3实现局部上下拉刷新和scroll-view动态高度计算 前言 在uni-appvue3项目开发中,经常需要实现列表的局部上下拉刷新功能。由于网上相关教程较少且比较零散,本文将详细介绍如何使用scroll-view组件实现这一功能,包括动态高度计算、下拉刷新、上拉加载等完整…

在Windows和Ubuntu上安装SDKMAN

文章目录 1. SDKMAN概述2. 安装与使用SDKMAN2.1 在Windows上安装SDKMAN2.1.1 安装Git for Windows2.1.2 安装SDKMAN 2.2 利用SDKMAN管理Java2.2.1 查看所有可用的OpenJDK发行版2.2.2 安装Java2.2.3 查看Java版本2.2.4 shell指定使用某个Java版本 2.3 在Ubuntu上安装SDKMAN2.3.1…

1210 作业

思维导图 作业 使用read和write函数拷贝文件&#xff0c;一半拷进一个文件&#xff0c;另一半拷进另一个文件 #include <myhead.h> int main(int argc, const char *argv[]) {int fd1 open("./z1.txt",O_RDONLY);if(fd1-1){perror("open");return…

牛客网热门Java面试题及答案整理(2024最新版)

当今互联网行业中&#xff0c;Java 作为一种广泛应用的编程语言&#xff0c;对于求职者来说仍是一项受欢迎的技能。然而&#xff0c;随着市场上的开发人员数量越来越多&#xff0c;Java 面试的竞争也愈加激烈。 目前 Java 面试有着以下现状&#xff1a; 面试难度加大与过去相…

【SSH+X11】VsCode使用Remote-SSH在远程服务器的docker中打开Rviz

&#x1f680;今天来分享一下通过VsCode的Remote-SSH插件在远程服务器的docker中打开Rviz进行可视化的方法。 具体流程如下图所示&#xff0c;在操作开始前&#xff0c;请先重启设备&#xff0c;排除之前运行配置的影响&#xff1a; ⭐️ 我这里是使用主机连接服务器&#xff…

iPhone 17 Air基本确认,3个大动作

近段时间&#xff0c;果粉圈都在讨论一个尚未发布的新品&#xff1a;iPhone 17 Air&#xff0c;苹果又要来整新活了。 从供应链消息来看&#xff0c;iPhone 17 Air本质上是Plus的替代品&#xff0c;主要是在维持“大屏”这一卖点的同时&#xff0c;增加了“轻薄”属性&#xff…

浅析OCR技术与大模型的深度融合—中安未来OCR产品优势及前景探索

OCR&#xff08;光学字符识别&#xff09;技术作为一种文本识别工具&#xff0c;已在文档管理、自动化办公和图书数字化等领域发挥了重要作用。然而&#xff0c;随着深度学习和大语言模型&#xff08;LLM&#xff09;的迅猛发展&#xff0c;OCR技术迎来了新的机遇和挑战。如今&…

Android四大组件——Activity(二)

一、Activity之间传递消息 在&#xff08;一&#xff09;中&#xff0c;我们把数据作为独立的键值对进行传递&#xff0c;那么现在把多条数据打包成一个对象进行传递&#xff1a; 1.假设有一个User类的对象&#xff0c;我们先使用putExtra进行传递 activity_demo06.xml <…

STM32G4系列MCU双ADC多通道数据转换的应用

目录 概述 1 STM32Cube配置项目 1.1 基本参数配置 1.1.1 ADC1参数配置 1.1.2 ADC2参数配置 1.2 项目软件架构 2 功能实现 2.1 ADC转换初始化 2.2 ADC数据组包 3 测试函数 3.1 Vofa数据接口 3.2 输入数据 4 测试 4.1 ADC1 通道测试 4.2 ADC2 通道测试 概述 本文…

STM32串口接收与发送(关于为什么接收不需要中断而发生需要以及HAL_UART_Transmit和HAL_UART_Transmit_IT的区别)

一、HAL_UART_Transmit和HAL_UART_Transmit_IT的区别 1. HAL_UART_Transmit_IT&#xff08;非阻塞模式&#xff09;&#xff1a; HAL_UART_Transmit_IT 是非阻塞的传输函数&#xff0c;也就是说&#xff0c;当你调用 HAL_UART_Transmit_IT 时&#xff0c;它不会等到数据完全发…

constexpr、const和 #define 的比较

constexpr、const 和 #define 的比较 一、定义常量 constexpr 定义&#xff1a;constexpr用于定义在编译期可求值的常量表达式。示例&#xff1a;constexpr int x 5;这里&#xff0c;x的值在编译期就确定为5。 const 定义&#xff1a;const表示变量在运行期间不能被修改&…

BMS电池管理系统

一.项目简介 1.该项目是基于BQ7692003PWR STM32F103C8T6研发的一块锂电池控制板&#xff0c;本控制板可供五串18650锂电池&#xff08;目前软件仅支持三元锂类型&#xff0c;标称电压为4.2V&#xff09;串联使用&#xff0c;电芯均衡采用被动均衡方式 二.本项目功能 1.监控任…

Milvus向量数据库01-基础概念

Milvus向量数据库01-基础概念 Zilliz Cloud 集群由全托管 Milvus 实例及相关计算资源构成。您可以在 Zilliz Cloud 集群中创建 Collection&#xff0c;然后在 Collection 中插入 Entity。Zilliz Cloud 集群中的 Collection 类似于关系型数据库中的表。Collection 中的 Entity …

【OpenCV】模板匹配

理论 模板匹配是一种在较大图像中搜索和查找模板图像位置的方法。为此&#xff0c;OpenCV 带有一个函数 cv.matchTemplate&#xff08;&#xff09; 。它只是在输入图像上滑动模板图像&#xff08;如在 2D 卷积中&#xff09;&#xff0c;并比较模板图像下的模板和输入图像的补…

深入解析下oracle的number底层存储格式

oracle数据库中&#xff0c;number数据类型用来存储数值数据&#xff0c;它既可以存储负数数值&#xff0c;也可以存储正数数值。相对于其他类型数据&#xff0c;number格式的数据底层存储格式要复杂得多。今天我们就详细探究下oracle的number底层存储格式。 一、环境搭建 1.…

MySQL Binlog 日志监听与 Spring 集成实战

MySQL Binlog 日志监听与 Spring 集成实战 binlog的三种模式 MySQL 的二进制日志&#xff08;binlog&#xff09;有三种常见的格式&#xff1a;Statement 模式、Row 模式和Mixed 模式。每种模式的设计目标不同&#xff0c;适用于不同的场景&#xff0c;以下是它们的详细对比和…

Vmware Vcenter7.0证书web续期发生错误

1. 故障描述 vSphere Client 版本 7.0.2.00200 vCenter _MACHINE_CERT快到期了&#xff0c;通过web界面更新证书失败 第一步先这样&#xff0c;重新续订一下证书 续订发生错误 2. 解决办法 2.1. 前提工作 登陆ssh到vcenter&#xff0c;重新生成证书 先关掉HA&#xff…