flink Mysql CDC(动态加表)、postgresqlCDC 和 CDC无锁算法

news2025/1/18 2:02:02

flinkCDC - 功能验证记录

  • flink 与cdc 版本使用搭配:
  • flink cdc
    • 参数说明
    • 原理分析
    • (DBLog)无锁算法论文
  • mysql cdc
    • cdc api 动态加表
    • flink cdc sql 性能压测
    • flink cdc api 性能压测
  • PostgreSqlCDC
    • 执行更新语句,会出现 2 种情况
  • cdc sink to kafka
  • 报错
  • mysql时区错误,The server time zone value 'EDT' is unrecognized or represents
  • java.lang.NoClassDefFoundError: io/debezium/connector/mysql/MySqlConnectorConfig
  • Cannot discover a connector using option: 'connector'='mysql-cdc'
  • Could not instantiate the executor. Make sure a planner module is on the classpath
  • (source 算子 )The TaskExecutor is shutting down.

flink 与cdc 版本使用搭配:

flink1.13.6 + flink mysql cdc 1.4.0
flink 1.16.0 + flink mysql cdc 2.3.0
flink 1.16.0 + flink mysql cdc 2.4.0
flink 1.16.0 + flink postgresql cdc 2.3.0

flink 1.13.6 + flink mysql cdc 2.3.0 : 没有报错,没有数据,估计是兼容有问题

flink cdc

参数说明

1、调整chunck大小 : scan.incremental.snapshot.chunk.size
2、设置cdc模式:scan.startup.mode【initial(默认)、latest-offset】
3、支持chunk key 列设置,默认是第一个字段:scan.incremental.snapshot.chunk.key-column
官网:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html

原理分析

1、cdc mysql 全量快照阶段split sql :SELECT * FROM cdc_db.tablename WHERE id >= ? AND NOT (id = ?) AND id <= ?;
备注:id 是主键id

(DBLog)无锁算法论文

链接地址:https://arxiv.org/pdf/2010.12597.pdf , 对此算法感兴趣的可以看这位大佬的分享:https://zhuanlan.zhihu.com/p/600303844

论文部分摘要理解:

  • 全量阶段:
    1、flink cdc 任务启动后按设置的chunk size切分数据,sql如下:
    (sql:SELECT * FROM cdc_db.tablename WHERE id >= ? AND NOT (id = ?) AND id <= ?; )
    2、同时会启动读取binlog任务,读取chunk对应的binlog,通过binlog对 select chunk的数据做合并操作,此操作是合并在期间执行了update、delete操作,保证insert-only

  • 增量阶段:
    1、不断追加数据
    在这里插入图片描述
    在这里插入图片描述

mysql cdc

cdc api 动态加表

1、启动任务,复制checkpoint路径
在这里插入图片描述2、新增监听的表到tableList(可以使用同一个jar包,在外部传参动态加表)
3、从checkpoint初重启任务即可

flink cdc sql 性能压测

1、cdc mysql sink to kafka :一个takmanager , 4个slot , source 并发度4,sink kafka 并发度1 ,最高写入2.8W条/s

flink cdc api 性能压测

1、cdc mysql sink to kafka :一个takmanager , 4个slot , source 并发度4,sink kafka 并发度1 ,最高写入2.8W条/s

PostgreSqlCDC

执行更新语句,会出现 2 种情况

1、若更新字段包含(部分)主键字段,会先发送一条删除之前主键的记录。op = d , after = null ; 然后再发送一条新主键记录,op = c,且before = null 。
2、若仅更新非主键主键,只会发送一条记录,op = u , before = null。

主体代码如下:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 必须开启 checkpoint ,因为Flink Postgres CDC 只会在 checkpoint 完成的时候更新 Postgres slot 中的 LSN,否则磁盘使用率会一直很高
        env.enableCheckpointing(1000);

        //监听 postgresql wal 日志
        DebeziumSourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder()
                .hostname(host)
                .port(port)
                .username(userName)
                .password(passWord)
                .database(dbName)
                .tableList(tableList)
                .deserializer(new JsonDebeziumDeserializationSchema())
                .slotName(slotName)
                .build();

        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
        dataStreamSource.print(">>>").setParallelism(1);

        env.execute();

cdc sink to kafka

AT_LEAST_ONCE 模型要配置 acks = 1

报错

mysql时区错误,The server time zone value ‘EDT’ is unrecognized or represents

登录mysql并查询当前时区:show variables like “%time_zone%”;
执行以下命令修改时区:

