Canal整合SpringBoot详解(一)

news2024/11/18 9:45:05

文章目录

    • Canal整合SpringBoot详解(一)
      • 什么是canal
      • 搭建Kafka3.2.1集群⭐
        • Kafka集群机器规划
        • 创建3台虚拟机(centos7系统)
        • 必要的环境准备(3台虚拟机都要执行如下操作)⭐
          • 分别修改每个服务器的hosts文件(将上面的ip和主机名配置上去)
          • 分别关闭每个服务器的防火墙
          • 分别为每个服务器安装jdk8
          • 分别为每个服务器安装Docker
            • 为每个节点的Docker接入阿里云镜像加速器
            • 为每个节点的docker设置开机自动启动
          • 分别为每个服务器安装zookeeper3.7.1(搭建zookeeper集群)⭐
          • 分别为每个服务器安装Kafka3.2.1(搭建Kafka集群)⭐
          • 安装MySQL5.7,配置canal+mysql(本次采用Docker的方式)⭐
      • 案例1:Canal+Kafka实现mysql和redis的数据同步⭐
        • 必要的环境
        • 配置canal.deployer
        • 启动canal.deployer⭐
        • 配置hosts(由于我是Windows运行,没有配置hosts导致无法识别kafka01主机名)⭐
        • 创建一个SpringBoot项目⭐
          • 项目结构
          • 准备需要同步的数据库表⭐
          • pom.xml⭐
          • application.yml⭐
          • RedisTemplateConfig(配置类)
          • Config.class⭐
          • canal要求必须要的实体类(用于接收canal发送到kafka的同步消息)⭐
            • ConfigCanalBean.class⭐
            • MysqlType.class⭐
            • SqlType.class⭐
          • ConfigCanalRedisConsumer(kafka消费者类,监听指定topic,把canal发送的消息同步到Redis中)⭐
        • 创建kafka的topic(我们指定的topic名称为canal-test-topic)
        • 开始测试canal同步效果⭐
          • 测试1:给t_config表插入数据
          • 测试2:修改t_config表数据
          • 测试3:删除t_config表数据

Canal整合SpringBoot详解(一)

什么是canal

  • canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
  • canal工作原理:
    • canal的工作原理就是把自己伪装成MySQL slave从节点,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如RocketMQ、Kafka、ElasticSearch等等。
  • canal能做什么:
    • 数据库镜像
    • 数据库实时备份
    • 索引构建和实时维护
    • 业务cache(缓存)刷新
    • 带业务逻辑的增量数据处理

搭建Kafka3.2.1集群⭐

Kafka集群机器规划
IP地址主机名需要安装的资源操作系统
192.168.184.201kafka01jdk、Docker、zookeeper、Kafkacentos7.9
192.168.184.202kafka02jdk、Docker、zookeeper、Kafkacentos7.9
192.168.184.203kafka03jdk、Docker、zookeeper、Kafkacentos7.9
创建3台虚拟机(centos7系统)

在这里插入图片描述在这里插入图片描述
在这里插入图片描述

必要的环境准备(3台虚拟机都要执行如下操作)⭐
分别修改每个服务器的hosts文件(将上面的ip和主机名配置上去)
  • 1:进入hosts文件:
vi /etc/hosts

在最后面追加内容如下:(这个需要根据你自己服务器的ip来配置)

192.168.184.201 kafka01
192.168.184.202 kafka02
192.168.184.203 kafka03
分别关闭每个服务器的防火墙
systemctl stop firewalld
systemctl disable firewalld
分别为每个服务器安装jdk8
  • 1:进入oracle官网下载jdk8的tar.gz包:

  • 2:将下载好的包上传到每个服务器上:

  • 3:查看是否上传成功:

[root@kafka01 ~]# ls
anaconda-ks.cfg  jdk-8u333-linux-x64.tar.gz
  • 4:创建文件夹:
mkdir -p /usr/java/
  • 5:解压刚刚下载好的包并输出到/usr/java目录下:
tar -zxvf jdk-8u333-linux-x64.tar.gz -C /usr/java/
[root@kafka02 ~]# ls /usr/java/
jdk1.8.0_333
  • 6:配置java环境变量:
vi /etc/profile

在文件中末尾添加如下配置:(需要更改的是JAVA_HOME,根据自己的java目录名来更改)

JAVA_HOME=/usr/java/jdk1.8.0_333
CLASSPATH=$JAVA_HOME/lib/
PATH=$PATH:$JAVA_HOME/bin
export PATH JAVA_HOME CLASSPATH
  • 7:让配置立即生效:
source /etc/profile
  • 8:查看JDK是否安装成功:
[root@kafka01 ~]# java -version
java version "1.8.0_333"
Java(TM) SE Runtime Environment (build 1.8.0_333-b02)
Java HotSpot(TM) 64-Bit Server VM (build 25.333-b02, mixed mode)
分别为每个服务器安装Docker
  • 1:切换镜像源
