Doris(六)--通过 Canal 同步数据到 Doris 中

news2024/12/23 2:29:51

pre 开启 mysql Binlog

网上有众多方法,自行百度。

查询是否成功,在 mysql 客户端输入

show BINARY LOGS;

出现如下提示,即表示 big log 正常开启。 

1,下载 canal 服务端

传送门

注意:下载 canal.deployer-xxx 版本即可。admin 是 deployer 的管理端。

2,上传到服务器的指定位置并解压

tar xzvf canal.deployer-1.1.6.tar.gz

注意,这个 deployer 解压之后直接是零散文件夹,建议先创建一个文件夹后,在这个文件夹里面进行解压

 3,配置实例

进入 conf 文件夹后,创建实例文件夹

cd conf/
mkdir test

从 example 文件夹中,拷贝instance.properties到当前文件夹

cp ../example/instance.properties .

 4,编辑实例文件

4.1 源数据库位置

//源数据位置
canal.instance.master.address=127.0.0.1:3306
//源数据 binlog 名字
canal.instance.master.journal.name=
//源数据 biglog 偏移量
canal.instance.master.position=

4.2 连接源数据库的用户名和密码

//连接源数据库用户名
canal.instance.dbUsername=canal
//连接源数据库密码
canal.instance.dbPassword=canal

4.3 编辑完,保存退出

5,编辑 canal 的配置文件

cd ..
vim canal.properties

5.1 加入新加的实例,已逗号分割

canal.destinations = example

6,部署客户端

这里客户端可以根据 canal 的 api 文档自行开发。

这里贴一些关键代码

