文章目录
- 一 业务数据采集
- 0 业务数据采集思路
- 1 Maxwell 介绍
- 2 Maxwell工作原理
- (1) MySQL主从复制过程
- (2)Maxwell的工作原理
- 3 MySQL的binlog
- (1)什么是binlog
- (2)binlog的开启
- (3)binlog的分类设置
- 4 MySQL的准备
- (1)创建实时业务数据库
- (2)导入建表数据
- (3)修改/etc/my.cnf文件
- (4)重启MySQL使配置生效
- (5)模拟生成数据
- 5 安装Maxwell
- 6 初始化Maxwell元数据库
- 7 使用Maxwell监控抓取MySQL数据
- 8 maxwell采集服务
- (1)编写启动脚本
- (2)maxwell配置
- (3)开启采集业务
- 9 Maxwell与Canal 工具对比
一 业务数据采集
0 业务数据采集思路
将业务数据从MySQL数据库采集到kafka属于CDC(Change Data Capture)操作,实现CDC有两种方式
-
全表扫描:使用sqoop,会丢失一些数据,只能拿到数据的最终状态。如订单状态:待支付、已支付、待发货、已发货、已收货、评价、完成。使用sqoop只会获取“完成”状态的订单,这种现象称为“状态丢失”。
对于离线统计没有统计订单状态的需求,只得到最终结果即可,但对实时计算来说,订单的状态变化十分重要。
-
基于Binglog获取新增和变化的数据:常见框架有阿里的canal、maxwell。
整体流程如下:
从数据库中读取数据,将数据封装成JSON格式字符串,发送给kafka。
1 Maxwell 介绍
Maxwell 是由美国Zendesk开源,用Java编写的MySQL实时抓取软件。 实时读取MySQL二进制日志Binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。
官网地址
2 Maxwell工作原理
(1) MySQL主从复制过程
- Master主库将改变数据的操作(如insert)进行记录,写到二进制日志(binary log)中 。
- Slave从库向mysql master发送dump协议,将master主库的binary log events拷贝到它的中继日志(relay log)。
- Slave从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。
具体工作过程如下图:
(2)Maxwell的工作原理
很简单,Maxwell将自己伪装成slave,假装从master复制数据。不同之处在于上图中的步骤4,拿到数据之后,将数据封装成JSON格式字符串,之后再将JSON发送到其他的消息中间件中。
3 MySQL的binlog
(1)什么是binlog
MySQL的二进制日志可以说是MySQL最重要的日志,它记录了所有的DDL(更改数据库的结构,表级操作)和DML(管理数据库中的数据)(除了数据查询语句DQL)语句,以事件形式记录在MySQL的binlog中,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。
一般来说开启二进制日志大概会有1%的性能损耗。二进制有两个最重要的使用场景:
- 其一:MySQL Replication 在Master端开启binlog,Master把它的二进制日志传递给slaves来达到master-slave数据一致的目的。
- 其二:数据恢复,通过使用mysql 的binlog工具来使恢复数据。
(2)binlog的开启
-
找到MySQL配置文件的位置
-
Linux: /etc/my.cnf
如果/etc目录下没有,可以通过locate my.cnf查找位置
-
Windows: \my.ini
-
在mysql的配置文件下,修改配置
在[mysqld] 区块,设置/添加
log-bin=mysql-bin
二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的DDL和DML(除了数据查询语句)语句事件。
这个表示binlog日志的前缀是mysql-bin,以后生成的日志文件就是 mysql-bin.123456 的文件,后面的数字按顺序生成,每次mysql重启或者到达单个文件大小的阈值时,新生一个文件,按顺序编号。
(3)binlog的分类设置
mysql binlog的格式有三种,分别是STATEMENT,MIXED,ROW。
在配置文件中可以选择配置:binlog_format= statement|mixed|row
三种格式的区别:
- statement
语句级,binlog会记录每次一执行写操作的语句。
相对row模式节省空间,但是可能产生不一致性,比如出现时间函数、随机数等情况时,如update tt set create_date=now()
。
如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同。
优点: 节省空间
缺点: 有可能造成数据不一致。
- row
行级, binlog会记录每次操作后每行记录的变化。
优点:保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,只记录执行后的效果。
缺点:占用较大空间。
- mixed
statement的升级版,一定程度上解决了,因为一些情况而造成的statement模式不一致问题。
默认还是statement,在某些情况下譬如:当函数中包含 UUID() 时;包含 AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时。
在上述情况中会按照 ROW的方式进行处理。
优点:节省空间,同时兼顾了一定的一致性。
缺点:还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog监控的情况都不方便。
综合上面对比,Maxwell想做监控分析,选择row格式比较合适
4 MySQL的准备
(1)创建实时业务数据库
(2)导入建表数据
(3)修改/etc/my.cnf文件
sudo vim /etc/my.cnf
# 添加如下内容
server-id= 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall2022
# 如果有多个数据库需要监控,增加几条配置信息即可,如下
binlog-do-db=gmall2021
binlog-do-db=gmall2020
注意:binlog-do-db根据自己的情况进行修改,指定具体要同步的数据库
(4)重启MySQL使配置生效
sudo systemctl restart mysqld
到/var/lib/mysql目录下查看初始文件大小154
sudo ls -l ./
(5)模拟生成数据
将jar包和properties文件上传到/opt/module/rt_dblog目录下。
修改application.properties中数据库连接信息。
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://hadoop101:3306/gmall2022?characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=123456
logging.pattern.console=%m%n
mybatis-plus.global-config.db-config.field-strategy=not_null
#业务日期
mock.date=2022-11-29
#是否重置
mock.clear=1
#是否重置用户
mock.clear.user=0
注意:如果生成较慢,可根据配置情况适当调整配置项
运行jar包
java -jar gmall2022-mock-db-2022-11-29.jar
完成后可以再去查看mysql-bin.000001文件,其已经将对数据修改的操作保存了下来。
在测试阶段可以适当将数据量减少,更改文件配置,如下:
logging.level.root=info
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://hadoop101:3306/gmall2022?characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=123456
logging.pattern.console=%m%n
mybatis-plus.global-config.db-config.field-strategy=not_null
#业务日期
mock.date=2022-11-29
#是否重置
mock.clear=1
#是否重置用户
mock.clear.user=1
#生成新用户数量
mock.user.count=100
#男性比例
mock.user.male-rate=20
#用户数据变化概率
mock.user.update-rate:20
#收藏取消比例
mock.favor.cancel-rate=10
#收藏数量
mock.favor.count=10
#购物车数量
mock.cart.count=10
#每个商品最多购物个数
mock.cart.sku-maxcount-per-cart=3
#购物车来源 用户查询,商品推广,智能推荐, 促销活动
mock.cart.source-type-rate=60:20:10:10
#用户下单比例
mock.order.user-rate=35
#用户从购物中购买商品比例
mock.order.sku-rate=20
#是否参加活动
mock.order.join-activity=1
#是否使用购物券
mock.order.use-coupon=1
#购物券领取人数
mock.coupon.user-count=10
#支付比例
mock.payment.rate=30
#支付方式 支付宝:微信 :银联
mock.payment.payment-type=30:60:10
#评价比例 好:中:差:自动
mock.comment.appraise-rate=30:10:10:50
#退款原因比例:质量问题 商品描述与实际描述不一致 缺货 号码不合适 拍错 不想买了 其他
mock.refund.reason-rate=30:10:20:5:15:5:5
再次执行脚本生成数据。
以后测试都使用第一次生成的一百个用户,修改如下配置:
#是否重置用户
mock.clear.user=0
#生成新用户数量
mock.user.count=0
此时,业务数据的第一个流程已经完成,如下图:
5 安装Maxwell
# 将maxwell-1.25.0.tar.gz上传到/opt/software目录下。
# 解压maxwell-1.25.0.tar.gz到/opt/module目录。
tar -zxvf /opt/software/maxwell-1.25.0.tar.gz -C /opt/module/
mv config.properties.example config.properties
6 初始化Maxwell元数据库
# 在MySQL中建立一个maxwell库用于存储Maxwell的元数据
mysql -uroot -p123456
CREATE DATABASE maxwell
# 设置安全级别
mysql> set global validate_password_length=4;
mysql> set global validate_password_policy=0;
# 分配一个账号可以操作该数据库
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '123456';
# 分配这个账号可以监控其他数据库的权限
mysql> GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@'%';
quit;
7 使用Maxwell监控抓取MySQL数据
# 修改配置文件
# 发送到kafka
producer=kafka
kafka.bootstrap.servers=hadoop101:9092,hadoop102:9092,hadoop103:9092
# 需要添加
kafka_topic=ods_base_db_m
# mysql login info
host=hadoop101
user=maxwell
password=123456
# 需要添加 后续初始化会用
client_id=maxwell_1
关于client_id=maxwell_1
的一些说明:对历史数据的处理,在maxwell读取数据之前,如表中已经存在数据,那么在binlog中不会记录原有数据。想要获得历史数据,maxwell专门提供了处理历史数据的功能maxwell-bootstrap,想要使用maxwell-bootstrap,需要进行以下配置
注意:默认还是输出到指定Kafka主题的一个kafka分区,因为多个分区并行可能会打乱binlog的顺序。如果要提高并行度,首先设置kafka的分区数>1,然后设置producer_partition_by属性
#producer_partition_by=database # [database, table, primary_key, transaction_id, column]
- database:将同一个数据库中的数据放到一个分区里面去;
- table:将一个表的数据放到同一个分区;
- primary_key:按照主键取哈希值,然后对分区数做模运算;
- transaction_id:
- column:按照某一列放到同一个分区。
使用主键策略,如下:
producer_partition_by=primary_key
如果在以上配置中,有多个数据库需要监控,现只想监控配置的某一个数据库,需要在以下配置项中进行设置
# *** filtering ***
# filter rows out of Maxwell's output. Command separated list of filter-rules, evaluated in sequence.
# A filter rule is:
# <type> ":" <db> "." <tbl> [ "." <col> "=" <col_val> ]
# type ::= [ "include" | "exclude" | "blacklist" ]
# db ::= [ "/regexp/" | "string" | "`string`" | "*" ]
# tbl ::= [ "/regexp/" | "string" | "`string`" | "*" ]
# col_val ::= "column_name"
# tbl ::= [ "/regexp/" | "string" | "`string`" | "*" ]
#
# See http://maxwells-daemon.io/filtering for more details
#
#filter= exclude: *.*, include: foo.*, include: bar.baz, include: foo.bar.col_eg = "value_to_match"
8 maxwell采集服务
(1)编写启动脚本
vim /home/hzy/bin/maxwell.sh
# 脚本内容
/opt/module/maxwell-1.25.0/bin/maxwell --config /opt/module/maxwell-1.25.0/config.properties >/dev/null 2>&1 &
(2)maxwell配置
# 启动zookeeper
# 启动kafka
# 启动maxwell
maxwell.sh
启动成功后,其会在hadoop101数据库管理系统maxwell元数据库新建几张表,如下:
关于表的一些说明:
- databases:存储机器上的所有数据库;
- tables:机器上的所有表;
- columns:机器上的所有字段;
- positions:从哪个文件同步数据,mysql-bin.000001;
(3)开启采集业务
# 编写kafka消费者脚本
/opt/module/kafka_2.11-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic $1
# 赋予执行权限
# 启动消费者
kfkcon.sh ods_base_db_m
# 模拟业务数据生成
java -jar gmall2022-mock-db-2022-11-29.jar
# 可以在kafka消费者处查看相应内容,数据格式如下
{"id":1597576252419706882,"user_id":36,"nick_name":null,"head_img":null,"sku_id":6,"spu_id":2,"order_id":28761,"appraise":"1204","comment_txt":"评论内容:59966718727753753271253792219417466226426865391966","create_time":"2022-11-29 21:00:34","operate_time":null}
# 行为日志数据如下
{"common":{"ar":"370000","ba":"Xiaomi","ch":"wandoujia","is_new":"0","md":"Xiaomi 9","mid":"mid_1","os":"Android 11.0","uid":"21","vc":"v2.1.134"},"page":{"during_time":9336,"item":"3","item_type":"sku_ids","last_page_id":"comment","page_id":"trade"},"ts":1669635473000}
至此,业务数据的生成和采集已经完成,第一层ods层的工作也已经完成。
9 Maxwell与Canal 工具对比
- Maxwell没有Canal那种server+client模式,只有一个server把数据发送到消息队列或redis。
- Maxwell有一个亮点功能,就是Canal只能抓取最新数据,对已存在的历史数据没有办法处理。而Maxwell有一个bootstrap功能,可以直接引导出完整的历史数据用于初始化,非常好用。
- Maxwell不能直接支持HA,但是它支持断点还原,即错误解决后重启继续读取数据。
- Maxwell只支持json格式,而Canal如果用Server+client模式的话,可以自定义格式。
- Maxwell比Canal更加轻量级。