Canal + Kafka 同步 MySQL 数据到 Redis

news2024/11/19 17:34:40
解决缓存和数据库一致性问题

一般来说,缓存中的数据没什么问题,但是数据库更新后,就容易出现缓存(Redis)和数据库(MySQL)间的数据一致性问题。由于写和读是并发的,没法保证顺序,就会出现缓存和数据库的数据不一致的问题

Canal工作原理
canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
mysql master收到dump请求,开始推送binary log给slave(也就是canal)
canal解析binary log对象(原始为byte流)

可参考官方文档

QuickStart · alibaba/canal Wiki · GitHub

1、Kafka,ZK安装略,本博主其它文章有

        安装好后,先启动ZK,在启动Kafka

2、win10编辑my.ini

[mysqld]
#开启 binlog
log-bin=mysql-bin
#选择 ROW 模式
binlog-format=ROW 
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server_id=1 

        

开启授权
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

3、下载canal

        下载地址:Releases · alibaba/canal · GitHub

解压后会有5文件夹

进入conf

编辑canal.properties

#################################################
######### 		common argument		#############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =

canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

#################################################
######### 		destinations		#############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### 	      MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=

canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local

canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8

##################################################
######### 		     Kafka 		     #############
##################################################
##kafka地址
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"

##################################################
######### 		    RocketMQ	     #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag = 

##################################################
######### 		    RabbitMQ	     #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =

进入canal\conf\example

编辑instance.properties

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password 这里是你的数据库
canal.instance.dbUsername=root
canal.instance.dbPassword=12345678
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex 这里是你要监听的表,多个用,隔开
canal.instance.filter.regex=test.user
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config 这里是你要用哪个topic监听
canal.mq.topic=myTestTopic
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################


启动canal

双击bin目录的 startup.bat

4、在pom.xml引入Kafka、canal的依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

yml添加Kafka的配置

  kafka:
    #kafka配置
    bootstrap-servers: 127.0.0.1:9092
    producer: #生产者
      retries: 3 #设置大于0的值,则客户端会将发送失败的记录重新发送的次数
      # 每次批量发送消息的数量
      batch-size: 16384
      buffer-memory: 33554432
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: -1
    consumer:
      # 指定默认消费者group id
      group-id: my-consumer-group
      auto-offset-reset: earliest
      enable-auto-commit: false
      auto-commit-interval: 5000
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      
    listener:
      ack-mode: manual_immediate

5、定义消费者


import com.fan.li.myspringboot.util.RedisClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

/**
 * @ClassName CanalConsumer
 * @Description TODO
 * @Author fan
 * @Date 2024/2/23 14:11
 * @Version 1.0
 */
@Component
@Slf4j
public class CanalConsumer {

    @Autowired
    RedisClient redisClient;//redis工具类,自己封装即可

    @KafkaListener(topics = "myTestTopic", groupId = "canalKafka-groupId")
    public void canalListenerKafkaToRedis(ConsumerRecord<String, String> record, Acknowledgment ack) {
        //数据表变动的值
        String value = record.value();
        log.info("数据表发生变动:{}", value);
        //简单存入redis
        redisClient.setKey("你的key",value);
        //手动提交
        ack.acknowledge();
    }
}

6、测试:往数据库的表插入一条数据,观察后台日志

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1466490.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java_URL中的URL编码转换成中文

问题描述 上传文件后&#xff0c;获得的URL中包含了URL编码&#xff0c;导致在前端展示文件名时出现乱码 最终效果 解决思路&#xff1a; 1、先按照英文逗号切割URL 2、截取字符串中URL编码部分(含后缀名) 3、使用正则匹配截取到的字符串中的URL编码 4、转换URL编码为中文&a…

创建一个基于Node.js的实时聊天应用

在当今数字化社会&#xff0c;实时通讯已成为人们生活中不可或缺的一部分。无论是在社交媒体平台上与朋友交流&#xff0c;还是在工作场合中与同事协作&#xff0c;实时聊天应用都扮演着重要角色。与此同时&#xff0c;Node.js作为一种流行的后端技术&#xff0c;为开发者提供了…

1TB! 台湾最新倾斜摄影3DTiles数据分享

之前的文章分享了546GB香港倾斜摄影3DTiles数据&#xff0c;主要是验证倾斜模型3DTiles转换工具的生产效率和数据显示效率&#xff0c;结果对比可以看出无论是数据生产速度以及成果数据显示效率上&#xff0c;都优于其他两种技术路线。最近使用倾斜模型3DTiles工具生产了台湾地…

Spring 手动实现Spring底层机制

目录 一、前言 二、Spring底层整体架构 1.准备工作 : 2.架构分析 : &#xff08;重要&#xff09; 3.环境搭建 &#xff1a; 三、手动实现Spring容器结构 1.自定义注解 : 1.1 Component注解 1.2 Scope注解 2.自定义组件 : 3.自定义用于封装Bean信息的BeanDefinition类&a…

STM32 SPI(基础概念)

文章目录 前言一、SPI通信协议概述二、SPI硬件框图和软件层次三、SPI通信时序四、SPI控制器总结 前言 本篇文章来给大家讲解一个非常重要的通信协议SPI&#xff0c;SPI在MCU和外设之间的通信用的是非常多的&#xff0c;这篇文章将带大家先来学习SPI的一些概念。 一、SPI通信协…

alist修改密码(docker版)

rootarmbian:~# docker exec -it [docker名称] ./alist admin set abcd123456 INFO[2024-02-20 11:06:29] reading config file: data/config.json INFO[2024-02-20 11:06:29] load config from env with prefix: ALIST_ INFO[2024-02-20 11:06:29] init logrus..…

