Flink cdc同步增量数据timestamp字段相差八小时(分析|解决)不是粘贴复制的!

news2025/1/16 2:32:22

问题

我使用flink cdc同步mysql到mysql遇到了timestamp字段缺少八小时的问题。很少无语,flink ,cdc,debezium时区都设置了,没有任何效果!

分析

问题出现在mysql binlog身上!!!
因为默认mysql会使用UTC来存储binlog,你可以使用下方的sql验证:

mysqlbinlog --base64-output=DECODE-ROWS -v --start-datetime="2024-11-26 16:20:59" --stop-datetime="2024-11-26 16:30:59" /path/to/binlog-file

设置起止时间和binlog位置

而我们存储mysql数据的时候使用的时区大概率是上海,你也可以查看:

SELECT @@global.time_zone, @@session.time_zone;

如果都是system,而你就在中国大陆那就没错了

验证分析:

而这个时间相差你会发现只在同步增量数据的时候才出现!因为.startupOptions(StartupOptions.initial())会同步历史数据,这些都是从数据库读取的,所以两边都是上海时区就不会有问题!

解决

其实官方给了解决的方案,但是说的非常的模糊,如果对flink cdc不是很熟悉的朋友大概率会云里雾里!
这是官方的常见问题汇总:
以下是原话:
Q2: 使用 MySQL CDC,增量阶段读取出来的 timestamp 字段时区相差8小时,怎么回事呢?

#在解析binlog数据中的timestamp字段时,cdc 会使用到作业里配置的server-time-zone信息,也就是MySQL服务器的时区,如果这个时区没有和你的MySQL服务器时区一致,就会出现这个问题。

此外,如果是在DataStream作业中自定义列化器如 MyDeserializer implements DebeziumDeserializationSchema, 自定义的序列化器在解析 timestamp 类型数据时,需要参考下 RowDataDebeziumDeserializeSchema 中对 timestamp 类型的解析,用时给定的时区信息。


private TimestampData convertToTimestamp(Object dbzObj, Schema schema) {
        if (dbzObj instanceof Long) {
            switch (schema.name()) {
                case Timestamp.SCHEMA_NAME:
                    return TimestampData.fromEpochMillis((Long) dbzObj);
                case MicroTimestamp.SCHEMA_NAME:
                    long micro = (long) dbzObj;
                    return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000));
                case NanoTimestamp.SCHEMA_NAME:
                    long nano = (long) dbzObj;
                    return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000));
            }
        }
        LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
        return TimestampData.fromLocalDateTime(localDateTime);
    }

其实意思很简单,就是自定义序列化器(实现接口CustomConverter)并且对timestamp字段进行单独的处理就可以!

总结!!!

关于序列化,flink有一个官方的序列化器,是debezium的,源码下载链接:
在这里插入图片描述
你只需要在这个方法里面手动修改时区就可以了!
在这里插入图片描述

注意:你要观察下按照你的环境版本timestamp字段映射的对象类型是不是 ZonedDateTime!!!

使用也很简单(好人做到底):

MySqlSource<DataChangeInfo> mySqlSource = MySqlSource.<DataChangeInfo>builder()
                .hostname("192.168.10.14")
                .port(3306)
                .databaseList("xcode")
                .tableList("xcode.temp_flink")
                .username("root")
                .password("123456")
//                .serverId("5401-5404")
                .debeziumProperties(getProperties())
                .deserializer(new MysqlDeserialization())
//                .scanNewlyAddedTableEnabled(true)
//                .includeSchemaChanges(true) // Configure here and output DDL events
                .startupOptions(StartupOptions.initial())
//                .serverTimeZone("Asia/Shanghai")
                .build();

