大数据技术之Maxwell入门案例学习

news2024/11/16 7:47:35

大数据技术之Maxwell入门案例学习

文章目录

  • 大数据技术之Maxwell入门案例学习
    • 1、写在前面
    • 2. Maxwell 使用
      • 2.1 Maxwell 安装部署
      • 2.2 Maxwell 入门案例
        • 2.2.1 监控 Mysql 数据并在控制台打印
        • 2.2.2 监控 Mysql 数据输出到 kafka
        • 2.2.3 监控 Mysql 指定表数据输出控制台
        • 2.2.4 监控 Mysql 指定表全量数据输出控制台,数据初始化


1、写在前面

1.3.0版本开始不支持JDK8,本文是尚硅谷的教学文档,并加以个人学习记录

  • Maxwell版本:Maxwell1.2.9
  • Zookeeper版本:Zookeeper3.5.7
  • Kafka版本:Kafka2.4.1
  • MySQL版本:MySQL5.7

2. Maxwell 使用

2.1 Maxwell 安装部署

见文

2.2 Maxwell 入门案例

2.2.1 监控 Mysql 数据并在控制台打印

  • 实现步骤:

(1)运行 maxwell 来监控 mysql 数据更新

[whybigdata@node01	maxwell-1.29.2]$ bin/maxwell --user='maxwell' --password='123456' --host='node01' --producer=stdout

(2)向 mysql 的 test_maxwell 库的 test 表插入一条数据,查看 maxwell 的控制台输出

mysql> insert into test2 values(1,'aaa');

tp

{
"database": "test_maxwell",	--库名
"table": "test",	--表名
"type": "insert",	--数据更新类型
"ts": 1637244821,	--操作时间
"xid": 8714,	    --操作 id
"commit": true, 	--提交成功
"data":  {	 		--数据
"id": 1,
"name": "aaa"
}

(3) 向 mysql 的 test_maxwell 库的 test 表同时插入 3 条数据,控制台发现了 3 条 json日志,说明 maxwell 是以数据行为单位进行日志的采集的。

mysql> INSERT INTO test2 VALUES(2,'bbb'),(3,'ccc'),(4,'ddd');

{"database":"test_maxwell","table":"test","type":"insert","ts"
:1637245127,"xid":9129,"xoffset":0,"data":{"id":2,"name":"bbb"
}}
{"database":"test_maxwell","table":"test","type":"insert","ts"
:1637245127,"xid":9129,"xoffset":1,"data":{"id":3,"name":"ccc"
}}
{"database":"test_maxwell","table":"test","type":"insert","ts"
:1637245127,"xid":9129,"commit":true,"data":{"id":4,"name":"dd d"}}

mysql> update test2 set name='zaijian' where id =1;

{"database":"test_maxwell","table":"test","type":"update","ts"
:1631618614,"xid":535,"commit":true,"data":{"id":1,"name":"zai jian"},"old":{"name":"nihao"}}

插入多条数据,只有最后一条插入的数据commit状态为true,其他的数据从往后按顺序排列,xoffset作为标识

在这里插入图片描述

JSON数据

tp

(4)修改test_maxwell 库的 test 表的一条数据,查看 maxwell 的控制台输出

mysql> update test2 set name='abc' where id =1;

tp

在这里插入图片描述

(5)删除test_maxwell 库的 test 表的一条数据,查看 maxwell 的控制台输出

mysql> DELETE FROM test2 WHERE id =1;

在这里插入图片描述

表的数据:

tp

JSON数据

在这里插入图片描述

2.2.2 监控 Mysql 数据输出到 kafka

1)实现步骤:

(1)启动 zookeeper 和kafka

[atguigu@hadoop102 bin]$ jpsall
=============== hadoop102 ===============
3511 QuorumPeerMain
4127 Kafka
=============== node02 ===============
1885 Kafka
1342 QuorumPeerMain
=============== node03 ===============
1345 QuorumPeerMain
1886 Kafka

(2) 启动Maxwell 监控binlog

whybigdata@node01 maxwell-1.29.2]$ bin/maxwell --user='maxwell' --password='123456'	--host='node01' --producer=kafka -- kafka.bootstrap.servers=node01:9092 --kafka_topic=maxwell

启动结果图

tp

