02 Canal的安装使用

news2024/12/26 0:04:24

1 下载Canal

Cannal下载地址如下:https://github.com/alibaba/canal/releases,这里选择Canal 1.1.4版本下载。

2 上传解压

#首先创建目录 “/software/canal”
[root@node3 ~]# mkdir -p /software/canal

#将Canal安装包解压到创建的canal目录中
[root@node3 ~]# tar -zxvf /software/canal.deployer-1.1.4.tar.gz  -C /software/canal/

3 使用Canal同步Mysql数据

1. 使用Canal同步MySQL的数据可以直接使用Canal客户端API方式消费Canal同步的数据,详细api参照:https://github.com/alibaba/canal/wiki/ClientAPI
2. 也可以直接通过Canal将数据写入Kafka

4. Canal架构原理

Canal Server结构
在这里插入图片描述

1.server 代表一个 canal 运行实例,对应于一个 jvm。
2.instance 对应于一个数据队列 (1个 canal server 对应 1..n 个 instance )
3.instance 下的子模块:
	(1) eventParser: 数据源接入,模拟 slave 协议和 master 进行交互,协议解析
	(2) b.eventSink: ParserStore 链接器,进行数据过滤,加工,分发的工作
	(3) eventStore: 数据存储
	(4) metaManager: 增量订阅 & 消费信息管理器

Canal同步MySQL数据原理
EventParser在向mysql发送dump命令之前会先从Log Position中获取上次解析成功的位置(如果是第一次启动,则获取初始指定位置或者当前数据段binlog位点)。mysql接受到dump命令后,由EventParser从mysql上pull binlog数据进行解析并传递给EventSink(传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功),传送成功之后更新Log Position。流程图如下:
在这里插入图片描述
EventSink起到一个类似channel的功能,可以对数据进行过滤、分发/路由(1:n)、归并(n:1)和加工。EventSink是连接EventParser和EventStore的桥梁。
EventStore实现模式是内存模式,内存结构为环形队列,由三个指针(Put、Get和Ack)标识数据存储和读取的位置。
MetaManager是增量订阅&消费信息管理器,增量订阅和消费之间的协议包括get/ack/rollback,分别为:
(1)Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:batch id[唯一标识]和entries[具体的数据对象]。
(2)void rollback(long batchId),顾名思义,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作。
(3)void ack(long batchId),顾名思义,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作。

5 关于同步MySQL数据配置信息

在这里插入图片描述
首先Canal可以是一个集群,这里以Canal单机为例解释Canal同步MySQL数据配置文件配置原理。
首先需要在Canal中配置CanalServer 对应的canal.properties,这个文件中主要配置Canal对应的同步数据实例(Canal Instance)位置信息及数据导出的模式,例如:我们需要将某个mysql中的数据同步到Kafka中,那么就可以创建一个“数据同步实例”,导出到Kafka就是一种模式。
其次,需要配置Canal Instance 实例中的instance.properties文件,指定同步到MySQL数据源及管道信息。

5.1 配置步骤

配置“canal.properties”
进入“/software/canal/conf”目录下,编辑“canal.properties”文件:

#canal将数据写入Kafka,可配:tcp, kafka, RocketMQ,tcp就是使用canal代码接收
canal.serverMode = kafka

#配置canal写入Kafka地址
canal.mq.servers = node1:9092,node2:9092,node3:9092

配置mysql slave的权限
Canal的原理是模拟自己为mysql slave,所以这里一定需要做为mysql slave的相关权限 ,授权Canal连接MySQL具有作为MySQL slave的权限:

mysql> CREATE USER canal IDENTIFIED BY 'canal'; 
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';  
mysql> FLUSH PRIVILEGES;
mysql> show grants for 'canal' ;

配置“instance.properties”
进入“/software/canal/conf/example/”下,编辑“instance.properties”文件:

#canal伪装为一个mysql的salve,配置其id,不要和真正mysql server-id冲突,这里也可以不配置,会自动生成
canal.instance.mysql.slaveId=123456
#配置mysql master 节点及端口
canal.instance.master.address=node2:3306

#配置连接mysql的用户名和密码,就是前面复制权限的用户名和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

#配置Canal将数据导入到Kafka topic
canal.mq.topic=canal_topic

启动Canal
进入“/software/canal/bin”,执行“startup.sh”脚本启动Canal。

