canal-server使用

news2025/1/25 9:03:24

canal是什么

canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
这句介绍有几个关键字:增量日志,增量数据订阅和消费。

这里我们可以简单地把canal理解为一个用来同步增量数据的一个工具。

我们看一张官网提供的示意图:

在这里插入图片描述

canal的工作原理就是把自己伪装成MySQL slave,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等等。

说明

本文档介绍了使用docker部署基于数据库canal-admin管理server
server推送消息到kafka

基本概念

集群

对应一组canal-server,组合在一起面向高可用HA的运维
启动canal-server时指定同一个cluster名称, 既可组成一个集群

canal-server

一个canal-server伪装自己为数据库master的slave, 用于接收binlog日志以及解析binlog

canal-instace

instance为canal-server的一个最小的订阅mysql的队列
每个canal-server下可以配置多个instace

canal-admin

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作

如果使用了canal-admin, 则可以大大的方便canal-server的而管理, 就可以不用在各个机器上维护server的各种配置.

启动canal-server时, 仅需要保留基本的配置即可(指定canal-admin的地址以及账号/密码即可)

准备工作

  1. 数据库需要先开启binlog写入功能, 并且配置binlog-formatrow
  2. 创建canal访问MySQL账号, 并授权(读取,slave,replication权限)
-- 创建账号
CREATE USER canal IDENTIFIED BY 'canal'; 
-- 授权
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

安装canal-admin

可参考官网地址:
https://github.com/alibaba/canal/wiki/Canal-Admin-Docker

初始化数据库

canal-admin的管理数据存储在数据库中, 因此我们需要先初始化数据库

下载数据库初始化文件: https://raw.githubusercontent.com/alibaba/canal/master/canal-admin/canal-admin-server/src/main/resources/canal_manager.sql

然后使用有数据库创建权限的账号执行, 我们可以看到一个canal-manager

下载canal-admin启动文件

docker目录下自带了一个run_admin.sh脚本, 通过以下目录下载

wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run_admin.sh 

运行admin

这里, 我们运行一个通过数据库管理的canal-admin, 命令如下:

sudo sh  run_admin.sh -e server.port=18089 \
         -e canal.adminUser=admin \
         -e canal.adminPasswd=admin \
         -e spring.datasource.address=gz-cdb-maaacf.sql.tencentcdb.com:50060 \
         -e spring.datasource.database=canal_manager \
         -e spring.datasource.username=root \
         -e spring.datasource.password=123456

通过命令sudo docker logs canal-admin查看启动结果, 成功显示:

在这里插入图片描述

运行成功后, 输入localhost:18089 即可进入管理登录页,默认账号admin/123456

注意, -e可以指定的参数可以配置canal-adminapplication.yaml中所有的key value配置项

application.yaml参数说明(注, 以下模板的值为默认值):

server:
  # admin的服务端口
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

# 管理端数据库配置
spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: canal
  password: canal
  # hikari数据库连接池配置(spring的插件)
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

# canal-admin的账号和密码(用于canal-server与admin通讯时校验)
# 注意: 这个不是canal-admin ui的登录账号
canal:
  adminUser: admin
  adminPasswd: admin

创建集群

进去到canal-admin的管理页后, 点击集群管理创建一个集群:
在这里插入图片描述

zk地址: zookeeper的地址, 用于管理集群节点(由于在同一台机器,可以填内网地址)

创建服务配置

如果用到集群, 在启动canal-server之前, 必须先配置好集群的服务主配置 , 启动的服务通过加载该配置来启动, 如果没有正确配置, canal-server启动会直接报错.

在这里插入图片描述

此处以写入kafka为例, 仅保留必要参数, 其他参数使用默认即可(为了清晰修改了哪些配置,默认配置已被移除):

#################################################
######### 		common argument		#############
#################################################
# zookeeper集群地址,  如果有多个地址用,号隔开
canal.zkServers = 172.16.0.17:2181

# tcp, kafka, rocketMQ, rabbitMQ
# 其中tcp为canal-server自己内部维护的一个消息队列
canal.serverMode = kafka