《TCP/IP详解 卷一》第3章 链路层

目录 3.1 引言 3.2 以太网 3.3 全双工 省点 自动协商 流量控制 3.4 网桥和交换机 3.5 WiFi 3.6 PPP协议 3.6.1 PPP协议流程 3.7 环回 3.8 MTU和路径MTU 3.9 隧道基础 3.9.1 GRE 3.9.2 PPTP 3.9.3 L2TP 3.10 与链路层相关的攻击 3.11 总结 3.1 引言 城域网&…

2024年1月京东洗衣机行业数据分析:TOP10品牌销量销额排行榜

鲸参谋监测的京东平台1月份洗衣机市场销售数据已出炉&#xff01; 根据鲸参谋电商数据分析平台显示&#xff0c;今年1月份&#xff0c;京东平台上洗衣机的销量约160万件&#xff0c;环比上个月增长约42%&#xff0c;同比去年下滑7%&#xff1b;销售额约28亿元&#xff0c;环比…

Java零基础 - 三元运算符

哈喽&#xff0c;各位小伙伴们&#xff0c;你们好呀&#xff0c;我是喵手。 今天我要给大家分享一些自己日常学习到的一些知识点&#xff0c;并以文字的形式跟大家一起交流&#xff0c;互相学习&#xff0c;一个人虽可以走的更快&#xff0c;但一群人可以走的更远。 我是一名后…

Golin 弱口令/漏洞/扫描/等保/基线核查的快速安全检查小工具

下载地址&#xff1a; 链接&#xff1a;https://pan.quark.cn/s/db6afba6de1f 主要功能 主机存活探测、漏洞扫描、子域名扫描、端口扫描、各类服务数据库爆破、poc扫描、xss扫描、webtitle探测、web指纹识别、web敏感信息泄露、web目录浏览、web文件下载、等保安全风险问题风险…

强大的文本绘图——PlantUML

PlantUML是一款开源工具&#xff0c;它允许用户通过简单的文本描述来创建UML图&#xff08;统一建模语言图&#xff09;。这种方法可以快速地绘制类图、用例图、序列图、状态图、活动图、组件图和部署图等UML图表。PlantUML使用一种领域特定语言&#xff08;DSL&#xff09;&am…

通过platform总线驱动框架编写LED灯的驱动,编写应用程序测试,发布到CSDN

效果图 设备树代码 myplatform{compatible "hqyj,myplatform";led1-gpio<&gpioe 10 0>;led2-gpio<&gpiof 10 0>;led3-gpio<&gpioe 8 0>; interrupt-parent <&gpiof>;interrupts<9 0>;reg<0X12345678 …

idea在工具栏中显示快速创建包和类的图标

一、效果图 点击需要创建包或者类的位置&#xff0c;在点击对用的图标就可以快速创建类或者包了。 二、设置 步骤一 View-->Appearance-->Toolbar 步骤二 File-->Settings-->Appearance & Behavior-->Menus and Toolbars-->Main Toolbar-->----…

Vue3 (unplugin-auto-import自动导入的使用)

安装 参考链接 npm i -D unplugin-auto-importvite.config.ts里面配置 import AutoImport from unplugin-auto-import/viteAutoImport({imports:[ vue,vue-router]})重新运行项目会生成一个auto-imports.d.ts的文件 /* eslint-disable */ /* prettier-ignore */ // ts-nochec…

在Ubuntu系统下搭建TDengine集群

目录 一、Ubuntu虚拟机创建 二、系统相关配置 1、设置系统hostname 2、网络配置及IP规划 3、配置FQDN&#xff08;etc/hosts&#xff09; 4、服务端口设置 三、TDengine server安装 1、服务安装 2、修改配置 3、启动taosd 4、服务卸载 四、客户端安装 1、client安…

密评技术要求实施详解:每一步都关键

密评简介 密评定义&#xff1a;全称商用密码应用安全性评估, 是对采用商用密码技术、产品和服务集成建设的网络和信息系统密码应用的合规性、正确性、有效性进行评估的活动。 评测依据&#xff1a;GB/T 39786-2021《信息安全技术 信息系统密码应用基本要求》。 密评对象&…

(done) 什么是特征值和特征向量?如何求特征值的特征向量 ?如何判断一个矩阵能否相似对角化?

什么是齐次方程&#xff1f; https://blog.csdn.net/shimly123456/article/details/136198159 行列式和是否有解的关系&#xff1f; https://blog.csdn.net/shimly123456/article/details/136198215 特征值和特征向量 参考视频&#xff1a;https://www.bilibili.com/video/BV…

【MATLAB】CEEMD_ MFE_SVM_LSTM 神经网络时序预测算法

有意向获取代码&#xff0c;请转文末观看代码获取方式~也可转原文链接获取~ 1 基本定义 CEEMD_MFE_SVM_LSTM神经网络时序预测算法是一种结合了多种先进技术的复杂预测方法&#xff0c;旨在提高时序预测的准确性和稳定性。下面是对该算法的详细介绍&#xff1a; CEEMD&#xff…

基于Java+SpringBoot+Vue前后端分离婚纱影楼管理系统设计和实现

博主介绍&#xff1a;✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和学生毕业项目实战,高校老师/讲师/同行交流合作✌ 主要内容&#xff1a;SpringBoot、Vue、SSM、HLM…

【监督学习之决策树和随机森林】

曾梦想执剑走天涯&#xff0c;我是程序猿【AK】 目录 简述概要知识图谱决策树&#xff08;Decision Tree&#xff09;随机森林&#xff08;Random Forest&#xff09; 简述概要 了解决策树和随机森林 知识图谱 决策树和随机森林都是机器学习中常用的算法&#xff0c;它们在处…