文章目录
- Canal整合SpringBoot详解(一)
- 什么是canal
- 搭建Kafka3.2.1集群⭐
- Kafka集群机器规划
- 创建3台虚拟机(centos7系统)
- 必要的环境准备(3台虚拟机都要执行如下操作)⭐
- 分别修改每个服务器的hosts文件(将上面的ip和主机名配置上去)
- 分别关闭每个服务器的防火墙
- 分别为每个服务器安装jdk8
- 分别为每个服务器安装Docker
- 为每个节点的Docker接入阿里云镜像加速器
- 为每个节点的docker设置开机自动启动
- 分别为每个服务器安装zookeeper3.7.1(搭建zookeeper集群)⭐
- 分别为每个服务器安装Kafka3.2.1(搭建Kafka集群)⭐
- 安装MySQL5.7,配置canal+mysql(本次采用Docker的方式)⭐
- 案例1:Canal+Kafka实现mysql和redis的数据同步⭐
- 必要的环境
- 配置canal.deployer
- 启动canal.deployer⭐
- 配置hosts(由于我是Windows运行,没有配置hosts导致无法识别kafka01主机名)⭐
- 创建一个SpringBoot项目⭐
- 项目结构
- 准备需要同步的数据库表⭐
- pom.xml⭐
- application.yml⭐
- RedisTemplateConfig(配置类)
- Config.class⭐
- canal要求必须要的实体类(用于接收canal发送到kafka的同步消息)⭐
- ConfigCanalBean.class⭐
- MysqlType.class⭐
- SqlType.class⭐
- ConfigCanalRedisConsumer(kafka消费者类,监听指定topic,把canal发送的消息同步到Redis中)⭐
- 创建kafka的topic(我们指定的topic名称为canal-test-topic)
- 开始测试canal同步效果⭐
- 测试1:给t_config表插入数据
- 测试2:修改t_config表数据
- 测试3:删除t_config表数据
Canal整合SpringBoot详解(一)
什么是canal
- canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
- canal工作原理:
- canal的工作原理就是把自己伪装成MySQL slave从节点,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如RocketMQ、Kafka、ElasticSearch等等。
- canal能做什么:
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护
- 业务cache(缓存)刷新
- 带业务逻辑的增量数据处理
搭建Kafka3.2.1集群⭐
Kafka集群机器规划
IP地址 | 主机名 | 需要安装的资源 | 操作系统 |
---|---|---|---|
192.168.184.201 | kafka01 | jdk、Docker、zookeeper、Kafka | centos7.9 |
192.168.184.202 | kafka02 | jdk、Docker、zookeeper、Kafka | centos7.9 |
192.168.184.203 | kafka03 | jdk、Docker、zookeeper、Kafka | centos7.9 |
创建3台虚拟机(centos7系统)
必要的环境准备(3台虚拟机都要执行如下操作)⭐
分别修改每个服务器的hosts文件(将上面的ip和主机名配置上去)
- 1:进入hosts文件:
vi /etc/hosts
在最后面追加内容如下:(这个需要根据你自己服务器的ip来配置)
192.168.184.201 kafka01
192.168.184.202 kafka02
192.168.184.203 kafka03
分别关闭每个服务器的防火墙
systemctl stop firewalld
systemctl disable firewalld
分别为每个服务器安装jdk8
- 1:进入oracle官网下载jdk8的tar.gz包:
-
2:将下载好的包上传到每个服务器上:
-
3:查看是否上传成功:
[root@kafka01 ~]# ls
anaconda-ks.cfg jdk-8u333-linux-x64.tar.gz
- 4:创建文件夹:
mkdir -p /usr/java/
- 5:解压刚刚下载好的包并输出到/usr/java目录下:
tar -zxvf jdk-8u333-linux-x64.tar.gz -C /usr/java/
[root@kafka02 ~]# ls /usr/java/
jdk1.8.0_333
- 6:配置java环境变量:
vi /etc/profile
在文件中末尾添加如下配置:(需要更改的是JAVA_HOME,根据自己的java目录名来更改)
JAVA_HOME=/usr/java/jdk1.8.0_333
CLASSPATH=$JAVA_HOME/lib/
PATH=$PATH:$JAVA_HOME/bin
export PATH JAVA_HOME CLASSPATH
- 7:让配置立即生效:
source /etc/profile
- 8:查看JDK是否安装成功:
[root@kafka01 ~]# java -version
java version "1.8.0_333"
Java(TM) SE Runtime Environment (build 1.8.0_333-b02)
Java HotSpot(TM) 64-Bit Server VM (build 25.333-b02, mixed mode)
分别为每个服务器安装Docker
- 1:切换镜像源
wget https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo -O /etc/yum.repos.d/docker-ce.repo
- 2:查看当前镜像源中支持的docker版本
yum list docker-ce --showduplicates | sort -r
- 3:安装特定版本的docker-ce
yum -y install docker-ce-3:20.10.8-3.el7.x86_64 docker-ce-cli-3:20.10.8-3.el7.x86_64 containerd.io
为每个节点的Docker接入阿里云镜像加速器
配置镜像加速器方法。
- 准备工作:
- 1:首先进入阿里云容器镜像服务 https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors
- 2:点击镜像工具下面的镜像加速器
- 3:拿到你的加速器地址和下面第二步的registry-mirrors的值替换即可。
针对Docker客户端版本大于 1.10.0 的用户,可以通过修改daemon配置文件/etc/docker/daemon.json来使用加速器
- 第一步:
mkdir -p /etc/docker
- 第二步:
cat <<EOF> /etc/docker/daemon.json
{
"exec-opts": ["native.cgroupdriver=systemd"],
"registry-mirrors": [
"https://u01jo9qv.mirror.aliyuncs.com",
"https://hub-mirror.c.163.com",
"https://mirror.baidubce.com"
],
"live-restore": true,
"log-driver":"json-file",
"log-opts": {"max-size":"500m", "max-file":"3"},
"max-concurrent-downloads": 10,
"max-concurrent-uploads": 5,
"storage-driver": "overlay2"
}
EOF
- 第三步:
sudo systemctl daemon-reload
- 第四步:
sudo systemctl restart docker
最后就接入阿里云容器镜像加速器成功啦。
为每个节点的docker设置开机自动启动
sudo systemctl enable docker
分别为每个服务器安装zookeeper3.7.1(搭建zookeeper集群)⭐
- 1:在zookeeper官网上面下载zookeeper稳定版(当前为3.7.1)的tar.gz包,并上传到每个服务器上:
zookeeper官网
- 2:查看刚刚上传的zookeeper包:
[root@kafka01 ~]# pwd
/root
[root@kafka01 ~]# ls | grep zookeeper
apache-zookeeper-3.7.1-bin.tar.gz
- 3:解压我们的zookeeper包:
tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz -C /usr/local
mv /usr/local/apache-zookeeper-3.7.1-bin/ /usr/local/zookeeper
cd /usr/local/zookeeper
- 4:配置关于zookeeper的环境变量:
vi /etc/profile
在文件中末尾添加如下配置:(ZOOKEEPER_HOME需要根据你自己的zookeeper目录来配置)
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$ZOOKEEPER_HOME/bin:$PATH
- 5:让配置立即生效:
source /etc/profile
- 6:创建目录:
cd /usr/local/zookeeper
sudo mkdir data
- 7;添加配置:
cd conf
sudo vi zoo.cfg
内容如下:(dataDir修改成自己的目录,kafka01/02/03是我们在hosts配置的主机名映射,相当于ip)
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
clientPort=2181
server.1=kafka01:2888:3888
server.2=kafka02:2888:3888
server.3=kafka03:2888:3888
initLimit:ZooKeeper集群模式下包含多个zk进程,其中一个进程为leader,余下的进程为follower。
当follower最初与leader建立连接时,它们之间会传输相当多的数据,尤其是follower的数据落后leader很多。initLimit配置follower与leader之间建立连接后进行同步的最长时间。
syncLimit:配置follower和leader之间发送消息,请求和应答的最大时间长度。
tickTime:tickTime则是上述两个超时配置的基本单位,例如对于initLimit,其配置值为5,说明其超时时间为 2000ms * 5 = 10秒。
server.id=host:port1:port2 :其中id为一个数字,表示zk进程的id,这个id也是dataDir目录下myid文件的内容。host是该zk进程所在的IP地址,port1表示follower和leader交换消息所使用的端口,port2表示选举leader所使用的端口。
dataDir:其配置的含义跟单机模式下的含义类似,不同的是集群模式下还有一个myid文件。myid文件的内容只有一行,且内容只能为1 - 255之间的数字,这个数字亦即上面介绍server.id中的id,表示zk进程的id。
- 8:进入data目录:
cd /usr/local/zookeeper/data/
- 9:对每个服务器(kafka01、kafka02、kafka03)配置myid文件:
- 9(1):如果是kafka01服务器,则执行下面这个:(下面的1、2、3就是我们上面指定的server.id,每个zookeeper服务器都要有一个id,并且全局唯一)
echo "1" > myid
- 9(2):如果是kafka02服务器,则执行下面这个:
echo "2" > myid
- 9(3):如果是kafka03服务器,则执行下面这个
echo "3" > myid
- 10:启动zookeeper服务命令:(必须要把全部zookeeper服务器启动之后在执行下一步status命令)
cd /usr/local/zookeeper/bin/
[root@kafka01 bin]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
- 11:对全部的zookeeper服务器执行查看zookeeper集群节点状态命令:(看看哪个是leader节点、哪个是follower节点)。Mode就是某一台zookeeper的角色⭐
[root@kafka01 bin]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
[root@kafka02 data]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader
[root@kafka03 data]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
分别为每个服务器安装Kafka3.2.1(搭建Kafka集群)⭐
- 1:进入kafka官网:
Kafka官网
- 2(下载方式1):下载当前kafka的Binary稳定版(截止到2022-08-29,稳定版本为3.2.1),下载会十分缓慢,大约要1个小时的时间(假如你的网速很慢,那么这种方式就不推荐了。):
- 2(下载方式2):使用我上传kafka_2.13-3.2.1.zip包(注意这个不是tgz包,而是zip包)(推荐这种方式),下载速度很快:
kafka3.2.1快速下载地址
- 3:解压kafka_2.13-3.2.1.zip包,拿到kafka的tgz包:
- 4:将解压好的kafka的tgz包上传到每个服务器上。
- 5:查看每个服务器上是否都已经成功上传了kafka_2.13-3.2.1.tgz包:
[root@kafka01 ~]# pwd
/root
[root@kafka01 ~]# ls | grep kafka
kafka_2.13-3.2.1.tgz
- 6:解压kafka.tgz包到/usr/local下:
tar -zxvf kafka_2.13-3.2.1.tgz -C /usr/local/
- 7:修改kafka目录:
cd /usr/local/
mv kafka_2.13-3.2.1/ kafka
-
**8:修改每个服务器的kafka配置文件:(注意:对应的机器要执行对应的命令,不是都在一台服务器执行)**⭐
- 8(1):在kafka01服务器上修改的配置文件,将下面的内容粘贴上去:⭐
[root@kafka01 local]# rm -f /usr/local/kafka/config/server.properties [root@kafka01 local]# vi /usr/local/kafka/config/server.properties
内容如下:
注意下面3个地方:
①每一个kafka的broker.id都不可以一样,并且要为数字(比如0、1、2都是可以的)!
②log.dirs为你当前机器的kafka的日志数据存储目录
③zookeeper.connect:配置连接Zookeeper集群地址,下面的kafka01:2181(kafka01的意思是zk所在的服务器的ip地址,因为我们配置了hosts,所以就直接用主机名更方便;2181就是zk配置文件中的clientPort)
#broker 的全局唯一编号,不能重复,只能是数字。 broker.id=1 #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘 IO 的线程数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔 log.dirs=/usr/local/kafka/datas #topic 在当前 broker 上的分区个数 num.partitions=1 #用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir=1 # 每个 topic 创建时的副本数,默认时 1 个副本 offsets.topic.replication.factor=1 #segment 文件保留的最长时间,超时将被删除 log.retention.hours=168 #每个 segment 文件的大小,默认最大 1G log.segment.bytes=1073741824 # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期 log.retention.check.interval.ms=300000 #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便我们管理) zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181/kafka
- 8(2):在kafka02服务器上修改的配置文件,将下面的内容粘贴上去:⭐
[root@kafka02 local]# rm -f /usr/local/kafka/config/server.properties [root@kafka02 local]# vi /usr/local/kafka/config/server.properties
内容如下:
#broker 的全局唯一编号,不能重复,只能是数字。 broker.id=2 #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘 IO 的线程数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔 log.dirs=/usr/local/kafka/datas #topic 在当前 broker 上的分区个数 num.partitions=1 #用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir=1 # 每个 topic 创建时的副本数,默认时 1 个副本 offsets.topic.replication.factor=1 #segment 文件保留的最长时间,超时将被删除 log.retention.hours=168 #每个 segment 文件的大小,默认最大 1G log.segment.bytes=1073741824 # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期 log.retention.check.interval.ms=300000 #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便我们管理) zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181/kafka
- 8(3):在kafka03服务器上修改的配置文件,将下面的内容粘贴上去:⭐
[root@kafka03 local]# rm -f /usr/local/kafka/config/server.properties [root@kafka03 local]# vi /usr/local/kafka/config/server.properties
内容如下:
#broker 的全局唯一编号,不能重复,只能是数字。 broker.id=3 #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘 IO 的线程数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔 log.dirs=/usr/local/kafka/datas #topic 在当前 broker 上的分区个数 num.partitions=1 #用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir=1 # 每个 topic 创建时的副本数,默认时 1 个副本 offsets.topic.replication.factor=1 #segment 文件保留的最长时间,超时将被删除 log.retention.hours=168 #每个 segment 文件的大小,默认最大 1G log.segment.bytes=1073741824 # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期 log.retention.check.interval.ms=300000 #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便我们管理) zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181/kafka
-
9:给每个服务器都配置kafka的环境变量:
sudo vim /etc/profile
在最后面追加的内容如下:
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
- 10:让配置立即生效:
source /etc/profile
- 11:启动zk集群。依次在 kafka01、kafka02、kafka03节点上启动zookeeper。(zk要先启动,然后再启动kafka)⭐
/usr/local/zookeeper/bin/zkServer.sh start
- 12:后台模式启动kafka集群。依次在 kafka01、kafka02、kafka03节点上启动kafka。
kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
- 13:查看kafka是否启动成功:
[root@kafka01 local]# jps
3603 Kafka
3166 QuorumPeerMain
4367 Jps
- 14:关闭kafka集群:(可以暂时不关闭,方便后面继续演示)
- 注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper
集群。
- 注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper
kafka-server-stop.sh
- 15:等kafka集群全部关闭之后再关闭zookeeper:(可以暂时不关闭,方便后面继续演示)
zkServer.sh stop
安装MySQL5.7,配置canal+mysql(本次采用Docker的方式)⭐
- 1:创建my.cnf文件(也就是mysql的配置文件)
vim /my-sql/mysql-master/conf/my.cnf
将内容粘贴进my.cnf文件
[client]
# 指定编码格式为utf8,默认的MySQL会有中文乱码问题
default_character_set=utf8
[mysqld]
collation_server=utf8_general_ci
character_set_server=utf8
# 全局唯一id(不允许有相同的)
server_id=201
binlog-ignore-db=mysql
# 指定MySQL二进制日志
log-bin=mysql-bin
# 二进制日志格式,因为要整合canal,所以这里必须要是row
binlog_format=row
#指定具体要同步的数据库,如果不配置则表示所有数据库均开启 Binlog(可以配置多个)
binlog-do-db=canal-test-db1
binlog-do-db=canal-test-db2
- 2:运行一个mysql容器实例。作为Master节点
docker run -p 3307:3306 \
-v /my-sql/mysql-master/log:/var/log/mysql \
-v /my-sql/mysql-master/data:/var/lib/mysql \
-v /my-sql/mysql-master/conf:/etc/mysql \
-e MYSQL_ROOT_PASSWORD=123456 \
--name mysql-master \
-d mysql:5.7
- 3:进入容器内部,并登陆mysql
docker exec -it mysql-master /bin/bash
mysql -uroot -p
- 4:创建canal的mysql帐号,使该canal帐号具有MySQL的Slave (从节点)的权限(也就是能够主从复制), 如果已有账户可直接 grant(这几步都是在mysql容器内部进行,也就是登录了mysql帐号后执行的命令)
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
grant all privileges on *.* to 'canal'@'%' identified by 'canal';
flush privileges;
- 5:退出mysql容器,并重启容器:
docker restart mysql-master
- 6:再次进入容器内部,并登陆mysql
docker exec -it mysql-master /bin/bash
mysql -uroot -p
- 7:查看是否成功开启binlog日志
show variables like '%log_bin%';
案例1:Canal+Kafka实现mysql和redis的数据同步⭐
案例目的:
1:实现canal只监控canal-test-db1数据库下的t_config(主要同步这个表)和.t_user表。
2:当我们修改canal-test-db1数据库下的t_config表的内容会自动同步到Redis中;
3:当我们修改canal-test-db1数据库下的t_user表则不会同步。(虽然t_user表也被canal监控,但是这个案例就要做到在被监控的情况下,而不被同步),说白了就是只同步t_config表。
必要的环境
- 1:jdk8
- 2:zookeeper
- 3:kafka
- 4:canal.deployer
- 5:Redis
- 6:Lombok
配置canal.deployer
- 1:进入Canal的github仓库:
Canal的github仓库地址
-
2:选择canal.deployer的版本(我们选择的是最新版v1.1.6):
- 2(1)方式1:直接从GitHub上面下载。(下载速度十分慢,不推荐)
- 2(2)方式2:从我的csdn上面下载。(速度很快,推荐!⭐)
canal.deployer快速下载地址
-
3:上传到我们的服务器上(这里我们就拿kafka01服务器作为canal服务器),生产环境可以另外创建一个新的canal服务器。
-
4:查看canal是否上传到我们的服务器上:(只上传到kafka01服务器上)
[root@kafka01 ~]# ls | grep canal
canal.deployer-1.1.6.tar.gz
- 5:解压canal.deployer:
mkdir -p /usr/local/canal-deployer
tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal-deployer
cd /usr/local/canal-deployer/conf
- 6:修改canal.properties文件:
vim /usr/local/canal-deployer/conf/canal.properties
- 修改地方1(配置zookeeper集群地址):
- 修改地方2(修改成kafka模式):
- 修改地方3(修改canal数据库用户的账号密码):
- 修改地方4(在conf目录下要有example同名的目录,可以默认不改,意思就是instance.properties在/usr/local/canal-deployer/conf/example目录下。):
- 例如要将example改成abc1,则也要在conf目录下创建一个abc1的目录,并在里面创建instance.properties配置文件。
- 修改地方5(配置kafka集群地址):
- 7:配置instance.properties配置文件:(默认是在example目录下)
cd /usr/local/canal-deployer/conf/example
vim instance.properties
- 修改地方1。canal数据库的id,必须要全局唯一(和mysql的id不能设置一样):
- 修改地方2。我们MySQL的master数据库的ip+端口(我们上面设置mysql的是3307端口):
- 修改地方3。在MySQL的master数据库中canal的账号密码:
- 修改地方4。新增一个配置,设置默认同步的数据库名:⭐
- canal.instance.defaultDatabaseName =监控的数据库名
- 例如canal.instance.defaultDatabaseName=canal-test-db1
- canal.instance.defaultDatabaseName =监控的数据库名
- 修改地方5:匹配表名的正则表达式:(指定canal要监控的数据库.表名)很重要⭐
- canal.instance.filter.regex=canal-test-db1.t_config,canal-test-db1.t_user
- 修改地方6:指定用于canal传输消息的kafka的topic名称:(我们指定的topic名称为canal-test-topic)
启动canal.deployer⭐
- 1:跳转目录:
cd /usr/local/canal-deployer/bin/
- 2:执行sh:
./startup.sh
配置hosts(由于我是Windows运行,没有配置hosts导致无法识别kafka01主机名)⭐
- :修改C:\Windows\System32\drivers\etc路径下的hosts文件:
创建一个SpringBoot项目⭐
项目结构
准备需要同步的数据库表⭐
CREATE DATABASE `canal-test-db1`;
USE `canal-test-db1`;
CREATE TABLE `t_config` (
`config_id` bigint(20) NOT NULL,
`config_info` text,
`datetime` datetime DEFAULT NULL,
`desc` varchar(255) DEFAULT NULL,
PRIMARY KEY (`config_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
pom.xml⭐
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>canal-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<!-- springboot版本-->
<spring-boot.version>2.5.9</spring-boot.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- spring整合kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Redis服务启动器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.70</version>
</dependency>
<!-- springboot-web依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
application.yml⭐
server:
port: 8081
# spring整合kafka配置
spring:
kafka:
# kafka集群地址(可以多个)
bootstrap-servers:
- 192.168.184.201:9092
- 192.168.184.202:9092
- 192.168.184.203:9092
#kafka消费者配置
consumer:
# 指定一个消费者组id
group-id: canal-group1
# key/value的反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#手动提交第1步:开启手动提交offset(true的话就是消费完一条消息自动会提交)
enable-auto-commit: false
# kafka生产者配置
producer:
# key/value的序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
listener:
#手动提交第2步:ack设置为手动(enable-auto-commit要设置为false)
# manual_immediate:每处理完业务手动调用Acknowledgment.acknowledge()后立即提交
ack-mode: manual_immediate
#redis配置
redis:
host: 127.0.0.1
# password:
port: 6379
database: 2
RedisTemplateConfig(配置类)
package com.boot.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisTemplateConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
redisTemplate.setValueSerializer(jsonRedisSerializer);
redisTemplate.setHashValueSerializer(jsonRedisSerializer);
// 解决查询缓存转换异常的问题
ObjectMapper om = new ObjectMapper();
// 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jsonRedisSerializer.setObjectMapper(om); //如果不设置,存储到redis的对象取出来将无法进行转换
redisTemplate.setDefaultSerializer(jsonRedisSerializer);
return redisTemplate;
}
}
Config.class⭐
package com.boot.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import java.io.Serializable;
/**
* 该实体类对应着数据库表t_config字段
* @author youzhengjie 2022-09-01 16:55:40
*/
//lombok注解简化开发
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true) //开启链式编程
public class Config implements Serializable {
private Long configId;
private String configInfo;
private String datetime;
private String desc;
}
canal要求必须要的实体类(用于接收canal发送到kafka的同步消息)⭐
ConfigCanalBean.class⭐
package com.boot.entity.config_canal;
import com.boot.entity.Config;
import lombok.Data;
import java.util.List;
/**
* 这个类是接收canal发送过来的消息所必须要的
* @author youzhengjie 2022-09-01 16:55:18
*/
@Data
public class ConfigCanalBean {
//config实体类的数据
private List<Config> data;
//数据库名称
private String database;
private long es;
//递增
private int id;
//是否是DDL语句
private boolean isDdl;
//表结构的字段类型
private MysqlType mysqlType;
//UPDATE语句,旧数据
private String old;
//主键名称
private List<String> pkNames;
//sql语句
private String sql;
//暂时没发现什么用,不过也要写上这个属性
private SqlType sqlType;
//表名
private String table;
private long ts;
//(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等
private String type;
//getter、setter方法
}
MysqlType.class⭐
package com.boot.entity.config_canal;
import lombok.Data;
/**
* 和SqlType类差不多。(就是把我们要同步的t_config数据库表的字段全部复制到这里,然后全部改成String类型即可)
* 下面的属性名支持驼峰法。比如下面的configId属性可以和t_config表的config_id一一对应,而无需更改。
* 注意:这个类的属性全部都要是String类型
* @author youzhengjie 2022-09-01 16:55:25
*/
@Data
public class MysqlType {
private String configId;
private String configInfo;
private String datetime;
private String desc;
}
SqlType.class⭐
package com.boot.entity.config_canal;
import lombok.Data;
/**
* 和MysqlType类差不多。(就是把我们要同步的t_config数据库表的字段全部复制到这里,然后全部改成int类型即可)
* 下面的属性名支持驼峰法。比如下面的configId属性可以和t_config表的config_id一一对应,而无需更改。
* 注意:这个类的属性全部都要是int类型
* @author youzhengjie 2022-09-01 16:55:32
*/
@Data
public class SqlType {
private int configId;
private int configInfo;
private int datetime;
private int desc;
}
ConfigCanalRedisConsumer(kafka消费者类,监听指定topic,把canal发送的消息同步到Redis中)⭐
package com.boot.comsumer;
import com.alibaba.fastjson.JSONObject;
import com.boot.entity.Config;
import com.boot.entity.config_canal.ConfigCanalBean;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* kafka消费者(监听名为canal-test-topic的topic),同步Redis
* @author youzhengjie 2022-09-01 16:54:28
*/
@Component
@Slf4j
public class ConfigCanalRedisConsumer {
@Autowired
private RedisTemplate redisTemplate;
//redis的key格式:(数据库.表名_字段的id)
private static final String KEY_PREFIX = "canal-test-db1.t_config_";
//过期时间(单位:小时)
private static final int TIME_OUT = 24;
/**
* @param consumer 接收消费记录(消息)
* @param ack 手动提交消息
*/
@KafkaListener(topics = "canal-test-topic")
public void receive(ConsumerRecord<String, String> consumer, Acknowledgment ack) {
try {
//获取canal的消息
String value = (String) consumer.value();
log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(), consumer.partition(), consumer.offset(), value);
//转换为javaBean
ConfigCanalBean canalBean = JSONObject.parseObject(value, ConfigCanalBean.class);
/*
由于我们canal.instance配置了监控canal-test-db1.t_config表和canal-test-db1.t_user表(生产环境下可以启动多个canal,每一个canal监听一张需要同步的表)
所以我们要对这两张表分开处理。(可以通过他们的表名(canalBean.getTable())来区分)
如果canalBean.getTable()获取的表名是t_config,则同步到redis,如果不是则不管。
*/
//System.out.println(canalBean);
if("t_config".equals(canalBean.getTable())){
//获取是否是DDL语句
boolean isDdl = canalBean.isDdl();
//获取当前sql语句的类型(比如INSERT、DELETE等等)
String type = canalBean.getType();
List<Config> configList = canalBean.getData();
//如果不是DDL语句
if (!isDdl) {
//INSERT和UPDATE都是一样的操作
if ("INSERT".equals(type) || "UPDATE".equals(type)) {
//新增语句
for (Config config : configList) {
Long id = config.getConfigId();
//新增到redis中,过期时间是10分钟
redisTemplate.
opsForValue().
set(KEY_PREFIX + id, JSONObject.toJSONString(config), TIME_OUT, TimeUnit.HOURS);
}
}else if("DELETE".equals(type)){
//删除语句
for (Config config : configList) {
Long id = config.getConfigId();
//从redis中删除
redisTemplate.delete(KEY_PREFIX+id);
}
}
}
}
//最后,如果上面的代码没有报错的情况下,可以确认消息了。(很重要)
ack.acknowledge();
}catch (Exception e){
throw new RuntimeException();
}
}
}
创建kafka的topic(我们指定的topic名称为canal-test-topic)
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server kafka01:9092 --topic canal-test-topic --create
开始测试canal同步效果⭐
测试1:给t_config表插入数据