FlinkCDC部署

news2025/1/23 7:14:04

文章目录

  • Flink安装
  • job部署
    • 1、测试代码
    • 2、打包插件
    • 3、打包
    • 4、测试

Flink安装

1、解压

wget -b https://archive.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz
tar -zxf flink-1.13.6-bin-scala_2.12.tgz
mv flink-1.13.6 /opt/module/flink

2、环境变量

vim /etc/profile.d/my_env.sh
export HADOOP_CLASSPATH=`hadoop classpath`

3、分发环境变量

source ~/bin/source.sh

4、Per-Job-Cluster时报错:Exception in thread “Thread-5” java.lang.IllegalStateException:
Trying to access closed classloader.
Please check if you store classloaders directly or indirectly in static fields.
If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately,
you can disable this check with the configuration ‘classloader.check-leaked-classloader’.
对此,编辑配置文件

vim /opt/module/flink/conf/flink-conf.yaml

在配置文件添加下面这行,可解决上面报错

classloader.check-leaked-classloader: false

5、下载 flink-sql-connector-kafka 和 fastjson1.2.83 的jar(去Maven官网找链接)

cd /opt/module/flink/lib
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.6/flink-sql-connector-kafka_2.12-1.13.6.jar
wget https://repo1.maven.org/maven2/com/alibaba/fastjson/1.2.83/fastjson-1.2.83.jar

job部署

1、测试代码

package org.example;

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

public class TestCDC {
    public static void main(String[] args) throws Exception {
        //TODO 1 创建流处理环境,设置并行度
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //TODO 2 创建Flink-MySQL-CDC数据源
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("hadoop107")
                .port(3306)
                .username("root")
                .password("密码")
                .databaseList("db1") //设置要捕获的库
                .tableList("db1.t") //设置要捕获的表(库不能省略)
                .deserializer(new JsonDebeziumDeserializationSchema()) //将接收到的SourceRecord反序列化为JSON字符串
                .startupOptions(StartupOptions.initial()) //启动策略:监视的数据库表执行初始快照,并继续读取最新的binlog
                .build();
        //TODO 3 读取数据并打印
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "sourceName")
                .map(JSONObject::parseObject).map(j -> j.getJSONObject("source").toString())
                .addSink(new FlinkKafkaProducer<>("hadoop105:9092", "topic01", new SimpleStringSchema()));
        //TODO 4 执行
        env.execute();
    }
}

2、打包插件

服务器上已有的jar,就不需打包,加<scope>provided</scope>
flink-connector-mysql-cdcflink-table-api-java-bridge需要打包上

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.13.6</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.13.6</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.13.6</version>
        <scope>provided</scope>
    </dependency>
    <!-- FlinkSQL -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_2.12</artifactId>
        <version>1.13.6</version>
        <scope>provided</scope>
    </dependency>
    <!-- FlinkCDC -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.1.0</version>
    </dependency>
    <!-- flink-table相关依赖,可解决下面报错:
    Caused by: java.lang.ClassNotFoundException:
    org.apache.flink.connector.base.source.reader.RecordEmitter -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_2.12</artifactId>
        <version>1.13.6</version>
    </dependency>
    <!-- JSON处理 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.83</version>
        <scope>provided</scope>
    </dependency>
    <!-- Flink_Kafka -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.13.6</version>
        <scope>provided</scope>
    </dependency>
</dependencies>
<!-- 打包插件 -->
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

3、打包

上传jar-with-dependencies的jar到服务器

4、测试

/opt/module/flink/bin/flink run \
-t yarn-per-job \
-nm a2 \
-ys 1 \
-yjm 1024 \
-ytm 1024 \
-c org.example.TestCDC \
FlinkCDC-1.0-SNAPSHOT-jar-with-dependencies.jar
kafka-console-consumer.sh --bootstrap-server hadoop105:9092 --topic topic01

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/77474.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

快手某HR吐槽:职位要求写得很清楚,照着写简历不行吗?有的工作经历不相关,有的工作好几年还写学生会奖学金,这种一秒扔垃圾桶!...