// 关键代码在这里!!!!!!!!!
private static Properties getProperties() {
    Properties properties = new Properties();
    properties.setProperty("converters", "dateConverters");
    //这里!这里!!这里!!!(这是官方的,用上面的源码自己修改完填你的全路径)
    properties.setProperty("dateConverters.type", "io.debezium.connector.mysql.converters.MysqlDebeziumTimeConverter");
    properties.setProperty("dateConverters.format.date", "yyyy-MM-dd");
    properties.setProperty("dateConverters.format.time", "HH:mm:ss");
    properties.setProperty("dateConverters.format.datetime", "yyyy-MM-dd HH:mm:ss");
    properties.setProperty("dateConverters.format.timestamp", "yyyy-MM-dd HH:mm:ss");
    // timestamp没用。。。
//        properties.setProperty("dateConverters.format.timestamp.zone", "UTC");
    //全局读写锁,可能会影响在线业务,跳过锁设置
    properties.setProperty("debezium.snapshot.locking.mode","none");
    properties.setProperty("include.schema.changes", "true");
    properties.setProperty("bigint.unsigned.handling.mode","long");
    properties.setProperty("decimal.handling.mode","double");
    return properties;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2250715.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

QUICK 调试camera-xml解析

本文主要介绍如何在QUICK QCS6490使能相机模组。QCS6490的相机基于CameraX的框架&#xff0c;只需通过配置XML文件&#xff0c;设置相机模组的相关参数&#xff0c;就可以点亮相机。本文主要介绍Camera Sensor Module XML和Camera Sensor XML配置的解析&#xff0c;这中间需要c…

ArcGIS 软件中路网数据的制作

内容导读 路网数据是进行网络分析的基础&#xff0c;它是建立网络数据集的数据来源。 本文我们以OSM路网数据为例&#xff0c;详细介绍OSM路网数据从下载&#xff0c;到数据处理&#xff0c;添加属性&#xff0c;完成符合网络分析的网络数据集的全部过程。 01 数据获取 比较…

Flink双流Join

在离线 Hive 中&#xff0c;我们经常会使用 Join 进行多表关联。那么在实时中我们应该如何实现两条流的 Join 呢&#xff1f;Flink DataStream API 为我们提供了3个算子来实现双流 join&#xff0c;分别是&#xff1a; join coGroup intervalJoin 下面我们分别详细看一下这…

【Python-Open3D学习笔记】005Mesh相关方法

TriangleMesh相关方法 文章目录 TriangleMesh相关方法1. 查看mesh三角形面信息2. 可视化三角形3. 上采样4. 计算mesh形成的面积和体积 1. 查看mesh三角形面信息 def view_hull_triangles(hull: o3d.geometry.TriangleMesh):"""查看mesh三角形面信息&#xff08…

【LeetCode热题100】优先级队列

这盘博客记录了关于优先级队列的几道题&#xff0c;包括最后一块石头的重量、数据流中的第K大元素、前K个高频单词、数据流的中位数。 class Solution { public:int lastStoneWeight(vector<int>& stones) {priority_queue<int> heap;for(auto s : stones) hea…

node.js基础学习-cheerio模块-简单小爬虫(五)

学习cheerio模块&#xff0c;简单做一个爬取图片网站的图片&#xff0c;并且将这些图片下载到本地指定的文件夹下&#xff0c;很多图片网站都有一些反爬取的机制&#xff0c;找的好几个都会报302错误&#xff0c;所以我找了一个小的图片网站&#xff0c;这个没有反爬取机制&…

技术创新与人才培养并重 软通动力子公司鸿湖万联亮相OpenHarmony人才生态大会

11月27日&#xff0c;由开放原子开源基金会指导&#xff0c;OpenHarmony项目群工作委员会主办的OpenHarmony人才生态大会2024在武汉隆重举办。软通动力子公司鸿湖万联作为OpenHarmony项目群A类捐赠人应邀出席。大会期间&#xff0c;鸿湖万联不仅深度参与了OpenHarmony人才生态年…

FFmpeg 推流给 FreeSWITCH

FFmpeg 推流&#xff0c;貌似不难&#xff0c;网上有很多资料, 接到一个任务&#xff0c;推流给 FreeSWITCH&#xff0c;最开始以为很容易&#xff0c; 实则不然&#xff0c;FreeSWITCH uuid_debug_media <uuid>&#xff0c; 一直没人任何反应 仔细一查&#xff0c;Fr…

彻底理解quadtree四叉树、Octree八叉树 —— 点云的空间划分的标准做法

1.参考文章&#xff1a; &#xff08;1&#xff09;https://www.zhihu.com/question/25111128 这里面的第一个回答&#xff0c;有一幅图&#xff1a; 只要理解的四叉树的构建&#xff0c;对于八叉树的构建原理类比方法完全一样&#xff1a;对于二维平面内的随机分布的这些点&…

【工具变量】上市公司企业数字化转型指数(甄红线版本,战略引领、技术驱动、环境支撑、数字化成果及应用)2011-2022年

一、测算方式&#xff1a;参考《经济研究》甄红线&#xff08;2023&#xff09;老师研究的做法&#xff0c;本文采用 &#xff23;&#xff33;&#xff2d;&#xff21;&#xff32; 数据库中国上市公司数字化转型研究数据库中企业数字化转型指数来衡量企业数字化转型水平。 数…

长短期记忆网络 (LSTM) 简介

文章目录 一、说明二、传统 RNN 的问题三、为什么梯度消失&#xff1f;四、长短期记忆网络简介五、忘记门六、Update Gate (Input Gate)七、Output Gate八、数学上的内存九、从 LSTM 到 Transformer十、总结 一、说明 机器学习取得进步的领域之一是自然语言处理。对于用于机器…

安卓悬浮窗应用外无法穿透事件问题

现象&#xff1a; 应用内悬浮窗如何设置了 WindowManager.LayoutParams.FLAG_NOT_FOCUSABLE WindowManager.LayoutParams.FLAG_NOT_TOUCHABLE在自己应用内事件穿透正常&#xff0c;但到应用外就无法点击。 原因&#xff1a; 解决方法&#xff1a; layoutParams.alpha 0.8f …

linux minio安装

安装minio&#xff08;Centos&#xff09; 1. 查看服务器版本uname -a 2. 到minio官网下载对应的版本 官网地址&#xff1a;minio官网下载 根据上面查看的信息是x86_64系统所以我们下载linu-amd64 3. 上传到服务器 新建minioServer目录 上传至该目录下 赋权 chmod x mi…

JavaScript字符串

这张图片主要介绍了JavaScript中的字符串类型&#xff08;string&#xff09;。 字符串 1. 字符串的定义 在JavaScript中&#xff0c;通过单引号&#xff08;&#xff09;、双引号&#xff08;"&#xff09;或反引号&#xff08;&#xff09;包裹的数据都叫字符串。单引…

Linq中的投影运算 (C#):Select、SelectMany、Zip

投影是指将对象转换为一种新形式的操作&#xff0c;该形式通常只包含那些将随后使用的属性。 通过使用投影&#xff0c;您可以构造从每个对象生成的新类型。 可以投影属性&#xff0c;并对该属性执行数学函数。 还可以在不更改原始对象的情况下投影该对象。 1、Select 下面的…

RabbitMQ在手动消费的模式下设置失败重新投递策略

最近在写RabbitMQ的消费者&#xff0c;因为业务需求&#xff0c;希望失败后重试一定次数&#xff0c;超过之后就不处理了&#xff0c;或者放入死信队列。我这里就达到重试次数后就不处理了。本来以为很简单的&#xff0c;问了kimi&#xff0c;按它的方法配置之后&#xff0c;发…

Maven install java heap space

Maven install java heap space 打包报错 Maven install java heap space 解决&#xff1a; vm option: -Xms1024m -Xmx1024m如果 vm配置了&#xff0c;还是一样报错&#xff0c;就重新选择JRE看看是否正确&#xff0c;idea会默认自己的环境&#xff0c;导致设置vm无效&…

OpenCV_Code_LOG

孔洞填充 void fillHole(const Mat srcBw, Mat &dstBw) {Size m_Size srcBw.size();Mat TempMat::zeros(m_Size.height2,m_Size.width2,srcBw.type());//延展图像srcBw.copyTo(Temp(Range(1, m_Size.height 1), Range(1, m_Size.width 1)));cv::floodFill(Temp, Point(…

C7.5【x86汇编】底层分析范围for的执行过程

目录 1.反汇编代码 2.分析 1.栈区初始化 2.设置数组元素的值 3. 逐条分析范围for 1.arr的地址被放到[ebp-2Ch]处 2.[ebp-2Ch]指向的值被复制一份到[ebp-30h]处 3.eax暂存[ebp-2Ch]指向的值,加28h后存储到[ebp-34h]处 4.跳转指令 5.比较[ebp-30h]和[ebp-34h]指向的值,…