Flink CDC MySQL数据同步到Doris表同步配置生成工具类

news2024/9/24 23:31:02

工具类
在这里插入图片描述
生成的配置

要同步表为:
customer_user.tb_business_user_info
express.route_push_service
请提前自行到doris中建好目标数据库,如果没有会报错
同步的配置文件如下:(将配置内容保存为xxx.yaml文件到flink cdc提交任务)


pipeline:
  name: Sync MySQL Tables To Doris
  parallelism: 1
source:
  type: mysql
  hostname: 10.1.0.24
  port: 3306
  username: root
  password: xxxxxxxxx
  tables: customer_user.tb_business_user_info,express.route_push_service
  server-id: your MYSQL serverId
  server-time-zone: UTC+08
sink:
  type: doris
  fenodes: 10.1.0.27:8030,10.1.0.50:8030,10.1.0.244:8030
  benodes: 10.1.0.27:8040,10.1.0.50:8040,10.1.0.244:8040
  username: root
  password: "xxxxxxxxxxx"
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 3
route:
  - source-table: customer_user.tb_business_user_info
    sink-table: test.ods_customer_user_tb_business_user_info
    description: sync customer_user.tb_business_user_info to test.ods_customer_user_tb_business_user_info
  - source-table: express.route_push_service
    sink-table: test.ods_express_route_push_service
    description: sync express.route_push_service to test.ods_express_route_push_service


import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import lombok.Builder;
import lombok.Data;

import java.util.*;
import java.util.stream.Collectors;

/**
 * @author humorchen
 * date: 2024/8/29
 * description: mysql 同步到doris配置生成器
 **/
public class MysqlSyncToDorisConfigUtil {
    public  interface GetConfig{
        String getConfig();
    }
    public interface CheckConfig{
        void checkConfig();
    }

    public interface InitConfig{
        void initConfig();
    }
    @Data
    @Builder
    public static class MysqlSource implements GetConfig,CheckConfig,InitConfig{
        private String host;
        private int port;
        private String username;
        private String password;
        private String serverId;
        private String timezone;
        private List<TableName> tableNames;
        private static final String TEMPLATE = "source:\n" +
                "  type: mysql\n" +
                "  hostname: #{host}\n" +
                "  port: #{port}\n" +
                "  username: #{username}\n" +
                "  password: #{password}\n" +
                "  tables: #{tables}\n" +
                "  server-id: #{serverId}\n" +
                "  server-time-zone: #{timezone}\n";


        @Override
        public void initConfig() {
            if (port< 1){
                port = 3306;
            }
            if (StrUtil.isBlank(timezone)){
                timezone = "UTC+08";
            }
            if (CollectionUtil.isNotEmpty(tableNames)){
                List<String> collect = tableNames.stream().map(TableName::toString).distinct().collect(Collectors.toList());
                CollectionUtil.sortByPinyin(collect);
                this.tableNames = collect.stream().map(s -> {
                    String[] split = s.split("\\.");
                    return TableName.builder().database(split[0]).tableName(split[1]).build();
                }).collect(Collectors.toList());
            }
        }

        @Override
        public void checkConfig() {
            Assert.notBlank(host);
            Assert.isTrue(port>100);
            Assert.notBlank(username);
            Assert.notBlank(password);
            Assert.notBlank(serverId);
            Assert.notBlank(timezone);
            Assert.isTrue(tableNames != null && !tableNames.isEmpty());
        }

        @Override
        public String getConfig() {
            StringBuilder builder = new StringBuilder();
            if (tableNames != null){
                int i = 0;
                for (TableName tableName : tableNames) {
                    if (i > 0){
                        builder.append(",");
                    }
                    i++;
                    builder.append(tableName.toString());
                }
            }
            String tableNames = builder.toString();
            return TEMPLATE.replace("#{host}",host).replace("#{port}",String.valueOf(port)).replace("#{username}",username).replace("#{password}",password).replace("#{tables}",tableNames).replace("#{serverId}",serverId).replace("#{timezone}",timezone);
        }
    }
    @Data
    @Builder
    public static class TableName{
        private String database;
        private String tableName;

