FlinkPipelineComposer 详解

news2024/12/26 0:43:19

FlinkPipelineComposer 详解

原文

背景

在flink-cdc 3.0中引入了pipeline机制,提供了除Datastream api/flink sql以外的一种方式定义flink 任务

通过提供一个yaml文件,描述source sink transform等主要信息

由FlinkPipelineComposer解析,自动调用DataStream api进行构建

官方样例

 source:
     type: mysql
     hostname: localhost
     port: 3306
     username: root
     password: 123456
     tables: app_db.\.*

   sink:
     type: doris
     fenodes: 127.0.0.1:8030
     username: root
     password: ""

   pipeline:
     name: Sync MySQL Database to Doris
     parallelism: 2

目前可以通过source配置的源只有mysql 和 values

values是调试用的,所以可以说当前这个功能是专门为“mysql同步数据到各个sink”的场景使用的

目前可以使用的sink有

  1. doris
  2. elasticsearch
  3. kafka
  4. paimon
  5. starrocks
  6. values

FlinkPipelineComposer

我们以mysql -> values来观察 FlinkPipelineComposer 是如何通过读取yaml文件的定义来构建DataStream的

values会将mysql产生的cdc消息打印到stdout上

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
 type: mysql
 hostname: x.x.x.x
 port: 3306
 username: username
 password: password
 tables: test.t1
 server-id: 5400-5404
 server-time-zone: UTC+8

sink:
  type: values
  name: values Sink

pipeline:
 name: Sync Mysql Database to Values
 parallelism: 2

首先来观察一下这个任务提交到flink集群后具体的链路构成

在这里插入图片描述

结合官方给出的架构

在这里插入图片描述

可以看出,“一个source,一个sink”的yaml定义,最终会生成5个operator

  1. Souce: Flink CDC Event Source: mysql
  2. SchemaOperator
  3. PrePartition

-------------- shuffle --------------

  1. PostPartion
  2. Sink Writer: values Sink

Souce: Flink CDC Event Source: mysql

负责

  1. 创建枚举器
  2. 创建reader
  3. 枚举split分发给reader
  4. reader读取数据生成事件

SchemaOperator

负责和JobMaster上的coodinator沟通,执行schema evolution 相关逻辑,见Flink CDC Schema Evolution 详解

PrePartition

负责

  1. 广播FlushEvent
  2. 广播SchemaChangeEvent
  3. shuffle普通消息到下游

PostPartion

Sink Writer: values Sink

写入下游,values sink当前到实现是打印到stdout

源码解析

接下来分析,FlinkPipelineComposer 读取 yaml 构造DataStream的细节

CliFrontend#main

CliFrontend.java:54

args

在这里插入图片描述

createExecutor 创建 executor CliFrontend.java:76

调用CliExecutor#run CliExecutor.java:70

看一下解析得到的pipelineDef
在这里插入图片描述

这里已经从yaml文件中解析出了source和sink的配置了

composer.compose 调用compose方法开始使用DataStream api进行构建

FlinkPipelineComposer.java:92 FlinkPipelineComposer#compose

声明了5个translator,其中第一个sourceTranslator会生成DataStream<Event> stream,而其他的translator基于这个stream作为input,调用transform方法,放入对应阶段的operator

DataSourceTranslator sourceTranslator = new DataSourceTranslator();
...
TransformTranslator transformTranslator = new TransformTranslator();
...
SchemaOperatorTranslator schemaOperatorTranslator =...
...
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
...
PartitioningTranslator partitioningTranslator = new PartitioningTranslator();
...

translate的调用顺序如下

