centos7部署Canal与Canal集成使用

news2025/1/20 1:43:55

在这里插入图片描述

1、简介

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x 、5.5.x 、5.6.x 、 5.7.x 、8.0.x

2、工作原理
2.1、MySQL主备复制原理
  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
2.2、canal 工作原理
  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)
3、部署
3.1、准备

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
  • 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3.2、启动

下载 canal, 访问 release 页面 , 选择需要的包下载, 如以 1.0.17 版本为例

wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz

解压缩

mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz  -C /tmp/canal

解压完成后,进入 /tmp/canal 目录,可以看到如下结构

drwxr-xr-x 2 jianghang jianghang  136 2013-02-05 21:51 bin
drwxr-xr-x 4 jianghang jianghang  160 2013-02-05 21:51 conf
drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
drwxr-xr-x 2 jianghang jianghang   48 2013-02-05 21:29 logs

配置修改

vi conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1

如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false

启动

sh bin/startup.sh

查看 server 日志

vi logs/canal/canal.log
2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

查看 instance 的日志

vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

关闭

sh bin/stop.sh
4、安装canal-admin

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作

4.1、准备

canal-admin的限定依赖:

MySQL,用于存储配置和节点等相关数据
canal版本,要求>=1.1.4 (需要依赖canal-server提供面向admin的动态运维管理接口)

4.2、部署

下载 canal-admin, 访问 release 页面 , 选择需要的包下载, 如以 1.1.4 版本为例

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz

解压缩

mkdir /tmp/canal-admin
tar zxvf canal.admin-$version.tar.gz  -C /tmp/canal-admin

解压完成后,进入 /tmp/canal 目录,可以看到如下结构

drwxr-xr-x   6 agapple  staff   204B  8 31 15:37 bin
drwxr-xr-x   8 agapple  staff   272B  8 31 15:37 conf
drwxr-xr-x  90 agapple  staff   3.0K  8 31 15:37 lib
drwxr-xr-x   2 agapple  staff    68B  8 31 15:26 logs

配置修改

vi conf/application.yml
server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: canal
  password: canal
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin

初始化元数据库

mysql -h127.1 -uroot -p

导入初始化SQL

> source conf/canal_manager.sql

a. 初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化 b. canal_manager.sql默认会在conf目录下,也可以通过链接下载 canal_manager.sql

启动

sh bin/startup.sh

查看 admin 日志

vi logs/admin.log
2019-08-31 15:43:38.162 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat initialized with port(s): 8089 (http)
2019-08-31 15:43:38.180 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8089"]
2019-08-31 15:43:38.191 [main] INFO  org.apache.catalina.core.StandardService - Starting service [Tomcat]
2019-08-31 15:43:38.194 [main] INFO  org.apache.catalina.core.StandardEngine - Starting Servlet Engine: Apache Tomcat/8.5.29
....
2019-08-31 15:43:39.789 [main] INFO  o.s.w.s.m.m.annotation.ExceptionHandlerExceptionResolver - Detected @ExceptionHandler methods in customExceptionHandler
2019-08-31 15:43:39.825 [main] INFO  o.s.b.a.web.servlet.WelcomePageHandlerMapping - Adding welcome page: class path resource [public/index.html]

此时代表canal-admin已经启动成功,可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456
在这里插入图片描述

关闭

sh bin/stop.sh

canal-server端配置
使用canal_local.properties的配置覆盖canal.properties

# register ip
canal.register.ip =

# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =

启动admin-server即可。

或在启动命令中使用参数:sh bin/startup.sh local 指定配置

5、springboot项目集成使用

依赖配置:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>
5.1、创建mvn标准工程:
mvn archetype:create -DgroupId=com.alibaba.otter -DartifactId=canal.sample

maven3.0.5以上版本舍弃了create,使用generate生成项目

mvn archetype:generate -DgroupId=com.alibaba.otter -DartifactId=canal.sample
5.2、修改pom.xml,添加依赖
5.3、ClientSample代码
package com.alibaba.otter.canal.sample;
import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample {


public static void main(String args[]) {
    // 创建链接
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                                                                                        11111), "example", "", "");
    int batchSize = 1000;
    int emptyCount = 0;
    try {
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();
        int totalEmptyCount = 120;
        while (emptyCount < totalEmptyCount) {
            Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                emptyCount++;
                System.out.println("empty count : " + emptyCount);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            } else {
                emptyCount = 0;
                // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                printEntry(message.getEntries());
            }

            connector.ack(batchId); // 提交确认
            // connector.rollback(batchId); // 处理失败, 回滚数据
        }

        System.out.println("empty too many times, exit");
    } finally {
        connector.disconnect();
    }
}