(3)打开 kafka 的控制台的消费者消费 maxwell 主题

[whybigdata@node01 ~]$ kafka-console-consumer.sh --bootstrap-server node01:9092 --topic maxwell

此处直接利用OffsetExplorer工具查看Maxwell监控的结果

未执行上述命令前,OffsetExplorer观察到并没有maxwell主题

在这里插入图片描述

如下图所示,可以看到maxwell主题已经新增

tp

  • 插入数据
mysql> insert into test2 values (5,'eee');

在控制台可以发现有一个error,但是并不影响实验,具体什么原因,还不清楚,如下图所示:

tp

查看maxwell主题Data一栏,出现乱码

tp

为防止key和value值乱码,提前在properties栏设置content types为String,默认是Byte Array

tp

查看结果

tp

查看value值:JSON格式

tp

  • 修改id=5的数据为eef

在这里插入图片描述

JSON数据:

tp

  • 删除id=5的数据
    在这里插入图片描述

在这里插入图片描述

前面都是操作test2表,此次在test表插入一条(id=3,name=dd)的数据:

tp

JSON数据

tp

因为开启maxwell指定的producer为kafkaa且指定的–kafka_topic=maxwell所以,maxwell库中所有表的变化都会出现在maxwell主题中(是在0号分区)

  • 新建库test_maxwell2以及表aaa(id,name),插入新数据(id=1,name=qqq)

tp

Maxwell主题也得到更新,依旧是在0号分区

tp

(4)向 test_maxwell 库的test 表再次插入一条数据

Note:关闭上次启动的Maxwell进程,再次启动Maxwell,然后执行上述SQL插入命令

mysql> insert into test values (5,'eee');

(5)通过 kafka 消费者来查看到了数据,说明数据成功传入 kafka

{"database":"test_maxwell","table":"test","type":"insert","ts"
:1637245889,"xid":10155,"commit":true,"data":{"id":5,"name":"e ee"}}

2)kafka 主题数据的分区控制

在公司生产环境中,我们一般都会用 maxwell 监控多个 mysql 库的数据,然后将这些数据发往 kafka 的一个主题 Topic,并且这个主题也肯定是多分区的,为了提高并发度。那么如何控制这些数据的分区问题,就变得至关重要,实现步骤如下:

(1) 修改 maxwell 的配置文件,定制化启动maxwell 进程

[whybigdata@node01 maxwell-1.29.2]$ vim config.properties

# tl;dr config log_level=info
producer=kafka kafka.bootstrap.servers=node01:9092
# mysql login info 
host=node01 
user=maxwell 
password=123456


#	*** kafka ***
# list of kafka brokers #kafka.bootstrap.servers=hosta:9092,hostb:9092
# kafka topic to write to
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
# in the latter case 'database' and 'table' will be replaced with the values for the row being processed kafka_topic=maxwell3


#	*** partitioning ***
# What part of the data do we partition by? #producer_partition_by=database # [database, table, primary_key, transaction_id, column] producer_partition_by=database # 控制数据分区模式,可选模式有 库名,表名,主键,列名

# specify what fields to partition by when using producer_partition_by=column
# column separated list. #producer_partition_columns=name
# when using producer_partition_by=column, partition by this when
# the specified column(s) don't exist. #producer_partition_by_fallback=database

(2)手动创建一个 3 个分区的 topic,名字就叫做 maxwell3

[whybigdata@node01 maxwell-1.29.2]$ kafka-topics.sh --zookeeper node01:2181,hadoop103:2181,hadoop104:2181/kafka --create -- replication-factor 2 --partitions 3 --topic maxwell3

Note:node01:2181,node02:2181,node03:2181/kafka必须加上kafka在zookeeper上的路径/kafka,同时逗号后面不要多留一个空格,不然会出现以下错误

在这里插入图片描述

(3)利用配置文件启动Maxwell 进程

[whybigdata@node01 maxwell-1.29.2]$ bin/maxwell --config ./config.properties

(4)向 test_maxwell 库的test 表再次插入一条数据

(5) 通过 kafka tool 工具查看,此条数据进入了 maxwell3 主题的 1 号分区

结果图

tp

(6)向 test 库的 aaa 表插入一条数据

(7)通过 kafka tool 工具查看,此条数据进入了 maxwell3 主题的 0 号分区,说明库名会对数据进入的分区造成影响。