wget https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo -O /etc/yum.repos.d/docker-ce.repo
  • 2:查看当前镜像源中支持的docker版本
yum list docker-ce --showduplicates | sort -r
  • 3:安装特定版本的docker-ce
yum -y install docker-ce-3:20.10.8-3.el7.x86_64 docker-ce-cli-3:20.10.8-3.el7.x86_64 containerd.io
为每个节点的Docker接入阿里云镜像加速器

配置镜像加速器方法。

  • 准备工作:
  • 1:首先进入阿里云容器镜像服务 https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors
  • 2:点击镜像工具下面的镜像加速器
  • 3:拿到你的加速器地址和下面第二步的registry-mirrors的值替换即可。

针对Docker客户端版本大于 1.10.0 的用户,可以通过修改daemon配置文件/etc/docker/daemon.json来使用加速器

  • 第一步:
mkdir -p /etc/docker
  • 第二步:
cat <<EOF> /etc/docker/daemon.json
{
  "exec-opts": ["native.cgroupdriver=systemd"],	
  "registry-mirrors": [
    "https://u01jo9qv.mirror.aliyuncs.com",
    "https://hub-mirror.c.163.com",
    "https://mirror.baidubce.com"
  ],
  "live-restore": true,
  "log-driver":"json-file",
  "log-opts": {"max-size":"500m", "max-file":"3"},
  "max-concurrent-downloads": 10,
  "max-concurrent-uploads": 5,
  "storage-driver": "overlay2"
}
EOF
  • 第三步:
sudo systemctl daemon-reload
  • 第四步:
sudo systemctl restart docker

最后就接入阿里云容器镜像加速器成功啦。

为每个节点的docker设置开机自动启动
sudo systemctl enable docker
分别为每个服务器安装zookeeper3.7.1(搭建zookeeper集群)⭐
  • 1:在zookeeper官网上面下载zookeeper稳定版(当前为3.7.1)的tar.gz包,并上传到每个服务器上:

zookeeper官网

在这里插入图片描述在这里插入图片描述

  • 2:查看刚刚上传的zookeeper包:
[root@kafka01 ~]# pwd
/root
[root@kafka01 ~]# ls | grep zookeeper
apache-zookeeper-3.7.1-bin.tar.gz
  • 3:解压我们的zookeeper包:
tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz -C /usr/local
mv /usr/local/apache-zookeeper-3.7.1-bin/ /usr/local/zookeeper
cd /usr/local/zookeeper
  • 4:配置关于zookeeper的环境变量:
vi /etc/profile

在文件中末尾添加如下配置:(ZOOKEEPER_HOME需要根据你自己的zookeeper目录来配置)

export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$ZOOKEEPER_HOME/bin:$PATH
  • 5:让配置立即生效:
source /etc/profile
  • 6:创建目录:
cd /usr/local/zookeeper
sudo mkdir data
  • 7;添加配置:
cd conf
sudo vi zoo.cfg

内容如下:(dataDir修改成自己的目录,kafka01/02/03是我们在hosts配置的主机名映射,相当于ip)

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
clientPort=2181
server.1=kafka01:2888:3888
server.2=kafka02:2888:3888
server.3=kafka03:2888:3888

initLimit:ZooKeeper集群模式下包含多个zk进程,其中一个进程为leader,余下的进程为follower。
当follower最初与leader建立连接时,它们之间会传输相当多的数据,尤其是follower的数据落后leader很多。initLimit配置follower与leader之间建立连接后进行同步的最长时间。

syncLimit:配置follower和leader之间发送消息,请求和应答的最大时间长度。

tickTime:tickTime则是上述两个超时配置的基本单位,例如对于initLimit,其配置值为5,说明其超时时间为 2000ms * 5 = 10秒。

server.id=host:port1:port2其中id为一个数字,表示zk进程的id,这个id也是dataDir目录下myid文件的内容。host是该zk进程所在的IP地址,port1表示follower和leader交换消息所使用的端口,port2表示选举leader所使用的端口。

dataDir:其配置的含义跟单机模式下的含义类似,不同的是集群模式下还有一个myid文件。myid文件的内容只有一行,且内容只能为1 - 255之间的数字,这个数字亦即上面介绍server.id中的id,表示zk进程的id。

  • 8:进入data目录:
cd /usr/local/zookeeper/data/
  • 9:对每个服务器(kafka01、kafka02、kafka03)配置myid文件:
  • 9(1):如果是kafka01服务器,则执行下面这个:(下面的1、2、3就是我们上面指定的server.id,每个zookeeper服务器都要有一个id,并且全局唯一)
echo "1" > myid
  • 9(2):如果是kafka02服务器,则执行下面这个:
echo "2" > myid
  • 9(3):如果是kafka03服务器,则执行下面这个
