大数据工具Maxwell的使用

news2024/11/15 12:30:47

1.Maxwell简介

Maxwell 是由美国Zendesk公司开源,用Java编写的MySQL变更数据抓取软件。它会实时监控Mysql数据库的数据变更操作(包括insert、update、delete),并将变更数据以 JSON 格式发送给 Kafka、Kinesi等流数据处理平台。

官网地址:http://maxwells-daemon.io/

  • Maxwell输出数据格式

在这里插入图片描述

注:Maxwell输出的json字段说明:

字段解释
database变更数据所属的数据库
table表更数据所属的表
type数据变更类型
ts数据变更发生的时间
xid事务id
commit事务提交标志,可用于重新组装事务
data对于insert类型,表示插入的数据;对于update类型,标识修改之后的数据;对于delete类型,表示删除的数据
old对于update类型,表示修改之前的数据,只包含变更字段

2.Maxwell原理

Maxwell的工作原理是实时读取MySQL数据库的二进制日志(Binlog),从中获取变更数据,再将变更数据以JSON格式发送至Kafka等流处理平台。

2.1 MySQL二进制日志

二进制日志(Binlog)是MySQL服务端非常重要的一种日志,它会保存MySQL数据库的所有数据变更记录。Binlog的主要作用包括主从复制和数据恢复。Maxwell的工作原理和主从复制密切相关。

2.2 MySQL主从复制

MySQL的主从复制,就是用来建立一个和主数据库完全一样的数据库环境,这个数据库称为从数据库。

  • 主从复制的应用场景如下:
    • 做数据库的热备:主数据库服务器故障后,可切换到从数据库继续工作。
    • 读写分离:主数据库只负责业务数据的写入操作,而多个从数据库只负责业务数据的查询工作,在读多写少场景下,可以提高数据库工作效率。
  • 主从复制的工作原理如下:
    • Master主库将数据变更记录,写到二进制日志(binary log)中
    • Slave从库向mysql master发送dump协议,将master主库的binary log events拷贝到它的中继日志(relay log)
    • Slave从库读取并回放中继日志中的事件,将改变的数据同步到自己的数据库。

在这里插入图片描述

2.3 原理总结

很简单,就是将自己伪装成slave,并遵循MySQL主从复制的协议,从master同步数据。

3.Maxwell部署

3.1 安装Maxwell

  • 下载安装包

    1. 地址:https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz

      注:Maxwell-1.30.0及以上版本不再支持JDK1.8

    2. 将安装包上传到hadoop102节点的/opt/software目录

      注:此处使用教学版安装包,教学版对原版进行了改造,增加了自定义Maxwell输出数据中ts时间戳的参数,生产环境请使用原版。

  • 将安装包解压至/opt/module*

[root@hadoop102 maxwell]$ tar -zxvf maxwell-1.29.2.tar.gz -C /opt/module/
  • 修改名称
[root@hadoop102 module]$ mv maxwell-1.29.2/ maxwell

3.2 配置MySQL

3.2.1 启用MySQL Binlog

MySQL服务器的Binlog默认是未开启的,如需进行同步,需要先进行开启。

  1. 修改MySQL配置文件/etc/my.cnf
  2. 增加如下配置
[mysqld]

#数据库id
server-id = 1
#启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
#binlog类型,maxwell要求为row类型
binlog_format=row
#启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=gmall

注:MySQL Binlog模式

  • Statement-based:基于语句,Binlog会记录所有写操作的SQL语句,包括insert、update、delete等。

    优点: 节省空间

    缺点: 有可能造成数据不一致,例如insert语句中包含now()函数。

  • Row-based:基于行,Binlog会记录每次写操作后被操作行记录的变化。

    优点:保持数据的绝对一致性。

    缺点:占用较大空间。

  • mixed:混合模式,默认是Statement-based,如果SQL语句可能导致数据不一致,就自动切换到Row-based。

Maxwell要求Binlog采用Row-based模式。

  1. 重启MySQL服务
[root@hadoop102 ~]$ sudo systemctl restart mysqld

3.2.2 创建Maxwell所需数据库和用户

Maxwell需要在MySQL中存储其运行过程中的所需的一些数据,包括binlog同步的断点位置(Maxwell支持断点续传)等等,故需要在MySQL为Maxwell创建数据库及用户。

  • 创建数据库
