mysql cdc 整库迁移 (mysql to mysql)

news2025/1/12 7:45:13

技术思想

利用 mysql catalog,mysql cdc,flink jdbc 等技术实现 mysql 整库迁移至下游数据库,这里是示范 mysql to mysql ,其他 sink 组件可自行扩展实现。

通过 flink ParameterTool,可以选择是整库同步还是多表亦或单表同步,可以设置全局并发,源表 mysql 参数,目标表 mysql 参数

通过 sql Connection 实现自动建表逻辑 (mysql 数据类型众多,这里并没有测试所有的类型参数,如担心建表不成功,可手动建表,不影响程序运行)

下游使用 flink jdbc 来实现,语法为 upsert 即幂等写入(重复数据只会写入一次)

使用 mysql catalog 来实现源表元数据的获取

自定义 CustomDebeziumDeserializer 实现 DebeziumDeserializationSchema 接口对数据进行转换

该任务本质上是 单 source 多 sink 任务,不同的表数据不一样可能会有一定的反压

程序测试 生成五百万条数据 五张表 一分钟左右完成,增量数据一万条,可以同步完成

环境 flink 1.16 cdc 2.3.0

refer:
https://nightlies.apache.org/flink/flink-docs-release-1.16/
https://blog.csdn.net/qq_36062467/article/details/128117647
https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc%28ZH%29.html

代码

github 地址 https://github.com/SophiaData/Bigdata_Code_Tutorial/blob/master/Code_Tutorial/Flink-demo/