echo "3" > myid
  • 10:启动zookeeper服务命令:(必须要把全部zookeeper服务器启动之后在执行下一步status命令)
cd /usr/local/zookeeper/bin/
[root@kafka01 bin]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
  • 11:对全部的zookeeper服务器执行查看zookeeper集群节点状态命令:(看看哪个是leader节点、哪个是follower节点)。Mode就是某一台zookeeper的角色
[root@kafka01 bin]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
[root@kafka02 data]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader
[root@kafka03 data]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
分别为每个服务器安装Kafka3.2.1(搭建Kafka集群)⭐
  • 1:进入kafka官网:

Kafka官网

  • 2(下载方式1):下载当前kafka的Binary稳定版(截止到2022-08-29,稳定版本为3.2.1),下载会十分缓慢,大约要1个小时的时间(假如你的网速很慢,那么这种方式就不推荐了。):

在这里插入图片描述

  • 2(下载方式2):使用我上传kafka_2.13-3.2.1.zip包(注意这个不是tgz包,而是zip包)(推荐这种方式),下载速度很快:

kafka3.2.1快速下载地址

在这里插入图片描述

  • 3:解压kafka_2.13-3.2.1.zip包,拿到kafka的tgz包:

在这里插入图片描述

  • 4:将解压好的kafka的tgz包上传到每个服务器上。
  • 5:查看每个服务器上是否都已经成功上传了kafka_2.13-3.2.1.tgz包:
[root@kafka01 ~]# pwd
/root
[root@kafka01 ~]# ls | grep kafka
kafka_2.13-3.2.1.tgz
  • 6:解压kafka.tgz包到/usr/local下:
tar -zxvf kafka_2.13-3.2.1.tgz -C /usr/local/
  • 7:修改kafka目录:
cd /usr/local/
mv kafka_2.13-3.2.1/ kafka
  • **8:修改每个服务器的kafka配置文件:(注意:对应的机器要执行对应的命令,不是都在一台服务器执行)**⭐

    • 8(1):在kafka01服务器上修改的配置文件,将下面的内容粘贴上去:⭐
    [root@kafka01 local]# rm -f /usr/local/kafka/config/server.properties
    [root@kafka01 local]# vi /usr/local/kafka/config/server.properties
    

    内容如下:

    注意下面3个地方:

    ①每一个kafka的broker.id都不可以一样,并且要为数字(比如0、1、2都是可以的)!

    ②log.dirs为你当前机器的kafka的日志数据存储目录

    ③zookeeper.connect:配置连接Zookeeper集群地址,下面的kafka01:2181(kafka01的意思是zk所在的服务器的ip地址,因为我们配置了hosts,所以就直接用主机名更方便;2181就是zk配置文件中的clientPort)

    #broker 的全局唯一编号,不能重复,只能是数字。
    broker.id=1
    #处理网络请求的线程数量
    num.network.threads=3
    #用来处理磁盘 IO 的线程数量
    num.io.threads=8
    #发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    #接收套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    #请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
    log.dirs=/usr/local/kafka/datas
    #topic 在当前 broker 上的分区个数
    num.partitions=1
    #用来恢复和清理 data 下数据的线程数量
    num.recovery.threads.per.data.dir=1
    # 每个 topic 创建时的副本数,默认时 1 个副本
    offsets.topic.replication.factor=1
    #segment 文件保留的最长时间,超时将被删除
    log.retention.hours=168
    #每个 segment 文件的大小,默认最大 1G
    log.segment.bytes=1073741824
    # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
    log.retention.check.interval.ms=300000
    #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便我们管理)
    zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181/kafka
    
    • 8(2):在kafka02服务器上修改的配置文件,将下面的内容粘贴上去:⭐
    [root@kafka02 local]# rm -f /usr/local/kafka/config/server.properties
    [root@kafka02 local]# vi /usr/local/kafka/config/server.properties
    

    内容如下:

    #broker 的全局唯一编号,不能重复,只能是数字。
    broker.id=2
    #处理网络请求的线程数量
    num.network.threads=3
    #用来处理磁盘 IO 的线程数量
    num.io.threads=8
    #发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    #接收套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    #请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
    log.dirs=/usr/local/kafka/datas
    #topic 在当前 broker 上的分区个数
    num.partitions=1
    #用来恢复和清理 data 下数据的线程数量
    num.recovery.threads.per.data.dir=1
    # 每个 topic 创建时的副本数,默认时 1 个副本
    offsets.topic.replication.factor=1
    #segment 文件保留的最长时间,超时将被删除
    log.retention.hours=168
    #每个 segment 文件的大小,默认最大 1G
    log.segment.bytes=1073741824
    # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
    log.retention.check.interval.ms=300000
    #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便我们管理)
    zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181/kafka
    
    • 8(3):在kafka03服务器上修改的配置文件,将下面的内容粘贴上去:⭐
    [root@kafka03 local]# rm -f /usr/local/kafka/config/server.properties
    [root@kafka03 local]# vi /usr/local/kafka/config/server.properties
    

    内容如下:

    #broker 的全局唯一编号,不能重复,只能是数字。
    broker.id=3
    #处理网络请求的线程数量
    num.network.threads=3
    #用来处理磁盘 IO 的线程数量
    num.io.threads=8
    #发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    #接收套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    #请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
    log.dirs=/usr/local/kafka/datas
    #topic 在当前 broker 上的分区个数
    num.partitions=1
    #用来恢复和清理 data 下数据的线程数量
    num.recovery.threads.per.data.dir=1
    # 每个 topic 创建时的副本数,默认时 1 个副本
    offsets.topic.replication.factor=1
    #segment 文件保留的最长时间,超时将被删除
    log.retention.hours=168
    #每个 segment 文件的大小,默认最大 1G
    log.segment.bytes=1073741824
    # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
    log.retention.check.interval.ms=300000
    #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便我们管理)
    zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181/kafka
    
  • 9:给每个服务器都配置kafka的环境变量:

