实时采集MySQL数据之轻量工具Maxwell实操

news2024/12/27 11:48:42

文章目录

  • 概述
    • 定义
    • 原理
    • Binlog说明
    • Maxwell和Canal的区别
  • 部署
    • 安装
    • MySQL准备
    • 初始化Maxwell元数据库
    • Maxwell进程启动
      • 命令行参数
      • 配置文件
    • 实时监控Mysql输出Kafka
    • Kafka Topic分区控制
    • 实时监控MySQL指定表
    • 监控MySQL指定表同步全量数据

概述

定义

Maxwell 官网地址 https://maxwells-daemon.io/

Maxwell GitHub源码地址 https://github.com/zendesk/maxwell

Maxwell 是由美国 Zendesk 开源,采用 Java 语言开发的 MySQL 实时抓取工具,通过实时读取MySQL binlog二进制日志并作为生产者生产 JSON 格式消息发送给 Kafka、Kinesis、RabbitMQ、Redis或其他流媒体平台的应用程序。最新版本为1.39.4

Maxwell的操作开销很低,只需要mysql和数据同步目的地,常用场景包括ETL、缓存构建/过期、指标收集、搜索索引和服务间通信。

原理

Maxwell 的工作原理很简单,就是把自己伪装成 MySQL 的一个 slave,然后以 slave的身份从 MySQL master服务器复制数据,需要MySQL的binlog数据格式设置为row模式。

Binlog说明

MySQL开启binlog大概会有 1%的性能损耗,主要用于主从复制和数据恢复。二进制日志包括两类文件

  • 二进制日志索引文件(文件名后缀为.index):用于记录所有 的二进制文件
  • 二进制日志文件(文件名后缀为.00000*):记录数据库所有的 DDL 和 DML(除了数据查询语句)语句事件。

MySQL 生成的 binlog 文件初始大小一定是 154 字节,然后前缀是 log-bin 参数配置的,后缀是默认从.000001,然后依次递增(包括每次重启mysql也会递增)。除了 binlog 文件文件以外,MySQL 还会额外生产一个.index 索引文件用来记录当前使用的 binlog 文件。

Maxwell和Canal的区别

Maxwell最初的设计思想是MySQL+Kafka,用于对MySQL数据采集个人比较推荐Maxwell,当然还有我们前面学过的FlinkCDC。

  • 服务端+客户端一体,轻量级
  • 支持断点还原功能+bootstrap+json,全量同步
  • maxwell社区比canal社区活跃

image-20221219231229027

部署

安装

# 下载最新版本1.39.4的maxwell,注意从github的release历史中可知maxwell从v1.30.0开始就已经不再支持JDK8,支持JDK11
wget https://github.com/zendesk/maxwell/releases/download/v1.39.4/maxwell-1.39.4.tar.gz
# 解压文件
tar -xvf maxwell-1.39.4.tar.gz
# 进入主目录
cd maxwell-1.39.4

MySQL准备

修改mysql的配置文件,开启mysql的binlog设置,vim /etc/my.cnf

# mysql server的id,如果有多台id需要唯一
server_id=1
# 设置生成的二进制文件的前缀
log-bin=mysql-bin
# 设置binlog的二进制文件的日志级别 行级模式
binlog_format=row
# binlog的执行的库 如果不加这个参数那么mysql会对所有的库都生成对应的binlog 即对所有的库尽心binlog监控
# 设置只监控某个或某些数据库
binlog-do-db=my_maxwell_01
binlog-do-db=my_maxwell_02
# 修改后重启MySQL的服务
service mysqld restart

初始化Maxwell元数据库

在 MySQL 中建立一个 maxwell 库用于存储 Maxwell 的元数据

# 创建数据库,在我们使用的时候它会自己创建对应的表,这里我们只需建库不需要创表
CREATE DATABASE maxwell;
# 创建用户任意远程访问
CREATE USER 'maxwell'@'%';
# 修改密码
ALTER USER 'maxwell'@'%' IDENTIFIED WITH mysql_native_password BY '123456';
#给用户授权
GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@'%';
GRANT ALL PRIVILEGES ON maxwell.* TO "maxwell"@"%";
#刷新权限
FLUSH PRIVILEGES;

Maxwell进程启动

命令行参数

# 启动maxwell
bin/maxwell --user='maxwell' --password='123456' --host='hadoop3' --port=3308 --producer=stdout

详细参数配置可以查阅官网

image-20221219145234235

# 在mysql中创建前面在MySQL的配置文件中binlog数据库
CREATE DATABASE my_maxwell_01;
# 数据表,账号表
use my_maxwell_01;
CREATE TABLE `account` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `age` int(4) DEFAULT 0,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
# 插入数据
INSERT INTO account(name,age) VALUES('张三三',20),('李思',23),('王乐',26);