# 表示加载instance的方式是通过admin管理端
canal.instance.global.mode = manager
# instance消费记录模式
# 记录在内存, server重新启动会重新开始消费binlog
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
# 记录再文件中, 服务销毁(删除)会丢失
#canal.instance.global.spring.xml = classpath:spring/file-instance.xml
# 记录再zookeepr中,默认的模式
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### 		     Kafka 		     #############
##################################################
# kafka地址, 如果有多个用,号隔开
kafka.bootstrap.servers = 1.14.16.25:9092

启动canal-server

下载canal-server启动文件

docker目录下自带了一个run.sh脚本, 通过以下目录下载

wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh 

指定admin运行canal-server

sudo sh run.sh -e canal.admin.manager=127.0.0.1:18089 \
         -e canal.admin.port=11110 \
         -e canal.admin.user=admin \
         -e canal.admin.passwd=4ACFE3202A5FF5CF467898FC58AAB1D615029441 \
         -e canal.admin.register.auto=true \
         -e canal.admin.register.cluster=test-cluster

实际生成的命令如下:

docker run -d -it -h 172.16.0.17 \
-e canal.admin.manager=127.0.0.1:18089 \
-e canal.admin.port=11120 \
-e canal.admin.user=admin \
-e canal.admin.passwd=4ACFE3202A5FF5CF467898FC58AAB1D615029441 
-e canal.admin.register.auto=true \
-e canal.admin.register.cluster=test-cluster \
--name=canal-server \
--net=host -m 4096m canal/canal-server

通过命令sudo docker logs canal-server查看启动结果, 成功显示:

在这里插入图片描述

注意, -e可以指定的参数可以配置canal-servercanal_local.properties中所有的key value配置项

canal_local.properties 参数说明:

# register ip
# 注册到zk的ip, 如果不指定使用当前容器ip
canal.register.ip =

# canal admin config
# manager的地址
canal.admin.manager = 127.0.0.1:18089
# 当前服务的管理端口
canal.admin.port = 11110
# admin的账号, 必须与canal-admin中配置的canal.adminUser保持一致
canal.admin.user = admin
# admin的密码, canal.adminPasswd的密码密文形式
# 通过数据库canal_manager执行`select password("明文")` 查询出结果(去掉开头的*)
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

# admin auto register
# 是否自动注册
canal.admin.register.auto = true
# 注册的集群名称
canal.admin.register.cluster = my-cluster
# 注册到集群后显示的名称, 如果不指定则显示为机器ip:prot形式
canal.admin.register.name =

Tips:
这里的canal.admin.user与canal.admin.passwd是canal-admin中配置的账号和密码, 并不是ui的登录账号和密码.

我之前一直以为是登录账号和密码, 服务一直报错, 找了很久都没有找到原因.

配置instance

在配置instance前,确保准备工作章节已经完成

在这里插入图片描述

在管理页面添加instance, 此处模板保留必要参数, 其他参数按实际业务优化(为了清晰修改了哪些配置,默认配置已被移除):

# enable gtid use true/false
# 是否启用gtid来区分同步的位置, 默认false
canal.instance.gtidon=false

# position info, 指定同步的数据库以及初始要同步的位置
# 数据库master地址
canal.instance.master.address=gz-cdb-maaacf.sql.tencentcdb.com:50060

# 同步方式1,使用binlog文件标识要同步的位置
# 要同步的binlog的文件名称,如果不指定从最后一个binlog开始同步
canal.instance.master.journal.name=
# binlog同步的位置, 配置 journal.name使用
canal.instance.master.position=
# 从指定时间戳开始请求binlog
canal.instance.master.timestamp=
canal.instance.master.gtid=
# 同步方式2, 使用gtid标识要同步的位置
canal.instance.master.gtid=

# username/password
# 数据库的账号和密码,必须具备 replication 权限
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal@123
canal.instance.connectionCharset = UTF-8

