Canal数据同步配置

news2024/11/14 6:51:39

文章目录

    • Canal数据同步配置
      • 0.canal工作原理
      • 1.**检查binlog功能是否有开启**
      • 2.如果显示状态为OFF表示该功能未开启,开启binlog功能
      • 3.**在mysql里面添加以下的相关用户和权限**
      • 4.下载安装Canal服务
      • 5.修改配置文件
      • 6.进入bin目录下启动
      • 7.idea中配置

Canal数据同步配置

canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。
canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。
阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。

0.canal工作原理

在这里插入图片描述

原理相对比较简单:

canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
mysql master收到dump请求,开始推送binary log给slave(也就是canal)
canal解析binary log对象(原始为byte流)

1.检查binlog功能是否有开启

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | OFF    |
+---------------+-------+
1 row in set (0.00 sec)

2.如果显示状态为OFF表示该功能未开启,开启binlog功能

1,修改 mysql 的配置文件 my.cnf
vi /etc/my.cnf    
这里使用的是docker安装的mysql,my.cnf已经挂载到了本机的 /mydata/mysql/conf/my.cnf
追加内容:
log-bin=mysql-bin     #binlog文件名
binlog_format=ROW     #选择row模式
server_id=1           #mysql实例id,不能和canal的slaveId重复

2,重启 mysql:
service mysql restart	

3,登录 mysql 客户端,查看 log_bin 变量
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON|
+---------------+-------+
1 row in set (0.00 sec)
————————————————
如果显示状态为ON表示该功能已开启

3.在mysql里面添加以下的相关用户和权限

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

使用docker操作,先启动mysql服务,启动mysql后,如果想进入mysql的命令行,执行如下命令

[root@jack conf]# docker exec -it mysql_3307 bash  //mysql_3307是我启动的mysql服务的name
root@96d65412b303:/# mysql -uroot -p密码

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VRPMkUOz-1677753459490)(../.assets/images/image-20220511205849713.png)]

会在mysql的mysql数据库的user表中插入上图中的canal用户

4.下载安装Canal服务

下载地址:

https://github.com/alibaba/canal/releases

(1)下载之后,放到目录中,解压文件

[root@jack opt]# cd /opt/
[root@jack opt]# ls
canal.deployer-1.1.5.tar.gz  containerd  redis-5.0.7.tar.gz

[root@jack opt]# tar -zxvf canal.deployer-1.1.5.tar.gz 

5.修改配置文件

vi conf/example/instance.properties
#需要改成自己的数据库信息
canal.instance.master.address=192.168.44.132:3306
#需要改成自己的数据库用户名与密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
#需要改成同步的数据库表规则,例如只是同步一下表
#canal.instance.filter.regex=.*\\..*
canal.instance.filter.regex=guli_ucenter.ucenter_member  #这个可以不设置

6.进入bin目录下启动

sh bin/startup.sh

7.idea中配置

1.引入依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.26</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>commons-dbutils</groupId>
            <artifactId>commons-dbutils</artifactId>
            <version>1.7</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.0.12</version>
        </dependency>

    </dependencies>

2.配置类

package jack.java.canal.client;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.sql.DataSource;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

@Component
public class CanalClient {

    //sql队列
    private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();

    @Resource
    private DataSource dataSource;