sudo vim /etc/profile

在最后面追加的内容如下:

export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
  • 10:让配置立即生效:
source /etc/profile
  • 11:启动zk集群。依次在 kafka01、kafka02、kafka03节点上启动zookeeper。(zk要先启动,然后再启动kafka)⭐
/usr/local/zookeeper/bin/zkServer.sh start
  • 12:后台模式启动kafka集群。依次在 kafka01、kafka02、kafka03节点上启动kafka。
kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
  • 13:查看kafka是否启动成功:
[root@kafka01 local]# jps
3603 Kafka
3166 QuorumPeerMain
4367 Jps
  • 14:关闭kafka集群:(可以暂时不关闭,方便后面继续演示)
    • 注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper
      集群。
kafka-server-stop.sh
  • 15:等kafka集群全部关闭之后再关闭zookeeper:(可以暂时不关闭,方便后面继续演示)
zkServer.sh stop
安装MySQL5.7,配置canal+mysql(本次采用Docker的方式)⭐
  • 1:创建my.cnf文件(也就是mysql的配置文件)
vim /my-sql/mysql-master/conf/my.cnf

将内容粘贴进my.cnf文件

[client]
# 指定编码格式为utf8,默认的MySQL会有中文乱码问题
default_character_set=utf8
[mysqld]
collation_server=utf8_general_ci
character_set_server=utf8

# 全局唯一id(不允许有相同的)
server_id=201
binlog-ignore-db=mysql
# 指定MySQL二进制日志
log-bin=mysql-bin
# 二进制日志格式,因为要整合canal,所以这里必须要是row
binlog_format=row
#指定具体要同步的数据库,如果不配置则表示所有数据库均开启 Binlog(可以配置多个)
binlog-do-db=canal-test-db1
binlog-do-db=canal-test-db2
  • 2:运行一个mysql容器实例。作为Master节点
docker run -p 3307:3306 \
-v /my-sql/mysql-master/log:/var/log/mysql \
-v /my-sql/mysql-master/data:/var/lib/mysql \
-v /my-sql/mysql-master/conf:/etc/mysql \
-e MYSQL_ROOT_PASSWORD=123456 \
--name mysql-master \
-d mysql:5.7
  • 3:进入容器内部,并登陆mysql
docker exec -it mysql-master /bin/bash
mysql -uroot -p
  • 4:创建canal的mysql帐号,使该canal帐号具有MySQL的Slave (从节点)的权限(也就是能够主从复制), 如果已有账户可直接 grant(这几步都是在mysql容器内部进行,也就是登录了mysql帐号后执行的命令)
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
grant all privileges on *.* to 'canal'@'%' identified by 'canal';
flush privileges;
  • 5:退出mysql容器,并重启容器:
docker restart mysql-master
  • 6:再次进入容器内部,并登陆mysql
docker exec -it mysql-master /bin/bash
mysql -uroot -p
  • 7:查看是否成功开启binlog日志
show variables like '%log_bin%';

案例1:Canal+Kafka实现mysql和redis的数据同步⭐

案例目的:

1:实现canal只监控canal-test-db1数据库下的t_config(主要同步这个表)和.t_user表。

2:当我们修改canal-test-db1数据库下的t_config表的内容会自动同步到Redis中;

3:当我们修改canal-test-db1数据库下的t_user表则不会同步。(虽然t_user表也被canal监控,但是这个案例就要做到在被监控的情况下,而不被同步),说白了就是只同步t_config表。

必要的环境
  • 1:jdk8
  • 2:zookeeper
  • 3:kafka
  • 4:canal.deployer
  • 5:Redis
  • 6:Lombok
配置canal.deployer
  • 1:进入Canal的github仓库:

Canal的github仓库地址

  • 2:选择canal.deployer的版本(我们选择的是最新版v1.1.6):

    • 2(1)方式1:直接从GitHub上面下载。(下载速度十分慢,不推荐)

在这里插入图片描述在这里插入图片描述
在这里插入图片描述

  • 2(2)方式2:从我的csdn上面下载。(速度很快,推荐!⭐)

canal.deployer快速下载地址

在这里插入图片描述

  • 3:上传到我们的服务器上(这里我们就拿kafka01服务器作为canal服务器),生产环境可以另外创建一个新的canal服务器。

  • 4:查看canal是否上传到我们的服务器上:(只上传到kafka01服务器上)

[root@kafka01 ~]# ls | grep canal
canal.deployer-1.1.6.tar.gz
  • 5:解压canal.deployer:
mkdir -p /usr/local/canal-deployer
tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal-deployer
cd /usr/local/canal-deployer/conf
  • 6:修改canal.properties文件:
vim /usr/local/canal-deployer/conf/canal.properties
  • 修改地方1(配置zookeeper集群地址):

在这里插入图片描述

  • 修改地方2(修改成kafka模式):

在这里插入图片描述

  • 修改地方3(修改canal数据库用户的账号密码):

在这里插入图片描述

  • 修改地方4(在conf目录下要有example同名的目录,可以默认不改,意思就是instance.properties在/usr/local/canal-deployer/conf/example目录下。):
    • 例如要将example改成abc1,则也要在conf目录下创建一个abc1的目录,并在里面创建instance.properties配置文件。

在这里插入图片描述

  • 修改地方5(配置kafka集群地址):

在这里插入图片描述

  • 7:配置instance.properties配置文件:(默认是在example目录下)
cd /usr/local/canal-deployer/conf/example
vim instance.properties
  • 修改地方1。canal数据库的id,必须要全局唯一(和mysql的id不能设置一样):

在这里插入图片描述

  • 修改地方2。我们MySQL的master数据库的ip+端口(我们上面设置mysql的是3307端口):

在这里插入图片描述

  • 修改地方3。在MySQL的master数据库中canal的账号密码:

在这里插入图片描述

  • 修改地方4。新增一个配置,设置默认同步的数据库名:⭐
    • canal.instance.defaultDatabaseName =监控的数据库名
      • 例如canal.instance.defaultDatabaseName=canal-test-db1

在这里插入图片描述

  • 修改地方5:匹配表名的正则表达式:(指定canal要监控的数据库.表名)很重要⭐
    • canal.instance.filter.regex=canal-test-db1.t_config,canal-test-db1.t_user

在这里插入图片描述

  • 修改地方6:指定用于canal传输消息的kafka的topic名称:(我们指定的topic名称为canal-test-topic)

在这里插入图片描述

启动canal.deployer⭐
  • 1:跳转目录:
cd /usr/local/canal-deployer/bin/
  • 2:执行sh:
./startup.sh
配置hosts(由于我是Windows运行,没有配置hosts导致无法识别kafka01主机名)⭐
  • :修改C:\Windows\System32\drivers\etc路径下的hosts文件:

在这里插入图片描述

创建一个SpringBoot项目⭐
项目结构

在这里插入图片描述

准备需要同步的数据库表⭐
CREATE DATABASE `canal-test-db1`;

USE `canal-test-db1`;