        @Override
        public boolean equals(Object o){
            if (! (o instanceof TableName)){
                return false;
            }
            TableName target = (TableName) o;
            if (!Objects.equals(database,target.database)){
                return false;
            }
            if (!Objects.equals(tableName,target.tableName)){
                return false;
            }
            return true;
        }

        @Override
        public int hashCode(){
            int hash = 0;
            if (database != null){
                hash += database.hashCode();
            }
            if (tableName != null){
                hash += tableName.hashCode();
            }
            return hash;
        }

        @Override
        public String toString(){
            return (database+"."+tableName).replace(" ","");
        }

    }
    @Data
    @Builder
    public static class DorisSource implements GetConfig,CheckConfig{
        private List<DorisFeNode> feNodeList ;
        private List<DorisBeNode> beNodeList;
        private String username;
        private String password;
        private String database;
        private static final String TEMPLATE = "sink:\n" +
                "  type: doris\n" +
                "  fenodes: #{feNodeList}\n" +
                "  benodes: #{beNodeList}\n" +
                "  username: #{username}\n" +
                "  password: \"#{password}\"\n" +
                "  table.create.properties.light_schema_change: true\n" +
                "  table.create.properties.replication_num: 3";

        @Override
        public void checkConfig() {
            Assert.isTrue(CollectionUtil.isNotEmpty(feNodeList));
            Assert.isTrue(CollectionUtil.isNotEmpty(beNodeList));
            Assert.notBlank(username);
            Assert.notBlank(password);
            Assert.notBlank(database);
        }

        @Override
        public String getConfig() {
            StringBuilder feNodeBuilder = new StringBuilder();
            if (feNodeList != null){
                for (int i = 0; i < feNodeList.size(); i++) {
                    if (i >0){
                        feNodeBuilder.append(",");
                    }
                    feNodeBuilder.append(feNodeList.get(i).toString());
                }
            }
            StringBuilder beNodeBuilder = new StringBuilder();
            if (beNodeList != null){
                for (int i = 0; i < beNodeList.size(); i++) {
                    if (i >0){
                        beNodeBuilder.append(",");
                    }
                    beNodeBuilder.append(beNodeList.get(i).toString());
                }
            }
            return TEMPLATE.replace("#{feNodeList}",feNodeBuilder.toString()).replace("#{beNodeList}",beNodeBuilder.toString()).replace("#{username}",username).replace("#{password}",password);
        }
    }

    @Data
    @Builder
    public static class DorisFeNode{
        private String host;
        private int port;

        @Override
        public String toString(){
            return host+":"+port;
        }
    }

    @Data
    @Builder
    public static class DorisBeNode{
        private String host;
        private int port;
        @Override
        public String toString(){
            return host+":"+port;
        }
    }
    @Data
    @Builder
    public static class PipeLineConfig implements GetConfig,CheckConfig,InitConfig{
        private String name;
        private int parallelism;

        private static final String TEMPLATE = "\n" +
                "pipeline:\n" +
                "  name: #{name}\n" +
                "  parallelism: #{parallelism}\n";

        @Override
        public void checkConfig() {
            Assert.notBlank(name);
            Assert.isTrue(parallelism > 0);
        }

        @Override
        public String getConfig() {
            return TEMPLATE.replace("#{name}",name).replace("#{parallelism}",String.valueOf(parallelism));
        }

        @Override
        public void initConfig() {
            if (name == null){
                name = "Sync MySQL Tables To Doris";
            }
            if (parallelism <1){
                parallelism = 1;
            }
        }
    }
    @Data
    @Builder
    public static class RouteConfig implements GetConfig,CheckConfig,InitConfig{
        private MysqlSource mysqlSource;
        private DorisSource dorisSource;
        private String tablePrefix;
        private static final String CONFIG_PREFIX = "\nroute:\n";
        private static final String TEMPLATE = "  - source-table: #{sourceDatabase}.#{sourceTable}\n" +
                "    sink-table: #{sinkDatabase}.#{tablePrefix}#{sourceDatabase}_#{sourceTable}\n" +
                "    description: sync #{sourceDatabase}.#{sourceTable} to #{sinkDatabase}.#{tablePrefix}#{sourceDatabase}_#{sourceTable}\n";