求职时&#xff0c;你的简历是什么样的&#xff1f;能否帮你顺利通过初筛&#xff1f;一位快手的面试官吐槽很多求职者的简历“一塌糊涂”&#xff1a;职位要求已经写得很明白了&#xff0c;就把里面罗列的技术和跟业务相关的项目经验贴上来就好了&#xff0c;有人偏写航空公司…

Vue 不重新打包,动态加载全局配置的实现过程

背景 项目前端采用了 Vue.js &#xff0c;跟传统前端 html 技术不同之处在于&#xff0c;每次打包后都重新生成新的 js 文件&#xff0c;而且不可读&#xff0c;必须全量替换。但最近碰到一个漏洞扫描的问题&#xff0c;系统通过单点登录方式访问时&#xff0c;是不能有登录首…

【MySQL基础】数据库操作语言DML相关操作有那些?

目录 一、什么是DML 二、数据插入insert 三、数据的修改update 四、数据的删除delete 五、delete和truncate有什么不同&#xff1f; 六、DML操作知识构图 七、DML操作练习 &#x1f49f; 创作不易&#xff0c;不妨点赞&#x1f49a;评论❤️收藏&#x1f499;一下 一、什…

【Docker学习教程系列】8-如何将本地的Docker镜像发布到私服?

通过前面的学习&#xff0c;我们已经知道&#xff0c;怎么将本地自己制作的镜像发布到阿里云远程镜像仓库中去。但是在实际工作开发中&#xff0c;一般&#xff0c;我们都是将公司的镜像发布到公司自己搭建的私服镜像仓库中&#xff0c;那么一个私服的镜像仓库怎么搭建&#xf…

Android之Service

一、Service简介&#xff1a; Service是一种后台服务机制&#xff0c;允许在没有用户界面的情况下&#xff0c;使程序能够长时间在后台运行。 Service是四大组件之一&#xff0c;适用于开发无UI界面、长时间后台运行、做一些用时比较长的操作。 二、Service创建&#xff1a;…

Frida - App逆向 JavaScript代码注入 基本语法以及数据类型介绍

Frida - App逆向 JavaScript代码注入 常用语法介绍 文章目录Frida - App逆向 JavaScript代码注入 常用语法介绍前言一、逆向步骤二、重载(Overload) / Frida数据类型1.重载函数介绍2.重载函数常用的类型三、Frida Hook常用代码介绍1.基本代码框架四、注入JavaScript代码运行监听…

功能测试的工作流程

测试工作人员接收到项目需求、验收标准和原型图并对需求进行分析以了解项目的需求。 一、测试计划&#xff1a;个人觉得测试计划应在在详细设计确认后&#xff0c;代码开始编写的时候开始编写。测试计划主要给后面的测试工作的一些指南。 其内容包含&#xff1a; 1、测试团队人…

kubernetes学习之路--Pod配置学习

写在前面&#xff1a;上篇文章提供了两种简单搭建一个k8s集群的方法&#xff0c;其中两种方式的区别是&#xff1a;kubeadm是直接在主机上进行k8s搭建&#xff0c;kind是在docker中搭建。本文主要分享新建pod等方法。&#xff08;想了想&#xff0c;对于像我这刚入门的人来说&a…

面试常问的《进程创建—进程终止—进程等待—进程替换》!

送给正在努力前行的你一句话&#xff1a;要努力&#xff0c;但不要着急&#xff0c;繁花锦簇&#xff0c;硕果累累都需要过程&#xff01; 博主主页 目录 1.进程创建 fork函数初识 fork函数返回值 写时拷贝 2.进程终止 进程退出场景 进程如何退出 3.进程等待 进程等待的必要性 …

Java泛型简介

参考博客&#xff1a;https://www.jb51.net/article/192850.htm 泛型&#xff0c;即“参数化类型”。一提到参数&#xff0c;最熟悉的就是定义方法时有形参&#xff0c;然后调用此方法时传递实参。那么参数化类型怎么理解呢&#xff1f;顾名思义&#xff0c;就是将类型由原来的…

vuex持久化