{
    protected final static Logger logger = LoggerFactory.getLogger(CanalClientApplication.class);

    private static String ADDRESS = ConfigUtils.getConfigValue("application.properties", "canal.address");

    private static int PORT = Integer.parseInt(ConfigUtils.getConfigValue("application.properties", "canal.port"));

    private static String DESTINATION = ConfigUtils.getConfigValue("application.properties", "canal.destination");

    private static String USERNAME = ConfigUtils.getConfigValue("application.properties", "canal.username");

    private static String PASSWORD = ConfigUtils.getConfigValue("application.properties", "canal.password");

    private static String SUBSCRIBER = ConfigUtils.getConfigValue("application.properties", "canal.subscriber");

    public static void main(String args[]) {
        SpringApplication.run(CanalClientApplication.class,args);
        System.out.println("数据同步服务启动成功");
        // 创建链接
        logger.info("Trying to connect to " + ADDRESS + ":" + PORT);
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ADDRESS,
                PORT), DESTINATION, USERNAME, PASSWORD);
        int batchSize = 1000;
        try {
            logger.info("...");
            connector.connect();
            logger.info("connected");
            connector.subscribe(SUBSCRIBER);
            connector.rollback();

            logger.info("CanalClient Application started successfully!");
            while (true) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                logger.info("当前 message 信息为:{}",message);
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    DataProcessor.process(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("Canal Client exit with error.", e);
            System.exit(-2);
        } finally {
            connector.disconnect();
        }
    }

}
{
    protected final static Logger logger = LoggerFactory.getLogger(DataProcessor.class);

    private static String DATABASE = ConfigUtils.getConfigValue("application.properties", "canal.database");

    private static String TABLE = ConfigUtils.getConfigValue("application.properties", "canal.table");

    private static String OPERATOR = ConfigUtils.getConfigValue("application.properties", "canal.operator");

    private static String CANAL_OUTPUT = ConfigUtils.getConfigValue("application.properties", "canal.output");

    private static DorisUtil dorisUtil;

    private static MySQLUtil mySQLUtil;

    public static void process(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            if (eventType == CanalEntry.EventType.TRUNCATE && OPERATOR.contains("TRUNCATE")) {
                if (StringUtils.isEmpty(DATABASE) ||
                        (entry.getHeader().getSchemaName()!=null && isContain(DATABASE.split(","),entry.getHeader().getSchemaName()))) {
                    if (StringUtils.isEmpty(TABLE) ||
                            (entry.getHeader().getTableName() != null && isContain(TABLE.split(","), entry.getHeader().getTableName()))) {
                        logger.info("TRUNCATE TABLE " + entry.getHeader().getTableName());
                        if (CANAL_OUTPUT.contains("mysql")) {
                            mySQLUtil = MySQLUtil.getInstance();
                            try {
                                mySQLUtil.mySQLTruncate(entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                            } catch (SQLException e) {
                                e.printStackTrace();
                                logger.error("MySQL执行同步truncate出错,dataBase:" + entry.getHeader().getSchemaName() + ",table:" + entry.getHeader().getTableName());
                            }
                        }

                        if (CANAL_OUTPUT.contains("doris")) {
                            dorisUtil = DorisUtil.getInstance();
                            try {
                                dorisUtil.dorisTruncate(entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                            } catch (SQLException e) {
                                e.printStackTrace();
                                logger.error("Doris执行同步truncate出错,dataBase:" + entry.getHeader().getSchemaName() + ",table:" + entry.getHeader().getTableName());
                            }
                        }

                    }
                }
            }
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                // 过滤database, table, operator
                if (StringUtils.isEmpty(DATABASE) ||
                        (entry.getHeader().getSchemaName()!=null && isContain(DATABASE.split(","),entry.getHeader().getSchemaName()))) {
                    if (StringUtils.isEmpty(TABLE) ||
                            (entry.getHeader().getTableName()!=null && isContain(TABLE.split(","),entry.getHeader().getTableName()))) {
                        if (CANAL_OUTPUT.contains("mysql")) {
                            mySQLUtil = MySQLUtil.getInstance();
                            try {
                                if (eventType == CanalEntry.EventType.DELETE && OPERATOR.contains("DELETE")) {
                                    mySQLUtil.mySQLDelete(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getBeforeColumnsList());
                                } else if (eventType == CanalEntry.EventType.INSERT && OPERATOR.contains("INSERT")) {
                                    mySQLUtil.mySQLInsert(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getAfterColumnsList());
                                } else if (eventType == CanalEntry.EventType.UPDATE && OPERATOR.contains("UPDATE")) {
                                    mySQLUtil.mySQLUpdate(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getBeforeColumnsList(), rowData.getAfterColumnsList());
                                } else {
                                    // nothing to do
                                }
                            } catch (SQLException e) {
                                logger.error("MySQL执行同步" + eventType + "出错,dataBase:"+entry.getHeader().getSchemaName()+",table:"+entry.getHeader().getTableName(), e);
                            }
                        }

                        if (CANAL_OUTPUT.contains("doris")) {
                            dorisUtil = DorisUtil.getInstance();
                            try {
                                if (eventType == CanalEntry.EventType.DELETE && OPERATOR.contains("DELETE")) {
                                    dorisUtil.dorisDelete(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getBeforeColumnsList());
                                } else if (eventType == CanalEntry.EventType.INSERT && OPERATOR.contains("INSERT")) {
                                    dorisUtil.dorisInsert(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getAfterColumnsList());
                                } else if (eventType == CanalEntry.EventType.UPDATE && OPERATOR.contains("UPDATE")) {
                                    dorisUtil.dorisUpdate(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getBeforeColumnsList(), rowData.getAfterColumnsList());
                                } else {
                                    // nothing to do
                                }
                            } catch (SQLException e) {
                                logger.error("MySQL执行同步" + eventType + "出错,dataBase:"+entry.getHeader().getSchemaName()+",table:"+entry.getHeader().getTableName(), e);
                            }
                        }
                    }
                }
            }
        }
    }

    public static boolean isContain(String[] list, String value) {
        if (list == null || value == null) return false;
        for (String lv : list) {
            if (value.trim().equals(lv.trim())) {
                return true;
            }
        }
        return false;
    }

    private static void printColumn(String database, String table, List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            logger.info(database + "-" + table + "-" + column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
        }
    }

}

7,启动 canal 服务端

在 canal 根目录下,执行如下命令

./bin/startup.sh 

8,启动 canal 客户端