        @Override
        public void initConfig() {
            if (tablePrefix == null){
                tablePrefix = "ods_";
            }
        }

        @Override
        public void checkConfig() {
            Assert.notNull(tablePrefix);
        }

        @Override
        public String getConfig() {
            StringBuilder builder = new StringBuilder();
            builder.append(CONFIG_PREFIX);
            List<TableName> tableNames = mysqlSource.tableNames;
            for (TableName tableName : tableNames) {
                builder.append(TEMPLATE.replace("#{sourceDatabase}",tableName.database).replace("#{sourceTable}",tableName.tableName).replace("#{sinkDatabase}", dorisSource.database)
                        .replace("#{tablePrefix}",tablePrefix)
                );
            }
            return builder.toString();
        }
    }

    public static String syncTables(MysqlSource mysqlSource,DorisSource dorisSource,RouteConfig routeConfig){
        return syncTables(mysqlSource,dorisSource,routeConfig,null);
    }
    public static String syncTables(MysqlSource mysqlSource,DorisSource dorisSource,RouteConfig routeConfig,PipeLineConfig pipeLineConfig){
        if (pipeLineConfig == null){
            pipeLineConfig = PipeLineConfig.builder().build();
        }
        Assert.notNull(mysqlSource);
        Assert.notNull(dorisSource);
        Assert.notNull(routeConfig);
        mysqlSource.initConfig();
        routeConfig.initConfig();
        pipeLineConfig.initConfig();
        pipeLineConfig.checkConfig();
        mysqlSource.checkConfig();
        dorisSource.checkConfig();
        routeConfig.checkConfig();
        return pipeLineConfig.getConfig() +
                mysqlSource.getConfig() +
                dorisSource.getConfig() +
                routeConfig.getConfig();
    }

    public static Set<TableName> getTableNamesFromStr(String str){
        String[] split = str.split("\n");
        Set<TableName> ret = new HashSet<>();
        for (String s : split) {
            if (StrUtil.isNotBlank(s)){
                s = s.replace(" ","").toLowerCase();
                String[] ss = s.split("\\.");
                ret.add(TableName.builder().database(ss[0]).tableName(ss[1]).build());
            }
        }
        return ret;
    }
    public static void main(String[] args) {
        String tables = "customer_user.tb_business_user_info\n" +
                "express.route_push_service\n";
        Set<TableName> tableNameSet = getTableNamesFromStr(tables);

        MysqlSource mysqlSource = MysqlSource.builder().host("10.1.0.24").port(3306).username("root").password("xxxxxxxxx").serverId("your MYSQL serverId").tableNames(new ArrayList<>(tableNameSet)).build();

        DorisSource dorisSource = DorisSource.builder().
                feNodeList(Lists.newArrayList(DorisFeNode.builder().host("10.1.0.27").port(8030).build(),
                        DorisFeNode.builder().host("10.1.0.50").port(8030).build(),
                        DorisFeNode.builder().host("10.1.0.244").port(8030).build()
                        ))
                .beNodeList(Lists.newArrayList(DorisBeNode.builder().host("10.1.0.27").port(8040).build(),
                        DorisBeNode.builder().host("10.1.0.50").port(8040).build(),
                        DorisBeNode.builder().host("10.1.0.244").port(8040).build())
                        )
                .username("root").password("xxxxxxxxxxx").database("test").build();
        RouteConfig routeConfig = RouteConfig.builder().mysqlSource(mysqlSource).dorisSource(dorisSource).tablePrefix("ods_").build();
        String config = syncTables(mysqlSource,dorisSource,routeConfig);
        System.out.println("要同步表为:");
        for (TableName tableName : mysqlSource.tableNames) {
            System.out.println(tableName.toString());
        }
        System.out.println("请提前自行到doris中建好目标数据库,如果没有会报错");
        System.out.println("同步的配置文件如下:(将配置内容保存为xxx.yaml文件到flink cdc提交任务)");
        System.out.println();
        System.out.println(config);
    }