msyql> CREATE DATABASE maxwell;
  • 调整MySQL数据库密码级别
mysql> set global validate_password_policy=0;
mysql> set global validate_password_length=4;
  • 创建Maxwell用户并赋予其必要权限
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

3.3 配置Maxwell

  • 修改Maxwell配置文件名称
[root@hadoop102 maxwell]$ cd /opt/module/maxwell
[root@hadoop102 maxwell]$ cp config.properties.example config.properties
  • 修改Maxwell配置文件
[root@hadoop102 maxwell]$ vim config.properties

#Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
#目标Kafka集群地址
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092
#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
kafka_topic=maxwell

#MySQL相关配置
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai

4.Maxwell使用

4.1 启动Kafka集群

若Maxwell发送数据的目的地为Kafka集群,则需要先确保Kafka集群为启动状态。

4.2 Maxwell启停

  • 启动Maxwell
[root@hadoop102 ~]$ /opt/module/maxwell/bin/maxwell --config /opt/module/maxwell/config.properties --daemon
  • 停止Maxwell
[root@hadoop102 ~]$ ps -ef | grep maxwell | grep -v grep | grep maxwell | awk '{print $2}' | xargs kill -9
  • Maxwell启停脚本

    • 创建并编辑Maxwell启停脚本
    [root@hadoop102 bin]$ vim mxw.sh
    
    • 脚本内容如下
    #!/bin/bash
    
    MAXWELL_HOME=/opt/module/maxwell
    
    status_maxwell(){
        result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`
        return $result
    }
    
    
    start_maxwell(){
        status_maxwell
        if [[ $? -lt 1 ]]; then
            echo "启动Maxwell"
            $MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon
        else
            echo "Maxwell正在运行"
        fi
    }
    
    
    stop_maxwell(){
        status_maxwell
        if [[ $? -gt 0 ]]; then
            echo "停止Maxwell"
            ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
        else
            echo "Maxwell未在运行"
        fi
    }
    
    
    case $1 in
        start )
            start_maxwell
        ;;
        stop )
            stop_maxwell
        ;;
        restart )
           stop_maxwell
           start_maxwell
        ;;
    esac
    

4.3 增量数据同步

  • 启动Kafka消费者
[root@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic maxwell
  • 模拟生成数据
[root@hadoop102 db_log]$ java -jar gmall2020-mock-db-2021-01-22.jar
  • 观察Kafka消费者
{
    "database": "gmall",
    "table": "comment_info",
    "type": "insert",
    "ts": 1634023510,
    "xid": 1653373,
    "xoffset": 11998,
    "data": {
        "id": 1447825655672463369,
        "user_id": 289,
        "nick_name": null,
        "head_img": null,
        "sku_id": 11,
        "spu_id": 3,
        "order_id": 18440,
        "appraise": "1204",
        "comment_txt": "评论内容:12897688728191593794966121429786132276125164551411",
        "create_time": "2020-06-16 15:25:09",
        "operate_time": null
    }
}
{
    "database": "gmall",
    "table": "comment_info",
    "type": "insert",
    "ts": 1634023510,
    "xid": 1653373,
    "xoffset": 11999,
    "data": {
        "id": 1447825655672463370,
        "user_id": 774,
        "nick_name": null,
        "head_img": null,
        "sku_id": 25,
        "spu_id": 8,
        "order_id": 18441,
        "appraise": "1204",
        "comment_txt": "评论内容:67552221621263422568447438734865327666683661982185",
        "create_time": "2020-06-16 15:25:09",
        "operate_time": null
    }
}

4.4 历史数据全量同步

我们已经实现了使用Maxwell实时增量同步MySQL变更数据的功能。但有时只有增量数据是不够的,我们可能需要使用到MySQL数据库中从历史至今的一个完整的数据集。这就需要我们在进行增量同步之前,先进行一次历史数据的全量同步。这样就能保证得到一个完整的数据集。

把全部的数据输入到Kafka

4.4.1 Maxwell-bootstrap

Maxwell提供了bootstrap功能来进行历史数据的全量同步,命令如下:

指定数据库名,指定表名,指定配置文件,一般上线的时候用一次就行了

[root@hadoop102 maxwell]$ /opt/module/maxwell/bin/maxwell-bootstrap --database gmall --table user_info --config /opt/module/maxwell/config.properties

4.4.2 boostrap数据格式

采用bootstrap方式同步的输出数据格式如下:

{
    "database": "fooDB",
    "table": "barTable",
    "type": "bootstrap-start",
    "ts": 1450557744,
    "data": {}
}
{
    "database": "fooDB",
    "table": "barTable",
    "type": "bootstrap-insert",
    "ts": 1450557744,
    "data": {
        "txt": "hello"
    }
}
{
    "database": "fooDB",
    "table": "barTable",
    "type": "bootstrap-insert",
    "ts": 1450557744,
    "data": {
        "txt": "bootstrap!"
    }
}
{
    "database": "fooDB",
    "table": "barTable",
    "type": "bootstrap-complete",
    "ts": 1450557744,
    "data": {}
}

注意事项:

  • 第一条type为bootstrap-start和最后一条type为bootstrap-complete的数据,是bootstrap开始和结束的标志,不包含数据,中间的type为bootstrap-insert的数据才包含数据。
  • 一次bootstrap输出的所有记录的ts都相同,为bootstrap开始的时间。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/341188.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Elasticsearch7.8.0版本进阶——路由计算

目录一、路由计算1.1、路由计算的前提理解1.2、路由计算的概述1.3、路由计算的概述一、路由计算 1.1、路由计算的前提理解 当索引一个文档的时候,文档会被存储到一个主分片中。Elasticsearch 如何知道一个文档应该存放到哪个分片中呢?当我们创建文档时…

mycat连接mysql 简单配置

mycat三个配置文件位于conf下 可通过Notepad操作 首先配置service.xml中的user标签&#xff0c;设置用户名&#xff0c;密码&#xff0c;查询权限&#xff0c;是否只读等 只是设置了root用户&#xff0c;有所有权限 配置schema.xml <?xml version"1.0"?&g…

FPGA开发软件(vivado + modelsim)环境搭建(附详细步骤)

本文详细介绍了vivado软件和modelsim软件的安装&#xff0c;以及vivado中配置modelsim仿真设置&#xff0c;每一步都加文字说明和图片。一、软件安装包下载1、vivado vivado版本很多&#xff0c;目前最新的已更新到vivado2022.2&#xff0c;版本越高&#xff0c;安装包越大&…

PTA甲级-1010 Radix c++

文章目录Input Specification:Output Specification:Sample Input 1:Sample Output 1:Sample Input 2:Sample Output 2:一、题干大意![在这里插入图片描述](https://img-blog.csdnimg.cn/68d84d3ea86e4aaab002152ae8472e05.png#pic_center)二、题解要点三、具体实现总结Given a…

【呕心沥血】整理全栈自动化测试技术(三):如何编写技术方案

前面两篇笔记我介绍了自动化测试前期调研注意事项和前置准备阶段切入点&#xff0c;有同学在后台提问&#xff1a; “做完前期的调研和准备工作&#xff0c;领导要求写一个落地方案并评审&#xff0c;自动化测试的落地方案该怎么写”&#xff1f; 首先这个要求我觉得挺正常&a…

2023.02.12 学习周报

文章目录摘要文献阅读1.题目2.摘要3.介绍4.本文贡献5.相关工作5.1 Temporal Recommendation5.2 Sequential Recommendation6.方法6.1 Problem Formulation6.2 Input Embedding6.3 Self-Attention Structure6.4 Model Training7.实验7.1 数据集7.2 实验结果7.3 多时间嵌入的效果…

Vulkan教程(12): Graphics pipeline Introduction (图形管线概要)

Vulkan官方英文原文&#xff1a;https://vulkan-tutorial.com/Drawing_a_triangle/Graphics_pipeline_basics/Introduction对应的Vulkan技术规格说明书版本&#xff1a; Vulkan 1.3.2Over the course of the next few chapters, well be setting up a graphics pipeline that i…

你知道MySQL中like 关键字也能用索引嘛?

上篇文章中&#xff0c;我和大家分享了索引的两个使用规则&#xff1a; 索引上不要使用函数运算。使用覆盖索引避免回表。 当然&#xff0c;凡事有个度&#xff0c;用哪一种策略也要结合具体的项目来定&#xff0c;不能为了 SQL 优化而抛弃了业务。 在前文的基础上&#xff0…

Mac安装Homebrew排坑大全..

官网&#xff1a;https://brew.sh/BACKGROUND: 安装Homebrew嘎嘎报错&#xff01;question one网络不通&#xff0c;需要配置一下github.com的hostError&#xff1a;fatal: unable to access https://github.com/Homebrew/homebrew-core/: error:02FFF036:system library:func(…

电子电气架构——基于CANoe实现两路CAN线间网关仿真

基于CANoe实现两路CAN线间网关仿真 魔都的天气也很魔性,白天酷热无比,现在晚上九点又狂风大作,凉爽宜人。 天气的不可预见性与个人生活极其相似,都有随机、不可控的成分,自己能做的就是不懊悔昨天已发生,不寄托于未来未发生,只想过好当下这时光。 老规矩,分享一段喜…

【不知道是啥】浅保存哈

这里写自定义目录标题欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注…

Linux系统安全之iptables防火墙

目录 一.iptables防火墙基本介绍 二.iptables的四表五链 三.iptables的配置 1.iptables的安装 2.iptables防火墙的配置方法 四.添加、查看、删除规则 1.查看(fliter)表中的所有链 iptables -L 2.使用数字形式(fliter)表所有链 查看输出结果 iptables -nL 3.清空表中所…

算法刷题打卡第88天:字母板上的路径

字母板上的路径 难度&#xff1a;中等 我们从一块字母板上的位置 (0, 0) 出发&#xff0c;该坐标对应的字符为 board[0][0]。 在本题里&#xff0c;字母板为board ["abcde", "fghij", "klmno", "pqrst", "uvwxy", "…

【Java|golang】1138. 字母板上的路径

我们从一块字母板上的位置 (0, 0) 出发&#xff0c;该坐标对应的字符为 board[0][0]。 在本题里&#xff0c;字母板为board [“abcde”, “fghij”, “klmno”, “pqrst”, “uvwxy”, “z”]&#xff0c;如下所示。 我们可以按下面的指令规则行动&#xff1a; 如果方格存…

【计组】理解Disruptor--《计算机组成原理》(十五)

Disruptor 的开发语言&#xff0c;并不是很多人心目中最容易做到性能极限的 C/C&#xff0c;而是性能受限于 JVM 的 Java。其实只要通晓硬件层面的原理&#xff0c;即使是像 Java 这样的高级语言&#xff0c;也能够把 CPU 的性能发挥到极限。 一、Padding Cache Line&#xff…

mysql中mvcc实现机制和原理

目录 1.什么是mvcc? 2.mvcc中的快照读和当前读有什么区别和联系&#xff1f; 3.mvcc的作用是什么&#xff1f; 4.mvcc的实现机制和原理是什么&#xff1f; 1.什么是mvcc? mvcc全称是(Multi-Version Concurrency Control) 多版本并发控制,是数据库管理过程中的一种并发控制…

keras+IMDB情感分析

目录简介IDMB数据集数据预处理数据加载数据清洗保存经过清洗后的数据训练测试数据集分割文字编码词嵌入模型构建模型训练训练效果模型评分模型预测及混淆矩阵查看F1 Score、召回率等信息预测新的影评总结本博客参考&#xff1a; 【python自然语言处理 周元哲著】 【keras中文文…

数据库索引篇(二叉树/B-Tree)对比结构讲解

我们可以先看一下 二叉树的一个结构 简单将数据分成左右两侧 左侧小于36 右侧大于36 在下面再以这种方式继续划分 但二叉树的结构就有一个非常大的弊端 如果我们后续插入的数据全部小于 或 大于36 他就会 变成这样 一个链表 查询效率大大降低 因为 比如 你想找什么数据 都会…

岁月闲思——时间给我地思考

岁月闲思——时间给我地思考 2022年6月10日&#xff0c;明天又一个周末&#xff0c;成人地时间总是让人感觉一天很慢&#xff0c;一周以及一年反而很快。 下班到家&#xff0c;吃过长辈做的手工凉皮&#xff0c;得空坐在电脑面前敲击点文字&#xff0c;记录下时间留给自己地印…

Docker--consul

目录 前言 一、Consul 简介 1.1、 consul 概述 1.2 、consul 的两种模式 1.3、consul 提供的一些关键特性 二、Consul 容器服务更新与发现 三、consul 部署 3.2、查看集群信息 四、registrator服务器 consul-template 五、consul 多节点 前言 服务注册与发现是微服…