CREATE TABLE `t_config` (
  `config_id` bigint(20) NOT NULL,
  `config_info` text,
  `datetime` datetime DEFAULT NULL,
  `desc` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`config_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
pom.xml⭐
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>canal-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <!--        springboot版本-->
        <spring-boot.version>2.5.9</spring-boot.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>

<!--        spring整合kafka依赖-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <!-- Redis服务启动器 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.70</version>
        </dependency>

        <!--        springboot-web依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--        lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

</project>
application.yml⭐
server:
  port: 8081
# spring整合kafka配置
spring:
  kafka:
    # kafka集群地址(可以多个)
    bootstrap-servers:
      - 192.168.184.201:9092
      - 192.168.184.202:9092
      - 192.168.184.203:9092
    #kafka消费者配置
    consumer:
      # 指定一个消费者组id
      group-id: canal-group1
      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #手动提交第1步:开启手动提交offset(true的话就是消费完一条消息自动会提交)
      enable-auto-commit: false
    # kafka生产者配置
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    listener:
      #手动提交第2步:ack设置为手动(enable-auto-commit要设置为false)
      # manual_immediate:每处理完业务手动调用Acknowledgment.acknowledge()后立即提交
      ack-mode: manual_immediate
  #redis配置
  redis:
    host: 127.0.0.1
    #    password:
    port: 6379
    database: 2
RedisTemplateConfig(配置类)
package com.boot.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisTemplateConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        redisTemplate.setKeySerializer(stringRedisSerializer);
        redisTemplate.setHashKeySerializer(stringRedisSerializer);
        redisTemplate.setValueSerializer(jsonRedisSerializer);
        redisTemplate.setHashValueSerializer(jsonRedisSerializer);
        // 解决查询缓存转换异常的问题
        ObjectMapper om = new ObjectMapper();
        // 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        // 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jsonRedisSerializer.setObjectMapper(om); //如果不设置,存储到redis的对象取出来将无法进行转换
        redisTemplate.setDefaultSerializer(jsonRedisSerializer);
        return redisTemplate;
    }
}
Config.class⭐
package com.boot.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import java.io.Serializable;

/**
 * 该实体类对应着数据库表t_config字段
 * @author youzhengjie 2022-09-01 16:55:40
 */

//lombok注解简化开发
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true) //开启链式编程
public class Config implements Serializable {
    private Long configId;
    private String configInfo;
    private String datetime;
    private String desc;
}
canal要求必须要的实体类(用于接收canal发送到kafka的同步消息)⭐
ConfigCanalBean.class⭐
package com.boot.entity.config_canal;

import com.boot.entity.Config;
import lombok.Data;

import java.util.List;

/**
 * 这个类是接收canal发送过来的消息所必须要的
 * @author youzhengjie 2022-09-01 16:55:18
 */
@Data
public class ConfigCanalBean {
    //config实体类的数据
    private List<Config> data;
    //数据库名称
    private String database;
    private long es;
    //递增
    private int id;
    //是否是DDL语句
    private boolean isDdl;
    //表结构的字段类型
    private MysqlType mysqlType;
    //UPDATE语句,旧数据
    private String old;
    //主键名称
    private List<String> pkNames;
    //sql语句
    private String sql;
    //暂时没发现什么用,不过也要写上这个属性
    private SqlType sqlType;
    //表名
    private String table;
    private long ts;
    //(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等
    private String type;
    //getter、setter方法
}
MysqlType.class⭐
package com.boot.entity.config_canal;

import lombok.Data;

/**
 * 和SqlType类差不多。(就是把我们要同步的t_config数据库表的字段全部复制到这里,然后全部改成String类型即可)
 * 下面的属性名支持驼峰法。比如下面的configId属性可以和t_config表的config_id一一对应,而无需更改。
 * 注意:这个类的属性全部都要是String类型
 * @author youzhengjie 2022-09-01 16:55:25
 */
@Data
public class MysqlType {

    private String configId;

    private String configInfo;

    private String datetime;

    private String desc;
}
SqlType.class⭐
package com.boot.entity.config_canal;

import lombok.Data;

/**
 * 和MysqlType类差不多。(就是把我们要同步的t_config数据库表的字段全部复制到这里,然后全部改成int类型即可)
 * 下面的属性名支持驼峰法。比如下面的configId属性可以和t_config表的config_id一一对应,而无需更改。
 * 注意:这个类的属性全部都要是int类型
 * @author youzhengjie 2022-09-01 16:55:32
 */
@Data
public class SqlType {

    private int configId;

    private int configInfo;

    private int datetime;

    private int desc;

}
ConfigCanalRedisConsumer(kafka消费者类,监听指定topic,把canal发送的消息同步到Redis中)⭐
package com.boot.comsumer;

import com.alibaba.fastjson.JSONObject;
import com.boot.entity.Config;
import com.boot.entity.config_canal.ConfigCanalBean;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * kafka消费者(监听名为canal-test-topic的topic),同步Redis
 * @author youzhengjie 2022-09-01 16:54:28
 */
@Component
@Slf4j
public class ConfigCanalRedisConsumer {

    @Autowired
    private RedisTemplate redisTemplate;

    //redis的key格式:(数据库.表名_字段的id)
    private static final String KEY_PREFIX = "canal-test-db1.t_config_";

    //过期时间(单位:小时)
    private static final int TIME_OUT = 24;


    /**
     * @param consumer 接收消费记录(消息)
     * @param ack 手动提交消息
     */
    @KafkaListener(topics = "canal-test-topic")
    public void receive(ConsumerRecord<String, String> consumer, Acknowledgment ack) {

        try {
            //获取canal的消息
            String value = (String) consumer.value();
            log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(), consumer.partition(), consumer.offset(), value);

            //转换为javaBean
            ConfigCanalBean canalBean = JSONObject.parseObject(value, ConfigCanalBean.class);
            /*
             由于我们canal.instance配置了监控canal-test-db1.t_config表和canal-test-db1.t_user表(生产环境下可以启动多个canal,每一个canal监听一张需要同步的表)
            所以我们要对这两张表分开处理。(可以通过他们的表名(canalBean.getTable())来区分)
            如果canalBean.getTable()获取的表名是t_config,则同步到redis,如果不是则不管。
             */
            //System.out.println(canalBean);
            if("t_config".equals(canalBean.getTable())){
                //获取是否是DDL语句
                boolean isDdl = canalBean.isDdl();
                //获取当前sql语句的类型(比如INSERT、DELETE等等)
                String type = canalBean.getType();
                List<Config> configList = canalBean.getData();
                //如果不是DDL语句
                if (!isDdl) {
                    //INSERT和UPDATE都是一样的操作
                    if ("INSERT".equals(type) || "UPDATE".equals(type)) {
                        //新增语句
                        for (Config config : configList) {
                            Long id = config.getConfigId();
                            //新增到redis中,过期时间是10分钟
                            redisTemplate.
                                    opsForValue().
                                    set(KEY_PREFIX + id, JSONObject.toJSONString(config), TIME_OUT, TimeUnit.HOURS);
                        }
                    }else if("DELETE".equals(type)){
                        //删除语句
                        for (Config config : configList) {
                            Long id = config.getConfigId();
                            //从redis中删除
                            redisTemplate.delete(KEY_PREFIX+id);
                        }
                    }

                }
            }
            //最后,如果上面的代码没有报错的情况下,可以确认消息了。(很重要)
            ack.acknowledge();
        }catch (Exception e){
            throw new RuntimeException();
        }
    }

}
创建kafka的topic(我们指定的topic名称为canal-test-topic)
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server kafka01:9092 --topic canal-test-topic --create
开始测试canal同步效果⭐
测试1:给t_config表插入数据

在这里插入图片描述
在这里插入图片描述

测试2:修改t_config表数据

在这里插入图片描述在这里插入图片描述

测试3:删除t_config表数据

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1142216.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【2021集创赛】Robei杯一等奖:基于Robei EDA工具的隔离病房看护机器人设计

本作品参与极术社区组织的有奖征集|秀出你的集创赛作品风采,免费电子产品等你拿~活动。 团队介绍 参赛单位&#xff1a;重庆交通大学 队伍名称&#xff1a;一丘之貉 指导老师&#xff1a;毕波 李艾星 参赛队员&#xff1a;郁航 张坤 秦衡 总决赛奖项&#xff1a;Robei杯一等奖…

iOS开发-CoreNFC实现NFC标签Tag读取功能

iOS开发-CoreNFC实现NFC标签Tag读取功能 一、NFC近场通信 近场通信&#xff08;NFC&#xff09;是一种无线通信技术&#xff0c;它使设备能够在不使用互联网的情况下相互通信。它首先识别附近配备NFC的设备。NFC常用于智能手机和平板电脑。 二、实现NFC标签Tag读取功能 在…

Linux | 进程终止与进程等待

目录 前言 一、进程终止 1、进程终止的几种可能 2、exit 与 _exit 二、进程等待 1、为什么要进程等待 2、如何进行进程等待 &#xff08;1&#xff09;wait函数 &#xff08;2&#xff09;waitpid函数 3、再次深刻理解进程等待 前言 我们前面介绍进程时说子进程退出…

Canal整合SpringBoot详解(二)

文章目录 Canal整合SpringBoot详解&#xff08;二&#xff09;什么是canal案例2&#xff1a;CanalKafka实现mysql和elasticsearch的数据同步⭐Docker搭建elasticsearch7.8.0&#xff08;单机版本&#xff09;⭐Docker安装elasticsearch-head5⭐解决es-head 406错误问题直接修改…

实用篇-Eureka注册中心

一、提供者与消费者 服务提供者&#xff1a;一次业务中&#xff0c;被其他微服务调用的服务。(提供接口给其他微服务) 服务消费者&#xff1a;一次业务中&#xff0c;调用其他微服务的服务。(调用其他微服务提供的接口) 例如前面的案例中&#xff0c;order-service微服务是服…

系列七、动态代理

一、概述 二、Jdk动态代理案例 2.1、Star /*** Author : 一叶浮萍归大海* Date: 2023/10/27 17:16* Description:*/ public interface Star {/*** 唱歌* param name 歌曲名字* return*/String sing(String name);/*** 跳舞*/void dance(); } 2.2、BigStar /*** Author : 一叶…

AcWing 1.2.1 最长上升子序列模型 + 动态规划 + 图解(详细)

&#xff08;1&#xff09;acwing 4557. 最长上升子序列 4557. 最长上升子序列 - AcWing题库 给定一个长度为 N 的整数序列 a1,a2,…,aN。请你计算该序列的最长上升子序列的长度。上升子序列是指数值严格单调递增的子序列 输入格式 第一行包含整数 N第二行包含 N个整数 a1,a…

LLM系列 | 23:多模态大模型:浦语·灵笔InternLM-XComposer解读、实战和思考

引言 ​简介 模型解读 模型架构 训练 实战 环境准备 本地实测 服务部署 总结 引言 谁念西风独自凉&#xff0c;萧萧黄叶闭疏窗&#xff0c;沉思往事立残阳。 Created by DALLE 3 小伙伴们好&#xff0c;我是《小窗幽记机器学习》的小编&#xff1a;卖热干面的小女孩…

在Golang中理解错误处理

处理Golang中临时错误和最终错误的策略和示例 作为一名精通Golang的开发人员&#xff0c;您了解有效的错误处理是编写健壮可靠软件的关键因素。在复杂系统中&#xff0c;错误可能采取各种形式&#xff0c;包括临时故障和最终失败。在本文中&#xff0c;我们将探讨处理Golang中…

源码解析SpringMVC之RequestMapping注解原理

1、启动初始化 核心&#xff1a;得到应用上下文中存在的全部bean后依次遍历&#xff0c;分析每一个目标handler & 目标方法存在的注解RequestMapping&#xff0c;将其相关属性封装为实例RequestMappingInfo。最终将 uri & handler 之间的映射关系维护在类AbstractHand…

Java入门篇 之 数据类型(简单介绍)

博主回归学习状态的第三篇文章&#xff0c;希望对大家有所帮助 今日份励志文案:你若决定灿烂&#xff0c;山无遮&#xff0c;海无拦 加油&#xff01; Java中一共存在2种数据类型 1 . 基本数据类型,基本数据类型四种和八种之说(具体看下图) 四种说的是&#xff0c;整数型&…

vscode打开settings.json方法

cmd shift p&#xff0c;输入setting Open Workspace Settings 也会打开UI设置界面&#xff1b; Open User Settings (JSON) 会打开用户设置 settings.json 文件&#xff1b; Open Workspace Settings (JSON) 会打开工作区设置 settings.json 文件 vscode存在两种设置 sett…

损失函数和目标函数|知识补充

这张图中&#xff0c;横坐标size表示房屋的大小&#xff0c;纵坐标price表示房屋的价格&#xff0c;现在需要建立模型来表示两者之间的关系。 对于给定的输入x&#xff0c;模型会有一个输出f(x)&#xff0c;用一个函数来度量拟合的程度&#xff0c;也就是真实值和预测值之间的…

前端工程化面试题及答案【集合】

前言&#xff1a; 欢迎浏览和关注本专栏《 前端就业宝典 》&#xff0c; 不管是扭螺丝还是造火箭&#xff0c; 多学点知识总没错。 这个专栏是扭螺丝之上要造火箭级别的知识&#xff0c;会给前端工作学习的小伙伴带来意想不到的帮助。 本专栏将前端知识拆整为零&#xff0c;主要…

工业相机常见的工作模式、触发方式

参考&#xff1a;机器视觉——工业相机的触发应用(1) - 知乎 工业相机常见的工作模式一般分为&#xff1a; 触发模式连续模式同步模式授时同步模式 触发模式&#xff1a;相机收到外部的触发命令后&#xff0c;开始按照约定时长进行曝光&#xff0c;曝光结束后输出一帧图像。…

子集生成算法:给定一个集合,枚举所有可能的子集

给定一个集合&#xff0c;枚举所有可能的子集。 &#xff08;为简单起见&#xff0c;本文讨论的集合中没有重复元素&#xff09; 1、方法一&#xff1a;增量构造法 第一种思路是一次选出一个元素放到集合中&#xff0c;程序如下&#xff1a; void print_subset(int n, int …

C++系列之list的模拟实现

&#x1f497; &#x1f497; 博客:小怡同学 &#x1f497; &#x1f497; 个人简介:编程小萌新 &#x1f497; &#x1f497; 如果博客对大家有用的话&#xff0c;请点赞关注再收藏 &#x1f31e; list的节点类 template struct list_Node { public: list_Node* _prev; list_…

Tomcat服务部署和优化

目录 一、Tomcat&#xff1a; 1、Tomcat作用&#xff1a; 2、Tomcat的核心组件&#xff1a; 3、servlet作用&#xff1a; 4、Tomcat的核心功能&#xff1a; 二、tomcat配置 一、Tomcat&#xff1a; 是一个开源的web应用服务器&#xff0c;nginx主要处理静态页面&#xff…

不再受害:如何预防和应对.mallab勒索病毒攻击

导言&#xff1a; 我们的数据成了我们的珍宝&#xff0c;但也成了黑客们追逐的目标。其中&#xff0c;.mallab勒索病毒就是一个充满阴谋和神秘的数字威胁&#xff0c;它采用高度复杂的方法将您的数据锁在数字牢笼中。本文91数据恢复将深入探讨.mallab勒索病毒的起源、工作方式…

【RabbitMQ 实战】12 镜像队列

一、镜像队列的概念 RabbitMQ的镜像队列是将消息副本存储在一组节点上&#xff0c;以提高可用性和可靠性。镜像队列将队列中的消息复制到一个或多个其他节点上&#xff0c;并使这些节点上的队列保持同步。当一个节点失败时&#xff0c;其他节点上的队列不受影响&#xff0c;因…