8. Flink-CDC

news2025/2/22 10:19:39

1. Flink-CDC的介绍

Flink-cdc主要是用来同步数据库中的数据,它的主要优势在于基于Flink框架直接用Flink Stream Api 或Flink SQL 直接编程,不需要引入第三方组件

2.Flink-CDC的使用

Flink-cdc在使用上需要注意的点

  • 注意Flink-cdc在2.1版本之前需要导入MySQL的连接包,之后版本不需要,如果环境中有MySQL的连接包需要去除掉
  • 在2.4版本之监控MySQL表需要它有主键,2.4版本开始只需要配置“scan.incremental.snapshot.chunk.key-column”参数即可
  • MySQL CDC Connector在监控多个表的时候,每个表需要指定库名,并用逗号隔开
  • Flink中必须要设置checkpoint,不设置无法正常监控binlog变更日志
    Flink-CDC基于DataStream的使用
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("node2")      //设置MySQL hostname
        .port(3306)             //设置MySQL port
        .databaseList("db1")    //设置捕获的数据库
        .tableList("db1.tbl1,db1.tbl2") //设置捕获的数据表
        .username("root")       //设置登录MySQL用户名
        .password("123456")     //设置登录MySQL密码
        .deserializer(new JsonDebeziumDeserializationSchema()) //设置序列化将SourceRecord 转换成 Json 字符串
        .startupOptions(StartupOptions.initial())
        .build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启checkpoint
env.enableCheckpointing(5000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"MySQL Source")
        .setParallelism(4)
        .print();
env.execute();

基于Flink Sql的使用

EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
//设置checkpoint
tableEnv.getConfig().getConfiguration().setLong("execution.checkpointing.interval", 5000L);
tableEnv.executeSql("" +
        "CREATE TABLE mysql_binlog (" +
        " id INT," +
        " name STRING," +
        " age INT," +
        " PRIMARY KEY(id) NOT ENFORCED" +
        ") WITH (" +
        " 'connector' = 'mysql-cdc'," +
        " 'hostname' = 'node2'," +
        " 'port' = '3306'," +
        " 'username' = 'root'," +
        " 'password' = '123456'," +
        " 'database-name' = 'db1'," +
        " 'table-name' = 'tbl1'" +
        ")");
tableEnv.executeSql("select * from mysql_binlog").print();

2.1 Flink-CDC对全量和增量数据的工作原理

并行读取表的全量快照,然后以单并行度方式读取表的binlog进行增量数据的同步

  • 全量同步过程中,它会根据主键把数据分为多个chunk分片,然后分配给多并行度去分别读取这些chunk上的数据,读取快照期间,Flink支持chunk级别的checkpoint,即使在同步的过程中发生故障,也可以做到exactly-once级别的恢复

2.2 Flink-CDC启动模式

启动模式是指程序启动的时候,以怎么的方式监控数据库中的数据,共有如下几种模式

  • initial(默认): 对受监控的库表进行初始快照,并继续读取最新的binlog
  • earliest-offset: 它会跳过快照直接读取最早的binlog日志,它与initial方式区别在于,initial只读取已经操作后(表中现有数据)的数据
  • latest-offset: 不执行快照,从binlog的最新处开始读取增量数据
  • specific-offset: 从指定的binlog位点开始读取,位点可以通过binlog文件名和位置指定
  • timestamp: 从指定的时间戳读取binlog事件

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2303357.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Nginx环境安装

一、官网地址 Nginx官网&#xff1a;http://nginx.org/ Nginx中文网&#xff1a;https://nginx.p2hp.com/ 二、Nginx版本 mainline version 开发版本stableversion 稳定版本legacy version 历史版本 三、Windows系统安装Nginx 第一步&#xff1a;选择Windows版本&#xff0c;…

Spring AI + Ollama 实现调用DeepSeek-R1模型API

一、前言 随着人工智能技术的飞速发展&#xff0c;大语言模型&#xff08;LLM&#xff09;在各个领域的应用越来越广泛。DeepSeek 作为一款备受瞩目的国产大语言模型&#xff0c;凭借其强大的自然语言处理能力和丰富的知识储备&#xff0c;迅速成为业界关注的焦点。无论是文本生…

android系统SystemServer进程启动流程分析

目录 一,SystemServer整体框架 二,SystemServer启动源码分析 2.1,重要的概念 2.2,启动入口 2.3,创建对应进程的binder 三,binder驱动和binder线程池 四,SystemServer真正启动方法 4.1 SystemServer main方法里面主要做了几件事情 1)创建SystemServiceManager管理所有的…

Oracle 深入理解Lock和Latch ,解析访问数据块全流程

Oracle 锁机制介绍 根据保护对象的不同&#xff0c;单实例Oracle数据库锁可以分为以下几大类&#xff1a; DML lock&#xff08;data locks&#xff0c;数据锁&#xff09;&#xff1a;用于保护数据的完整性&#xff1b; DDL lock&#xff08;dictionary locks&#xff0c;字典…

如何基于transformers库通过训练Qwen/DeepSeek模型的传统分类能力实现文本分类任务

文章目录 模型与环境准备文档分析源码解读模型训练及推理方式进阶:CPU与显存的切换进阶:多卡数据并行训练🔑 DDP 训练过程核心步骤🚫 DDP 不适用于模型并行⚖️ DDP vs. Model Parallelism⚙️ 解决大模型训练的推荐方法🎉进入大模型应用与实战专栏 | 🚀查看更多专栏…

Unity中一个节点实现植物动态(Shader)

1 . 核心思路就操作顶点作往复运动&#xff1b; 核心代码&#xff1a; half stage1 dot(positionOS, float3(0, 1, 0)) * _Strength; half stage2 sin(dot(positionOS, float3(1, 0, 0)) * _Strength _Time.y * _Speed); half stage3 stage1 * stage2 * float3(0.001,…