从日志可以看出创建数据库、数据表和数据插入捕获的binlog由maxwell转换为json的日志,向 mysql 的my_maxwell_01库的account表同时插入 3 条数据,控制台出现了 3 条 json日志,说明 maxwell 是以数据行为单位进行日志的采集的。

image-20221219150217790

通过启动maxwell后我们也可以maxwell元数据库的内容,包含核心几张表数据。

image-20221219163646478

配置文件

# 拷贝maxwell根目录下config.properties.example
cp config.properties.example config.properties
# 修改config.properties配置文件

log_level=info
producer=stdout
# mysql login info
host=hadoop3
port=3308
user=maxwell
password=123456

image-20221219151059520

# 通过指定配置文件启动maxwell
bin/maxwell --config ./config.properties
# 再次插入数据测试
INSERT INTO account(name,age) VALUES('吴三',25);
# 也可以通过jps查看maxwell进程com.zendesk.maxwell.Maxwell
jps -l

image-20221219151742515

接下里看下修改数据和删除数据json数据内容,后续主要就是针对json内容做处理

UPDATE account SET age = 28 WHERE id =4;
UPDATE account SET age = 30 WHERE name ='吴三';
DELETE FROM account WHERE id =4;

image-20221219152520787

json内容分别如下,主要是type类型区分,更新包含old数据的值,ts为秒级时间戳。

# 添加
{
    "database": "my_maxwell_01", 
    "table": "account", 
    "type": "insert", 
    "ts": 1671434231, 
    "xid": 5097, 
    "commit": true, 
    "data": {
        "id": 4, 
        "name": "吴三", 
        "age": 25
    }
}
# 更新
{
    "database": "my_maxwell_01", 
    "table": "account", 
    "type": "update", 
    "ts": 1671434616, 
    "xid": 6302, 
    "commit": true, 
    "data": {
        "id": 4, 
        "name": "吴三", 
        "age": 28
    }, 
    "old": {
        "age": 25
    }
}
# 删除
{
    "database": "my_maxwell_01", 
    "table": "account", 
    "type": "delete", 
    "ts": 1671434700, 
    "xid": 6584, 
    "commit": true, 
    "data": {
        "id": 4, 
        "name": "吴三", 
        "age": 30
    }
}

实时监控Mysql输出Kafka

# 启动 Maxwell 监控 binlog
bin/maxwell --user='maxwell' --password='123456' --host='hadoop3' --producer=kafka --kafka.bootstrap.servers=kafka1:9092 --kafka_topic=test_topic_1

一旦mysql表有了数据的更新么mysql底层的binlog文件肯定会有变化,binlog变化了则maxwell进程就能捕捉到这个变化,将之解析并转换为json数据写入到kafka里面。使用kafka的图形化工具kafka tool查看数据,点击test_topic_1查看,不更新数据这里的topic是不会被创建的,插入数据后test_topic_1就有相关的消息数据了

# 继续插入数据
INSERT INTO account(name,age) VALUES('刘说',27);

image-20221219155314573

# 官方的命令行消费脚本也可以消费到数据
kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test_topic_1

通过 kafka 消费者来查看到了数据,说明数据成功传入 kafka

image-20221219155127513

Kafka Topic分区控制

上面的示例往kafka写入的消息都是是发往一个分区,在实际生产环境中一般都会用 maxwell 监控多个 mysql 库的数据,然后将这些数据发往 kafka 的一个主题 Topic,提高并发度主题肯定是多分区的。先创建一个名称test_topic_2,分区为3副本为2的topic。

image-20221219160131248

这次使用配置文件方式,修改config.properties

# 配置生产者是kafka
producer=kafka
# kafka的server
kafka.bootstrap.servers=kafka1:9092
# 指定topoic
kafka_topic=test_topic_2
#按什么分区 [database, table, primary_key, transaction_id, thread_id, column]
producer_partition_by=database

image-20221219161020889

使用配置文件的方式启动maxwell进程

bin/maxwell --config ./config.properties
# 在mysql中创建前面在MySQL的配置文件中binlog数据库
CREATE DATABASE my_maxwell_02;
# 数据表,账号表
use my_maxwell_02;
CREATE TABLE `account` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `age` int(4) DEFAULT 0,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
# 分别往两个数据库的account表插入一条数据
INSERT INTO my_maxwell_01.account(name,age) VALUE('李丹',30);
INSERT INTO my_maxwell_02.account(name,age) VALUE('李丹',30);

再来看看topic,在test_topic_2中两条数据分区分别为0和1,验证不同数据库的数据会发往不同的分区。

image-20221219161527606

实时监控MySQL指定表

主要是通过–filter参数设置exclude排除,include是包含来实现

