欢迎光临我的博客查看最新文章: https://river106.cn
1、Maxwell简介
Maxwell 是由美国Zendesk开源,用Java编写的MySQL实时抓取软件。读取 MySQL binlogs 并将修改行字段的更新写入 Kafka, Kinesis, RabbitMQ, Google Cloud Pub/Sub 或 Redis (Pub/Sub or LPUSH) 以作为 JSON 的应用程序。
官网:https://maxwells-daemon.io/
github:https://github.com/zendesk/maxwell
安装版本:maxwell-1.29.2
快速开始:https://maxwells-daemon.io/quickstart/
2、安装mysql开启binlog
创建数据库maxwell 用于存储 Maxwell 的元数据,
新增账号maxwell并授权:
GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '12345678';
GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON . TO maxwell@'%';
flush privileges;
创建测试数据库testdb,创建测试表testuser
CREATE TABLE `testuser` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`user_name` varchar(100) NOT NULL COMMENT '用户名',
`pswd` varchar(100) NOT NULL COMMENT '密码',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`modify_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
3、启动maxwell
方法1:
bin/maxwell --user='maxwell' --password='12345678' --host='127.0.0.1' --producer=stdout
方法2:
cp config.properties.example config.properties
修改config.properties:
producer=stdout
host=127.0.0.1
password=12345678
启动:
bin/maxwell --config config.properties
出现如下报错:
java.lang.RuntimeException: error: unhandled character set ‘utf8mb3’
at com.zendesk.maxwell.schema.columndef.StringColumnDef.charsetForCharset(StringColumnDef.java:61)
at com.zendesk.maxwell.schema.columndef.StringColumnDef.asJSON(StringColumnDef.java:75)
at com.zendesk.maxwell.replication.BinlogConnectorEvent.writeData(BinlogConnectorEvent.java:112)
at com.zendesk.maxwell.replication.BinlogConnectorEvent.buildRowMap(BinlogConnectorEvent.java:162)
at com.zendesk.maxwell.replication.BinlogConnectorEvent.jsonMaps(BinlogConnectorEvent.java:176)
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.getTransactionRows(BinlogConnectorReplicator.java:486)
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.getRow(BinlogConnectorReplicator.java:626)
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.work(BinlogConnectorReplicator.java:178)
at com.zendesk.maxwell.util.RunLoopProcess.runLoop(RunLoopProcess.java:34)
at com.zendesk.maxwell.Maxwell.startInner(Maxwell.java:255)
at com.zendesk.maxwell.Maxwell.start(Maxwell.java:183)
at com.zendesk.maxwell.Maxwell.main(Maxwell.java:286)
00:29:21,008 INFO TaskManager - Stopped all tasks
这个问题是因为MySQL从 5.5.3 开始,用 utf8mb4 编码来实现完整的 UTF-8,其中 mb4 表示 most bytes 4,最多占用4个字节。而原来的utf8则被utf8mb3则代替。
一种解决方案是,将MySQL降级,重新安装5.5.3以下的版本。
另一种方法则是修改maxwell源码。
解压打开,找到有问题的类:com.zendesk.maxwell.schema.columndef.StringColumnDef,加上能识别utf8mb3的语句,重新打包。
打包好的maxwell-1.19.0.jar百度云盘链接: https://pan.baidu.com/s/1t0s_e6d2G1-Z4dM3pSwVXQ?pwd=cs8m 提取码: cs8m 。
替换maxwell/lib/maxwell-1.19.0.jar ,重启即可。
[root@river106 maxwell-1.29.2]# bin/maxwell --config config.properties
Using kafka version: 1.0.0
20:48:10,306 INFO Maxwell - Starting Maxwell. maxMemory: 837287936 bufferMemoryUsage: 0.25
20:48:10,472 INFO Maxwell - Maxwell v1.29.2 is booting (StdoutProducer), starting at Position[BinlogPosition[binlog.000001:1772098], lastHeartbeat=1681476466529]
20:48:10,802 INFO MysqlSavedSchema - Restoring schema id 2 (last modified at Position[BinlogPosition[binlog.000001:987693], lastHeartbeat=1681465703072])
20:48:10,987 INFO MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[binlog.000001:931603], lastHeartbeat=0])
20:48:11,026 INFO MysqlSavedSchema - beginning to play deltas…
20:48:11,027 INFO MysqlSavedSchema - played 1 deltas in 1ms
20:48:11,067 INFO BinlogConnectorReplicator - Setting initial binlog pos to: binlog.000001:1772098
20:48:11,099 INFO BinaryLogClient - Connected to localhost:3306 at binlog.000001/1772098 (sid:6379, cid:3825)
20:48:11,099 INFO BinlogConnectorReplicator - Binlog connected.
如上说明启动成功!
binlog变更抓取测试
分别新增数据、更新数据、删除数据,观察控制台输出变化
INSERT INTO testdb.testuser (user_name,pswd,create_time,modify_time) VALUES
('testuser','33333',now(),now());
UPDATE testdb.testuser SET pswd='55555' WHERE user_name = 'testuser';
DELETE FROM testdb.testuser WHERE user_name = 'testuser';
{“database”:“testdb”,“table”:“testuser”,“type”:“insert”,“ts”:1683336811,“xid”:126263,“commit”:true,“data”:{“id”:12,“user_name”:“testuser”,“pswd”:“33333”,“create_time”:“2023-05-06 09:33:31”,“modify_time”:“2023-05-06 09:33:31”}}
{“database”:“testdb”,“table”:“testuser”,“type”:“update”,“ts”:1683336841,“xid”:126348,“commit”:true,“data”:{“id”:12,“user_name”:“testuser”,“pswd”:“55555”,“create_time”:“2023-05-06 09:33:31”,“modify_time”:“2023-05-06 09:34:01”},“old”:{“pswd”:“33333”,“modify_time”:“2023-05-06 09:33:31”}}
{“database”:“testdb”,“table”:“testuser”,“type”:“delete”,“ts”:1683336847,“xid”:126373,“commit”:true,“data”:{“id”:12,“user_name”:“testuser”,“pswd”:“55555”,“create_time”:“2023-05-06 09:33:31”,“modify_time”:“2023-05-06 09:34:01”}}
4、输出到redis
bin/maxwell --user='maxwell' --password='12345678' --host='127.0.0.1' \
--producer=redis --redis_host='127.0.0.1' --redis_port=6380 --redis_auth='123456Ab'
或修改config.properties
redis_host=127.0.0.1
redis_port=6380
redis_auth=123456Ab
redis_database=0
redis_key=maxwell
redis_type=pubsub
bin/maxwell --config config.properties
相关配置文档:https://maxwells-daemon.io/producers/#redis
测试发布订阅模式
设置config.properties的redis_type=pubsub
mysql中新增一条数据,redis订阅频道maxwell
127.0.0.1:6380> SUBSCRIBE maxwell
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "maxwell"
3) (integer) 1
1) "message"
2) "maxwell"
3) "{\"database\":\"testdb\",\"table\":\"testuser\",\"type\":\"insert\",\"ts\":1681566124,\"xid\":86540,\"commit\":true,\"data\":{\"id\":8,\"user_name\":\"sssss\",\"pswd\":\"123456\",\"create_time\":\"2023-04-14 23:32:35\",\"modify_time\":\"2023-04-15 21:29:38\"}}"
测试list模式
设置config.properties的redis_type=lpush
127.0.0.1:6380> keys *
1) "foo"
2) "maxwell"
127.0.0.1:6380> LRANGE maxwell 0 10
1) "{\"database\":\"testdb\",\"table\":\"testuser\",\"type\":\"insert\",\"ts\":1681566484,\"xid\":87555,\"commit\":true,\"data\":{\"id\":9,\"user_name\":\"ggggg\",\"pswd\":\"55555\",\"create_time\":\"2023-04-15 21:48:04\",\"modify_time\":\"2023-04-15 21:48:04\"}}"
127.0.0.1:6380> LPOP maxwell
"{\"database\":\"testdb\",\"table\":\"testuser\",\"type\":\"insert\",\"ts\":1681566484,\"xid\":87555,\"commit\":true,\"data\":{\"id\":9,\"user_name\":\"ggggg\",\"pswd\":\"55555\",\"create_time\":\"2023-04-15 21:48:04\",\"modify_time\":\"2023-04-15 21:48:04\"}}"
127.0.0.1:6380>
通过发布订阅模式,实现数据同步功能,通过list方式可以获取最新的数据的变化和数据变化数量等需求。
5、输出到RabbitMQ
RabbitMQ安装及使用请参考: RabbitMQ安装及简单使用
RabbitMQ中新建exchange:maxwell,类型为:fanout。
修改Maxwell配置config.properties
producer=rabbitmq
rabbitmq_host=127.0.0.1
rabbitmq_port=5672
rabbitmq_user=guest
rabbitmq_pass=guest
rabbitmq_virtual_host=/
rabbitmq_exchange=maxwell
rabbitmq_exchange_type=fanout
rabbitmq_exchange_durable=true
Maxwell相关配置文档:https://maxwells-daemon.io/producers/#rabbitmq
启动服务:
bin/maxwell --config config.properties
6、监控
通过 http 方式获取监控指标,修改config.properties配置如下:
metrics_type=http
metrics_jvm=true
http_port=8080
启动服务:
bin/maxwell --config config.properties
打开浏览器,访问:http://ip:8080/metrics,即可获取到监控指标.
5、Maxwell与Canal 工具对比
1、Maxwell没有Canal那种server+client模式,只有一个server把数据发送到消息队列或redis。
2、Maxwell有一个亮点功能,就是Canal只能抓取最新数据,对已存在的历史数据没有办法处理。而Maxwell有一个bootstrap功能,可以直接引导出完整的历史数据用于初始化,非常好用。
3、Maxwell不能直接支持HA,但是它支持断点还原,即错误解决后重启继续读取数据。
4、Maxwell只支持json格式,而Canal如果用Server+client模式的话,可以自定义格式。
5、Maxwell比Canal更加轻量级。
扩展:
https://toutiao.io/posts/0xfdws/preview