Maxwell 概述、安装、数据同步【一篇搞定】!

news2025/1/22 22:55:15

文章目录

      • 什么是 Maxwell?
      • Maxwell 输出格式
      • Maxwell 工作原理
      • Maxwell 安装
      • Maxwell 历史数据同步
      • Maxwell 增量数据同步

什么是 Maxwell?

Maxwell 在大数据领域通常指的是一个用于数据同步和数据捕获的开源工具,由美国 Zendesk 开源,用 Java 编写的 MySQL 等关系型数据库的实时抓取软件。

Maxwell 可以监控数据库中的更改,并将这些更改以可消费的方式传递给其他系统。它通常用于实时数据管道、数据仓库、事件驱动架构等场景中,帮助将数据库中的变更数据传送到其他系统,以便进行分析、报告和其他数据处理操作。

Maxwell 的特点包括:

  1. 实时数据捕获:Maxwell 可以实时地捕获数据库中的更改,包括插入、更新和删除操作。

  2. 支持多种数据库:它可以与多种关系型数据库系统(如 MySQL、PostgreSQL)集成。

  3. JSON输出:Maxwell 通常以 JSON 格式输出变更数据,这种格式易于处理和解析。

  4. 可配置性:用户可以根据自己的需求配置 Maxwell,包括选择要捕获的表格、输出目标等。

  5. 高性能:Maxwell 经过优化,可以处理高吞吐量的数据流。

Maxwell 官网地址

Maxwell 输出格式

下面通过一个官方案例来了解 Maxwell 的输出格式。

mysql> update `test`.`maxwell` set mycol = 55, daemon = 'Stanislaw Lem';

maxwell -> kafka: 
  {
    "database": "test",
    "table": "maxwell",
    "type": "insert",
    "ts": 1449786310,
    "data": { "id":1, "daemon": "Stanislaw Lem", "mycol": 55 },
    "old": { "mycol":, 23, "daemon": "what once was" }
  }

Stage Analysis

  1. 首先,执行了一条 MySQL 的 UPDATE 语句,用于更新名为 test 数据库中的 maxwell 表中的数据。它将 mycol 字段的值更改为 55,将 daemon 字段的值更改为 'Stanislaw Lem'

  2. maxwell -> kafka:这一行表示 Maxwell 捕获到了 MySQL 数据库中的更新操作,并将其传输到 Kafka 消息队列中。

  3. JSON 数据块:以下是 JSON 格式的数据块,其中包含了关于更新操作的详细信息:

    • "database": "test":这是更新操作所在的数据库名称,即 test

    • "table": "maxwell":这是更新操作所在的表格名称,即 maxwell

    • "type": "update":这指示 Maxwell 将此更新操作视为 UPDATE 类型,因为实际上是对表中的数据进行了更新。

    • "ts": 1449786310:这是时间戳,表示更新操作发生的时间。

    • "data":这个部分包含了新的数据,包括 iddaemonmycol 字段的新值,表示更新后的值。

    • "old":这个部分包含了旧的数据,包括 mycoldaemon 字段的旧值,表示更新前的值。在示例中,mycol 字段的旧值为 23daemon 字段的旧值为 'what once was'

这个示例演示了一个 MySQL 数据库中的 UPDATE 操作,Maxwell 捕获了这个操作并将其转化为 JSON 格式的事件,然后将这些事件发送到 Kafka 消息队列,以供其他系统订阅和处理。

Maxwell 工作原理

Maxwell 是一个开源的数据变更捕获工具,它的主要作用是捕获关系型数据库中的数据变更事件,并将这些事件以结构化的方式传送到消息队列(通常是Kafka)或其他目标,以便其他系统可以实时处理这些变更数据。