#启动Canal
[root@node3 ~]# cd /software/canal/bin/
[root@node3 bin]# ./startup.sh 
[root@node3 bin]# jps
68675 CanalLauncher  #启动成功

启动zookeeper和Kafka,并监控Kafka中“canal_topic”的数据
注意:“canal_topic”不需要提前创建,默认创建就是1个分区。

[root@node2 bin]# ./kafka-console-consumer.sh  --bootstrap-server node1:9092,node2:9092,node3:9092 --topic canal_topic

在MySQL中建表,插入语句

mysql> create database testdb;
mysql> use testdb;
mysql> create table person(id int ,name varchar(255),age int);
mysql> insert into person values (1,"zs",18),(2,"ls",19),(3,"ww",20);

对应的在Kafka中有对应的数据日志写入
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
关于以上json字段解析如下:
data:最新的数据,为JSON数组,如果是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据,如果是删除,则表示被删除的数据。
database:数据库名称。
es:事件时间,13位的时间戳。
id:事件操作的序列号,1,2,3…
isDdl:是否是DDL操作。
mysqlType:字段类型。
old:旧数据。
pkNames:主键名称。
sql:SQL语句。
sqlType:是经过canal转换处理的,比如unsigned int会被转化为Long,unsigned long会被转换为BigDecimal。
table:表名。
ts:日志时间。
type:操作类型,比如DELETE,UPDATE,INSERT。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1992250.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

反序列化--serial

去vulhub上下载此靶机:https://www.vulnhub.com/entry/serial-1,349/ 用Vmware新建虚拟机打开 用kali扫描ip arp-scan -l 访问扫描到的靶机ip访问 按F12 ---》网络---》cookie得到一串编码 base64解码 dirb http://靶机ip 扫描目录 访问backup目录 解压得到三个p…

node_sass安装失败,但又不想重装python2和降低node的版本,如何解决?

github下载对应的sass安装包,地址:https://github.com/sass/dart-sass/releases下载后解压到对应的文件夹,比如我的地址 E:\dart-sass配置环境变量:系统变量-->path变量-->新建。如下图

【Material-UI】Checkbox组件:颜色设置详解

文章目录 一、Checkbox 组件与颜色设置概述1. 组件介绍2. 基本用法 二、Checkbox 颜色设置的关键特性1. 使用预定义颜色2. 自定义颜色 三、Checkbox 颜色设置的实际应用场景1. 表单中的状态指示2. 设置页面中的选项分类3. 自定义主题 四、注意事项1. 无障碍支持2. 一致的用户体…

【Tessent IJATG Users Manual】【Ch5】IJTAG Network Insertion

The IJTAG Network Insertion FlowIJTAG Network Insertion ExampleModification of the IJTAG Network Insertion Flow How to Edit or Modify a DftSpecificationEdit or Modify MethodDftSpecification Examples IJTAG Network Insertion 可以将已有的 instrument 连接起来&…

怀山府交付,怀柔主城品质生活完美呈现

时间是美好的质造者,也是美好的检阅者。 越秀天恒怀山府,作为越秀地产进驻北京的开篇之作,承载着越秀地产深厚的匠心传承与府系产品的卓越品质,以时间为笔,四季为墨,7月26日,终于迎来了盛大交付…

【数据结构与算法 | 力扣+二叉搜索树篇】力扣450, 98

1. 力扣450:删除二叉搜索树的节点 1. 题目: 给定一个二叉搜索树的根节点 root 和一个值 key,删除二叉搜索树中的 key 对应的节点,并保证二叉搜索树的性质不变。返回二叉搜索树(有可能被更新)的根节点的引…

开个技术外挂|用自动建模工具巧妙解决电子产品连续跌落被摔的损伤,再也不怕手滑了

啪嗒,这是心碎的声音 消费电子、网络设备、家电等行业需要对产品进行不同区域的多次跌落试验,以研究产品在已有损伤的情况下,再次发生跌落,产生的影响。连续跌落仿真计算,可以对多次跌落产生的损伤累计进行查验&#x…

CMU15445 (Fall 2023) Project 1 - Buffer Pool 思路分享

文章目录 写在前面Task 1 - LRU-K Replacement PolicyTask 2 - Disk SchedulerTask 3 - Buffer Pool ManagerNewPageFetchPageUnpinPageDeletePageFlushPage 写在最后 写在前面 操作系统为应用程序提供了默认的缓存机制,DBMS作为应用程序,为什么不使用默…