/** (@SophiaData) (@date 2022/12/1 16:02). */
public class CustomDebeziumDeserializer
        implements DebeziumDeserializationSchema<Tuple2<String, Row>> {
    private static final Logger LOG = LoggerFactory.getLogger(CustomDebeziumDeserializer.class);

    private final Map<String, DeserializationRuntimeConverter> physicalConverterMap =
            Maps.newConcurrentMap();

    CustomDebeziumDeserializer(Map<String, RowType> tableRowTypeMap) {
        for (String tableName : tableRowTypeMap.keySet()) {
            RowType rowType = tableRowTypeMap.get(tableName);
            DeserializationRuntimeConverter physicalConverter = createNotNullConverter(rowType);
            this.physicalConverterMap.put(tableName, physicalConverter);
        }
    }

    @Override
    public void deserialize(SourceRecord record, Collector<Tuple2<String, Row>> out)
            throws Exception {
        Envelope.Operation op = Envelope.operationFor(record);
        Struct value = (Struct) record.value();
        Schema valueSchema = record.valueSchema();
        Struct source = value.getStruct("source");
        String tableName = source.get("table").toString();
        DeserializationRuntimeConverter physicalConverter = physicalConverterMap.get(tableName);
        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
            Row insert = extractAfterRow(value, valueSchema, physicalConverter);
            insert.setKind(RowKind.INSERT);
            out.collect(Tuple2.of(tableName, insert));
        } else if (op == Envelope.Operation.DELETE) {
            Row delete = extractBeforeRow(value, valueSchema, physicalConverter);
            delete.setKind(RowKind.DELETE);
            out.collect(Tuple2.of(tableName, delete));
        } else if (op == Envelope.Operation.UPDATE) {
            Row before = extractBeforeRow(value, valueSchema, physicalConverter);
            before.setKind(RowKind.UPDATE_BEFORE);
            out.collect(Tuple2.of(tableName, before));
            Row after = extractAfterRow(value, valueSchema, physicalConverter);
            after.setKind(RowKind.UPDATE_AFTER);
            out.collect(Tuple2.of(tableName, after));
        } else {
            LOG.warn(" Unexpected statement: {}", value);
        }
    }

    private Row extractAfterRow(
            Struct value, Schema valueSchema, DeserializationRuntimeConverter physicalConverter)
            throws Exception {
        Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
        Struct after = value.getStruct(Envelope.FieldName.AFTER);
        return (Row) physicalConverter.convert(after, afterSchema);
    }

    private Row extractBeforeRow(
            Struct value, Schema valueSchema, DeserializationRuntimeConverter physicalConverter)
            throws Exception {
        Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
        Struct before = value.getStruct(Envelope.FieldName.BEFORE);
        return (Row) physicalConverter.convert(before, beforeSchema);
    }

    @Override
    public TypeInformation<Tuple2<String, Row>> getProducedType() {
        return TypeInformation.of(new TypeHint<Tuple2<String, Row>>() {});
    }

    public static DeserializationRuntimeConverter createNotNullConverter(LogicalType type) {

        switch (type.getTypeRoot()) {
            case NULL:
                return new DeserializationRuntimeConverter() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object convert(Object dbzObj, Schema schema) {
                        return null;
                    }
                };
            case BOOLEAN:
                return convertToBoolean();
            case TINYINT:
                return new DeserializationRuntimeConverter() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object convert(Object dbzObj, Schema schema) {
                        return Byte.parseByte(dbzObj.toString());
                    }
                };
            case SMALLINT:
                return new DeserializationRuntimeConverter() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object convert(Object dbzObj, Schema schema) {
                        return Short.parseShort(dbzObj.toString());
                    }
                };
            case INTEGER:
            case INTERVAL_YEAR_MONTH:
                return convertToInt();
            case BIGINT:
            case INTERVAL_DAY_TIME:
                return convertToLong();
            case DATE:
                return convertToDate();
            case TIME_WITHOUT_TIME_ZONE:
                return convertToTime();
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                return convertToTimestamp(ZoneId.of("UTC"));
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                return convertToLocalTimeZoneTimestamp(ZoneId.of("UTC"));
            case FLOAT:
                return convertToFloat();
            case DOUBLE:
                return convertToDouble();
            case CHAR:
            case VARCHAR:
                return convertToString();
            case BINARY:
            case VARBINARY:
                return convertToBinary();
            case DECIMAL:
                return createDecimalConverter((DecimalType) type);
            case ROW:
                return createRowConverter((RowType) type);
            case ARRAY:
            case MAP:
            case MULTISET:
            case RAW:
            default:
                throw new UnsupportedOperationException("Unsupported type: " + type);
        }
    }

    private static DeserializationRuntimeConverter convertToBoolean() {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Boolean) {
                    return dbzObj;
                } else if (dbzObj instanceof Byte) {
                    return (byte) dbzObj == 1;
                } else if (dbzObj instanceof Short) {
                    return (short) dbzObj == 1;
                } else {
                    return Boolean.parseBoolean(dbzObj.toString());
                }
            }
        };
    }

    private static DeserializationRuntimeConverter convertToInt() {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Integer) {
                    return dbzObj;
                } else if (dbzObj instanceof Long) {
                    return ((Long) dbzObj).intValue();
                } else {
                    return Integer.parseInt(dbzObj.toString());
                }
            }
        };
    }

    private static DeserializationRuntimeConverter convertToLong() {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Integer) {
                    return ((Integer) dbzObj).longValue();
                } else if (dbzObj instanceof Long) {
                    return dbzObj;
                } else {
                    return Long.parseLong(dbzObj.toString());
                }
            }
        };
    }

    private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) {
        final int precision = decimalType.getPrecision();
        final int scale = decimalType.getScale();
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                BigDecimal bigDecimal;
                if (dbzObj instanceof byte[]) {
                    // decimal.handling.mode=precise
                    bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);
                } else if (dbzObj instanceof String) {
                    // decimal.handling.mode=string
                    bigDecimal = new BigDecimal((String) dbzObj);
                } else if (dbzObj instanceof Double) {
                    // decimal.handling.mode=double
                    bigDecimal = BigDecimal.valueOf((Double) dbzObj);
                } else {
                    if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
                        SpecialValueDecimal decimal =
                                VariableScaleDecimal.toLogical((Struct) dbzObj);
                        bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);
                    } else {
                        // fallback to string
                        bigDecimal = new BigDecimal(dbzObj.toString());
                    }
                }
                return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
            }
        };
    }

    private static DeserializationRuntimeConverter convertToDouble() {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Float) {
                    return ((Float) dbzObj).doubleValue();
                } else if (dbzObj instanceof Double) {
                    return dbzObj;
                } else {
                    return Double.parseDouble(dbzObj.toString());
                }
            }
        };
    }

    private static DeserializationRuntimeConverter convertToFloat() {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Float) {
                    return dbzObj;
                } else if (dbzObj instanceof Double) {
                    return ((Double) dbzObj).floatValue();
                } else {
                    return Float.parseFloat(dbzObj.toString());
                }
            }
        };
    }

    private static DeserializationRuntimeConverter convertToDate() {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay();
            }
        };
    }

    private static DeserializationRuntimeConverter convertToTime() {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Long) {
                    switch (schema.name()) {
                        case MicroTime.SCHEMA_NAME:
                            return (int) ((long) dbzObj / 1000);
                        case NanoTime.SCHEMA_NAME:
                            return (int) ((long) dbzObj / 1000_000);
                    }
                } else if (dbzObj instanceof Integer) {
                    return dbzObj;
                }
                // get number of milliseconds of the day
                return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000;
            }
        };
    }

    private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Long) {
                    switch (schema.name()) {
                        case Timestamp.SCHEMA_NAME:
                            return TimestampData.fromEpochMillis((Long) dbzObj);
                        case MicroTimestamp.SCHEMA_NAME:
                            long micro = (long) dbzObj;
                            return TimestampData.fromEpochMillis(
                                    micro / 1000, (int) (micro % 1000 * 1000));
                        case NanoTimestamp.SCHEMA_NAME:
                            long nano = (long) dbzObj;
                            return TimestampData.fromEpochMillis(
                                    nano / 1000_000, (int) (nano % 1000_000));
                    }
                }
                LocalDateTime localDateTime =
                        TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
                return TimestampData.fromLocalDateTime(localDateTime);
            }
        };
    }

    private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp(
            ZoneId serverTimeZone) {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof String) {
                    String str = (String) dbzObj;
                    // TIMESTAMP_LTZ type is encoded in string type
                    Instant instant = Instant.parse(str);
                    return TimestampData.fromLocalDateTime(
                            LocalDateTime.ofInstant(instant, serverTimeZone));
                }
                throw new IllegalArgumentException(
                        "Unable to convert to TimestampData from unexpected value '"
                                + dbzObj
                                + "' of type "
                                + dbzObj.getClass().getName());
            }
        };
    }

    private static DeserializationRuntimeConverter convertToString() {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                return StringData.fromString(dbzObj.toString());
            }
        };
    }

    private static DeserializationRuntimeConverter convertToBinary() {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof byte[]) {
                    return dbzObj;
                } else if (dbzObj instanceof ByteBuffer) {
                    ByteBuffer byteBuffer = (ByteBuffer) dbzObj;
                    byte[] bytes = new byte[byteBuffer.remaining()];
                    byteBuffer.get(bytes);
                    return bytes;
                } else {
                    throw new UnsupportedOperationException(
                            "Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());
                }
            }
        };
    }

    private static DeserializationRuntimeConverter createRowConverter(RowType rowType) {
        final DeserializationRuntimeConverter[] fieldConverters =
                rowType.getFields().stream()
                        .map(RowType.RowField::getType)
                        .map(CustomDebeziumDeserializer::createNotNullConverter)
                        .toArray(DeserializationRuntimeConverter[]::new);
        final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);

        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) throws Exception {
                Struct struct = (Struct) dbzObj;
                int arity = fieldNames.length;
                Row row = new Row(arity);
                for (int i = 0; i < arity; i++) {
                    String fieldName = fieldNames[i];
                    Field field = schema.field(fieldName);
                    if (field == null) {
                        row.setField(i, null);
                    } else {
                        Object fieldValue = struct.getWithoutDefault(fieldName);
                        Schema fieldSchema = schema.field(fieldName).schema();
                        Object convertedField =
                                convertField(fieldConverters[i], fieldValue, fieldSchema);
                        row.setField(i, convertedField);
                    }
                }
                return row;
            }
        };
    }

    private static Object convertField(
            DeserializationRuntimeConverter fieldConverter, Object fieldValue, Schema fieldSchema)
            throws Exception {
        if (fieldValue == null) {
            return null;
        } else {
            return fieldConverter.convert(fieldValue, fieldSchema);
        }
    }
}