    /**
     * canal入库方法
     */
    public void run() {

        //写远程的数据库地址
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.137.10",
//        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("47.97.99.28",
                11111), "example", "", "");
        int batchSize = 1000;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            try {
                while (true) {
                    //尝试从master那边拉去数据batchSize条记录,有多少取多少
                    Message message = connector.getWithoutAck(batchSize);
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        Thread.sleep(1000);
                    } else {
                        dataHandle(message.getEntries());
                    }
                    connector.ack(batchId);

                    //当队列里面堆积的sql大于一定数值的时候就模拟执行
                    if (SQL_QUEUE.size() >= 1) {
                        executeQueueSql();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        } finally {
            connector.disconnect();
        }
    }

    /**
     * 模拟执行队列里面的sql语句
     */
    public void executeQueueSql() {
        int size = SQL_QUEUE.size();
        for (int i = 0; i < size; i++) {
            String sql = SQL_QUEUE.poll();
            System.out.println("[sql]----> " + sql);

            this.execute(sql.toString());
        }
    }

    /**
     * 数据处理
     *
     * @param entrys
     */
    private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {
        for (Entry entry : entrys) {
            if (EntryType.ROWDATA == entry.getEntryType()) {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                EventType eventType = rowChange.getEventType();
                if (eventType == EventType.DELETE) {
                    saveDeleteSql(entry);
                } else if (eventType == EventType.UPDATE) {
                    saveUpdateSql(entry);
                } else if (eventType == EventType.INSERT) {
                    saveInsertSql(entry);
                }
            }
        }
    }

    /**
     * 保存更新语句
     *
     * @param entry
     */
    private void saveUpdateSql(Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {
                List<Column> newColumnList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
                for (int i = 0; i < newColumnList.size(); i++) {
                    sql.append(" " + newColumnList.get(i).getName()
                            + " = '" + newColumnList.get(i).getValue() + "'");
                    if (i != newColumnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(" where ");
                List<Column> oldColumnList = rowData.getBeforeColumnsList();
                for (Column column : oldColumnList) {
                    if (column.getIsKey()) {
                        //暂时只支持单一主键
                        sql.append(column.getName() + "=" + column.getValue());
                        break;
                    }
                }
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    /**
     * 保存删除语句
     *
     * @param entry
     */
    private void saveDeleteSql(Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {
                List<Column> columnList = rowData.getBeforeColumnsList();
                StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
                for (Column column : columnList) {
                    if (column.getIsKey()) {
                        //暂时只支持单一主键
                        sql.append(column.getName() + "=" + column.getValue());
                        break;
                    }
                }
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    /**
     * 保存插入语句
     *
     * @param entry
     */
    private void saveInsertSql(Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {
                List<Column> columnList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");
                for (int i = 0; i < columnList.size(); i++) {
                    sql.append(columnList.get(i).getName());
                    if (i != columnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(") VALUES (");
                for (int i = 0; i < columnList.size(); i++) {
                    sql.append("'" + columnList.get(i).getValue() + "'");
                    if (i != columnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(")");
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    /**
     * 入库
     * @param sql
     */
    public void execute(String sql) {
        Connection con = null;
        try {
            if(null == sql) return;
            con = dataSource.getConnection();
            QueryRunner qr = new QueryRunner();
            int row = qr.execute(con, sql);
            System.out.println("update: "+ row);
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            DbUtils.closeQuietly(con);
        }
    }
}

3.启动类

package jack.java.canal;

import jack.java.canal.client.CanalClient;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.annotation.Resource;

@SpringBootApplication
public class CanalApplication implements CommandLineRunner {
    @Resource
    private CanalClient canalClient;

    public static void main(String[] args) {
        SpringApplication.run(CanalApplication.class, args);
    }

    @Override
    public void run(String... strings) throws Exception {
        //项目启动,执行canal客户端监听
        canalClient.run();
    }
}

4.properties配置文件

# 服务端口
server.port=8100
# 服务名
spring.application.name=canal-client

# mysql数据库连接
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/guli_edu?serverTimezone=GMT%2B8&useSSL=false
spring.datasource.username=root
spring.datasource.password=971107

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/382589.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

记录第一次接口上线过程

新入职一家公司后&#xff0c;前三天一直在学习公司内部各种制度文化以及考试。 一直到第三天组长突然叫我过去&#xff0c;给了一个需求的思维导图&#xff0c;按照这个需求写这样一个接口&#xff0c; 其实还不错&#xff0c;不用自己去分析需求&#xff0c;按照这上面直接开…

工业机器人有哪些类型?如何利用工业网关集中监测管理?

工业机器人在制造业中的应用与日俱增&#xff0c;使用工业机器人&#xff0c;不仅提高了设备和场地的利用率&#xff0c;还能保持稳定的产品水平。随着工业机器人的大规模部署&#xff0c;对于数量众多、类型各异、功能不一的机器人的监测、管理和维护&#xff0c;也成为企业面…

Java 异常

文章目录1. 异常概述2. JVM 的默认处理方案3. 异常处理之 try...catch4. Throwable 的成员方法5. 编译异常和运行异常的区别6. 异常处理之 throws7. 自定义异常8. throws 和 throw 的区别1. 异常概述 异常就是程序出现了不正常的情况。 ① Error&#xff1a;严重问题&#xff…

Nessus: 漏洞扫描器-网络取证工具

Nessue 要理解网络漏洞攻击&#xff0c;应该理解攻击者不是单独攻击&#xff0c;而是组合攻击。因此&#xff0c;本文介绍了关于Nessus历史的研究&#xff0c;它是什么以及它如何与插件一起工作。研究了Nessus的特点&#xff0c;使其成为网络取证中非常推荐的网络漏洞扫描工具…

maven高级知识。

目录 一、分模块开发 1、分模块开发设计 2、依赖管理 二、继承和聚合 1、聚合 2、继承 三、属性 1、基本介绍 2、版本管理 四、多环境配置与应用 1、多环境开发 2、跳过测试 五、私服 1、私服安装 2、私服仓库分类 一、分模块开发 1、分模块开发设计 ▶ 示意图 …

【测绘程序设计】——计算卫星位置

本文分享了根据广播星历计算卫星于瞬时地固系下位置的计算程序(C#版)(注:瞬时地球坐标系坐标经极移改正即可获得协议地球坐标系坐标),相关源代码(完整工程,直接运行;包含实验数据)及使用示例如下。 目录 Part.Ⅰ 使用示例Part.Ⅱ 代码分析Chap.Ⅰ 数据结构Chap.Ⅱ 计…

原生javascript手写一个丝滑的轮播图

通过本文&#xff0c;你将学到: htmlcssjs 没错&#xff0c;就是html&#xff0c;css,js,现在是框架盛行的时代&#xff0c;所以很少会有人在意原生三件套&#xff0c;通过本文实现一个丝滑的轮播图&#xff0c;带你重温html,css和js基础知识。 为什么选用轮播图做示例&…

网络运维和网络安全运维有什么区别?就业前景如何?

随着互联网的高速发展&#xff0c;运维安全已经成了大多数企业安全保障的基石。在如今的信息时代&#xff0c;无论是网络运维还是网络安全运维都成了不可缺少的一部分。因此导致很多人都容易把两者弄混淆。首先我们来了解一下网络运维和网络安全运维有什么区别呢&#xff1f;网…

Linux vi/vim教程

所有的 Unix Like 系统都会内建 vi 文本编辑器&#xff0c;其他的文本编辑器则不一定会存在。 但是目前我们使用比较多的是 vim 编辑器。 vim 具有程序编辑的能力&#xff0c;可以主动的以字体颜色辨别语法的正确性&#xff0c;方便程序设计。 ** 什么是 vim&#xff1f;** Vim…

将vue-devtools打包成edge插件

文章目录一、从github拉vue-devtools源码二、用npm安装yarn三、使用yarn安装并编译源码四、将vue-devtools打包成edge插件五、离线安装edge插件一、从github拉vue-devtools源码 目前最新的版本是v6.5.0&#xff0c;地址&#xff1a;https://github.com/vuejs/devtools 二、用n…

第四阶段17-关于Redis中的list类型,缓存预热,关于Mybatis中的`#{}`和`${}`这2种格式的占位符

关于Redis中的list类型 Redis中的list是一种先进后出、后进先出的栈结构的数据。 在使用Redis时&#xff0c;应该将list想像为以上图例中翻转了90度的样子&#xff0c;例如&#xff1a; 在Redis中的list数据&#xff0c;不仅可以从左侧压入&#xff0c;也可以选择从右侧压入…

Linux04-冯诺依曼体系结构||操作系统||进程概念

1.认识冯诺依曼系统 1.1冯诺依曼体系结构 我们常见的计算机&#xff0c;如笔记本。我们不常见的计算机&#xff0c;如服务器&#xff0c;大部分都遵守冯诺依曼体系。 截至目前&#xff0c;我们所认识的计算机&#xff0c;都是有一个个的硬件组件组成 输入单元&#xff1a;包括…

Linux: 中断只被GIC转发到CPU0问题分析

文章目录1. 前言2. 分析背景3. 问题4. 分析4.1 ARM GIC 中断芯片简介4.1.1 中断类型和分布4.1.2 拓扑结构4.2 问题根因4.2.1 设置GIC SPI 中断CPU亲和性4.2.2 GIC初始化&#xff1a;缺省的CPU亲和性4.2.2.1 boot CPU亲和性初始化流程4.2.2.1 其它非 boot CPU亲和性初始化流程5.…

【蓝桥杯入门不入土】变幻莫测的链表

文章目录一&#xff1a;链表的类型单链表双链表循环链表二&#xff1a;链表的存储方式三&#xff1a;链表的定义删除节点添加节点四&#xff1a;实战练习1.设计链表2. 移除链表元素最后说一句一&#xff1a;链表的类型 单链表 什么是链表&#xff0c;链表是一种通过指针串联在…

最经典的黑客技术入门知识大全

第一节、什么是黑客 以我的理解&#xff0c;“黑客”大体上应该分为“正”、“邪”两类&#xff0c;正派黑客依靠自己掌握的知识帮助系统管理员找出系统中的漏洞并加以完善&#xff0c;而邪派黑客则是通过各种黑客技能对系统进行攻击、入侵或者做其他一些有害于网络的事情&…

LCR数字电桥软件下载安装教程

软件&#xff1a;LCR数字电桥软件NS-LCR 语言&#xff1a;简体中文 环境&#xff1a;NI-VISA 安装环境&#xff1a;Win10以上版本&#xff08;特殊需求请后台私信联系客服&#xff09; 硬件要求&#xff1a;CPU2GHz 内存4G(或更高&#xff09;硬盘500G(或更高&#xff09; …

ProcDump+Mimikatz绕过杀毒软件抓密码

抓密码的奇淫技巧背景1、原理2、实操演示背景 如果主机上装了杀毒软件&#xff0c;开了防火墙&#xff0c;例如&#xff1a;360、火绒之类的话。Mimikatz就会被检测为病毒&#xff0c;无法使用&#xff0c;而想要对Mimikatz进行二次开发或者免杀难度会比较大&#xff0c;那有办…

边缘计算开源项目解读——kubeedge mappers实现

0 背景 本文重点解读kubeedge项目中的mapper模块。该模块位于kubeedge的edgecore的南向边缘侧&#xff0c;主要对接入kubeedge的终端设备&#xff0c;进行协议的适配和转换&#xff0c;使其可以和边缘设备通信&#xff0c;转换后的协议是我们前面描述的mqtt协议&#xff0c;当然…

玩转ThreadLocal

前言 ThreadLocal想必都不陌生&#xff0c;当多线程访问同一个共享变量时&#xff0c;就容易出现并发问题&#xff0c;为了保证线程安全&#xff0c;我们需要对共享变量进行同步加锁&#xff0c;但这又带来了性能消耗以及使用者的负担&#xff0c;那么有没有可能当我们创建一个…

只知道删除单张表的数据?不知道删除多张表的数据?

一些废话 可能在某某一天&#xff0c;你在表删除表数据的时候&#xff0c;不想一张表一张表的去删除&#xff0c;想把两个表的数据同时删除&#xff1b;然后你就会去搜索&#xff0c;然后你就很有很有很有很有可能会看到 me 的这篇优质&#xff08;呸&#xff01;&#xff01;…