# table regex
# 要匹配的数据表,正则(这里写死指定的数据表), 注意 . 是正则的关键字,所以需要加\\斜杠(两个)
canal.instance.filter.regex=zhubaoe.zby_twz_canal_test
# canal.instance.filter.regex=.*\\..*
# table black regex
# 黑名单,指定不匹配的表
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
# 表字段过滤,canal仅记录这些字段数据, 一般对于大表,最好指定该配置,防止fakfa写入太多的数据
# 实际使用中,可以仅仅指定主键id以及业务需要用到字段
# 注意,如果配置了该项,必须要加上主键id,否则框架消费者读取数据时无法关联同一行记录的新旧数据变化
# canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
canal.instance.filter.field=zhubaoe.zby_twz_canal_test:id/name
# 表字段黑名单, 不监听这些字段的变化
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
# canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
# 写入消息队列的topic(默认主题,必须配置)
canal.mq.topic=canal-test
# 针对库名或者表名发送动态topic
# canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id

完整的参数以及说明, 可以参看官方文档: https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

kafka数据内容

解析binlog后, canal推送到kafka的消息如下

{
    "id":465,
    "table":"zby_twz_canal_test",
    "pkNames":[
        "id"
    ],
    "sqlType":{
        "id":4,
        "name":12,
        "price":3
    },
    "mysqlType":{
        "id":"int(11)",
        "name":"varchar(255)",
        "price":"decimal(10,2)"
    },
    "sql":"",
    "type":"UPDATE",
    "isDdl":false,
    "data":[
        {
            "id":"4",
            "name":"昭君2",
            "price":null
        }
    ],
    "old":[
        {
            "name":"昭君3"
        }
    ],
    "es":1653363454000,
    "ts":1653363454386
}

字段说明

字段类型说明
idintcanal内部维护的消息id
tablestring变更的数据表
pkNamesstring[]主键
sqlTypemap[string]int字段类型(主类型)
mysqlTypemap[string]string具体的类型, 会带上长度
sqlstring原始的sql
typestring操作类型,值为: INSERT UPDATE DELETE QUERY
isDdlbool是否为ddl操作
data[]map[string]string当前变更的数据内容, 注意这里是数组, 因为如果是同一个sql更新影响的多行数据, 会推送到同一个消息中
old[]map[string]string旧数据, 只会显示变更了数据的字段
estimestamp数据库中的数据变更时间
tstimestampcanal的数据同步时间

Tips: data中记录的内容, 字段根据instance的canal.instance.filter.field配置(如果不指定,显示整个表所有的字段, 一般不建议).

错误排查

canal-server启动后, 瞬间就会退出

首先, 通过 docker inspect canal-server查询容器信息, 我们可以看到挂在目录

"GraphDriver": {
            "Data": {
                "LowerDir": "/data/docker/overlay2/fcb0ac8653cc3bfad55456436a608e6ad8e998c52ca83367398895b43fde3680-init/diff:/data/docker/overlay2/540d4fee0dba2fcf581bb47c0caee7e7077d023c18390b94770553cfbd961055/diff:/data/docker/overlay2/4dc443dca60a09cb7827381790d28a131b098f13ebb047dc7330097484b053e1/diff:/data/docker/overlay2/b04b0831a813f3d0fdb098f86a8e5ef8096bb078e1e760177af76e336c7c84df/diff:/data/docker/overlay2/800e584ff2c268c71eecfaab52e6a67b17c20c09aa8b0d63a7b7b55d168daec5/diff:/data/docker/overlay2/499d4fb694d39227ea5af64375c6faab41fb6c8ee8cd9e1d2440fe52da0fe25f/diff:/data/docker/overlay2/686c803859ec7513865c7517616526dfd473547ec8ea2b64978aa06e2f1cb07a/diff:/data/docker/overlay2/667b963828ad6e6fce66cb7c2f0ddb09f3bf8404df3e959a0b1eff560be70af2/diff",
                "MergedDir": "/data/docker/overlay2/fcb0ac8653cc3bfad55456436a608e6ad8e998c52ca83367398895b43fde3680/merged",
                "UpperDir": "/data/docker/overlay2/fcb0ac8653cc3bfad55456436a608e6ad8e998c52ca83367398895b43fde3680/diff",
                "WorkDir": "/data/docker/overlay2/fcb0ac8653cc3bfad55456436a608e6ad8e998c52ca83367398895b43fde3680/work"
            },
            "Name": "overlay2"
        },