效果

在这里插入图片描述

在这里插入图片描述

最后

代码和测试可能不充分,仅供参考,欢迎提出意见。

欢迎访问博客 https://sophiadata.github.io/Bigdata_Blog_Website/learning/overview

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/72921.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023年天津天狮学院专升本报名考试的安排

天津天狮学院2023年高职升本科考试报名时间安排的通知 一、报名条件 报名条件和具体要求按照天津市招生委员会的文件规定执行。考生必须完成文化课报名环节&#xff0c;且填报天津天狮学院志愿&#xff0c;且具备我校提出的线上考试条件&#xff0c;方可报考我校专业课考试。考…

Java作业

这里写自定义目录标题java16次作业新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注脚注释也是…

python-网络编程

python-网络编程 网络编程的理论概述&#xff1a; 现在的生活离不开网络&#xff0c;例如手机&#xff0c;电脑&#xff0c;平板&#xff0c;都是网络的代名词&#xff0c;通过一些APP&#xff0c;浏览器&#xff0c;获取大量的信息如 文字、声音、视频&#xff0c;这都是从网…

[Linux]------线程同步和信号量

文章目录前言一、条件变量同步概念与竞态条件条件变量函数初始化销毁等待条件满足唤醒等待为什么pthread_cond_wait需要互斥量&#xff1f;条件变量使用规范二、生产者消费者模型为何要使用生产者消费者模型生产者消费者模型的优点基于BlockingQueue的生产者消费者模型C queue模…

