基于Canal的数据同步

news2025/1/23 6:02:18

基于Canal的数据同步

一、 系统结构

该数据同步系统由Spring Boot和Canal共同组成。
Spring Boot 是一个流行的 Java Web 框架,而 Canal 则是阿里巴巴开源的 MySQL 数据库的数据变更监听框架。结合 Spring Boot 和 Canal,可以实现 MySQL 数据库的实时数据同步到其他系统中。

  1. canal.deployer-1.1.7-SNAPSHOT.tar.gz为Canal软件压缩包,需要安装在服务器上,并根据下文进行配置文件的修改。
  2. CanalClient.rar为用Spring Boot框架编写的数据库监听同步项目

二、. Canal配置

在解压Canal文件夹后,需要配置两个文件。
在配置Canal前,需要确保Mysql的Binlog已经开启,并且模式为ROW,找到当前binlog的文件名和position。

1) 配置文件路径:canal/conf/canal.properties

在这里插入图片描述

2) 配置文件路径:

canal/conf/example/instance.properties

在这里插入图片描述
在这里插入图片描述

三、 Spring Boot配置

1. 项目结构

在这里插入图片描述

2. Canal账号密码配置

进入到Config下的CanalClient类文件。

注意密码是通过MD5加密的,图中这这段字符应替换为canal。

在这里插入图片描述

3. 目标数据库配置

在yml中配置数据同步目标数据库即可
在这里插入图片描述

源代码:

package com.canal.canalclient.config;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.springframework.jdbc.core.JdbcTemplate;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
 * @Auther: fzl
 * @Date: 2020/4/20 01:21
 * @Description:
 */
public class CanalClient {

    private static Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();

    public static void startCanal() {
        //获取canalServer连接:本机地址,端口号
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("IP地址"
                , "端口号"), "example", "canal", "canal");