下面是 Maxwell 的工作原理简要解释:

  1. 数据库 Binlog 解析:Maxwell 通过订阅数据库的二进制日志(Binlog)来实时监控数据库中的变更。Binlog 是数据库引擎记录数据库操作的详细日志,包括插入、更新和删除等操作。

  2. 数据解析:一旦 Maxwell 连接到数据库的 Binlog,它开始解析 Binlog 中的数据变更事件。Maxwell 能够解析这些事件并将其转化为易于理解和处理的数据格式。

  3. 数据重构:Maxwell 将解析后的数据重构为 JSON 格式或其他结构化数据格式,以便后续系统可以轻松处理和消费这些数据。

  4. 数据发布:捕获的数据变更事件被发送到一个消息队列(通常是 Kafka),以实现异步和实时的数据传输。将数据发送到消息队列允许其他系统根据需要消费数据,而不会对原始数据库产生太多负载。

  5. 事件处理:一旦数据变更事件被发送到消息队列,其他系统可以订阅这些事件并处理它们。这些系统可以是数据仓库、实时分析系统、缓存系统、搜索引擎或其他需要实时数据的应用程序。

  6. 可配置性:Maxwell 具有丰富的配置选项,允许用户指定要捕获的数据库、表格、字段,以及如何处理捕获的事件。

Maxwell 的工作原理可以概括为捕获、解析、重构、发布和处理数据库中的数据变更事件。通常用于与流数据处理系统和实时分析工具集成,以支持实时数据分析和应用程序。

Maxwell 安装

在这里插入图片描述

安装之前需要注意,从 v1.30.0 开始,Maxwell 不再支持 JDK1.8,所以安装之前注意 JDK 版本!

官方快速入门文档

官方版本说明与下载

本节使用最后一个支持 JDK1.8 版本的 Maxwell v1.29.2 进行部署。

  1. 解压文件
tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz -C /opt/module/
  1. 添加环境变量
sudo vim /etc/profile.d/my.sh

# 添加如下内容:
#MAXWELL_HOME
export MAXWELL_HOME=/opt/module/maxwell-1.29.2
export PATH=$PATH:$MAXWELL_HOME/bin

刷新环境变量:source /etc/profile.d/my.sh

  1. 配置 MySQL
sudo vim /etc/my.cnf

# 添加如下内容:
[mysqld]
#maxwell 需要指定 Binlog 日志以"行级别"的方式进行记录
binlog_format=row
#MySQL服务器的唯一标识号
server_id=1 
#启用二进制日志 Binlog,指定 "master" 作为 Binlog 文件的前缀名
log-bin=master

配置完成后,重启 MySQL:

sudo systemctl restart mysqld.service
  1. 创建 MySQL 用户并给予权限
CREATE USER 'maxwell'@'%' IDENTIFIED BY '000000';

GRANT ALL ON maxwell.* TO 'maxwell'@'%';

GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
  1. 修改配置文件
cd $MAXWELL_HOME

cp config.properties.example config.properties

vim config.properties

############ 添加如下信息 ############

# 指定生产者对象
producer=kafka

# 指定 kafka 目标机器
kafka.bootstrap.servers=hadoop120:9092,hadoop121:9092,hadoop122:9092

# 指定 kafka topic
kafka_topic=maxwell

# 指定 mysql 连接信息
host=hadoop120
user=maxwell
password=000000
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true

# 指定数据按照主键分组进入 kafka 不同分区,避免数据倾斜
producer_partition_by=primary_key

在这里插入图片描述

  1. 启动 Maxwell

启动 Maxwell 之前请先启动 MySQL、Zookeeper、Kafka。

cd $MAXWELL_HOME

maxwell --config config.properties --daemon
  • --daemon:告诉 Maxwell 以守护进程模式运行,也就是在后台运行而不会阻塞当前终端。

在这里插入图片描述

  1. 验证 Maxwell 是否启动成功
ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l
  • ps -ef:列出所有进程信息。

  • grep com.zendesk.maxwell.Maxwell:查找 Maxwell 进程。

  • grep -v grep:排除包含字符串 "grep" 的行。在查找进程时,通常会包含一个与查找本身相关的 "grep" 进程,因此我们需要排除它。

  • wc -l:统计行数。