# 启动maxwell
bin/maxwell --user='maxwell' --password='123456' --host='hadoop3' --filter 'exclude: *.*, include:my_maxwell_01.product' --port=3308 --producer=stdout

创建新的数据表,分别往两张表插入数据

use my_maxwell_02;
CREATE TABLE `product` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `type` int(4) DEFAULT 0,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

INSERT INTO my_maxwell_01.account(NAME,age) VALUE('李丹',30);
INSERT INTO my_maxwell_01.product(NAME,TYPE) VALUE('iphone13',1);

image-20221219162849650

可以看到只有product表捕获变更数据,此外还可以设置 include:my_maxwell_01.*,通过此种方式来监控 mysql 某个库的所有表,也就是说过滤整个库。

监控MySQL指定表同步全量数据

监控MySQL指定表同步全量数据常用于数据初始化操作,Maxwell 进程默认只能监控 mysql 的 binlog 日志的新增及变化的数据,但是Maxwell 是支持数据初始化的,可以通过修改 Maxwell 的元数据,来对 MySQL 的某张表进行数据初始化,也就是我们常说的全量同步。接下来演示将 my_maxwell_01库下的 account表的5条数据全量导入输出到 maxwell 控制台。

前面我们创建过maxwel的元数据库,这里需求修改 Maxwell 的元数据以触发数据初始化机制,在 mysql 的 maxwell 库中 bootstrap表中插入一条数据,写明需要全量数据的库名和表名

insert into maxwell.bootstrap(database_name,table_name) values('my_maxwell_01','account');

image-20221219164054827

# 启动 maxwell 进程,此时初始化程序会直接打印account表的所有数据
bin/maxwell --user='maxwell' --password='123456' --host='hadoop3' --port=3308 --producer=stdout

image-20221219164159981

当数据全部初始化完成以后,Maxwell 的元数据会变化,is_complete 字段从 0 变为 1,start_at 字段从 null 变为具体时间(数据同步开始时间),complete_at 字段从 null 变为具体时间(数据同步结束时间),在Maxwell 运行过程中继续往maxwell.bootstrap插入数据也会触发处理。

image-20221219164244308

本人博客网站IT小神 www.itxiaoshen.com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/102435.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

3D相机获取点云信息的几种方法

在计算机中, 图像由一个个像素点组成。图像数据存储在每一个像素点中,每一个像素点包含了被测物体的信息。除了常见的RGB信息或者灰度信息以外,还可以包含深度信息和坐标等其它信息。在某个坐标系下的点的数据集又被称为点云。点云里的每一个点包含了丰富…

Windows取证——数据恢复(Fat32文件系统和NTFS文件系统)

目录 一、磁盘存储结构 (一)分区表 1.MBR 分区表(主引导记录Master Boot Record , mbr) 小实验:将磁盘改为活动分区 2.GPT 分区表(全局唯一标识分区表GUID PartITion Table,gpt …

《SpringBoot篇》24.SpringBoot整合Freemarker超详细教程

陈老老老板🦸👨‍💻本文专栏:SpringBoot篇(主要讲一些与springboot整合相关的内容)👨‍💻本文简述:本文讲一下SpringBoot整合Freemarker的整合教程超详细教程。&#x1f…

SPL 和 SQL 能不能融合在一起?

文章目录SPL 和 SQL 能不能融合在一起?SPL资料SPL 和 SQL 能不能融合在一起? SQL和SPL都是面向结构化数据的通用处理技术。SQL普及率高受众广,很多用户天生就会用SQL查询数据,如果数据引擎支持SQL就会很容易上手,而且…

【MySQL】数据库约束与聚合查询和联合查询等进阶操作知识汇总

目录1.数据库约束:1.1 约束的类型:1.2 unique:1.3 primary key:1.3.1 分布式系统下,自增主键如何生成唯一id:1.4 foreign key:1.4.1 逻辑删除:2.表的设计/数据库的设计:2.1 数据库是如何设计的?3.进阶插入操作:4.进阶查询:4.1 聚合查询:4.2 group by 列名:4.3 联合查询/多表查…

2508. 添加边使所有节点度数都为偶数 c++

给你一个有 n 个节点的 无向 图,节点编号为 1 到 n 。再给你整数 n 和一个二维整数数组 edges ,其中 edges[i] [ai, bi] 表示节点 ai 和 bi 之间有一条边。图不一定连通。 你可以给图中添加 至多 两条额外的边(也可以一条边都不添加&#x…

C51——定时器控制寄存器

定时器的本质原理: 每经过一个一起周期,就加1 在寄存器里加 当我们想要操作寄存器的时候 就要找到TCON 当它开始数数的时候,会有天花板,会有溢出。 那我们怎么知道他溢出了?有TF0,当TF0 出现变化的时…

【剧前爆米花--爪哇岛寻宝】

作者:困了电视剧 专栏:《JavaSE语法与底层详解》 文章分布:这是一篇关于接口的文章,在本篇文章中我会将接口常用的一些实例进行讲解,以及部分方法在重写中的思想。 目录 Comparable和Comparator接口使用 Object类 t…

JDBC的简单使用与封装

目录 1、JDBC 2、JDBC的常用接口 1.Driver接口 2.Connection接口 3.Statement接口 4.ResultSet接口 3、JDBC的基本使用 1)、简单的增删查改 Ⅰ、查 Ⅱ、增 Ⅲ、改 Ⅳ、删 2)简单封装 1、JDBC 我们先了解JDBC是什么,JDBC的全称是Java数…