PrimeTime:工具简介

相关阅读 PrimeTimehttps://blog.csdn.net/weixin_45791458/category_12900271.html?spm1001.2014.3001.5482 PrimeTime是PrimeTime Suite中的一个工具&#xff0c;能够执行全芯片级、门级的静态时序分析&#xff0c;这是芯片设计和分析流程中的一个关键部分。该工具通过检查…

【拜读】Tensor Product Attention Is All You Need姚期智团队开源兼容RoPE位置编码

姚期智团队开源新型注意力&#xff1a;张量积注意力&#xff08;Tensor Product Attention&#xff0c;TPA&#xff09;。有点像一种「动态的LoRA」&#xff0c;核心思路在于利用张量分解来压缩注意力机制中的 Q、K、V 表示&#xff0c;同时保留上下文信息&#xff0c;减少内存…

Docker-技术架构演进之路

目录 一、概述 常见概念 二、架构演进 1.单机架构 2.应用数据分离架构 3.应用服务集群架构 4.读写分离 / 主从分离架构 5.引入缓存 —— 冷热分离架构 6.垂直分库 7.业务拆分 —— 微服务 8.容器化引入——容器编排架构 三、尾声 一、概述 在进行技术学习过程中&am…

用Chrome Recorder轻松完成自动化测试脚本录制

前言 入门自动化测试,录制回放通常是小白测试首先用到的功能。而录制回放工具也一直是各大Web自动化测试必然会着重提供的一块功能。 早期WinRunner、QTP这样的工具,自动化测试可以说是围绕录制回放开展的。近年像Selenium也提供有录制工具 Selenium IDE,Playwright也包含…

python中的异常-模块-包

文章目录 异常异常的定义异常捕获语法捕获常规异常捕获指定异常捕获多个异常捕获所有异常异常else异常finally 异常传递总结 模块概念导入自定义模块及导入main方法all变量 总结 包自定义包定义pycharm中建包的基本步骤导入方式 第三方包 异常 异常的定义 当检测到一个错误时…

【GPU驱动】OpenGLES图形管线渲染机制

OpenGLES图形管线渲染机制 OpenGL/ES 的渲染管线也是一个典型的图形流水线&#xff08;Graphics Pipeline&#xff09;&#xff0c;包括多个阶段&#xff0c;每个阶段都负责对图形数据进行处理。管线的核心目标是将图形数据转换为最终的图像&#xff0c;这些图像可以显示在屏幕…

ssm-day06 ssm整合

从springMVC总结再回顾一下 60节 整合就是应用框架&#xff0c;并且把这个框架放到IOC容器中 web容器&#xff1a;装springMVC和controller相关的web组件 root容器&#xff1a;装业务和持久层相关的组件 子容器可以引用父容器中的组件&#xff0c;父容器不能调子容器 一个容器…

AI 编程助手 cursor的系统提示词 prompt

# Role 你是一名极其优秀具有10年经验的产品经理和精通java编程语言的架构师。与你交流的用户是不懂代码的初中生&#xff0c;不善于表达产品和代码需求。你的工作对用户来说非常重要&#xff0c;完成后将获得10000美元奖励。 # Goal 你的目标是帮助用户以他容易理解的…

ollama如何安全卸载,解决Ollama unins000.msg is missing

春节后在本地电脑安装了Ollama的客户端&#xff0c;每次开机自启&#xff0c;影响开机速度&#xff0c;而且本地的模型不如联网的回答效果好&#xff0c;果断选择了卸载&#xff0c;但是今天卸载发现提示下方的错误。根据此文章可以解决当前的问题。 根据此文章可以解决当前的…

网络安全设备防护原理 网络安全防护装置

&#x1f345; 点击文末小卡片 &#xff0c;免费获取网络安全全套资料&#xff0c;资料在手&#xff0c;涨薪更快 防火墙 简介 网络层的防护设备&#xff0c;依照特殊的规则允许或者限制传输的数据通过 是由软件和硬件设备组合而成&#xff0c;在内部网和外部网之间、专用网…

Python的那些事第二十八篇:数据分析与操作的利器Pandas

Pandas:数据分析与操作的利器 摘要 Pandas是基于Python的开源数据分析库,广泛应用于数据科学、机器学习和商业智能等领域。它提供了高效的数据结构和丰富的分析工具,能够处理结构化数据、时间序列数据以及复杂的数据转换任务。本文从Pandas的基础概念入手,深入探讨其核心…

学习threejs,使用MeshBasicMaterial基本网格材质

&#x1f468;‍⚕️ 主页&#xff1a; gis分享者 &#x1f468;‍⚕️ 感谢各位大佬 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍⚕️ 收录于专栏&#xff1a;threejs gis工程师 文章目录 一、&#x1f340;前言1.1 ☘️THREE.MeshBasicMaterial 二…

【git-hub项目:YOLOs-CPP】本地实现05:项目移植

ok&#xff0c;经过前3个博客&#xff0c;我们实现了项目的跑通。 但是&#xff0c;通常情况下&#xff0c;我们的项目都是需要在其他电脑上也跑通&#xff0c;才对。 然而&#xff0c;经过测试&#xff0c;目前出现了2 个bug。 项目一键下载【⬇️⬇️⬇️】&#xff1a; 精…

【python】协程(coroutine)

协程&#xff08;coroutine&#xff09;可以理解为一个可以中途暂停保存当前执行状态信息并可以从此处恢复执行的函数&#xff0c;多个协程共用一个线程执行&#xff0c;适合执行需要“等待”的任务。 所以严格意义上&#xff0c;多个协程同一时刻也只有一个在真正的执行&#…