在这里插入图片描述

(8)再次往test_maxwell库的test2表插入数据,结果是:数据插入到maxwell3主题的1号分区

tp

2.2.3 监控 Mysql 指定表数据输出控制台

(1) 运行 maxwell 来监控 mysql 指定表数据更新

限制可监控的表:exclude排除所有库下的所有表,include只包含(监控)test_maxwell库下的test表

[whybigdata@node01 maxwell-1.29.2]$ bin/maxwell --user='maxwell' --password='123456' --host='node01' --filter 'exclude: *.*, include:test_maxwell.test' --producer=stdout

(2) 向 test_maxwell.test 表插入一条数据,查看 maxwell 的监控

mysql> insert into test_maxwell.test values(7,'ggg');

{
    "database":"test_maxwell",
    "table":"test",
    "type":"insert","ts"
    :1637247760,
    "xid":11818,
    "commit":true,
    "data":{
        "id":7,
        "name":"g gg"
    }
}

(3)向 test_maxwell.test2 表插入一条数据,查看 maxwell 的监控

mysql> insert into test1 values(1,'nihao');

本次没有收到任何信息,说明 include 参数生效,只能监控指定的 mysql 表的信息

Note:还可以设置 include:test_maxwell.*,通过此种方式来监控 mysql 某个库的所有表,也就是说过滤整个库。读者可以自行测试。

2.2.4 监控 Mysql 指定表全量数据输出控制台,数据初始化

初始化(Bootstraping)官网地址:https://maxwells-daemon.io/bootstrapping/

Maxwell 进程默认只能监控 mysql 的 binlog 日志的新增及变化的数据,但是Maxwell 是支持数据初始化的,可以通过修改 Maxwell 的元数据,来对 MySQL 的某张表进行数据初始化,也就是我们常说的全量同步。具体操作步骤如下:

tp

需求:将 test_maxwell 库下的 test2 表的四条数据,全量导入到 maxwell 控制台进行打印。

(1)修改 Maxwell 的元数据,触发数据初始化机制,在 mysql 的 maxwell 库中bootstrap

  • 表中插入一条数据,写明需要全量数据的库名和表名
mysql> insert into maxwell.bootstrap(database_name,table_name) values('test_maxwell','test2');

执行上述语句前:

在这里插入图片描述

执行后boostrap表:

tp

(2)启动 maxwell 进程,此时初始化程序会直接打印 test2 表的所有数据

[whybigdata@node01 maxwell-1.29.2]$ bin/maxwell --user='maxwell' --password='123456' --host='node01' producer=stdout
Using kafka version: 1.0.0
23:15:38,841 WARN MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
23:15:39,110 INFO Maxwell - Maxwell v1.22.0 is booting (StdoutProducer), starting at Position[BinlogPosition[mysql- bin.000004:611096], lastHeartbeat=1637248429242] 23:15:39,194 INFO MysqlSavedSchema - Restoring schema id 6 (last modified at Position[BinlogPosition[mysql- bin.000004:517625], lastHeartbeat=1637246435111])
23:15:39,299 INFO MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[mysql- bin.000004:158612], lastHeartbeat=0])
23:15:39,342 INFO MysqlSavedSchema - beginning to play deltas...
23:15:39,343 INFO MysqlSavedSchema - played 5 deltas in 1ms
{"database":"test_maxwell","table":"test2","type":"bootstrap- start","ts":1637248539,"data":{}}
23:15:39,367 INFO SynchronousBootstrapper - bootstrapping started for test_maxwell.test2
23:15:39,369 INFO BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.000004:611096
{"database":"test_maxwell","table":"test2","type":"bootstrap- insert","ts":1637248539,"data":{"id":1,"name":"aa"}}
{"database":"test_maxwell","table":"test2","type":"bootstrap- insert","ts":1637248539,"data":{"id":2,"name":"bb"}}
{"database":"test_maxwell","table":"test2","type":"bootstrap- insert","ts":1637248539,"data":{"id":3,"name":"cc"}}
{"database":"test_maxwell","table":"test2","type":"bootstrap- insert","ts":1637248539,"data":{"id":4,"name":"dd"}}
{"database":"test_maxwell","table":"test2","type":"bootstrap- complete","ts":1637248539,"data":{}}
23:15:39,387 INFO SynchronousBootstrapper - bootstrapping ended for #8 test_maxwell.test2
23:15:39,465 INFO BinaryLogClient - Connected to node01:3306 at mysql-bin.000004/611096 (sid:6379, cid:108) 23:15:39,465 INFO	BinlogConnectorLifecycleListener - Binlog connected.

