如何在 Amazon EMR 中运行 Flink CDC Pipeline Connector

news2025/1/12 22:51:14

如何在Amazon EMR 中运行 Flink CDC Pipeline Connector

由于 Amazon EMR 最新的 Flink 版本中没有原生支持 Flink CDC,因此这里介绍一种通过 FlinkCDC Pipeline Connector 同步数据的例子(MySQL->Kafka)

环境准备

  1. 启动一个 Amazon EMR 集群

  2. 启动成功之后,可以登录到 EMR Master 节点中,执行以下命令,将 mysql/kafka connector pipeline 的包上传到 master 节点的 /usr/lib/flink/lib/ 目录下

    sudo wget https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.2.0/flink-cdc-pipeline-connector-mysql-3.2.0.jar -P /usr/lib/flink/lib/
    
    sudo wget https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-kafka/3.2.0/flink-cdc-pipeline-connector-kafka-3.2.0.jar -P /usr/lib/flink/lib/
    
  3. 下载 flink cdc

    以下命令在 EMR Master 节点上执行,可以选择就在 /home/hadoop 目录

    wget https://dlcdn.apache.org/flink/flink-cdc-3.2.0/flink-cdc-3.2.0-bin.tar.gz
    tar -xvf flink-cdc-3.2.0-bin.tar.gz
    

MySQL 同步到 Kafka

  1. 启动 Yarn Session

    虽然这里指定了 checkpoint 地址,和 execution.checkpointing.interval ,但是不起作用,因此需要参考下一步骤,配置在 flink-conf.yaml 目录下。

    # set flink home
    export FLINK_HOME=/usr/lib/flink
    
    #指定checkpoint地址
    checkpoints=s3://<s3bucket>/flink/checkpoints/
    
    sudo flink-yarn-session -jm 2048 -tm 4096 -s 2 \
    -D state.checkpoint-storage=filesystem \
    -D state.checkpoints.dir=${checkpoints} \
    -D execution.checkpointing.interval=10s \
    -D state.checkpoints.num-retained=5 \
    -D execution.checkpointing.mode=EXACTLY_ONCE \
    -D execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \
    -D execution.checkpointing.max-concurrent-checkpoints=2 \
    -D execution.checkpointing.checkpoints-after-tasks-finish.enabled=true \
    -D rest.flamegraph.enabled=true \
    -d
    

    启动之后,从返回的结果中,获取到 Application ID 用于后续的步骤

  2. 修改 Flink conf 文件

    flink-conf.yaml 文件中修改和添加以下内容

    rest.bind-port: {{REST_PORT}}
    rest.address: {{NODE_IP}}
    execution.target: yarn-session
    yarn.application.id: {{YARN_APPLICATION_ID}}
    

    另外,也需要在 flink-conf.yaml 文件中指定 checkpoint,在 yarn-session 指定的无效。

    execution.checkpointing.interval: 10000
    state.checkpoint-storage: filesystem
    state.checkpoints.dir: s3://<s3bucket>/flink/checkpoints/
    
  3. 配置数据同步 yaml 文件

    如下例子,将多个表,写到一个 kafka topic 中。

    不过需要注意的是,对于没有主键的表,需要在配置中指定一个字短作为分布键,设置 scan.incremental.snapshot.chunk.key-column参数,多表字段使用逗号分隔。

    Example 1

    cat > cdc_demo_example_1.yaml <<EOF
    source:
       type: mysql
       name: MySQL Source
       hostname: <mysql-host/ip>
       port: 3306
       username: <mysql-username>
       password: <mysql-password>
       tables: <database_name>.<table-prefix>\.*,<tablename>
       server-id: 5401-5404
       scan.incremental.snapshot.chunk.key-column: <databasename>.<tablename>:<key-cloumn>
    
    sink:
      type: kafka
      name: Kafka Sink
      properties.bootstrap.servers: <kafka-boostrap-server>
      topic: <topic-name>
    
    pipeline:
      name: MySQL to Kafka Pipeline example 1
      parallelism: 2
    EOF
    

    主要参数配置说明:

参数
topic如果需要将多个表同时写到一个 topic中,需要设置此参数,如果不设置,将默认按照每张表一个 topic(<database>.<tablename>
scan.incremental.snapshot.chunk.key-column对于没有主键的表,需要在 source 指定该参数,格式<database>.<tablename>:<column>,多个表字段使用逗号(,)分隔。

Example 2

cat > cdc_demo_example_2.yaml <<EOF
source:
   type: mysql
   name: MySQL Source
   hostname: <mysql-host/ip>
   port: 3306
   username: <mysql-username>
   password: <mysql-password>
   tables: <database_name>.<table-prefix>\.*,<tablename>
   server-id: 5401-5404

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: <kafka-boostrap-server>
  
pipeline:
  name: MySQL to Kafka Pipeline example 2
  parallelism: 2
EOF

运行 Flink CDC Job

./flink-cdc-3.2.0/bin/flink-cdc.sh -cm claim cdc_demo_example_2.yaml

在这里插入图片描述

从 Offset 恢复

在 Amazon EMR 中启动 Flink 作业需要手动维护 savepoint,因此需要在停止时保存 savepoint ,启动从 savepoint 恢复。这样可以在 flink cdc job 恢复的时候从上一次同步的 offset 开始同步数据。

Example:

flink cancel -s  s3://<s3bucket>/flink/20240923/01 9b46c81d4f018eafdf6c2cc44b2e356a -yid application_1727084151749_0006

./flink-cdc-3.2.0/bin/flink-cdc.sh -cm claim -s s3://<s3bucket>/flink/20240923/01/savepoint-9b46c8-1633c5a00095 \
	cdc_demo_example_1.yaml 

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2177071.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

聚势启新 智向未来 | 重庆华阳通用科技有限公司揭牌成立

助推两江新区汽车产业高质量发展 (以下文字内容转载自两江新区网&#xff09; 9月26日&#xff0c;重庆华阳通用科技有限公司&#xff08;华阳通用重庆子公司&#xff09;在两江新区揭牌成立&#xff0c;将致力于智能座舱、智能驾驶两大领域&#xff0c;不断加大技术研发投入…

「系列投研|01」建立自己的移动比特币银行——赛道概况

ZJUBCA 09/28/2024 01&#xff5c;BTCfi赛道概况 BTCFi&#xff1a;建立自己的移动比特币银行 ——从Lending到Staking的全面解读 作者&#xff1a; Freya & Knight& Ausdin from ZJUBCA Elaine & Youyu from Satoshi Lab 关键词 BTCFi&#xff0c;稳定币&#xff…

DePIN 代表项目 CESS 受邀出席国会山活动,向议员展示创新 DePIN 技术

我们非常激动地宣布&#xff0c;CESS 已受邀参加由美国区块链协会主办的国会山活动&#xff0c;将于当地时间 2024 年 10 月 2 日向一众国会议员展示创新的 DePIN 技术&#xff01;本次关于去中心化物理基础设施网络&#xff08;DePIN&#xff09;的重要会议中&#xff0c;CESS…

windows下 Winobj.exe工具使用说明c++

1、winobj.exe工具下载地址 WinObj - Sysinternals | Microsoft Learn 2、接下来用winobj.exe查看全局互斥&#xff0c;先写一个小例子 #include <iostream> #include <stdlib.h> #include <tchar.h> #include <string> #include <windows.h>…

别再误用useMemo了!这才是最佳实践的正确打开方式

useMemo是react用作性能优化的一个hook&#xff0c;但有一个现象&#xff0c;不知道的人一次不用&#xff0c;知道的人随时随地到处都用。本文就带你真正搞懂什么情况下可以使用useMemo。 useMemo 是一个 React Hook&#xff0c;它在每次重新渲染的时候能够缓存计算的结果 useM…

【py】字符串切片

下面是一个简单的Python脚本&#xff0c;它读取输入的学号和姓名&#xff0c;然后按照要求拆分并输出&#xff1a; # 从键盘输入学号和姓名 input_str input("请输入学号和姓名&#xff1a;") # 学号和姓名的长度&#xff0c;可以根据实际情况调整 grade_length …

Linux下V4L2实时显示摄像头捕捉画面(完整QT+C++代码)

目录 一、V4L2 1、简介 2、编程与应用 二、示例演示 1、例子说明&#xff1a; 2、关键的代码演示 3、完整的例子的代码 一、V4L2 1、简介 V4L2&#xff0c;即Video for Linux Two&#xff0c;是Linux下关于视频设备的内核驱动框架&#xff0c;为驱动和应用程序提供了一…

前端vue-form表单的验证

form表单验证的完整步骤

Nginx反向代理配置支持websocket

一、官方文档 WebSocket proxying 为了将客户端和服务器之间的连接从HTTP/1.1转换为WebSocket&#xff0c;使用了HTTP/1.1中可用的协议切换机制&#xff08;RFC 2616: Hypertext Transfer Protocol – HTTP/1.1&#xff09;。 然而&#xff0c;这里有一个微妙之处:由于“升级”…

TLS详解

什么是TLS TLS(Transport Layer Security)传输层安全性协议 &#xff0c;它的前身是SSL(Secure Sockets Layer)安全套接层&#xff0c;是一个被应用程序用来在网络中安全的通讯协议&#xff0c; 防止电子邮件、网页、消息以及其他协议被篡改或是窃听。是用来替代SSL的&#xf…

ONFI 5.1:定义、缩写语和约定

address 该地址由一个行地址和一个列地址组成。行地址标识要访问的page、block和LUN。列地址标识要访问的page中的byte或word。 asynchronous 异步是指数据用WE_n信号进行写&#xff0c;RE_n信号进行读。 block 由多个page组成&#xff0c;是擦除操作的最小可寻址单元。 column…

稀土阻燃协效剂-磷氮系的应用

稀土阻燃协效剂凭借独特的稀土4f电子层结构,在聚合物材料燃烧时可催化酯化成炭,迅速在高分子表面形成致密连续的碳层,隔绝聚合物材料内部的可燃性气体与氧气的接触,从而达到阻燃抑烟的效果,且燃烧时不产生有毒有害气体。 金士镧系列稀土阻燃剂是一种基于稀土协效阻燃的复合阻燃…

Windows11如何关闭“显示更多选项”,直接展示所有选项

在windows11系统中&#xff0c;右击&#xff0c;会有“显示更多选项”&#xff0c;每次都要点一下这个按钮&#xff0c;才能看到所有的选项&#xff0c;太麻烦&#xff0c;那么有什么办法去掉呢&#xff1f; 1、以管理员的方式打开命令提示符 winR&#xff1b;cmd回车 执行命…

在Ubuntu和centos系统安装JDK教程

目录 1. 先使用apt 查看有哪些软件包2.使用apt安装软件包3.确认是否安装4.centos安装jdk Linux会把一些软件包&#xff0c;放到对应的服务器上&#xff0c;通过包管理器这样的程序&#xff0c;来把这些软件包下载安装 包管理器&#xff1a; Ubuntu&#xff1a;apt centos&#…

工程设备包括哪些内容?

工程设备是构成或计划构成永久工程一部分的机电设备、金属结构设备&#xff0c;仪器装置及其他类似的设备和装置。它们在工程建设中扮演着至关重要的角色&#xff0c;涵盖了从基础建设到设备安装的多个方面。以下是整理出来的一些工程设备主要的内容&#xff1a; 1. 建筑机械设…

实用好软-----电脑端 从视频中导出音频的方便工具

最近想从一个视频中导出个音乐&#xff0c;百度找很多没有合适的工具。最终找到了一款很方便 而且操作超级简单的工具。打开这个工具后只需要把需要导出音乐的视频拖进窗口里就会自动导出音乐mp3。方便小巧&#xff0c;而且音频效果还是不错的。 一些视频转换成音频文件&#x…

从零开始学习OMNeT++系列第一弹——OMNeT++的介绍与安装

最近由于由于工作上的需求&#xff0c;接了一个网络仿真的任务。于是开始调研各个仿真平台&#xff0c;然后根据目前的需求和网络上公开资料的多少&#xff0c;决定使用omnet这个网络仿真平台。现在也是刚开始学习&#xff0c;所以决定记录一下从零开始的这个学习过程。因为虽然…

社交应用性能提升秘籍:推拉结合优化方案全解读!

我是小米,一个喜欢分享技术的29岁程序员。如果你喜欢我的文章,欢迎关注我的微信公众号“软件求生”,获取更多技术干货! Hello,大家好!我是你们的老朋友小米,一个积极活泼的29岁技术分享达人~ 今天要跟大家分享的是我最近在个人项目里遇到的一个有趣的优化案例——“推拉…

OptiTrack与Xsens光、惯动捕中用于动画制作的尖端设备对比

随着动画、电影、游戏等数字内容行业的迅速发展&#xff0c;捕捉演员的动作并将其转化为虚拟角色的技术越来越受到重视。两种主要的动作捕捉技术——光学捕捉系统和惯性动作捕捉系统——代表了当前市场的最前沿。本文将对比两种技术的代表性设备&#xff1a;OptiTrack的光学动作…

Vue3 动态加载图片不显示问题

一、图片目录结构 二、批量导出图片 exportImage.ts const images import.meta.glob(/assets/icons/*.{png,jpg,jpeg,svg}, { eager: true });const imageMap Object.entries(images).reduce((acc, [key, value]) > {const imageName key.split(/).pop().replace(/\.…