2018年全国职业技能大赛中职组“网络安全”赛项—基础题(解析)

2018年全国职业技能大赛中职组“网络安全”赛项—基础题(解析) 任务一、nmap 1、nmap sP -n 192.168.100.133 //进行ping扫描 1.1、nmap -Pn -n 192.168.100.133 //目标禁用 绕过ping 扫描 2、nmap -A -n ip // 对ip进行综合性扫描 3、nmap -O -n…

TiDB-学习笔记

编写这个笔记,希望能记录下学习TiDB时候的知识点。 目录 第一章 1.事务 1.1 SQL-92标准: 1.2 事务的隔离级别 2.在TiDB学习 SQL 语句 第二章 第三章 第一章 1.事务 事务的特性(ACID) atomicity原子性、consistency一致性、i…

网上花店网页代码 html静态花店网页设计制作 dw静态鲜花网页成品模板素材网页 web前端网页设计与制作 div静态网页设计

常见网页设计作业题材有 个人、 美食、 公司、 学校、 旅游、 电商、 宠物、 电器、 茶叶、 家居、 酒店、 舞蹈、 动漫、 服装、 体育、 化妆品、 物流、 环保、 书籍、 婚纱、 游戏、 节日、 戒烟、 电影、 摄影、 文化、 家乡、 鲜花、 礼品、 汽车、 其他等网页设计题目, A…

全志V853 在 NPU 转换 YOLO V3 模型

NPU 转换 YOLO V3 模型 YOLO 全称是 You Only Look Once(你只需看一次),从名称上也能看出这种算法速度快的优势,因此在许多边缘设备上,YOLO 算法的使用十分广泛。YOLOV3 是华盛顿大学研究生 Joseph Redmon 所开发&…

matlab学习笔记(九)

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 matlab学习笔记(九)一、信号采样二、信号重建一、信号采样 信号的采样原理图如下图所示: 其数学模型表示为: 其中的f(t)为原…

11、Mysql高级之SQL优化

11、Mysql高级之SQL优化 文章目录11、Mysql高级之SQL优化0 SQL优化1 大批量插入数据2 优化insert语句3 优化order by语句3.1 环境准备3.2 两种排序方式3.3 Filesort 的优化4 优化group by 语句5 优化嵌套查询6 优化OR条件7 优化分页查询7.1 优化思路一7.2 优化思路二8 使用SQL提…

【Budibase】搭建低代码开发平台

Budibase介绍 Budibase 是一套开源的低代码开发平台,支持一键数据库及API 接入,支持简单的 JS 关联前后端数据,有细致的权限访问管理,对移动端有良好的支持。它主打企业流程自动化,有完善的自动化流程设计&#xff0c…

NETDMIS5.0界面介绍

1.启动界面介绍 双击桌面快捷方式,打开NET.DMISS启动界面如下图,包含三个菜单是文件、设置语言、帮助。 2.工作界面介绍 默认测量界面包括主菜单,工具条,节点程序界面(显示测量过程),CAD视窗…

(免费分享)基于ssm简易网盘系统

开发工具:IDEA,mysql5.7 Tomcat8.0,jdk1.8 package cn.tangtj.clouddisk.web;import cn.tangtj.clouddisk.entity.User; import cn.tangtj.clouddisk.utils.UserUtil; import org.apache.logging.log4j.LogManager; import org.apache.loggi…

嵌入式应用开发|Linux文件I/O常用的四种访问方式

在Linux系统中&#xff0c;一切都可以看成"文件"<通常包含&#xff1a;普通文件、驱动文件、网络通信文件等等>&#xff0c;系统中所有的操作都可以通过文件I/O实现&#xff0c;因此&#xff0c;掌握文件常用接口很有必要。 0.前言 屏幕前的你如果懂得main函数…

SAP S4 如何快速配置一家公司

之前配置公司采用的方法是copy的方式来配置&#xff0c;后来在做其他配置的时候发现&#xff0c;copy的方法会把很多不需要的配置也copy进来了。所就我从新定义了一个公司来进行学习研究。配置一定空的公司很简单&#xff0c;只需要三步&#xff1a; 1 定义和分配公司 • 公司…