本人执行结果:

在这里插入图片描述

(3)当数据全部初始化完成以后,Maxwell 的元数据会变化

  • is_complete 字段从 0 变为 1

  • start_at 字段从 null 变为具体时间(数据同步开始时间)

  • complete_at 字段从 null 变为具体时间(数据同步结束时间)

tp

本人执行结果

在这里插入图片描述

关闭maxwell,再次启动就不会再次初始化了(boostraping),如果需要再次初始化,需要再次执行sql:

insert into maxwell.bootstrap(database_name,table_name) values(‘test_maxwell’,‘test’);

tp

boostrap表:

tp

结束!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/382189.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

分布式监控系统——Zabbix(4)可视化、模板和用户参数

文章目录分布式监控系统——Zabbix(4)可视化、模板和用户参数一、zabbix可视化1.简介2.自定义图形(Graphs)3.聚合图形(Screens)4.幻灯片演示(Slide shows)5.定义拓扑图(M…

企业电销如何获得高质量精准流量,大数据获客直击精准客流

随时社会的发展,时代的进步,各行各业都在崛起,竞争力愈演愈烈,那么为了使自己的品牌得到更多人的关注,自己的产品可以更好的销售,无数的老板在想各种缤纷的办法让自己赚钱,百度竞价,…

图像处理│一张自拍即可实现变老变年轻,带你感受时光流逝之美

✨ 目录🎈 项目效果🎈 环境搭建🎈 下载PaddleGAN🎈 安装飞浆🎈 生成图片潜码🎈 老龄化处理🎈 年轻化处理🎈 项目效果 飞浆是一个由百度推出的深度学习开发平台,为开发者…

Ep_MySQL基础-存储引擎

show engines; //查看mysql所支持的存储引擎,以及从中得到mysql默认的存储引擎 MyIsam存储引擎:(非聚集索引)->数据索引不在一起 Innodb存储引擎:(聚集索引)->数据索引在一起 MyISAM 每个MyISAM数据表,皆由存储在硬盘上的3个文件所组成, .f…

光学设计软件Ansys的Lumerical 2023版本下载与安装使用

文章目录前言一、许可管理工具安装二、许可管理器配置三、Lumerical安装四、工具使用配置总结前言 Lumerical是一款功能强大的软件,用于设计和分析从组件到系统阶段的光子学和电磁学。这个版本的Lumerical改进了电子和光子学设计工具,用于复杂光子学&am…

【JAVA程序设计】【C00108】基于Springboot+Vue前后端分离的民宿短租预约管理系统

基于SpringbootVue前后端分离的民宿短租预约管理系统项目简介项目获取开发环境项目技术运行截图项目简介 基于Springbootvue开发的民宿短租预订管理系统,共分为三种权限:系统管理员、商家、用户 管理员角色包含以下功能: 个人中心、修改密码…

C++——优先级队列(priority_queue)的使用及实现

目录 一.priority_queue的使用 1.1、基本介绍 1.2、优先级队列的定义 1.3、基本操作(常见接口的使用) 1.4、重写仿函数支持自定义数据类型 二.priority_queue的模拟实现 2.1、构造&&重要的调整算法 2.2、常见接口的实现 push() pop() top() empt…

2023爬虫学习笔记 -- 多线程操作

一、定义一个程序开始时间程序开始时间time.time()二、创建几个网址,模拟目标网站网址列表[http://www.baidu.com,http://www.sogou.com,http://www.163.com]三、创建一个函数访问网站,模拟爬取数据操作(耗时操作)头{ "User-…

【C++】讲的最通透最易懂的关于结构体内存对齐的问题

目录1. 内存对齐规则2. 简单易懂的内存对齐示例2.1 简单结构体2.2 含位域的结构体2.3 空类的大小2.4 嵌套结构体3. 为什么需要内存对齐?4. 类型在不同系统下所占字节数1. 内存对齐规则 第一个成员在与结构体变量偏移量为0的位置处。其他成员变量要对齐到某个数字&a…

学了很久python却什么都做不了?这个方法一定要试试

很多人学了两三个月的python却什么都做不了,但有的人只学了不到一个月的时间,就可以开始自己做项目或者接私活,这是为什么? 作为20年码龄的老程序员,龙叔我觉得除了内在原因外,学习资源占据着大头。拥有好的…

NOC2021年测试卷2

一、单选题(共25题,每题2分,共50分) 1. 执行下面程序,屏幕上最多会看到多少个苹果?( ) A、10个B、11个C、1个D、无法确定2. 关于下面程序,说法正确的是 ?( ) 3. “角色1”要在“角色2”说完话后才能开始动作,如果“角色2”思考2秒钟,然后说3秒钟,那么“角…

【干货】又是一年跳槽季!Nginx 10道核心面试题及解析

Nginx是一款轻量级的高性能Web服务器和反向代理服务器,由俄罗斯的Igor Sysoev开发。它具有占用资源少、高并发、稳定性高等优点,被广泛应用于互联网领域。在Nginx的面试过程中,面试官通常会提出一些核心问题,本文将介绍一些常见的…

【手把手教会数据类型的存储】

数据类型介绍整型在内存中的存储:原码、反码、补码大小端字节序介绍及判断浮点型在内存中的存储解析1.数据类型介绍我们已经学习过了基本的数据类型:整型:shor —— 短整型int —— 整型long —— 长整型long long —— 更长的整型浮点型&…

机器学习 | 线性回归(单变量)

前文回顾:机器学习概述📚线性回归概念我们要使用一个数据集,数据集包含俄勒冈州波特兰市的住房价格。在这里,我要根据不同房屋尺寸所售出的价格,画出我的数据集。比方说,如果你朋友的房子是 1250 平方尺大小…

数据仓库之建模理论以及仓库设计思想

1、数据仓库 1.1、数据仓库概述 数据仓库是一个为数据分析而设计的企业级数据管理系统。数据仓库可集中、整合多个信息源的大量数据,借助数据仓库的分析能力,企业可从数据中获得宝贵的信息进而改进决策。同时,随着时间的推移,数…

【计算机组成原理】1、浮点数的二进制表示、科学计数法、IEEE754标准

文章目录什么是浮点数浮点数表示数字浮点数的二进制表示浮点数的「IEEE754标准」二进制表示背景特殊约定示例浮点数为什么有精度损失浮点数的范围和精度有多大参考资料用定点数表示数字时,会约定小数点的位置固定不变,整数部分和小数部分分别转换为二进制…

中原银行使用 XSKY星辰天合对象一体机解决核心系统备份管理问题

中原银行使用星辰天合 X3000 对象存储一体机在生产中心和灾备中心分别搭建分布式存储集群,通过 S3 协议与 NBU 备份平台对接,提供海量存储服务,实现备份平台架构转型。 近年来,金融机构随着业务规模及数据量的持续增长&#xff0c…

JavaSE19-常见类

文章目录一、Object1.概述2.常用方法二、String1.概述2.对象创建2.1 直接使用字面值2.2 使用构造方法3.常用方法三、包装类1.概述2.创建对象2.1 直接使用字面值2.2 使用构造方法2.3 使用静态方法valueOf3.常用方法4.自动装箱与自动拆箱4.1 自动装箱4.2 自动拆箱4.3 原理四、Str…

各类特殊开关电源问题解决方案

一、提高DCDC芯片电流 使用大功率三极管代替芯片内部开关管提高过流能力 二、BUCK电路实现负电压 将buck的地作为-Vout输出,原输出接地。 注:不要用LM2596跟LM2576 三、FLY-BUCK电路 ![在这里插入图片描述](https://img-blog.csdnimg.cn/20808c03b126…

成功解决xshell7会话窗口只能显示一个的问题

文章目录前言一. 问题复现二. 问题解决方法一方法二三. 拓展3.1 自定义快捷键3.2 将当前shell中的代码内容复制到记事本中3.3 xshell配置密钥登录3.3.1 生成密钥3.3.2 将密钥上传到服务器并设置3.3.3 用xshell密钥登录服务器总结前言 重点强调: 本文是解决xshell的…