1.MyBatis简介

1.概念 MyBatis是一款开源的持久层框架&#xff0c;它支持定制化SQL、存储过程以及高级映射。 与其它ORM框架不同&#xff0c;MyBatis没有将Java对象与数据表关联起来&#xff0c;而是作为Java方法和SQL语句的桥梁&#xff0c;一般称它为“半自动化ORM”框架。 2.Mybatis架构 …

【软件测试】在我刚上岗时,资深测试给我的建议让我受益匪浅......

目录&#xff1a;导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09;前言 建立测试基线 当我还…

BNB Chain对Zebec生态大力扶持,ZBC或继续登录一线平台

在行业早期开始&#xff0c;流支付赛道就已经具备了早期的轮廓&#xff0c;而在流支付协议Zebec Protocol出现后&#xff0c;该领域被推向了一个新的发展高度&#xff0c;并得到加密领域以及传统商业领域的高度关注。而随着生态的商业进展不断推进、生态不断壮大&#xff0c;Ze…

C++代码优化(1):条款1~4

"不要烟火不要星光&#xff0c;只要问问你内心的想法。" 本栏仅仅 由对《Effictive C》其中的一系列条款&#xff0c;掺杂着自己一些愚钝的理解而写的。 ---前言 条款01: 尽量以const、enum、inline 替换 #define 在谈及上述好几个关键字 与define宏定义的关系&…

Intel i226芯片4端口千兆以太网卡 2.5GPoE工业相机图像采集卡介绍

PCIe-8634图像采集卡是一款基于 Intel i226芯片高性能千兆工业级 PCIe*4 POE网卡,具有传输速度高、兼容性强、性能稳定的特点&#xff0c;可广泛主要应用于网络高清监控、无线覆盖、工业自动化等领域。 RJ45千兆网络采用4 k Intel226千兆网络芯片,支持10/100/1000/2500Mbps传输…

microservices 简介

油鹳视频 Microservices explained - the What, Why and How? https://www.youtube.com/watch?vrv4LlmLmVWk&t2s microservices 是一种软件体系结构&#xff0c; microservices architecture(微服务架构) 是与传统的 monolithic architecture(整体式架构&#xff0c;一体…

微信转盘抽奖小程序如何制作?