private static void printEntry(List<Entry> entrys) {
    for (Entry entry : entrys) {
        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
            continue;
        }

        RowChange rowChage = null;
        try {
            rowChage = RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                       e);
        }

        EventType eventType = rowChage.getEventType();
        System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

        for (RowData rowData : rowChage.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumn(rowData.getAfterColumnsList());
            } else {
                System.out.println("-------&gt; before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("-------&gt; after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }
}

private static void printColumn(List<Column> columns) {
    for (Column column : columns) {
        System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
    }
}

    
  

}
5.4、运行Client

首先启动Canal Server,

启动Canal Client后,可以从控制台从看到类似消息:

empty count : 1
empty count : 2
empty count : 3
empty count : 4

此时代表当前数据库无变更数据

  1. 触发数据库变更
mysql> use test;
Database changed
mysql> CREATE TABLE `xdual` (
    ->   `ID` int(11) NOT NULL AUTO_INCREMENT,
    ->   `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
    ->   PRIMARY KEY (`ID`)
    -> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ;
Query OK, 0 rows affected (0.06 sec)
mysql> insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)

可以从控制台中看到:

empty count : 1
empty count : 2
empty count : 3
empty count : 4
================> binlog[mysql-bin.001946:313661577] , name[test,xdual] , eventType : INSERT
ID : 4    update=true
X : 2013-02-05 23:29:46    update=true

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1187012.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

项目实战:中央控制器实现(2)-优化Controller,将共性动作抽取到中央控制器

1、FruitController FruitController已经和Web没有关系了&#xff0c;和Web容器解耦&#xff0c;可以脱离Web容器做单元测试 package com.csdn.fruit.controller; import com.csdn.fruit.dto.PageInfo; import com.csdn.fruit.dto.PageQueryParam; import com.csdn.fruit.dto.R…

Linux C基础(7)

1、二维数组 1.1 概念 本质&#xff1a;元素为一堆数组的数组&#xff08;数组的数组&#xff09;数组的特点&#xff1a;&#xff08;1&#xff09;数据类型相同 &#xff08;2&#xff09;地址连续 1.2 定义 数组&#xff1a;存储类型 数据类型 数组名[元素个数]二维数组&…

TinyEngine 开源低代码引擎首次直播答疑QA合集

前言 10月27日晚8点&#xff0c;OpenTiny 社区开启了 TinyEngine 开源低代码引擎首次答疑直播&#xff0c;本次直播我们通过收集开发者诉求&#xff0c;精心策划和组织了内容&#xff0c;希望提供给大家最明确和清晰的答疑方式。这是 TinyEngine 低代码引擎直播计划的开端&…

什么是数字化管理?产业园区如何进行数字化管理?

工业园区的数字化管理涉及利用技术和数据驱动的工具来优化工业园区环境中的运营、提高效率并改进决策流程。它通常包括使用各种数字技术和数据分析技术来监视、控制和增强公园运营的各个方面。 以下是工业园区数字化管理的一些关键方面以及如何实施&#xff1a; 1.数据收集和…

vue3怎么获取el-form的元素节点

在元素中使用ref设置名称 在ts中通过从element-plus引入formInstance,设置formRef同名名称字段来获取el-form节点

酷开科技持续推动智能投影行业创新发展

近年来&#xff0c;投影仪逐渐成为年轻人追捧的家居时尚单品。据国际数据公司&#xff08;IDC&#xff09;报告显示&#xff0c;2022年中国投影机市场总出货量505万台&#xff0c;超80%为家用投影仪。相比于电视&#xff0c;投影仪外观小巧、屏幕大小可调节&#xff0c;无论是卧…

C#中基于.NET6的动态编译技术

前几天要解决动态计算问题&#xff0c;尝试着使用了不同的方法。问题是给定一个包含计算的字符串&#xff0c;在程序运行中得到计算结果&#xff0c;当时考虑了动态编译&#xff0c;在网上查了一些资料完成了这项功能&#xff0c;可是基于不同的.NET平台使用的编程代码相差比较…

sparksql明明插入了但是表里数据是null

现象 将数据插入表的时候&#xff0c;表里的数据是null 代码 原因 建表语句的时候detail字段的类型写成了bigint&#xff0c;而要插入的数据类型是string&#xff0c;所以把建表语句的字段类型改了然后sql文件重跑就解决了

动态轮换住宅代理是什么?为何需要使用它?

随着越来越多的企业完善网络活动&#xff0c;IP代理的重要性变得显而易见。代理可确保顺利、安全且不受限制地访问互联网的大量资源。在不同类型的代理中&#xff0c;轮换代理脱颖而出&#xff0c;那么他哪里有别于其他IP代理呢&#xff1f; 一、什么是动态轮换代理&#xff1f…

Navicat的使用--mysql

表关系 数据库的操作&#xff0c;表字段的设计&#xff0c;一般都由于图形化界面工具Navicat完成。 而表中数据的增删改查&#xff0c;需要熟悉sql语句。 一对一 一对一&#xff1a;一个A对应一个B&#xff0c;一个B对应一个A 将A或B任意一张表的主键设置为外键 一对多 一…

高性能网络编程 - The C10M problem

文章目录 Pre概述回顾C10K实现C10M的挑战思路总结 Pre 高性能网络编程 - The C10K problem 以及 网络编程技术角度的解决思路 概述 在接下来的10年里&#xff0c;因为IPv6协议下每个服务器的潜在连接数都是数以百万级的&#xff0c;单机服务器处理数百万的并发连接&#xff0…

SpringDataJpa(一)

一、JPA概述 1.1 ORM概述 ORM&#xff08;Object-Relational Mapping&#xff09; 表示对象关系映射。在面向对象的软件开发中&#xff0c;通过ORM&#xff0c;就可以把对象映射到关系型数据库中。只要有一套程序能够做到建立对象与数据库的关联&#xff0c;操作对象就可以直…

【自然语言处理】利用python创建简单的聊天系统

一&#xff0c;实现原理 代码设计了一个简单的客户端-服务器聊天应用程序&#xff0c;建立了两个脚本文件&#xff08;.py文件)&#xff0c;其中有一个客户端和一个服务器端。客户端和服务器之间通过网络连接进行通信&#xff0c;客户端发送消息&#xff0c;服务器端接收消息并…

工业相机基本知识理解:帧率、带宽(数据接口)、图像数据格式

1、帧率&#xff1a;Frame Per Second&#xff0c;单位fps&#xff0c;每秒采集的图像数量 2、带宽&#xff1a;一般单位用Gbps&#xff0c;每秒能传输的Gbit数据量 Gige&#xff1a;千兆网&#xff0c;带宽1Gbps USB3.0&#xff1a;带宽5Gbps&#xff0c;一般U3V工业相机用到3…

Redis笔记 Redis主从同步

文章目录 Redis主从搭建主从架构主从数据同步原理全量同步增量同步repl_backlog原理 主从同步优化小结 Redis主从 搭建主从架构 单节点Redis的并发能力是有上限的&#xff0c;要进一步提高Redis的并发能力&#xff0c;就需要搭建主从集群&#xff0c;实现读写分离。 主从数据…

10 # 手写 every 方法

every 使用 every() 方法测试一个数组内的所有元素是否都能通过指定函数的测试。它返回一个布尔值。 ele&#xff1a;表示数组中的每一个元素index&#xff1a;表示数据中元素的索引array&#xff1a;表示数组 <script>var arr [1, 3, 5, 7, 8];var result arr.ever…

网络通信——与Socket交换数据(三十一)

1. 与Socket交换数据 1.1 知识点 &#xff08;1&#xff09;通过Android与Socket完成基本的Echo程序实现&#xff1b; &#xff08;2&#xff09;通过对象序列化进行大数据的传输&#xff1b; 1.2 具体内容 对于网络的开发而言&#xff0c;最常使用的交互模式&#xff1a;W…

ZZ308 物联网应用与服务赛题第F套

2023年全国职业院校技能大赛 中职组 物联网应用与服务 任 务 书 &#xff08;F卷&#xff09; 赛位号&#xff1a;______________ 竞赛须知 一、注意事项 1.检查硬件设备、电脑设备是否正常。检查竞赛所需的各项设备、软件和竞赛材料等&#xff1b; 2.竞赛任务中所使用…

【MySQL篇】数据库角色

前言 数据库角色是被命名的一组与数据库操作相关的权限&#xff0c;角色是权限的集合。因此&#xff0c;可以为一组具有相同权限的用户创建一个角色&#xff0c;使用角色来管理数据库权限可以简化授权的过程。 CREATE ROLE&#xff1a;创建一个角色 GRANT&#xff1a;给角色授…

个人前端编程技巧总结

目录 1. 让界面位于当前屏幕的中心&#xff08;屏幕中心&#xff09;css代码示例 2. 界面透明但是内部元素不透明&#xff08;毛玻璃&#xff09;css代码示例 3. 将当前界面的参数传递到跳转的目标页面&#xff08;携参跳转&#xff09;js代码 1. 让界面位于当前屏幕的中心&…