canal: 连接kafka (docker)

news2025/1/23 14:51:12

一、确保mysql binlog开启并使用ROW作为日志格式
docker 启动mysql 5.7配置文件
my.cnf

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server-id=1

在这里插入图片描述

在这里插入图片描述
一定要确保上述两个值一个为ROW,一个为ON

二、下载canal的run.sh

https://github.com/alibaba/canal/blob/master/docker/run.sh

三、启动canal容器,其中配置了mysql的信息和kafka的信息:

./run.sh -e canal.auto.scan=false -e canal.destinations=me -e canal.instance.master.address=xx.xx.xx.xx:3306  -e canal.instance.dbUsername=root  -e canal.instance.dbPassword=yourpassword  -e canal.instance.connectionCharset=UTF-8 -e canal.instance.tsdb.enable=true -e canal.instance.gtidon=false -e canal.instance.topic=me -e canal.mq.partition=0  -e canal.mq.topic=me

四、通过docker exec -it canal-server /bin/bash进入canal容器,编辑容器中的

/home/admin/canal-server/conf/canal.properties

其中要修改两处,一是工作模式改为kafka(canal.serverMode),二是改MQ的地址(canal.mq.servers),完整配置类似如下:

#################################################
######### 		common argument		############# 
#################################################
canal.id = 1
canal.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun ak/sk , support rds/mq
canal.aliyun.accesskey =
canal.aliyun.secretkey =

#################################################
######### 		destinations		############# 
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### 		     MQ 		     #############
##################################################
canal.mq.servers = xx.xx.xx.xx:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all

五、重启canal容器
docker restart canal-server

这样,对于上面的mysql 数据库中的增删改,在kakfa都可以在me 这个topic 收到数据了。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1547442.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Java】LinkedList vs. ArrayList:Java中的数据结构选择

人不走空 🌈个人主页:人不走空 💖系列专栏:算法专题 ⏰诗词歌赋:斯是陋室,惟吾德馨 目录 🌈个人主页:人不走空 💖系列专栏:算法专题 ⏰诗词歌…

Kindling the Darkness:A Practical Low-light Image Enhancer

Abstract 在弱光条件下拍摄的图像通常会出现(部分)可见度较差的情况。,除了令人不满意的照明之外,多种类型的退化也隐藏在黑暗中,例如由于相机质量有限而导致的噪点和颜色失真。,换句话说,仅仅调高黑暗区域的亮度将不…

R语言随机抽取数据,并作两组数据间t检验,并保存抽取的数据,并绘制boxplot

前提:接着上述R脚本输出的seed结果来选择应该使用哪个seed比较合理,上个R脚本名字: “5utr_计算ABD中Ge1和Lt1的个数和均值以及按照TE个数小的进行随机100次抽样.R” 1.输入数据:“5utr-5d做ABD中有RG4和没有RG4的TE之间的T检验.c…

String类(三)

文章目录 string类(三)string类的模拟实现:1.默认成员变量和函数2.string的长度和下表引用3.字符串拷贝构造4. 赋值拷贝5.字符串比较6.字符串的增添操作7.insert插入操作8.遍历字符 string类(三) string类的模拟实现&…

jupyter lab使用虚拟环境

python -m ipykernel install --name 虚拟环境名 --display-name 虚拟环境名然后再启动jupyter lab就行了

【Unity】调整Player Settings的Resolution设置无效

【背景】 Build时修改了Player Settings下的Resolution设置,但是再次Building时仍然不生效。 【分析】 明显是沿用了之前的分辨率设定,所以盲猜解决办法是Build相关的缓存文件,或者修改打包名称。 【解决】 实测修改版本号无效&#xf…

IDEA使用常用的设置

一、IDEA常用设置 可参考:IDEA这样配置太香了_哔哩哔哩_bilibili 波波老师 二、插件 可参考:IDEA好用插件,强烈推荐_哔哩哔哩_bilibili 波波老师 三、其他 学会用点“.” IDEA弹窗Servers certificate is not trusted怎么禁止&#xf…

基于SSM作业提交与批改

基于SSM作业提交与批改的设计与实现 摘要 社会的进步导致人们对于学习的追求永不止境,那么追求学习的方式也从单一的书本教程变成了多样化的学习方式。多样化的学习方式不仅仅是需要人们智慧的依靠,还需要能够通过软件的加持进行信息化的价值体现。软件…

uniapp开发小程序遇到的问题,持续更新中