set global time_zone = '+8:00'; ##修改mysql全局时区为北京时间,即我们所在的东8区
set time_zone = '+8:00'; ##修改当前会话时区
flush privileges; #立即生效

java.lang.NoClassDefFoundError: io/debezium/connector/mysql/MySqlConnectorConfig

缺包,引入 debezium-connector-mysql-1.6.4.Final.jar包会报Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig,看 flink cdc 社区群反馈可能是

Cannot discover a connector using option: ‘connector’=‘mysql-cdc’

删除pom.xml文件flink-connector-mysql-cdc依赖报下的provided

Could not instantiate the executor. Make sure a planner module is on the classpath

包冲突原因。注释或删除jar:flink-table-planner-loader-1.16.0.jar

(source 算子 )The TaskExecutor is shutting down.

加大心跳间隔时间,默认是30s,‘heartbeat.interval’ = ‘60s’

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/780040.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数据挖掘】bytewax 与 ydata工具可实时了解您的数据

一、说明 在这篇博文中&#xff0c;我们将介绍如何将开源流式处理解决方案 bytewax 与 ydata 分析相结合并加以利用&#xff0c;以提高流式处理流的质量。 STream 处理支持在传输中和存储之前对数据进行实时分析&#xff0c;并且可以是有状态的&#xff0c;也可以是无状态的。 …

[STL]vector使用介绍

[STL]vector使用介绍 注&#xff1a;文内代码均在Visual Studio 2013下进行测试&#xff0c;不同的编译器下在扩容大小等方面可能有所不同&#xff0c;但不影响各接口函数的使用。 文章目录 [STL]vector使用介绍1. vector介绍2. 构造函数3. 迭代器相关函数begin函数和end函数的…

实现点击复制到剪切板功能

该功能使用VueUse实现 什么是 VueUse VueUse不是Vue.use&#xff0c;它是为Vue 2和3服务的一套Vue Composition API的常用工具集&#xff0c;是目前世界上Star最高的同类型库之一。它的初衷就是将一切原本并不支持响应式的JS API变得支持响应式&#xff0c;省去程序员自己写相…

jmeter常用的提取器(正则表达式和JSON提取器)

jmeter常用的后置处理器有两种提取数据&#xff1a; 1、JSON提取器 获取后可以将变量token引用到其他所需要的地方 &#xff08;正则表达式和JSON提取器&#xff09;:2023接口自动化测试框架必会两大神器:正则提取器和Jsonpath提取器_哔哩哔哩_bilibilihttps://www.bilibili.…

uniapp实战

上面是tab栏&#xff0c;下面是swiper&#xff0c;&#xff0c;tab和swiper和 红色滑块 动态变化&#xff0c;&#xff0c; 遇到的问题&#xff1a; 往下滚动 tab栏 吸顶&#xff1a; position:sticky; z-index:99; top:0;swiper切换触发 change 事件&#xff0c; :current …

SOMEIP协议--第四节[ SOME/IP](someip概述与行为)

SOMEIP协议–第四节[ SOME/IP](someip概述与行为) 文章目录 SOMEIP协议--第四节[ SOME/IP](someip概述与行为)1、概述2、someip的行为2.1 基础传输2.2 SOME/IP-TP传输:2.3 someip参数(client)2.4 someip参数(server)1、概述 Method | Event | Field是上层设计的三个概念…

【C++】优先级队列和反向迭代器 模拟笔记

文章目录 优先级队列仿函数适配器模式堆知识储备 反向迭代器代码&#xff08;反向迭代器&#xff09;代码&#xff08;优先级队列&#xff09; 优先级队列 仿函数 仿函数&#xff0c;它不是函数&#xff08;其实是个类&#xff09;&#xff0c;但用法和函数一样。既然是个类&a…

子类化QThread来实现多线程,moveToThread函数的作用

子类化QThread来实现多线程&#xff0c; QThread只有run函数是在新线程里的&#xff0c;其他所有函数都在QThread生成的线程里。正确启动线程的方法是调用QThread::start()来启动。 一、步骤 子类化 QThread&#xff1b;重写run&#xff0c;将耗时的事件放到此函数执行&#…

轻量级Web报表工具ActiveReportsJS全新发布v4.0,支持集成更多前端框架!

ActiveReportsJS 是一款基于 JavaScript 和 HTML5 的轻量级Web报表工具&#xff0c;采用拖拽式设计模式&#xff0c;不需任何服务器和组件支持&#xff0c;即可在 Mac、Linux 和 Windows 操作系统中&#xff0c;设计多种类型的报表。ActiveReportsJS 同时提供跨平台报表设计、纯…