下载&#xff1a; vuex-persistedstate npm install --save vuex-persistedstate 或者&#xff1a; npm install --save vuex-persistedstate --legacy-peer-deps 引入使用&#xff1a;在store文件夹下的index文件中&#xff0c;也就是放vuex的js代码中引入&#xff1a; i…

周末福利 | 21天学通Python完整版,豆瓣评分9.6!

前言 又到了周末啦&#xff0c;小编例行给大家发福利&#xff01; 今天福利的内容是21天学通Python完整版&#xff0c;这是一本豆瓣评分9.6的人工智能入门书籍&#xff01;全面、系统、深入地讲解了Python编程基础语法与高级应用。在讲解过程中&#xff0c;通过大量实际操作的…

mac m1 安装docker docker 安装php 5.6 和 7.2 避坑指南

通过该link可以下载兼容mac m1核心的docker-desktop: Docker Desktop - Docker 所有基于amd64架构核心的都加上--platform linux/amd64来运行,这样就能愉快的玩耍了: docker search centos7 php56 #搜索 docker images #显示所有镜像 docker pull sglim2/centos7 建议使用ln…

如何让青少年在AI时代抢占先机

点击蓝字关注我们AI TIME欢迎每一位AI爱好者的加入&#xff01;11月17日&#xff0c;由智谱AI支持&#xff0c;北京市科委、中关村管委会科普专项经费资助的系列栏目“科普大佬说”很荣幸邀请到国家科技部“新一代人工智能创新发展与应用研究”项目、中小学人工智能教育服务平台…

在VSCode中用Markdown自动生成PPT详细使用指南,支持多种风格和排版,支持ppt转为pdf等

在VSCode中用Markdown自动生成PPT详细使用指南&#xff0c;支持多种风格和排版&#xff0c;支持ppt转为pdf等。 Marp 官网&#xff1a; https://marp.app/ 这款软件也能在 Windows 或 Linux 系统上使用&#xff0c;并不局限于 macOS 系统。 今天要介绍的是 Marp 推出的 VS Cod…

PLC-Recorder实现速度高达0.24ms准确周期采集的方法(带时间戳采集)

目录 1、PLC的发送程序 2、PLC连接配置 3、PLC-Recorder侧的通讯设置 4、PLC-Recorder的通道配置 5、PLC-Recorder的变量配置 6、正常通讯情况的界面 7、记录数据的情况 8、小结 如果要以非常高的速度高速采集各种控制器&#xff08;典型的是PLC&#xff09;的数据&…

【Linux】gcc/g++

目录 Linux编译器-gcc/g使用 No.1 背景知识 No.2 预处理(进行宏替换) No.3 编译&#xff08;生成汇编&#xff09; No.4 汇编&#xff08;生成机器可识别代码&#xff09; No.5 动态链接过程 No.6 gcc选项 Linux编译器-gcc/g使用 No.1 背景知识 预处理&#xff08;进行…

什么是Serverless?

Serverless 无服务器&#xff1b;无主机&#xff1b; Serverless &#xff0c;按中文翻译&#xff0c;称为「无服务器」。被认为是新一代的云计算发展方向。 在某些场景可以解读为一种软件系统架构方法&#xff0c;通常称为 Serverless 架构 关于 Serverless 的定义&#xf…

留学Assignment写作怎么注意论证方法?

Assignment的论证方法&#xff0c;对于一篇Assignment而言&#xff0c;学会如何论证&#xff0c;对Assignment的逻辑和内容&#xff0c;有一定的作用。掌握常用的论证方法&#xff0c;了解Assignment要求&#xff0c;在Assignment写作的过程中&#xff0c;学会加以应用&#xf…

VMware 安装、移除Ubuntu系统

目录 前言 安装虚拟机 1. 新建虚拟机 2. 进入向导 3. 选择系统镜像 4. 添加系统设置&#xff0c;用户名主机名密码 <实际没啥用...在系统安装过程中才设置的> 5. 设置虚拟机名称&#xff0c;存放位置 6. 磁盘容量 7. 设置虚拟机硬件配置 8. 自定义硬件 9. 开机…