结果为 1 则表示启动成功了,为 0 则表示服务没有启动成功。

在这里插入图片描述
通过 jps 命令查看:

在这里插入图片描述

  1. 设置自动启停脚本
sudo vim /bin/mxw

添加下列内容:

#! /bin/bash

if [[ $# -ne 1 ]]; then
	echo "参数有误,请重新输入!"
elif [[ "$1" = "start" ]]; then
	echo "-----------------$host MAXWELL START-----------------"
	maxwell -config $MAXWELL_HOME/config.properties --daemon
elif [[ "$1" = "stop" ]]; then
	echo "-----------------$host MAXWELL STOP-----------------"
	ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
elif [[ "$1" = "restart" ]]; then
	echo "-----------------$host MAXWELL RESTART-----------------"
	ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
	maxwell -config $MAXWELL_HOME/config.properties --daemon
elif [[ "$1" = "status" ]]; then
	echo "-----------------$host MAXWELL STATUS-----------------"
	ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l
fi

给予权限:

sudo chmod +755 /bin/mxw

Maxwell 历史数据同步

当前在 MySQL 中创建了 finance_result 库,其中存储了许多表,现在对该库中的表进行全量历史数据同步。

在这里插入图片描述

创建 Kakfa 消费者

kafka-console-consumer.sh --bootstrap-server hadoop120:9092 --topic maxwell

注意,这里创建的 topic 要和 Maxwell 配置文件中设置的 topic 主题保持一致。

启动 Maxwell 历史数据同步,指定库名和表名。

cd $MAXWELL_HOME

maxwell-bootstrap --database finance_result --table industry --config $MAXWELL_HOME/config.properties

程序正常启动后 Kafka 消费端将会收到 Maxwell 发送过来的数据

在这里插入图片描述

其中发送过来的数据第一行及最后一行数据是标识 Maxwell 历史数据同步的,不携带任何数据。

maxwell -> kafka: 
{
	"database": "finance_result",
	"table": "industry",
	"type": "bootstrap-start",
	"ts": 1694748250,
	"data": {}
}

{
	"database": "finance_result",
	"table": "industry",
	"type": "bootstrap-insert",
	"ts": 1694748250,
	"data": {
		"id": 1,
		"create_time": "2022-08-19 00:00:00.000000",
		"update_time": "2022-08-19 00:00:00.000000",
		"industry_level": 1,
		"industry_name": "工程建设",
		"superior_industry_id": null
	}
} {
	"database": "finance_result",
	"table": "industry",
	"type": "bootstrap-insert",
	"ts": 1694748250,
	"data": {
		"id": 2,
		"create_time": "2022-08-19 00:00:00.000000",
		"update_time": "2022-08-19 00:00:00.000000",
		"industry_level": 1,
		"industry_name": "轻工",
		"superior_industry_id": null
	}
} {
	"database": "finance_result",
	"table": "industry",
	"type": "bootstrap-insert",
	"ts": 1694748250,
	"data": {
		"id": 3,
		"create_time": "2022-08-19 00:00:00.000000",
		"update_time": "2022-08-19 00:00:00.000000",
		"industry_level": 2,
		"industry_name": "土木",
		"superior_industry_id": 1
	}
}
......

{
	"database": "finance_result",
	"table": "industry",
	"type": "bootstrap-complete",
	"ts": 1694748250,
	"data": {}
}

Maxwell 增量数据同步

增量数据同步就是对 MySQL 中的所有库表进行实时监听,你对库表执行任何的操作都会发送到 Kakfa 消费者中。

先创建 Kakfa 消费者

kafka-console-consumer.sh --bootstrap-server hadoop120:9092 --topic maxwell

注意,这里创建的 topic 要和 Maxwell 配置文件中设置的 topic 主题保持一致。

启动 Maxwell 增量数据同步

cd $MAXWELL_HOME

maxwell --config config.properties --daemon

修改所监听 MySQL 库中任意表的一条数据,进行测试:

update finance_result.industry set industry_name="纱线行业" where id = 11;

finance_result.industry 表中 id11industry_name 修改为 "纱线行业"

maxwell -> kafka: 
{
	"database": "finance_result",
	"table": "industry",
	"type": "update",
	"ts": 1694751386,
	"xid": 21025,
	"commit": true,
	"data": {
		"id": 11,
		"create_time": "2022-08-19 00:00:00.000000",
		"update_time": "2022-08-19 00:00:00.000000",
		"industry_level": 3,
		"industry_name": "纱线行业",
		"superior_industry_id": 5
	},
	"old": {
		"industry_name": "纱线行业233"
	}
}

修改完成后 Kafka 消费端将会收到 Maxwell 发送过来的增量数据信息,其中包括数据的相关更新信息以及历史数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1012090.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

千巡翼X1协调转弯功能

近年来,随着技术的飞速发展,无人机航测已成为现代测绘领域的一项重要应用。 无人机的出现极大地提高了航测的效率和精度,极大地减少了人力资源的投入。通过搭载各种高精度的航测仪器和传感器,无人机可以在短时间内完成大面积的航…

使用vscode以16进制方式查看bin文件内容

简介 方便对bin文件内容进行分析。 使用 VSCODE:插件下载 Hex Editor,下载完后使用vscode打开bin文件。 使用快捷键CtrlShiftP, 并在上方命令框输入>hex 选择 结果如下

微信小程序隐私授权

微信开发者平台新公告:2023年9月15之后,隐私协议将被启用,所以以后的小程序都要加上隐私协议的内容提示用户, 首先设置好隐私协议的内容,登录小程序的开发者后台,在设置--》服务内容声明--》用户隐私保护指…

前端JavaScript入门到精通,javascript核心进阶ES6语法、API、js高级等基础知识和实战 —— JS基础(一)

ﻌﻌﻌﻌ♡‎ﻌﻌﻌﻌ♡‎‎ﻌﻌﻌﻌ♡‎ﻌﻌﻌﻌ♡ﻌﻌﻌﻌ…

OPC HDA扫盲

目录 1 基本概念 1.1 历史数据服务器类型 1.2 数据源 1.3 对象和接口概述 1.4 所需接口定义 1.5 可选接口定义 1.6 定义 1.7 边界值和时域 2 HDA聚合 2.1 生成间隔 2.2 数据类型 2.3 数据质量 3 聚合示例 3.1 示例数据 3.2 内插(INTERPOLATIVE&#x…

构造与析构

在类的声明中,构造函数和析构函数是一类特殊的函数:由系统自动执行,在程序中不可显示地调用它们。 构造函数 作用:建立对象时对对象的数据成员进行初始化 特点: 构造函数是与类同名的特殊成员函数,没有…

Xamarin.Android实现App内版本更新

目录 1、具体的效果2、代码实现2.1 基本原理2.2 开发环境2.3 具体代码2.3.1 基本设置2.3.2 系统的权限授予2.3.3 进度条的layout文件2.3.4 核心的升级文件 3、代码下载4、知识点5、参考文献 1、具体的效果 有事需要在程序内集成自动更新的功能,网上找了下&#xff…

【ARM AMBA AXI 入门 11 - AXI 总线 AWCACHE 和 ARCACHE 介绍】

文章目录 1.1 AXI 传输事务属性1.1.1 slave type1.1.2 系统级缓存 1.2 Memory Attributes1.2.1 Bufferable,AxCACHE[0]1.2.2 Modifiable, AxCACHE[1]1.2.3 cache-allocate 1.3 Memory types 转自:https://zhuanlan.zhihu.com/p/148813963 如有侵权请联系…

学习记忆——英语篇

文章目录 英语字母形象起源右脑记忆单词的原则四大步骤第一步:摄取信息第二步:处理信息第三步:储存信息第四步:提取信息 训练例子字母形象训练 右脑记忆单词5大方法字源法编码法字母编码法字母组合编码法 拼音法全拼法拼音组合 熟…

前K个高频单词-c++实现

692. 前K个高频单词 - 力扣(LeetCode) 给定一个单词列表 words 和一个整数 k ,返回前 k 个出现次数最多的单词。 返回的答案应该按单词出现频率由高到低排序。如果不同的单词有相同出现频率, 按字典顺序 排序。 示例 1&#xff…

关于Linux服务器.sh文件启动问题

问题描述 在linux服务器上使用文本编辑(并非vim操作)对.sh脚本文件进行修改后无法启动,显示’\r’识别错误等。 错误如下: 错误原因 因为.sh文件在经过这种编辑后格式产生了错误,由unix转为了doc格式,需…

Ae 效果:CC Particle Systems II

模拟/CC Particle Systems II Simulation/CC Particle Systems II CC Particle Systems II(CC 粒子系统 II)可用于生成和模拟各种类型的粒子系统,包括火焰、雨、雪、爆炸、烟雾等等。 与 CC Particle World 效果相比有许多类似的属性。最大的…

华为云云耀云服务器L实例评测|部署功能强大的监控和可视化工具Grafana

应用场景 Grafana介绍 Grafana是一个功能强大的监控和可视化工具,适用于各种行业和应用场景,如IT运维监控、网络监控、能源管理、金融市场分析等。它提供了灵活的数据源支持、强大的可视化功能和告警机制,以及注释和过滤功能,使…

阿里云服务器全方位介绍_优势_使用场景_限制说明

阿里云服务器是什么?云服务器ECS是一种安全可靠、弹性可伸缩的云计算服务,云服务器可以降低IT成本提升运维效率,免去企业或个人前期采购IT硬件的成本,阿里云服务器让用户像使用水、电、天然气等公共资源一样便捷、高效地使用服务器…

使用cpolar配合Plex打造个人媒体站,畅享私人影音娱乐空间

文章目录 1.前言2. Plex网站搭建2.1 Plex下载和安装2.2 Plex网页测试2.3 cpolar的安装和注册 3. 本地网页发布3.1 Cpolar云端设置3.2 Cpolar本地设置 4. 公网访问测试5. 结语 1.前言 用手机或者平板电脑看视频,已经算是生活中稀松平常的场景了,特别是各…

langchain+GPT+neo4j 图数据库

neo4j版本是5.11.0,langchain的版本 0.0.288下载apoc插件 https://neo4j.com/docs/apoc/current/installation/ neo4j.conf文件把apoc.*添加到dbms.security.procedures.unrestricted配置项 使用return apoc.version()来查看是否安装成功 pip install neo4j图 参考官网&…

以太网的简单概念、MAC地址与IP地址的关系

以太网 DIX Ethernet V2标准的局域网------以太网。 IEEE 802.3标准和DIX Ethernet V2标准很相似,只有些许区别,不严格的来说,802.3局域网也叫做以太网。以太网是一个局域网,信息以广播的形式发送。 IEEE 802标准定义的局域网参…

哈工大校园网显示IP地址错误连接不上

您当前获取到的IP地址有误,请重新开关无线获取IP地址(注:电脑端还可以通过cmd窗口,输入ipconfig /release、ipconfig /renew命令)。如未解决此问题请联系网络安全和信息化办公室处理。 当校园网登录时会出现如上情况,并且当你按照他的方法尝试…

vue2使用vuedraggable实现拖拽删除添加重置功能

需求:要输入xx阶段,之后输入后显示但是要可以自己手动排序和删除,以免写错了,并且做了判断,如果重复输入的话会提示,不会让他添加,点击重置功能后一键清空所有输入 1.效果 2.下载插件 我直接下…

【Linux】自动化构建工具:make/Makefile

​👻内容专栏: Linux操作系统基础 🐨本文概括: 工具使用的背景、理解make/makefile工具、探索工作原理(文件修改时间的对比)、.PHONY伪目标、特性等。 🐼本文作者: 阿四啊 🐸发布时间&#xff1…