2024年中级消防设施操作员(考前冲刺)证模拟考试题库及中级消防设施操作员(考前冲刺)理论考试试题

题库来源:安全生产模拟考试一点通公众号小程序 2024年中级消防设施操作员(考前冲刺)证模拟考试题库及中级消防设施操作员(考前冲刺)理论考试试题是由安全生产模拟考试一点通提供,中级消防设施操作员&#…

7.2 继承与多态:Python 面向对象编程的魔法

欢迎来到我的博客,很高兴能够在这里和您见面!欢迎订阅相关专栏: 工💗重💗hao💗:野老杂谈 ⭐️ 全网最全IT互联网公司面试宝典:收集整理全网各大IT互联网公司技术、项目、HR面试真题.…

idea中怎么使用git把项目提交到远程仓库

git的版本控制 在gitignore中添加这些文件 可以在与远程仓库进行操作的时候 忽略掉这些文件夹 使用Git进行项目代码的版本控制,具体操作: 1). 创建Git本地仓库 当Idea中VS变成Git(G): 说明本地仓库创建成功。 2). 创建Git远程仓库 1、gi…

Deep-Live-Cam:只需单张图像即可实现人脸替换;零一万物、月之暗面再掀国产大模型资本战丨 RTE 开发者日报

开发者朋友们大家好: 这里是 「RTE 开发者日报」 ,每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享 RTE(Real-Time Engagement) 领域内「有话题的新闻」、「有态度的观点」、「有意思的数据」、「有思考的文章」、…

萤石开放平台开发票流程

若您已经在开放平台完成充值,可通过以下方式自助开票 一、PC端开票 进入开放平台首页,登入账号,进入价格页面,点击首页右下方的“开票指引”入口 二、萤石云视频APP 萤石云APP:底部导航栏“我的”——点击“订单”…

如何编写一个有效的OKR(带有示例)

客户经常问我们的一个问题是:”我如何写出有效的目标和关键结果? 我如何写出有效的目标和关键结果? 一开始,要弄清楚不同的要素是很困难的,而且有这么多的指导和风格,你很难知道自己是否做对了。 在这篇…

【Nacos无压力源码领读】(三) Nacos 配置中心与热更新原理详解 敢说全网最细

本文将从 Nacos 配置中心的基本使用入手, 详细介绍 Nacos 客户端发布配置, 拉取配置, 订阅配置的过程以及服务器对应的处理过程; 配置订阅以及热更新原理相关的部分, 我看了主流的博客网站, 绝对没有比这更详细的讲解; 如果在阅读过程中对文中提到的 SpringBoot 启动过程以及…

k8s 与 docker 安装 Syncthing 文件同步服务器

Syncthing是一个开源文件同步工具,可以在多台设备之间实时同步文件或文件夹,官方网站:https://syncthing.net/ 下载地址:https://syncthing.net/downloads/ ,如果是windows一般推荐下载图形界面SyncTrayzor, 但我这边都…

视频循环存储的实现

目录 1. 三方工具 2. 视频存储的实现 2.1 分段存储 - 比如每15分钟 2.2 对齐到15分钟整边界 2.3 循环存储的实现 video_space_daemon.sh 3.封装 3.1 主执行程序,修订版 3.2 创建服务 3.3 service关联的执行脚本文件 4.额外的工作 附录A: ffmpeg视频存储…

中电金信三步法全面助力银行数字化营销体系建设

存量用户竞争时代 精细化经营、个性化服务与多场景覆盖 成为银行经营的重要策略 营销数字化转型不可或缺 但是,与所有转型的曲折、阵痛等特征一样,银行构建数字化营销运营体系过程中,亦走过一些弯路,包括: 缺少顶层…

收银系统源码-连锁店版本

千呼新零售2.0系统是零售行业连锁店一体化收银系统,包括线下收银线上商城连锁店管理ERP管理商品管理供应商管理会员营销等功能为一体,线上线下数据全部打通。 私有化独立部署/全开源源码,系统开发语言: 核心开发语言: PHP、HTML…

【漏洞复现】某赛通电子文档安全管理系统 PolicyAjax SQL注入漏洞

0x01 产品简介 某赛通电子文档安全管理系统(简称:CDG)是一款电子文档安全加密软件,该系统利用驱动层透明加密技术,通过对电子文档的加密保护,防止内部员工泄密和外部人员非法窃取企业核心重要数据资产&…