部署过程问题总结

  1. canal-server一直无法启动, 查看日志显示 admin:auth error
    原因: canal-server配置的canal.admin.user canal.admin.passwd参数, 账号密码使用错误(使用了canal-admin ui的登录账号和密码)
  2. 部署的admin当天登录正常, 隔天登录后台就显示 network error
    原因: 使用run_admin.sh启动canal-admin, 其限定了容器的内存为1G(参数 -m 1024), 因此服务启动后内存过大导致服务自动关闭.

查看日志方式:

docker inspect canal-server
less /data/docker/overlay2/43d902280348af49d2f6e86443e9d091c737eda803d26cc33a29e4ae683c37d4/diff/home/admin/canal-server/logs/canal/canal.log 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/46471.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

10月11日

gitlab 用https网址拉取项目的时候,出现: error setting certificate verify locations: CAfile: D:/Git/Git/mingw64/ssl/certs/ca-bundle.crt CApath: none原因是: 修改为自己git所在的文件夹即可(原因是我是从另外一台电脑直…

AndroidStudio连接真机测试运行

文章目录准备步骤1、打开要连接的手机2、配置AndroidStudio3、用数据线将手机和电脑连起来4、打开 开发者模式后问题1、可能需要下user driver才能连接手机2、电脑连不上手机3、手机连上电脑后不能运行软件运行Android Studio运行虚拟机时会占用较大的电脑内存而自己电脑内存不…

流媒体传输 - HLS 协议

HLS 全称是 HTTP Live Streaming,是一个由 Apple 公司提出的基于 HTTP 的媒体流传输协议,用于实时音视频流的传输。目前 HLS 协议被广泛的应用于视频点播和直播领域。 概述 原理介绍 通过将整条流切割成一个小的可以通过 HTTP 下载的媒体文件&#xff…

SpringBoot中拦截器的使用

SpringBoot中拦截器接口: public interface HandlerInterceptor {default boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {return true;}default void postHandle(HttpServletRequest request, …

【jmeter】windows下使用 (测试MQTT)

1. 添加线程组 二、添加如下请求 1. 添加创建连接请求-选中线程组, 点击右键,添加>取样器>MQTT Connect设置MQTT连接 本次使用本机开启的MQTT服务进行测试,默认ip为127.0.0.1,端口默认1883 2. 添加发布请求-选中线程组 …

面试:各种热修复框架对比

目前Android热修复的技术方案大致可以归类为以下几种: 资源热替换代码热修复动态库替换 资源修复 创建新的AssetManager实例,利用反射加载外部(SD卡)需要热修复的资源路径通过当前Activity实例获取Resouces资源类,然后反射得到mAssets属性&…

powerlevel10k 颜色和图标的自定义设置

文章目录1. 颜色的更改2. 图标的更改3. 附录powerlevel10k 的设置向导命令是p10k configure如果想在这个预定好的设置上面做一些个人的自定义设置,就得更改 powerlevel10k 的配置文件:~/.p10k.zsh 下面的操作主要就是在这个配置文件上面更改。 1. 颜色的…

浅谈芯片验证中的仿真运行之 compilation unit 技术(实践篇)

前言 前面文章,讲述了一些关于SV语法下,编译问题的一些基本概念。其实,芯片验证中仿真工具编译仿真文件、RTL文件的一些规则,依据每一家仿真工具内部的编译原理,都是保密的,用户只要按照仿真工具的使用规则去使用,即可。最近,笔者遇到了一个问题,今天空下来,总结一下…

Windows本地安装Redis且设置服务自启

redis中文网:http://redis.cn/ 如果是安装Windows版的redis需要去GitHub上下载安装包 如果是在Linux上安装,可以直接使用命令进行安装 本次教程是基于Windows系统进行的 GitHub地址:https://github.com/microsoftarchive/redis 选择需要下…

Online Decision Transformer

摘要 最近的工作表明,离线强化学习 (RL) 可以表述为序列建模问题 (Chen et al., 2021; Janner et al., 2021),并通过类似于大规模语言建模的方法来解决。 然而,RL 的任何实际实例化还涉及在线组件,其中在被动离线数据集上预训练的…

Express:Express 中间件

中间件的概念 1. 什么是中间件 中间件(Middleware ),特指业务流程的中间处理环节。 2. 现实生活中的例子 在处理污水的时候,一般都要经过三个处理环节,从而保证处理过后的废水,达到排放标准。 处理污水…

aws cloudformation 堆栈集的创建和使用

资料 使用 AWS CloudFormation StackSets 跨多个 AWS 账户和区域配置资源AWS cloudformation示例模板堆栈集堆栈实例状态原因 很多组织使用大量的 AWS 账户,通常用 AWS Organizations 将这些账户组织为分层结构,分组为不同的组织部门 (OU)。并且希望确…

Teams app 的 SSO 机制

我们来继续我们的 Teams sample 之旅,上一个讲了 Tab app,那我们这里再深入一步,看一下如何使用 sso 机制。 sso 是一个很有用机制,它可以让我们的 teams app 能获取当前用户的身份。sso 很多时候比较难彻底理解,在开…

刷爆力扣之公平的糖果交换

刷爆力扣之公平的糖果交换 HELLO,各位看官大大好,我是阿呆 🙈🙈🙈 今天阿呆继续记录下力扣刷题过程,收录在专栏算法中 😜😜😜 该专栏按照不同类别标签进行刷题&#xff…

【数据链路层】循环冗余码CRC、后退N帧协议GBN、选择重传协议SR、CSMA/CA

文章目录循环冗余码CRC多帧滑动窗口连续ARQ协议后退N帧协议GBN选择重传协议SRCSMA/CA---针对无线局域网处理隐蔽站问题RTS,CTS循环冗余码CRC /*** 计算CRC16校验码** param bytes* return* [1,3,4,1,205,1,18,235,173]*/public static String CRC16(byte[] bytes) {…

终于见识到了微服务的天花板!SpringCloud全线手册,太强了

后台都是在问微服务架构的面试题怎么答,想聊聊微服务架构了。微服务架构一跃成为 IT 领域炙手可热的话题也就这两年的事,大量一线互联网公司因为庞大的业务体量和业务需求,纷纷投入了微服务架构的建设中,像阿里巴巴、百度、美团等…

Kamiya丨Kamiya艾美捷大鼠微量白蛋白酶联免疫吸附试验说明书

Kamiya艾美捷大鼠微量白蛋白酶联免疫吸附试验预期用途: 大鼠微量白蛋白酶联免疫吸附试验(ELISA)是一种高灵敏度的双位点酶联免疫吸附试验(ELISA)大鼠生物样品中微量白蛋白的测定。仅供研究使用。 引言 白蛋白&#x…

Java项目:ssm学生学籍管理系统

作者主页:源码空间站2022 简介:Java领域优质创作者、Java项目、学习资料、技术互助 文末获取源码 项目介绍 SSM项目-学生学籍管理系统。该项目分管理员、老师、学生三种用户角色。每种角色分别对应不同的菜单; 以下分别介绍各个角色对应的功…

[附源码]计算机毕业设计springboot基于Java的日用品在线电商平台

项目运行 环境配置: Jdk1.8 Tomcat7.0 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术: SSM mybatis Maven Vue 等等组成,B/S模式 M…

二进制数据的贝叶斯非参数聚类算法(Matlab代码实现)

目录 💥1 概述 📚2 部分运行结果 🎉3 参考文献 👨‍💻4 Matlab代码 💥1 概述 利用图像结构信息是字典学习的难点,针对传统非参数贝叶斯算法对图像结构信息利用不充分,以及算法运行效率低下的问题,该文…