因为我用的 jar,所以,启动 jar 包就行了。

9,待完成事项

1,doris 官方文档上有通过 binLog 同步数据到 doris 中的方法,这部分待实现。

2,当前客户端写法单一。一旦canal 服务端重启,应用自动停机。待优化。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/954845.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

惹人喜爱的朋友圈背景图

分享一波可爱喜庆的朋友圈背景图&#xff0c;快来看看有没有你喜欢的吧~ ​

Flutter 安装教程 + 运行教程

1.下载依赖 https://flutter.cn/docs/get-started/install/windows 解压完后根据自己的位置放置&#xff0c;如&#xff08;D:\flutter&#xff09; 注意 请勿将 Flutter 有特殊字符或空格的路径下。 请勿将 Flutter 安装在需要高权限的文件夹内&#xff0c;例如 C:\Program …

秋招刷题推荐

参加了24年秋招&#xff0c;除了常见的LC&#xff0c;还需要重点刷某些大厂的常见题 http://codefun2000.com 优点1: 题目全部改编自公司笔试真题&#xff0c;可以做做往年真题练手。 优点2: 该平台和公司笔试模式一样&#xff0c;都是Acm输入输出&#xff0c;更有利于准备秋招…

【Tkinter系列08/15】小部件(Radiobutton、Scale)

20. 小部件Radiobutton 单选按钮是一组相关的小部件&#xff0c;允许 用户&#xff0c;仅选择一组选项中的一个。每 单选按钮由指示器和标签两部分组成&#xff1a; 指示器是变为红色的菱形部分 在所选项目中。 标签是文本&#xff0c;但您可以使用图像或位图作为标签。 如果…

【Python】从入门到上头— IO编程(8)

文章目录 一.IO编程是什么二.文件读写1.读取文件2.file-like Object二进制文件字符编码 3.写文件file对象的常用函数常见标识符 三.StringIO和BytesIO1.StringIO2.BytesIO 四.操作文件和目录五.序列化和反序列化1.pickle.dumps()2.pickle.loads()3.JSON 一.IO编程是什么 IO在计…

聊聊二叉树的前序遍历算法

二叉树顾名思义&#xff0c;一个根节点只会有两个分叉对应&#xff0c;下图所示&#xff1a; 前序遍历先去拿它的左节点&#xff0c;拿完之后再去拿它左节点相邻的右节点&#xff0c;如数据结构是这样 第一中不考虑性能的方式的话&#xff0c;可以使用递归的方式去给他遍历 fun…

Linux下的系统编程——文件与目录操作(六)

前言&#xff1a; 在Linux系统中&#xff0c;文件和目录的操作非常灵活。用户可以通过命令行或者图形界面来进行操作。下面是Linux文件和目录操作的一些常见操作&#xff1a; 一、文件系统 1.inode 其本质为结构体&#xff0c;存储文件的属性信息。如:权限、类型、大小、时间…

抖音电商,提前批offer!

南京夫子庙茶颜悦色店 摄于2023.8.27 小伙伴们大家好&#xff0c;我是阿秀。 互联网圈有个梗就是"两大码农工厂&#xff1a;南华科、北北邮"&#xff0c;就是说这两所高校的毕业生从事互联网工作的特别多&#xff0c;北邮虽然是211&#xff0c;但在互联网圈子里比很多…

5分钟看懂物料清单(BOM)的用途、类型及管理

管理物料可以提高制造和供应链流程的效率&#xff0c;例如生产、物流、调度、产品成本核算和库存计划。企业通常使用物料清单作为制造产品的组件、材料和流程的中央记录。 物料清单&#xff08;BOM&#xff09;是构建、制造或维修产品或服务所需的原材料、组件和说明的详细列表…

K8S:K8S自动化运维容器Docker集群

文章目录 一.k8s概述1.k8s是什么2.为什么要用K8S3.作用及功能4.k8s容器集群管理系统 二.K8S的特性1.弹性伸缩2.自我修复3.服务发现和复制均衡4.自动发布和回滚5.集中化配置管理和秘钥管理6.存储编排7.任务批量处理运行 三.K8S的集群架构四.K8S的核心组件1.Master组件&#xff0…