18.背景轮播

背景轮播 html部分 <div class"container"><div class"slide active" style"background-image: url(./static/20180529205331_yhGyf.jpeg);"></div><div class"slide " style"background-image: url(./s…

vue3+taro+Nutui 开发小程序(二)

上一篇我们初始化了小程序项目&#xff0c;这一篇我们来整理一下框架 首先可以看到我的项目整理框架是这样的&#xff1a; components:这里存放封装的组件 custom-tab-bar:这里存放自己封装的自定义tabbar interface&#xff1a;这里放置了Ts的一些基本泛型&#xff0c;不用…

AtcoderABC238场

A - Exponential or QuadraticA - Exponential or Quadratic 题目大意 给定一个整数 n&#xff0c;判断是否满足 2n >n 2。 思路分析 根据数学知识可知n 的取值在 2 到 4 之间&#xff08;包括 2 和 4&#xff09;&#xff0c;不满足条件 。 时间复杂度 O(1) AC代码 …

MyBatis学习笔记——4

MyBatis学习笔记——4 一、MyBatis的高级映射及延迟加载1.1、多对一1.1.1、第一种方式&#xff1a;级联属性映射1.1.2、第二种方式&#xff1a;association1.1.3、第三种方式&#xff1a;分步查询 1.2、一对多1.2.1、第一种方式&#xff1a;collection1.2.1、第二种方式&#x…

Linux Ubuntu crontab 添加错误 提示:no crontab for root - using an empty one 888

资料 错误提示&#xff1a; no crontab for root - using an empty one 888 原因剖析&#xff1a; 第一次使用crontab -e 命令时会让我们选择编辑器&#xff0c;很多人会不小心选择默认的nano&#xff08;不好用&#xff09;&#xff0c;或则提示no crontab for root - usin…

一文了解Python中的while循环语句

目录 &#x1f969;循环语句是什么 &#x1f969;while循环 &#x1f969;遍历猜数字 &#x1f969;while循环嵌套 &#x1f969;while循环嵌套案例 &#x1f990;博客主页&#xff1a;大虾好吃吗的博客 &#x1f990;专栏地址&#xff1a;Python从入门到精通专栏 循环语句是什…

【N32L40X】学习笔记11-ADC规则通道采集+dma数据传输

ADC规则通道转换 概述 支持 1 个 ADC&#xff0c;支持单端输入和差分输入&#xff0c;最多可测量 16 个外部和 3 个内部源。支持 12 位、10 位、8 位、6 位分辨率。ADC 时钟源分为工作时钟源、采样时钟源和计时时钟源 仅可配置 AHB_CLK 作为工作时钟源。可配置 PLL 作为采样时…

【安全】Sqllabs(1-4) 多种情况浅析

目录 环境⭐ 先要 ⭐⭐⭐⭐⭐ Less - 1 (information_shcema) Less - 2 (假设没有information_schema) Less - 3 (无列名注入) Less - 4 环境⭐ MySQL8.0.12 Nginx1.15.11 先要 ⭐⭐⭐⭐⭐ MySQL5.0以上有这几个数据库mysql, sys&#xff0c;information_schema informa…

前端性能优化——图片优化

一、图片优化措施 优化图片是 Web 前端优化的重要一环&#xff0c;因为图片是 Web 页面中最耗费带宽和加载时间的资源之一。以下是一些通过优化图片来优化 Web 前端的方法&#xff1a; 压缩图片&#xff1a;压缩图片可以减少图片的文件大小&#xff0c;从而减少加载时间。 使…

【数学建模】相关是一个距离指标吗?

一、说明 本文探讨最平凡的数学模型--距离模型。我们知道&#xff0c;任何数学模型如果是个距离模型&#xff0c;那么它是&#xff1a;放心的、自动的、不加任意条件的指标项目。然而另一些度量参数不是距离空间&#xff0c;因此&#xff0c;使用起来必须外加若干条件&#xff…

苹果iOS 16.6 RC发布:或为iPhone X/8系列养老版本

今天苹果向iPhone用户推送了iOS 16.6 RC更新(内部版本号&#xff1a;20G75)&#xff0c;这是时隔两个月的首次更新。 按照惯例RC版基本不会有什么问题&#xff0c;会在最近一段时间内直接变成正式版&#xff0c;向所有用户推送。 需要注意的是&#xff0c;鉴于iOS 17正式版即将…