一、uniapp引入全局scss 在App.vue中引入uni.scss <style lang"scss">/* #ifndef APP-NVUE */import "uni.scss";/* #endif */ </style>注意&#xff1a;nvue页面的样式在编译时&#xff0c;有很多样式写法被限制了&#xff0c;容易报错。所…

干货分享DS5L1伺服电机通过倍讯科技485转 Profinet 网关与西门子PLC进行通信的配置方法

倍讯科技485转 ProfinetDS5L1 伺服电机与 Profinet 网关进行通信需要了解 Profinet 协议和伺服电机的具体通信要求。以下是您可以如何解决此问题的总体概述&#xff1a; 了解 Profinet&#xff1a;Profinet 是自动化工业以太网标准。您需要了解 Profinet 的工作原理、其寻址方案…

2024 解决 Failed to launch process [ElasticSearch]

操作系统&#xff1a;centos 7 (x86) sonarQube不能使⽤root账号进⾏启动&#xff0c;所以需要创建普通⽤户及其⽤户组 一、问题描述&#xff1a;使用root启动时&#xff0c;一直反馈 SonarQube is not running 问题原因&#xff1a;不能够使用root用户进行启动 解决方案…

三点估算计算

当历史数据不充分时&#xff0c;通过考虑估算中的不确定性和风险&#xff0c;可以提高活动持续时间估算的准确性。使用三点估算有助于界定活动持续时间的近似区间: 乐观时间&#xff08;Optimistic Time&#xff0c;To&#xff09;&#xff1a;在任何事情都顺利的情况下&#…

DFS深度优先搜索刷题(二)

一.P1683 入门 算法思想&#xff1a;设置瓷砖状态st&#xff0c;这里瓷砖状态是否走过决定计数与否&#xff0c;因为可以重复走过但只记一次&#xff0c;所以可以不用回溯。每一次dfs都记录此时的坐标与进入可能的新坐标。 const int N 25;int W, H; char map[N][N];//存地图…

20240319-2-机器学习基础面试题

⽼板给了你⼀个关于癌症检测的数据集&#xff0c;你构建了⼆分类器然后计算了准确率为 98%&#xff0c; 你是否对这个模型很满意&#xff1f;为什么&#xff1f;如果还不算理想&#xff0c;接下来该怎么做&#xff1f; 首先模型主要是找出患有癌症的患者&#xff0c;模型关注的…

苹果与百度合作,将在iPhone 16中使用生成式AI

3月25日&#xff0c;《科创板日报》消息&#xff0c;苹果将与百度进行技术合作&#xff0c;为今年即将发布的iPhone16、Mac系统和iOS 18提供生成式AI&#xff08;AIGC&#xff09;功能。 据悉&#xff0c;苹果曾与阿里巴巴以及另外一家国产大模型厂商进行了技术合作洽谈。最终…

机器学习模型及其使用方法——《机器学习图解》

本书教你两件事——机器学习模型及其使用方法 机器学习模型有不同的类型&#xff0c;有些返回确定性的答案&#xff0c;例如是或否&#xff0c;而另一些返回概率性的答案。有些以问题的形式呈现&#xff1b;其他则使用假设性表达。这些类型的一个共同点是它们都返回一个答案或…

单链表专题(上)(顺序表链表线性表)

在开始之前思考一个顺序表的问题 1. 中间/头部的插⼊删除&#xff0c;时间 复杂度为O(N) 2. 增容需要申请新空间&#xff0c;拷⻉数据&#xff0c;释放旧空间。会有不⼩的消耗。 3. 增容⼀般是呈2倍的增⻓&#xff0c;势必会有⼀定的空间浪费。例如当前容量为100&#xff0c;…

HTML(二)---【常见的标签使用】

零.前言 本文只介绍常见的标签使用&#xff0c;其中使用的一些HTML专业术语可以在作者的第一篇文章&#xff1a; HTML&#xff08;一&#xff09;---【基础】-CSDN博客中找到。 一.<b>粗体、<i>或<em>斜体 1.定义 粗体、斜体的实现可以在CSS中实现&…

DaisyDisk for mac 苹果电脑磁盘清理工具

DaisyDisk for Mac是一款直观易用的磁盘空间分析工具&#xff0c;专为Mac用户设计&#xff0c;旨在帮助他们快速识别和管理磁盘上的文件与文件夹&#xff0c;从而释放存储空间。 软件下载&#xff1a;DaisyDisk for mac 激活版 DaisyDisk采用独特的可视化界面&#xff0c;将磁盘…

基于uniapp微信小程序我的钱包页面

基于uniapp color ui 页面效果图&#xff1a; 代码部分&#xff1a; https://download.csdn.net/download/kay523393/89035927