水土保持技术教程

详情点击公众号链接&#xff1a;新《生产建设项目水土保持方案技术审查要点》要求下全流程水土保持实践技术应用教程 目标 1、水土保持常用的主要法律法规、部委规章、规范性文件及技术规范与标准&#xff1b; 2、水土保持方案、监测及验收工作开展的流程&#xff1b; 3、水…

同一台电脑测.Net和Mono平台浮点运算的差异

float speed 0.1f;float distance 2.0f;long needTime (long)(distance / speed);Log.Debug($"needTime{needTime}"); 结果&#xff1a; .Net平台算出20 Mono平台算出19

尚硅谷SpringMVC (5-8)

五、域对象共享数据 1、使用ServletAPI向request域对象共享数据 首页&#xff1a; Controller public class TestController {RequestMapping("/")public String index(){return "index";} } <!DOCTYPE html> <html lang"en" xmln…

桌面网络存储迎来新浪潮!龙蜥社区联合龙芯首发优龙桌面网络存储一体机方案

2023 年 8 月 19 日&#xff0c;龙蜥社区合作伙伴单位南京龙众创芯电子科技有限公司(以下简称“龙众创芯“)与龙蜥社区理事单位龙芯中科(武汉)技术有限公司&#xff08;以下简称“龙芯”&#xff09;&#xff0c;联合可道云、上海七朵信息等多家生态伙伴&#xff0c;以及龙芯开…

IntelliJ IDEA快捷键大全 + 动图演示,建议收藏!

本文参考了 IntelliJ IDEA 的官网&#xff0c;列举了IntelliJ IDEA&#xff08;Windows 版&#xff09;的所有快捷键。并在此基础上&#xff0c;为 90% 以上的快捷键提供了动图演示&#xff0c;能够直观的看到操作效果。 该快捷键共分 16 种&#xff0c;可以方便的按各类查找自…

【二维偏序】CF Edu10 D

Problem - D - Codeforces 题意&#xff1a; 思路&#xff1a; Code&#xff1a; #include <bits/stdc.h>#define int long long #define lowbit(x) (x & (-x))using i64 long long;constexpr int N 2e6 10; constexpr int M 2e6 10; constexpr int P 2e6; c…

美国纽扣电池/锂电池产品UL4200A标准解析

近来&#xff0c;部分ANSI/UL标准&#xff08;如UL1082、UL982、UL1026、UL1081等&#xff09;对含有纽扣锂电池的产品新增了UL4200A的要求。对于具体生效日期&#xff0c;请注意后续各终端产品标准新版本更新通告。 1. 适用产品 安装有直径 ≤32mm&#xff0c;且直径大于高度的…

如何在不重新安装的情况下将操作系统迁移到新硬盘?

通常情况下&#xff0c;当你的硬盘损坏或文件过多时&#xff0c;电脑会变得缓慢且卡顿。这时&#xff0c;你可能会被建议更换为一块更好的新硬盘。 ​ 在比较HDD和SSD之后&#xff0c;许多用户更愿意选择SSD作为他们的新硬盘&#xff0c;因为SSD比HDD更稳定且运行更安…

平面设计师都在用的6个免费素材网站

常见的设计素材网站太多了&#xff0c;不是要会员就是要花钱买&#xff0c;今天给大家推荐几个可以免费下载的设计素材网站&#xff0c;有需要的朋友赶紧马住了。 1、菜鸟图库 菜鸟图库-免费设计素材下载菜鸟图库汇集了各种免费高清广告图片设计、电商淘宝、企业办公模板、视频…

python爬取bilibili,下载视频

一. 内容简介 python爬取bilibili&#xff0c;下载视频 二. 软件环境 2.1vsCode 2.2Anaconda version: conda 22.9.0 2.3代码 链接&#xff1a;https://pan.baidu.com/s/1WuXTso_iltLlnrLffi1kYQ?pwd1234 三.主要流程 3.1 下载单个视频 代码 import requests impor…