需求:继上一篇使用xxljob实现数据的全量同步到es后,当数据库中新增、删除、修改数据时,应该对es中的对应索引库实现增量同步。
本文介绍了2种双写一致性方案,对其中使用MQ的方案进行了实现。
1. 方案设计
1.1 数据一致性问题分析
案例问题分析:
1:管理员调用秒杀服务添加秒杀活动,并给秒杀活动添加商品。
2:秒杀服务调用IgeMonitor服务生成静态页。
3:秒杀服务调用goods服务将商品数量和价格存入Redis缓存。
4:秒杀服务调用goods服务调用Search服务将商品数据写入ES索引库。
上面是秒杀案例需要同步操作调用的服务流程,如果采用同步调用,性能效率比较低,且服务之间耦合度极高。我们需要寻找一种方法,降低seckill服务和其他服务的耦合度,并且实现静态页、缓存数据、索引数据一致性。
1.2 双写一致性设计-TCP模式
在秒杀活动添加中,有这几个流程:
- 静态页生成
- 价格、数量加入缓存
- 数据入索引库
如果我们要求这几个操作强一致性,可以采取如下方案:
我们以上面案例为例,可以采用Canal作为中间数据同步管道:
1:开启需要实现数据一致性操作的数据库Binlog,当执行增删改的时候,会记录日志。
2:管理员操作seckill服务添加秒杀活动和活动商品,并将数据修改到数据库,操作结束。
3:Canal订阅数据库增量数据,并编写一个Java服务获取增量数据。
4:在Java服务中实现对IgeMonitor调用以及Goods服务调用和Search服务调用,从而实现数据一致性。
这种模式适合某些特定场景,对数据一致性要求比较高的时候,可以采取这种模式。这种模式虽然能解决强一致性问题,但Canal和其他服务之间耦合度较高。
1.3 双写一致性-MQ模式
如果我们要求这几个操作(静态页生成,价格、数量加入缓存,数据入索引库)强一致性要求不高,可以采取如下方案:
上图是双写一致性设计,几乎能满足微服务架构下所有服务数据一致性,实现流程如下:
1:开启需要实现数据一致性操作的数据库Binlog,当执行增删改的时候,会记录日志。
2:使用Canal监听数据库变更的Binlog日志,同时将变更数据推送至Kakfa。
3:各个服务为实现数据双写一致性,可以按需订阅Kafka中的数据,实现数据同步到文件服务、ElasticSearch、Redis等。
方案特性:
- 基于MySQL Binlog增量订阅,业务0侵入性。
- 增量数据采用Kafka实现收集,Kafka吞吐量极高,能满足高并发场景下数据一致性需求。
- 数据同步,跨语言、跨系统,灵活度极高,只要能订阅Kafka数据,必能同步。
- 订阅Kafka数据,能解决的数据一致性业务场景丰富:
- 可以实现多消费者实现多服务、多库同步。
- 也可以基于单组消费者消费,实现单服务、单库同步。
无论是哪种模式,都需要用到Canal和MySQL Binlog,所以我们需要掌握着2个知识点。
平时在工作中,2种数据一致性模式往往有不同应用场景,可以根据业务需搭配使用。
2. springboot集成kafka
3. canal
本章小结:
- MySQL Binlog介绍
- 主从复制机制
- Canal工作原理介绍
- Canal安装
3.1 MySQL Binlog日志
MySQL
的二进制日志可以说是MySQL
最重要的日志了,它记录了所有的DDL
和DML
(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL
的二进制日志是事务安全型的。使用Binlog日志一定会有性能损耗,在官方文档记录中,开启二进制日志大概会有1%
的性能损耗。
二进制有两个最重要的使用场景:
#1:主从复制
MySQL Replication在Master端开启binlog,Mster把它的二进制日志传递给slaves来达到master-slave数据一致的目的。
#2:数据恢复
由于binlog日志记录了DDL和DML语句,所以可以实现数据恢复,通过使用mysqlbinlog工具来使恢复数据。
MysqlBinlog三种模式:
1)statement模式:每一条会修改数据的sql都会记录到master的binlog中,slave在复制的时候sql进程会解析成和原来master端执行相同的sql再执行。
2)ROW模式:日志中会记录成每一行数据被修改后的快照,而后在slave端再对相同的数据进行修改,只记录要修改的数据,只有value,不会有sql多表关联的状况。
3)MIXED:混合模式复制,结合Statement和ROW的优势,会根据执行的每一条具体的sql语句来区分对待记录的日志形式,也就是在Statement和Row之间选择一种。
- 如何查看是否开启
binlog
?
#查看是否开启了binlog
mysql> show variables like 'log_%';
+----------------------------------------+--------+
| Variable_name | Value |
+----------------------------------------+--------+
| log_bin | OFF |
| log_bin_basename | |
| log_bin_index | |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
| log_builtin_as_identified_by_password | OFF |
+----------------------------------------+--------+
通过上面的语句可以查看是否开启了binlog,很显然bin_log的值为OFF表示关闭。
如何开启binlog?
#开启binlog,这里是基于docker容器下安装canal实现
#1:将容器配置文件mysqld.cnf拷贝到宿主机中
docker cp mysql:/etc/mysql/mysql.conf.d/mysqld.cnf ./
#2:修改mysqld.cnf配置文件
vi mysqld.cnf
#3:添加如下配置,开启binlog,添加到[mysqld]下面
#log-bin=mysql-bin 开启binlog,而mysql-bin是日志文件的前缀
#server_id服务唯一标识
#binlog-format指日志记录格式
server_id=1
binlog-format=ROW
log-bin=mysql-bin
#4:将修改后的文件拷贝到容器中
docker cp ./mysqld.cnf mysql:/etc/mysql/mysql.conf.d/
#5:重启容器
docker restart mysql
#6:查看是否开启了binlog
mysql> show variables like 'log_%';
+----------------------------------------+--------------------------------+
| Variable_name | Value |
+----------------------------------------+--------------------------------+
| log_bin | ON |
| log_bin_basename | /var/lib/mysql/mysql-bin |
| log_bin_index | /var/lib/mysql/mysql-bin.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
+----------------------------------------+--------------------------------+
3.2 开启binlog详细步骤
查看是否开启了binlog
[root@192 ~]# docker exec -it mysql /bin/bash
root@4cfb9a43ab13:/# mysql -u root -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 5
Server version: 5.7.36 MySQL Community Server (GPL)
Copyright (c) 2000, 2021, Oracle and/or its affiliates.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> show variables like 'log_%';
+----------------------------------------+--------+
| Variable_name | Value |
+----------------------------------------+--------+
| log_bin | OFF |
| log_bin_basename | |
| log_bin_index | |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
| log_builtin_as_identified_by_password | OFF |
| log_error | stderr |
| log_error_verbosity | 3 |
| log_output | FILE |
| log_queries_not_using_indexes | OFF |
| log_slave_updates | OFF |
| log_slow_admin_statements | OFF |
| log_slow_slave_statements | OFF |
| log_statements_unsafe_for_binlog | ON |
| log_syslog | OFF |
| log_syslog_facility | daemon |
| log_syslog_include_pid | ON |
| log_syslog_tag | |
| log_throttle_queries_not_using_indexes | 0 |
| log_timestamps | UTC |
| log_warnings | 2 |
+----------------------------------------+--------+
21 rows in set (0.01 sec)
mysql>
开启binlog系列命令
[root@192 ~]# docker cp mysql:/etc/mysql/mysql.conf.d/mysqld.cnf ./
Successfully copied 3.58kB to /root/./
[root@192 ~]# ls
anaconda-ks.cfg images minio mysqld.cnf
[root@192 ~]# vi mysqld.cnf
修改mysqld.cnf
[root@192 ~]# docker cp ./mysqld.cnf mysql:/etc/mysql/mysql.conf.d/
Successfully copied 3.58kB to mysql:/etc/mysql/mysql.conf.d/
[root@192 ~]# docker restart mysql
mysql
[root@192 ~]# docker exec -it mysql bash
root@4cfb9a43ab13:/# mysql -u root -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 5
Server version: 5.7.36-log MySQL Community Server (GPL)
Copyright (c) 2000, 2021, Oracle and/or its affiliates.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> show variables like 'log_%';
+----------------------------------------+--------------------------------+
| Variable_name | Value |
+----------------------------------------+--------------------------------+
| log_bin | ON |
| log_bin_basename | /var/lib/mysql/mysql-bin |
| log_bin_index | /var/lib/mysql/mysql-bin.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
| log_builtin_as_identified_by_password | OFF |
| log_error | stderr |
| log_error_verbosity | 3 |
| log_output | FILE |
| log_queries_not_using_indexes | OFF |
| log_slave_updates | OFF |
| log_slow_admin_statements | OFF |
| log_slow_slave_statements | OFF |
| log_statements_unsafe_for_binlog | ON |
| log_syslog | OFF |
| log_syslog_facility | daemon |
| log_syslog_include_pid | ON |
| log_syslog_tag | |
| log_throttle_queries_not_using_indexes | 0 |
| log_timestamps | UTC |
| log_warnings | 2 |
+----------------------------------------+--------------------------------+
21 rows in set (0.01 sec)
mysql> exit
Bye
root@4cfb9a43ab13:/# exit
exit
[root@192 ~]#
3.3 canal工作原理
canal
模拟MySQL slave
的交互协议,伪装自己为MySQL slave
,向MySQL master
发送dump
协议MySQL master
收到dump
请求,开始推送binary log
给slave
(即canal
)canal
解析binary log
对象(原始为byte
流)
3.4 canal安装
基于Docker命令安装Canal:
docker pull canal/canal-server:v1.1.5
docker run -p 11111:11111 --name=canal --restart=always -d canal/canal-server:v1.1.5
# Kafka的支持是从1.1.0开始,所以我们使用MQ模式的时候,最佳版本选择1.1.5,支持RabbitMQ、RocketMQ、Kafka。
因为canal伪装自己为slave,因此mysql需要对这个slave库进行授权
数据库授权账号
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
详细步骤
[root@192 ~]# docker exec -it mysql bash
root@4cfb9a43ab13:/# mysql -u root -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 6
Server version: 5.7.36-log MySQL Community Server (GPL)
Copyright (c) 2000, 2021, Oracle and/or its affiliates.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> create user canal@'%' IDENTIFIED by 'canal';
Query OK, 0 rows affected (0.00 sec)
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
Query OK, 0 rows affected (0.00 sec)
mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.01 sec)
mysql>
3.3 canal配置
考虑到下面3个问题:
- 集成kafka,canal应该把数据库的数据修改信息推送到哪?–>配置kafka
- 指定监听表:canal监听的应该是我们想让他监听的表,而不是数据库的所有表的修改数据
- 推送规则:canal把信息推送到MQ的哪个topic?
1)Kafka服务信息配置
进入Canal容器,首先要配置Kafka服务信息,主要修改/home/admin/canal-server/conf/canal.properties
文件,配置如下:
#进入Canal容器
docker exec -it canal /bin/bash
#进入配置目录
cd /home/admin/canal-server/conf
#编辑配置文件canal.properties
vi canal.properties
#设置Canal数据流模式为kafka,可选值有tcp, kafka, rocketMQ, rabbitMQ,默认是tcp
canal.serverMode = kafka
#设置Kafka服务地址
kafka.bootstrap.servers = 192.168.211.130:9092,192.168.211.130:9093,192.168.211.130:9094
详细步骤
[root@192 ~]# docker exec -it canal /bin/bash
[root@ca8714734388 admin]# cd /home/admin/canal-server/conf
[root@ca8714734388 conf]# ls
canal_local.properties canal.properties example logback.xml metrics spring
[root@ca8714734388 conf]# vi canal.properties
2)监听数据库配置
接下来要配置监听的数据源信息,修改/home/admin/canal-server/conf/example/instance.properties
文件,配置如下:
#修改/home/admin/canal-server/conf/example/instance.properties文件
vi /home/admin/canal-server/conf/example/instance.properties
#position info 配置监听数据源,将数据源地址换成指定的数据源
canal.instance.master.address=192.168.211.130:3306
# username/password 授权账号配置,这个账号其实就是我们在开启MySQL Binlog创建的授权账号
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
#table regex 配置监听的数据库表
#所有表:.* or .*\\..*
#canal schema下所有表: canal\\..*
#canal下的以canal打头的表:canal\\.canal.*
#canal schema下的一张表:canal.test1
#多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
#监听demo数据库下的所有表,配置如下:
canal.instance.filter.regex=demo\\..*
3)topic配置
当数据库数据发生变更的时候,我们希望将数据推送至Kafka中,所以需要配置队列,配置队列可以直接将队列名字硬编码写死,但不推荐这种做法,推荐用数据库名_表名
的方式。仍然要配置instance.properties
。
修改如下配置即可:
# mq config
# 固定MQ的名字
#canal.mq.topic=example
#根据表的名字动态创建MQ,例如demo下的tb_user表变化,创建队列demo_tb_user
canal.mq.dynamicTopic=.*\\..*
详细步骤
修改instance.properties
重启canal
docker restart canal
3.4 测试
修改数据库数据
查看kafka的topic发现多了个leadnews_wemedia_wm_news
[root@192 ~]# docker restart canal
canal
[root@192 ~]# docker exec -it kafka /bin/bash
bash-5.1# kafka-topics.sh --zookeeper 192.168.200.131:2181 --list
__consumer_offsets
leadnews_wemedia_wm_news
test
user-topic
bash-5.1#
ps:kafka相关命令
kafka-topics.sh --zookeeper 192.168.200.131:2181 --list //查看所有topic
kafka-topics.sh --delete --topic topic_name --zookeeper 192.168.200.131:2181 //删除名为topic_name的topic
kafka-topics.sh --delete --topic leadnews_wemedia_wm_news --zookeeper 192.168.200.131:2181
4. springboot监听topic
kafka配置
spring:
application:
name: leadnews-kafka
cloud:
nacos:
discovery:
server-addr: 192.168.200.131:8848
config:
server-addr: 192.168.200.131:8848
file-extension: yml
kafka:
bootstrap-servers: 192.168.200.131:9092
producer: # producer 生产者
retries: 0 # 重试次数
acks: 0 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
batch-size: 16384 # 批量大小
buffer-memory: 33554432 # 生产端缓冲区大小
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer: # consumer消费者
group-id: xxxgroup # 默认的消费组ID
enable-auto-commit: true # 是否自动提交offset
auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Listener
@Component
public class BinlogListener {
@KafkaListener(topics = "leadnews_wemedia_wm_news",groupId = "pay")
public void WmNewsListenerPay(ConsumerRecord<String,String> record){
System.out.println("pay模块");
String msg = record.value();
System.out.println(msg);
}
@KafkaListener(topics = "leadnews_wemedia_wm_news",groupId = "shop")
public void WmNewsListenerShop(ConsumerRecord<String,String> record){
System.out.println("shop模块");
String msg = record.value();
System.out.println(msg);
}
}
修改数据
消费者接收到binlog的修改信息