        int batchSize = 1000;
        try {
            //连接canalServer
            connector.connect();
            //订阅Desctinstion
            connector.subscribe();
            connector.rollback();
            try {
                while (true) {
                    //尝试从master那边拉去数据batchSize条记录,有多少取多少
                    //轮询拉取数据   上面的where
                    Message message = connector.getWithoutAck(batchSize);
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        //睡眠
                        Thread.sleep(1000);
                    } else {
                        dataHandle(message.getEntries());
                    }
                    connector.ack(batchId);
                    System.out.println("aa"+size);
                    //当队列里面堆积的sql大于一定数值的时候就模拟执行
                    if (SQL_QUEUE.size() >= 10) {
                        executeQueueSql();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        } finally {
            connector.disconnect();
        }
    }

    public static JdbcTemplate jdbcTemplate;

    /**
     * 模拟执行队列里面的sql语句
     */
    public static void executeQueueSql() {
        int size = SQL_QUEUE.size();
        for (int i = 0; i < size; i++) {
            String sql = SQL_QUEUE.poll();
            jdbcTemplate.execute(sql);
            System.out.println("[sql]----> " + sql);
        }
    }

    /**
     * 数据处理
     *
     * @param entrys
     */
    private static void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {
        for (CanalEntry.Entry entry : entrys) {
            if (EntryType.ROWDATA == entry.getEntryType()) {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                CanalEntry.EventType eventType = rowChange.getEventType();
                if (eventType == EventType.DELETE) {
                    saveDeleteSql(entry);
                } else if (eventType == EventType.UPDATE) {
                    saveUpdateSql(entry);
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    saveInsertSql(entry);
                }
            }
        }
    }

    /**
     * 保存更新语句
     *
     * @param entry
     */
    private static void saveUpdateSql(CanalEntry.Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
            for (CanalEntry.RowData rowData : rowDatasList) {
                List<Column> newColumnList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("update " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " set ");
                for (int i = 0; i < newColumnList.size(); i++) {
                    sql.append(" " + newColumnList.get(i).getName()
                            + " = '" + newColumnList.get(i).getValue() + "'");
                    if (i != newColumnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(" where ");
                List<Column> oldColumnList = rowData.getBeforeColumnsList();
                for (Column column : oldColumnList) {
                    if (column.getIsKey()) {
                        //暂时只支持单一主键
                        sql.append(column.getName() + "=" + column.getValue());
                        break;
                    }
                }
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    /**
     * 保存删除语句
     *
     * @param entry
     */
    private static void saveDeleteSql(CanalEntry.Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
            for (CanalEntry.RowData rowData : rowDatasList) {
                List<Column> columnList = rowData.getBeforeColumnsList();
                StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " where ");
                for (Column column : columnList) {
                    if (column.getIsKey()) {
                        //暂时只支持单一主键
                        sql.append(column.getName() + "=" + column.getValue());
                        break;
                    }
                }
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    /**
     * 保存插入语句
     *
     * @param entry
     */
    private static void saveInsertSql(CanalEntry.Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
            for (CanalEntry.RowData rowData : rowDatasList) {
                List<Column> columnList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " (");
                for (int i = 0; i < columnList.size(); i++) {
                    sql.append(columnList.get(i).getName());
                    if (i != columnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(") VALUES (");
                for (int i = 0; i < columnList.size(); i++) {
                    sql.append("'" + columnList.get(i).getValue() + "'");
                    if (i != columnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(")");
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }
}
package com.canal.canalclient;

import com.canal.canalclient.config.CanalClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;


@Component
@Slf4j
public class StartedFunction implements ApplicationRunner {

    @Autowired
    @Qualifier("test_master_energy") //有多个数据源的,需要名称区分
    private static JdbcTemplate jdbcTemplate;


    @Override
    public void run(ApplicationArguments args)  throws Exception{
        log.info("开始监听同步数据库");

        CanalClient.jdbcTemplate = jdbcTemplate;
        CanalClient.startCanal();
    }
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.canal</groupId>
    <artifactId>CanalClient</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>CanalClient</name>
    <description>CanalClient</description>
    <properties>
        <java.version>19</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.2.9</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.32</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 打包时跳过测试 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.12.4</version>
                <configuration>
                    <skipTests>true</skipTests>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>

        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                </includes>
            </resource>
        </resources>

    </build>

</project>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/386620.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ASGCN之图卷积网络(GCN)

文章目录前言1. 理论部分1.1 为什么会出现图卷积网络&#xff1f;1.2 图卷积网络的推导过程1.3 图卷积网络的公式2. 代码实现参考资料前言 本文从使用图卷积网络的目的出发&#xff0c;先对图卷积网络的来源与公式做简要介绍&#xff0c;之后通过一个例子来代码实现图卷积网络…

Linux命令行安装Oracle19c教程和踩坑经验

安装 下载 从 Oracle官方下载地址 需要的版本&#xff0c;本次安装是在Linux上使用yum安装&#xff0c;因此下载的是RPM。另外&#xff0c;需要说明的是&#xff0c;Oracle加了锁的下载需要登录用户才能安装&#xff0c;而用户是可以免费注册的&#xff0c;这里不做过多说明。 …

最新使用nvm控制node版本步骤

一、完全卸载已经安装的node、和环境变量 ①、打开控制面板的应用与功能&#xff0c;搜索node&#xff0c;点击卸载 ②、打开环境变量&#xff0c;将node相关的所有配置清除 ③、打开命令行工具&#xff0c;输入node-v&#xff0c;没有版本号则卸载成功 二、下载nvm安装包 ①…

SBUS的协议详解

SBUS 1.串口配置&#xff1a; 100k波特率&#xff0c; 8位数据位&#xff08;在stm32中要选择9位&#xff09;&#xff0c; 偶校验&#xff08;EVEN), 2位停止位&#xff0c; 无控流&#xff0c;25个字节&#xff0c; 2.协议格式&#xff1a; [startbyte] [data1][data2]……

单月涨粉超600w,直播销售额破5亿,2月的黑马都是谁?

2月的抖音&#xff0c;黑马多多&#xff0c;处处有爆点。有直播间热度不减&#xff0c;在带货领域持续位列前茅&#xff1b;有达人通过“抓马式”连麦直播&#xff0c;涨粉657w&#xff1b;有0.01元的低价商品&#xff0c;一天热卖超1000w个。那么&#xff0c;2月有哪些主播表现…

微服务实战03-注册数据服务

EurekaServer &#xff0c;它扮演的角色是注册中心&#xff0c;用于注册各种微服务&#xff0c;以便于其他微服务找到和访问。有了EurekaServer&#xff0c;还需要一些微服务&#xff0c;注册到EurekaServer上去。 这一节&#xff0c;我们来写一个注册微服务。为了简单起见&am…

【同步工具类:Phaser】

同步工具类:Phaser介绍特性动态调整线程个数层次Phaser源码分析state 变量解析构造函数对state变量赋值阻塞方法arrive()awaitAdvance()业务场景实现CountDownLatch功能代码测试结果实现 CyclicBarrier功能代码展示测试结果总结介绍 一个可重复使用的同步屏障&#xff0c;功能…

26- AlexNet和VGG模型分析 (TensorFlow系列) (深度学习)

知识要点 AlexNet 是2012年ISLVRC 2012竞赛的冠军网络。 VGG 在2014年由牛津大学著名研究组 VGG 提出。 一 AlexNet详解 1.1 Alexnet简介 AlexNet 是2012年ISLVRC 2012&#xff08;ImageNet Large Scale Visual Recognition Challenge&#xff09;竞赛的冠军网络&#xff0…

paddle推理部署(cpu)

我没按照官方文档去做&#xff0c;吐槽一下&#xff0c;官方文档有点混乱。。一、概述总结起来&#xff0c;就是用c示例代码&#xff0c;用一个模型做推理。二、示例代码下载https://www.paddlepaddle.org.cn/paddle/paddleinferencehttps://github.com/PaddlePaddle/Paddle-In…

Clion连接Docker,使用HElib库

文章目录需求Clion连接服务器内的DockerDockerCLionDocker内配置HElib库参考需求 HElib库是用C编写的同态加密开源库&#xff0c;一般在Linux下使用为了不混淆生产环境&#xff0c;使用Docker搭建HElib运行环境本地在Windows下开发&#xff0c;使用的IDE为Clion&#xff0c;本…

动态规划:leetcode 121. 买卖股票的最佳时机、122. 买卖股票的最佳时机II

leetcode 121. 买卖股票的最佳时机leetcode 122.买卖股票的最佳时机IIleetcode 121. 买卖股票的最佳时机给定一个数组 prices &#xff0c;它的第 i 个元素 prices[i] 表示一支给定股票第 i 天的价格。你只能选择 某一天 买入这只股票&#xff0c;并选择在 未来的某一个不同的日…

node版本管理工具nvm

1.标题卸载nvm和node.js 系统变量中删除nvm添加变量&#xff1a;NVM_HOME和NVM_SYMLINK环境变量中 path&#xff1a;删除nvm自动添加的变量 Path %NVM_HOME%;%NVM_SYMLINK%删除自身安装node环境&#xff0c;参考图一图二 图一 图二 2.安装nvm nvm-window下载------https:/…

ES window 系统环境下连接问题

环境问题&#xff1a;&#xff08;我采用的版本是 elasticsearch-7.9.3&#xff09;注意 开始修正之前的配置&#xff1a;前提&#xff1a;elasticsearch.yml增加或者修正一下配置&#xff1a;xpack.security.enabled: truexpack.license.self_generated.type: basicxpack.secu…

对象实例化【JVM】

JVM对象实例化简介/背景一、创建对象的方式1. new2. Class对象的newInstance方法3. Construstor对象的newInstance(xx)方法4. 使用clone方法二、创建对象的步骤1. 判断对象是否已经加载、链接、初始化2. 为对象分配内存3. 处理并发安全问题4. 初始化分配到的空间5. 设置对象的对…

Tech Lead如何引导团队成员解决问题?

作为一个开发团队的Tech Lead&#xff0c;当团队成员向你寻求帮助时&#xff0c;你有没有说过下面这些话&#xff1f; 你别管了&#xff0c;我来解决这个问题你只要。。。就行了你先做其他的吧&#xff0c;我研究一下&#xff0c;然后告诉你怎么做 当我们说这些话时&#xff…

腾讯免费企业邮箱迁移记录

本文记录在重新申请腾讯企业邮箱的过程。 背景 很多年前&#xff0c;将域名latelee.org 迁移到了阿里云&#xff0c;当时因政策原因无法实名&#xff0c;但能使用。去年3月&#xff0c;阿里云提示无法续费&#xff0c;紧急将其转到外面某服务&#xff0c;继续使用&#xff0c;…

IP地址的工作原理

如果您想了解特定设备为何未按预期方式进行连接&#xff0c;或者想要排查网络无法正常工作的可能原因&#xff0c;它可以帮助您了解 IP 地址的工作原理。互联网协议的工作原理与任何其他语言相同&#xff0c;即使用设定的准则进行通信以传递信息。所有设备都使用此协议与其他连…

jq获取同级或者下级的dom节点的操作

1.使用find找到对应的class或者其他 var class_dom1 obj.find(.class名称);或者 find(span .class名称)2.使用添加背景颜色来确定当前的查找位置 class_dom1.css(background,red);3.通过parent来找到它的上级的dom节点 var parent_li_dom1 class_dom1.parent(li.parent_li…

进阶指针——(2)

本次讲解重点&#xff1a; 6. 函数指针数组 7. 指向函数指针数组的指针 8. 回调函数 在前面我们已经讲解了进阶指针的一部分&#xff0c;我们回顾一下在进阶指针(1)我们学过的难点知识点&#xff1a; int my_strlen(const char* str) {return 0; }int main() {//指针数…

创宇盾重保经验分享,看政府、央企如何防护?

三月重保已经迫近&#xff0c;留给我们的准备时间越来越少&#xff0c;综合近两年三月重保经验及数据总结&#xff0c;知道创宇用实际案例的防护效果说话&#xff0c;深入解析为何创宇盾可以在历次重保中保持“零事故”成绩&#xff0c;受到众多部委、政府、央企/国企客户的青睐…