    public static void main12(String[] args){
        // 要同步的表
        String tables = "customer_user.tb_business_user_info\n" +
                "express.route_push_service\n" +
                "funds.dunning_order_request_table\n" +
                "funds.payout_order_table\n" +
                "funds.pdd_settle_table\n" +
                "manage.coupon_order_detail\n" +
                "manage.fulfillment_info\n" +
                "manage.order_report_detail\n" +
                "manage.recycle_inspection_report\n" +
                "manage.recycle_store_order_review\n" +
                "market.device\n" +
                "market.device \n" +
                "market.order\n" +
                "market.order_and_device\n" +
                "order.order_finance_detail\n" +
                "order.order_info\n" +
                "order.order_status\n" +
                "order.order_store_daily\n" +
                "order.order_store_recycle_detail\n" +
                "order.order_trade_in_detail\n" +
                "storage.delivery_order\n" +
                "storage.device\n" +
                "storage.inspect_order\n" +
                "storage.receive_order_detail\n" +
                "storage.recycle_updoor_inspections \n" +
                "storage.send_goods_order_detail\n" +
                "storage.storage_order\n" +
                "storage.unpack\n" +
                "storage.warehouse\n" +
                "storage.work_order_result\n";
        // 同步成功的表
        String now = "ods_customer_user_tb_business_user_info\n" +
                "ods_express_route_push_service\n" +
                "ods_funds_dunning_order_request_table\n" +
                "ods_funds_payout_order_table\n" +
                "ods_manage_coupon_order_detail\n" +
                "ods_manage_fulfillment_info\n" +
                "ods_manage_order_report_detail\n" +
                "ods_manage_recycle_inspection_report\n" +
                "ods_manage_recycle_store_order_review\n" +
                "ods_market_device\n" +
                "ods_market_order\n" +
                "ods_market_order_and_device\n" +
                "ods_order_order_finance_detail\n" +
                "ods_order_order_info\n" +
                "ods_order_order_status\n" +
                "ods_order_order_store_daily\n" +
                "ods_order_order_store_recycle_detail\n" +
                "ods_order_order_trade_in_detail\n" +
                "ods_storage_delivery_order\n" +
                "ods_storage_device\n" +
                "ods_storage_inspect_order\n" +
                "ods_storage_receive_order_detail\n" +
                "ods_storage_recycle_updoor_inspections\n" +
                "ods_storage_send_goods_order_detail\n" +
                "ods_storage_storage_order\n" +
                "ods_storage_unpack\n" +
                "ods_storage_warehouse\n" +
                "ods_storage_work_order_result";
        checkLostTables(tables,now);
    }