DataStream<Event> stream =
                sourceTranslator.translate(
                  ...
stream =
                transformTranslator.translatePreTransform(
                  ...
stream =
                transformTranslator.translatePostTransform(
                  ...
stream =
                schemaOperatorTranslator.translate(
                  ...
stream =
                partitioningTranslator.translate(
                  ...
sinkTranslator.translate(
                pipelineDef.getSink(), stream, dataSink, schemaOperatorIDGenerator.generate());

return new FlinkPipelineExecution(env...)
                  ...

逐一说明

  1. sourceTranslator.translate 通过source名字获取sourceProvider,关联到stream中
  • sourceProvider.getSource ->
    • MysqlSource ->
      • createReader
      • createEnumerator
  1. stream = transformTranslator.translatePreTransform
if (transforms.isEmpty()) {
    return input;
}

由于有如上代码,我们的yaml中没有涉及,所以忽略这个transform

  1. stream = transformTranslator.translatePostTransform
  • 同上
  1. stream = schemaOperatorTranslator.translate
  • 插入一个schemaOperator节点,在收到schemaChangeEvent的时候
    1. 停住当前流
    2. 上报coodinator
    3. flush下游数据,让sink消耗完已有数据
    4. sink 通知coodinator flush完成
    5. coodinator调用sink注册的MetaApplier完成schema变更,变更完成后通知schemaOperator
    6. schemaOperator重新放通数据
  1. stream = partitioningTranslator.translate
  • 构建prePartition postPartition节点
  1. sinkTranslator.translate
  • 构建sink节点
  1. FlinkPipelineExecution 中的 execute 方法调用 env.executeAsync(jobName)

总结

flink-cdc 3.0 提供的pipeline模式,通过定义yaml,自动构建了一条cdc pipeline,避免手动调用datastream api,并且支持schema evolution

构建的主要逻辑集中在 FlinkPipelineComposer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2241106.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MybatisPlus知识

mybatis与mybatisplus的区别&#xff1a; mybatisplus顾名思义时mybatis的升级版&#xff0c;提供了更多的API和方法&#xff0c;是基于mybatis框架基础上的升级&#xff0c;更加方便开发。mybatisplus继承BaseMapper接口并调用其中提供的方法来操作数据库&#xff0c;不需要再…

利用飞书多维表格自动发布版本

文章目录 背景尝试1&#xff0c;轮询尝试2&#xff0c;长连接 背景 博主所在的部门比较奇特&#xff0c;每个车型每周都需要发版&#xff0c;所以实际上一周会发布好几个版本。经过之前使用流水线自动发版改造之后&#xff0c;发版的成本已经大大降低了&#xff0c;具体参考&a…

Qwen2-VL:发票数据提取、视频聊天和使用 PDF 的多模态 RAG 的实践指南

概述 随着人工智能技术的迅猛发展&#xff0c;多模态模型在各类应用场景中展现出强大的潜力和广泛的适用性。Qwen2-VL 作为最新一代的多模态大模型&#xff0c;融合了视觉与语言处理能力&#xff0c;旨在提升复杂任务的执行效率和准确性。本指南聚焦于 Qwen2-VL 在三个关键领域…

蓝桥杯每日真题 - 第7天

题目&#xff1a;&#xff08;爬山&#xff09; 题目描述&#xff08;X届 C&C B组X题&#xff09; 解题思路&#xff1a; 前缀和构造&#xff1a;为了高效地计算子数组的和&#xff0c;我们可以先构造前缀和数组 a&#xff0c;其中 a[i] 表示从第 1 个元素到第 i 个元素的…

家政服务小程序,家政行业数字化发展下的优势

今年以来&#xff0c;家政市场需求持续增长&#xff0c;市场规模达到了万亿级别&#xff0c;家政服务行业成为了热门行业之一&#xff01; 家政服务种类目前逐渐呈现了多样化&#xff0c;月嫂、保姆、做饭保洁、收纳、维修等家政种类不断出现&#xff0c;满足了居民日益增长的…

蓝桥杯每日真题 - 第12天

题目&#xff1a;&#xff08;数三角&#xff09; 题目描述&#xff08;14届 C&C B组E题&#xff09; 解题思路&#xff1a; 给定 n 个点的坐标&#xff0c;计算其中可以组成 等腰三角形 的三点组合数量。 核心条件&#xff1a;等腰三角形的定义是三角形的三条边中至少有…

Linux系统下svn新建目录

Linux安装svn自行查找 新建目录 新建一个自定义库的文件夹&#xff1a;mkdir security 使用svnadmin命令在新创建的目录中创建一个新的SVN版本库。例如&#xff1a; svnadmin create security 执行完成以上命令就会生成默认配置文件 通过pwd命令查找当前目录路径 路径&…

SpringCloud基础 入门级 学习SpringCloud 超详细(简单通俗易懂)

Spring Cloud 基础入门级学习 超详细&#xff08;简单通俗易懂&#xff09; 一、SpringCloud核心组件第一代&#xff1a;SpringCloud Netflix组件第二代&#xff1a;SpringCloud Alibaba组件SpringCloud原生组件 二、SpringCloud体系架构图三、理解分布式与集群分布式集群 四、…

性能调优专题(9)之从JDK源码级别解析JVM类加载机制

一、类加载运行全过程 当我们用java命令运行某个类的main函数启动程序时&#xff0c;首先需要通过类加载器把主类加载到JVM。 package com.tuling.jvm;public class Math {public static final int initData 666;public static User user new User();public int compute() {…

【全面系统性介绍】虚拟机VM中CentOS 7 安装和网络配置指南

一、CentOS 7下载源 华为源&#xff1a;https://mirrors.huaweicloud.com/centos/7/isos/x86_64/ 阿里云源&#xff1a;centos-vault-7.9.2009-isos-x86_64安装包下载_开源镜像站-阿里云 百度网盘源&#xff1a;https://pan.baidu.com/s/1MjFPWS2P2pIRMLA2ioDlVg?pwdfudi &…

「JVM详解」

JVM JVM概述 基本介绍 JVM&#xff1a;全称 Java Virtual Machine&#xff0c;即 Java 虚拟机&#xff0c;一种规范&#xff0c;本身是一个虚拟计算机&#xff0c;直接和操作系统进行交互&#xff0c;与硬件不直接交互&#xff0c;而操作系统可以帮我们完成和硬件进行交互的…

正点原子IMX6ULL--嵌入式Linux开发板学习中常用命令和笔记记录

学习路线图 传驱动文件 sudo cp chrdevbase.ko chrdevbaseApp /home/txj/linux/nfs/rootfs/lib/modules/4.1.15/ -f bootcmd setenv bootcmd tftp 80800000 zImage;tftp 83000000 imx6ull-alientek-emmc.dtb;bootz 80800000 - 83000000 setenv bootcmd tftp 80800000 zImag…

DVWA靶场通关——SQL Injection篇

一&#xff0c;Low难度下unionget字符串select注入 1&#xff0c;首先手工注入判断是否存在SQL注入漏洞&#xff0c;输入1 这是正常回显的结果&#xff0c;再键入1 You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for…

CSS回顾-基础知识详解

一、引言 在前端开发领域&#xff0c;CSS 曾是构建网页视觉效果的关键&#xff0c;与 HTML、JavaScript 一起打造精彩的网络世界。但随着组件库的大量涌现&#xff0c;我们亲手书写 CSS 样式的情况越来越少&#xff0c;CSS 基础知识也逐渐被我们遗忘。 现在&#xff0c;这种遗…

SSH和NFS

文章目录 SSH和NFS1 SSH远程管理1.1 概述1.2 ssh服务端和客户端1.3 用法1.3.1 服务器命令行的远程登录方式1.3.2 scp1.3.3 sftp1.3.4 ssh的密钥登录 2 NFS2.1 概述2.2 nfs操作步骤 SSH和NFS 1 SSH远程管理 1.1 概述 SSH&#xff08;Secure Shell&#xff09;协议是一种用于远…

Springboot 启动端口占用如何解决

Springboot 启动端口占用如何解决 1、报错信息如下 *************************** APPLICATION FAILED TO START ***************************Description:Web server failed to start. Port 9010 was already in use.Action:Identify and stop the process thats listening o…

SpringBoot 打造图片阅后即焚功能

阅后即焚”&#xff08;Snapchat-like feature&#xff09;是指一种社交媒体或信息传递功能&#xff0c;用户在阅读某条信息或查看某张图片后&#xff0c;该信息或图片会自动销毁&#xff0c;无法再次查看。这种功能的主要目的是保护用户的隐私和信息安全&#xff0c;防止敏感信…

FFmpeg 4.3 音视频-多路H265监控录放C++开发十三.2:avpacket中包含多个 NALU如何解析头部分析

前提&#xff1a; 注意的是&#xff1a;我们这里是从avframe转换成avpacket 后&#xff0c;从avpacket中查看NALU。 在实际开发中&#xff0c;我们有可能是从摄像头中拿到 RGB 或者 PCM&#xff0c;然后将pcm打包成avframe&#xff0c;然后将avframe转换成avpacket&#xff0…

Vue之插槽(slot)

插槽是vue中的一个非常强大且灵活的功能&#xff0c;在写组件时&#xff0c;可以为组件的使用者预留一些可以自定义内容的占位符。通过插槽&#xff0c;可以极大提高组件的客服用和灵活性。 插槽大体可以分为三类&#xff1a;默认插槽&#xff0c;具名插槽和作用域插槽。 下面…

华为鸿蒙HarmonyOS NEXT升级HiCar:打造未来出行新体验

随着科技的不断进步&#xff0c;智能出行已成为我们生活中不可或缺的一部分。华为凭借其在智能科技领域的深厚积累&#xff0c;推出了全新的鸿蒙HarmonyOS NEXT系统&#xff0c;旨在为用户打造一个“人车家”的无缝协同出行体验。这一系统的核心亮点之一&#xff0c;就是其内置…