微信转盘抽奖小程序如何制作&#xff1f;大概需要多少钱&#xff1f; 价格方面&#xff0c;平台按年收费&#xff0c;一年1498至2498元。 明码标价&#xff0c;7天退款制度&#xff0c;随时退。 微信转盘抽奖小程序如何制作步骤: 1.进入第三方微信转盘抽奖小程序制作平台官…

计算机结构体系:指令调度与循环展开题型 (非凭感觉的方法详解)

文章目录题目初始分析1.确定所使用的各个寄存器的作用2.将循环体内容语句和控制语句分开3.找出每一条循环体内容指令的代价并排序4.找出每一条循环体控制指令的代价并排序5.基于贪婪算法的最优循环展开体系结构这门课程中&#xff0c;指令调度和循环展开可以说是课程最困难的地…

负载均衡反向代理下的webshell

负载均衡(Load Balance) 是一种廉价的扩容的方案&#xff0c;它的概念不是本文的重点&#xff0c;不知道的可以去查资料学习。实现负载均衡的方式有很多种&#xff0c;比如 DNS 方式、HTTP 重定向方式、IP 负载均衡方式、反向代理方式等等。 其中像 HTTP 重定向方式、DNS方式等…

BioPython ② | 面向对象编程Object Oriented Programming

BioPython ② | Python面向对象编程 题目要求 定义分子类&#xff08;Molecule&#xff09;作为基类&#xff0c;包含集合elements和weight作为其属性&#xff0c;用初始化函数&#xff0c;将elements初始化为空集&#xff0c;weight初始化为None&#xff1b;定义show_weight…

进阶 - Git的远程仓库

本篇文章&#xff0c;是基于我自用Windows&#xff08;Win10&#xff09;系统当做示例演示 Git的远程仓库 之前我们一直在探讨 Git 的一些命令&#xff0c;也提及了仓库的概念。如果只是在一个仓库里管理文件历史&#xff0c;Git 和 SVN 真没啥区别。 Git 是分布式版本控制系…

02 stata入门【计量经济学及stata应用】

安装&#xff1a;建议直接在微信搜索&#xff0c;很多公众号有安装包资源及下载教程 不同版本在基本功能上无较大差异&#xff0c;一般为SE&#xff0c;更为专业MP&#xff0c;只是在处理变量个数或容量等存在不同 界面 历史命令&#xff1b;结果窗口&命令窗口&#xff1b…

字节跳动岗位薪酬体系曝光,看完感叹:不服不行,想高薪还得是学这个。。。。

目录&#xff1a;导读 前言 01岗位职级 02岗位薪酬 03绩效考核与晋升 大厂软件测试岗经验分享 一、软件测试基础篇&#xff1a;2022版 二、MySQL篇&#xff1a;2022版 三、 Linux篇&#xff1a;2022版 四、 Web测试 五、接口测试 六、APP测试 七、性能测试 八、Se…

Nacos一些理解

下载Mysql //下载mysql docker pull mysql:5.7 //运行容器 docker run -p 3306:3306 --name mysql -v /home/mysql/log:/var/log/mysql -v /home/mysql/data:/var/lib/mysql -e MYSQL_ROOT_PASSWORDxx -d mysql:5.7 将导入数据库 1.新建数据库 nacos /*Navicat Premiu…

HyperMesh宝典 | 跟着老师学,你也可以做好二次开发

说到二次开发&#xff0c;你的脑海里是不是浮现出了“码农”两个字&#xff1f;有人可能会问&#xff0c;码农又是什么&#xff1f; 你是不是觉得二次开发这种码农干的事情感觉起来也太困难了吧&#xff1f; 其实有时候二次开发真的很简单&#xff0c;懂一点二次开发会让你的工…

机器学习中的数学原理——多项式回归

这个专栏主要是用来分享一下我在机器学习中的学习笔记及一些感悟&#xff0c;也希望对你的学习有帮助哦&#xff01;感兴趣的小伙伴欢迎私信或者评论区留言&#xff01;这一篇就更新一下《白话机器学习中的数学——多项式回归》&#xff01; 目录 一、什么多项式回归 二、算法…