    /**
     * 检查同步丢失的表,有时候会有报错没有提示是哪个表,这样对比看是是哪个失败了
     *
     * @param target
     * @param now
     */
    public static void checkLostTables(String target,String now){
        String[] targetSplit = target.split("\n");
        ArrayList<String> targetList = new ArrayList<>();
        for (int i = 0; i < targetSplit.length; i++) {
            targetList.add("ods_"+targetSplit[i].replace(".","_").replace(" ",""));
        }
        String[] nowSplit = now.split("\n");
        ArrayList<String> nowList = new ArrayList<>();
        for (String s : nowSplit) {
            nowList.add(s.replace(" ",""));
        }
        CollectionUtil.sortByPinyin(targetList);
        CollectionUtil.sortByPinyin(nowList);
        System.out.println("target len: "+targetList.size());
        System.out.println("now len: "+nowList.size());
        Collection<String> subtract = CollectionUtil.subtract(targetList, nowList);
        System.out.println(JSONObject.toJSONString(subtract));

        for (int i = 0; i < targetList.size(); i++) {
            try {
                System.out.print(targetList.get(i));
            }catch (Exception e){}
            System.out.print("  \t  ");
            try {
                System.out.print(nowList.get(i));
            }catch (Exception e){}
            System.out.println( );
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2089907.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

昇腾 Ascend 概念澄清 Host、Device、AI core、AI CPU、DVPP、AIPP、AscendCL、AscendC

昇腾 Ascend 概念澄清 Host、Device、AI core、AI CPU、DVPP、AIPP、AscendCL、AscendC flyfish Ascend C开发算子&#xff0c;偏低。 AscendCL开发应用&#xff0c;偏高。 AI core、AI CPU、DVPP都属于计算资源。 Ascend C开发的算子运行在AI Core上。 AIPP用于在AI Core上完…

TimeWheel算法介绍及在应用上的探索

作者&#xff1a;来自 vivo 互联网服务器团队- Li Fan 本文从追溯时间轮算法的出现&#xff0c;介绍了时间轮算法未出现前&#xff0c;基于队列的定时任务实现&#xff0c;以及基于队列的定时任务实现所存在的缺陷。接着我们介绍了时间轮算法的算法思想及其数据结构&#xff0c…

手撕数据结构与算法——拓扑排序

拓扑排序是图论中的一个重要概念&#xff0c;它在许多领域如任务调度、课程规划等都有广泛的应用。在这篇文章中&#xff0c;我们将探讨拓扑排序的基本概念、算法实现以及在C/C中的实现方法。 拓扑排序简介 拓扑排序是针对有向无环图&#xff08;DAG&#xff09;的一种排序算法…

二叉树(数据结构)

1.两种特殊的二叉树 1. 满二叉树 : 一棵二叉树&#xff0c;如果 每层的结点数都达到最大值&#xff0c;则这棵二叉树就是满二叉树 。也就是说&#xff0c; 如果一棵 二叉树的层数为 K &#xff0c;且结点总数是2^k-1 &#xff0c;则它就是满二叉树 。 2. 完全二叉树 : 完…

为你的LLM应用增加记忆能力

1. 记忆系统的重要性 我们都知道&#xff0c;大模型本身是无状态、无记忆的。默认情况下&#xff0c;我们向大模型发起的每次提问&#xff0c;在其内部都会被视为一次全新的调用。尽管诸如 ChatGPT 等聊天应用内置了部分记忆功能&#xff0c;可以记录用户最近几轮的聊天信息&a…

ChatTTS容器构建教程

一、模型介绍 ChatTTS 是专门为对话场景设计的文本转语音模型&#xff0c;例如 LLM 助手对话任务。它支持英文和中文两种语言。最大的模型使用了 10 万小时以上的中英文数据进行训练。在 HuggingFace 中开源的版本为 4 万小时训练且未 SFT 的版本。 ChatTTS WebUI如下&#x…

【单片机原理及应用】实验:LED循环控制

目录 一、实验目的 二、实验内容 三、实验步骤 四、记录与处理 五、思考 六、成果文件提取链接 一、实验目的 熟悉Proteus x8原理图与C51程序的联合仿真调试方法&#xff0c;掌握C51延时函数和循环控制的方法 二、实验内容 【参照图表】 &#xff08;1&#xff09;创建一…

晚宴扫码查询桌号

在现代社交活动中&#xff0c;晚宴的组织和管理越来越依赖于技术手段。为了提高晚宴的效率和参与者的体验&#xff0c;我们可以通过一个简单的扫码查询系统来实现快速查找桌号和座位号。以下是详细步骤&#xff1a; 1. 电脑端上传查询信息 访问云分组官网。 使用微信扫码登录…

AcrelEMS3.0企业微电网智慧能源平台的设计与应用-安科瑞 蒋静

1系统概述 1.1 概述 2020年9月&#xff0c;我国明确提出2030年“碳达峰”与2060年“碳中和”目标。2022年6月&#xff0c;科技部、国家发展改革委、工业和信息化部、生态环境部、住房城乡建设部、交通运输部、中科院、工程院、国家能源局共同研究制定了《科技支撑碳达峰碳中和…

netty编程之实现断点续传(分片发送)功能

写在前面 在我们使用各种网盘的时候&#xff0c;可以随时的暂停上传&#xff0c;然后继续上传&#xff0c;这其实就是断点续传的功能&#xff0c;本文就看下在netty中如何实现断点续传的功能。 1&#xff1a;核心点介绍 1.1&#xff1a;RandomAccessFile RandomAccessFile类…

汽车信息安全--MCU Flash读保护真的没有后门吗?

目录 1.修bug修出的灵感 2.串行编程接口协议 3.毛刺攻击 4.RH850 串行编程模式 5.小结 1.修bug修出的灵感 ECU量产后通过密码控制来防止通过Debug口读取Flash的程序和数据。 这是应该是共识了&#xff0c;但是这样做真的就万无一失了吗&#xff1f; 最近解决了个问题&…

Linux操作系统在虚拟机VM上的安装【CentOS版本】

目录 准备工作 "CPU虚拟化"的方法 VMware的安装 Linux镜像文件的下载 开始安装 声明 新建虚拟机 安装CentOS7.6 配置Linux(CentOS7.6)操作系统 配置分区【学习者可以直接点击自动配置分区&#xff0c;不过还是建议学习一下手动分区】 分区原则 添加分区 …

适配制造业的设备管理系统有哪些?本文给你答案!

本文将带大家盘点10款设备管理系统&#xff0c;供企业选型参考。 外包单位数量众多&#xff0c;但难以对他们进行统一协同管理&#xff1f;危险作业的作业申请、作业审批使用线下纸质版&#xff0c;不仅效率低还管控力不足&#xff1f;相关部门想监测、管理设备安全风险&#x…

AcWing898. 数字三角形

线性DP 董晓老师的讲解是从下标0开始算的&#xff0c;其实我们从1开始也可以&#xff0c;我感觉这里从1开始更好理解。是从下往上计算的。j负责列的计算&#xff0c;往上计算时逐步收窄横向的范围&#xff0c;i是纵向的从下往上算&#xff0c; 下面是内存布局 下面是逻辑上的…

【网络安全】打开这份“开学礼” 谨防骗子“冲业绩”

吉祥知识星球http://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247485367&idx1&sn837891059c360ad60db7e9ac980a3321&chksmc0e47eebf793f7fdb8fcd7eed8ce29160cf79ba303b59858ba3a6660c6dac536774afb2a6330#rd 《网安面试指南》http://mp.weixin.qq.com/s…

今年读过最绝的一本书!《自然语言处理原理、方法与应用》,几乎把自然语言处理讲透了【附PDF】

书籍内容介绍&#xff1a; 本书系统阐述自然语言处理基础知识&#xff0c;以及自然语言处理高级模型应用等高级知识。 全书共11章&#xff1a;第1&#xff5e;5章为自然语言处理的基础知识&#xff0c;第6&#xff5e;11章将自然语言处理知识应用于实战。书中主要内容包括预训…

99% 的人都不知道,哪种 Python 循环最快?

大家好&#xff0c;在Python编程中&#xff0c;循环是我们经常使用的工具&#xff0c;用来重复执行一些代码块。我们通常会在 for 循环、while 循环和列表推导式中选择一种来解决问题。但你是否曾想过&#xff1a;哪种循环效率最高&#xff1f; 今天&#xff0c;我们就来一次轻…

Vue -- 总结 02

Vue脚手架 安装Vue脚手架&#xff1a; 在cmd中安装(输入):npm install -g vue/cli 如果下载慢或下载不了&#xff0c;可以安装(cmd输入)淘宝镜像:npm config set registry https://registry.npmmirror.com 用命令创建Vue项目 在要创建的vue项目的文件夹里输入 cmd 回车 创…

CSS 2D3D转换与动画

CSS 2D&3D转换与动画 字体图标 字体图标展示的是图标&#xff0c;本质是字体 处理简单的&#xff0c;颜色单一的图片&#xff0c;使用字体图标 使用&#xff1a; 下载&#xff1a;https://www.iconfont.cn/ 引入字体图标样式表 对应标签上classz增加对应的类名&…

使用pytest的 reporting特性来生成报告

特性 1.HTML 报告&#xff1a;使用 pytest-html 插件&#xff0c;你可以生成 HTML 格式的测试报告。只需在项目的 pytest.ini 文件中添加以下内容&#xff1a; [pytest] addopts --htmlreport.html然后&#xff0c;在运行 pytest 时&#xff0c;将会生成一个名为 report.htm…