maxwell解析mysql的binlog数据并保存到kafka使用

news2024/9/21 16:24:33

通过maxwell来实现binlog的实时解析,实现数据的实时同步

1、mysql创建一个maxwell用户

为mysql添加一个普通用户maxwell,因为maxwell这个软件默认用户使用的是maxwell这个用户,

进入mysql客户端,然后执行以下命令,进行授权

mysql -u root  -p  

set global validate_password_policy=LOW;

set global validate_password_length=6;

CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';

GRANT ALL ON maxwell.* TO 'maxwell'@'%';

GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';

flush privileges;

2:开启mysql的binlog机制

sudo vim /etc/my.cnf

添加或修改以下三行配置

log-bin= /var/lib/mysql/mysql-bin

binlog-format=ROW

server_id=1

数据库的服务器执行以下命令重启mysql服务:

sudo service mysqld restart

验证是否开启binlog

mysql -u root -p

 show variables like '%log_bin%'; 

 3、安装maxwell并启动

安装包地址

https://github.com/zendesk/maxwell/releases/download/v1.21.1/maxwell-1.21.1.tar.gz

1、解压到指定文件夹

cd /kkb/soft

tar -zxf maxwell-1.21.1.tar.gz -C /kkb/install/

2、修改配置文件 

producer=kafka
kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092

host=node03
user=maxwell
password=123456

kafka_topic=maxwell_kafka

一定要注意一定要保证我们使用maxwell用户和123456密码能够连接上mysql数据库 

3、启动并创建kafka的topic 

kafka-topics.sh  --create --topic maxwell_kafka --partitions 3 --replication-factor 2 --zookeeper node01:2181

4、启动maxwell 

cd /kkb/install/maxwell-1.21.1

bin/maxwell

5、测试是否成功 

kafka-console-consumer.sh --topic maxwell_kafka --from-beginning --bootstrap-server node01:9092,node02:9092,node03:9092

创建mysql表并插入修改数据

CREATE DATABASE /*!32312 IF NOT EXISTS*/`test` /*!40100 DEFAULT CHARACTER SET utf8 */;
USE `test`;


DROP TABLE IF EXISTS `myuser`;

CREATE TABLE `myuser` (
  `id` int(12) NOT NULL,
  `name` varchar(32) DEFAULT NULL,
  `age` varchar(32) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


insert  into `myuser`(`id`,`name`,`age`) values (1,'zhangsan',NULL),(2,'xxx',NULL),(3,'ggg',NULL),(5,'xxxx',NULL),(8,'skldjlskdf',NULL),(10,'ggggg',NULL),(99,'ttttt',NULL),(114,NULL,NULL),(121,'xxx',NULL);

kafka中消费的json数据如下 

 4、插入到kafka的不同分区

由于maxwell(使用的分区策略是 dbname.hashCode %  numPartitions )采用的是不同库的hashcode的值,对分区数目取模,得到数据存放的分区,这就很容易造成数据倾斜的问题,如一个库中数据很多,只分到同一个分区,所以我这里采用对主键取模的方式来均衡数据。

创建的配置文件如下

cd /kkb/install/maxwell-1.21.1

vim travel.properties

producer_partition_by=primary_key  分区策略,依据数据的主键分割

filter=exclude:*.*, include: 过滤数据,exclude:*.*表示排除所有的库表,include:表示只包含这后面的库表

log_level=INFO

producer=kafka

kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092

host=node03.kaikeba.com

user=maxwell

password=123456

producer_ack_timeout = 600000

port=3306

######### output format stuff ###############

output_binlog_position=ture

output_server_id=true

output_thread_id=ture

output_commit_info=true

output_row_query=true

output_ddl=false

output_nulls=true

output_xoffset=true

output_schema_id=true

######### output format stuff ###############

kafka_topic= veche

kafka_partition_hash=murmur3

kafka_key_format=hash

kafka.compression.type=snappy

kafka.retries=5

kafka.acks=all

producer_partition_by=primary_key

############ kafka stuff #############

############## misc stuff ###########

bootstrapper=async

############## misc stuff ##########

############## filter ###############

filter=exclude:*.*, include: travel.order_info_201904,include: travel.order_info_201905,include: travel.order_info_201906,include: travel.order_info_201907,include: travel.order_info_201908,include: travel.order_info_201906,include: travel.order_info_201910,include: travel.order_info_201911,include: travel.order_info_201912,include: travel.renter_info,include: travel.driver_info ,include: travel.opt_alliance_business

############## filter ###############

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/196646.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

IDEA操作git commit后(push项目失败:Access token is expired),撤销commit,恢复到提交前的状态

1. 在IDEA操作push代码报错 remote: [session-e6423190] Oauth: Access token is expired 原因:这个问题其实就是因为你的本地电脑上安全中心存储Gitee密码过期导致的。 解决此问题可以参考以下链接:本以为修改下IDEA的settings下的Gitee账号密码就可以了…

若依框架文档开发手册----开发中常用功能模块

目录 前端 add.html 时间框 大文本框 Ajax校验 自定义校验 回显选中图片 JS对添加下拉列元素 edit.html 下拉列 回显时间 list.html 搜索栏 时间框 mapper.xml Table表格 格式化时间 前端 表格匹配字典值 表格增加.减少功能项 全局 其他 关闭标签页 输入框…

前端使用vue-pdf、pdf-lib、canvas 给PDF文件添加水印,并预览与下载

前端使用vue-pdf、pdf-lib 给pdf添加水印,并预览与下载效果预览使用第三方插件安装依赖插件import 导入依赖预览添加水印的pdf下载添加水印的pdf预览及下载总结完整代码效果预览 使用第三方插件 安装依赖插件 npm i vue-pdf --save npm i pdf-lib --save npm inst…

java之面向对象基础

1.类和对象1.1什么是对象万物皆对象,只要是客观存在的事物都是对象1.2什么是面向对象1.3什么是类类是对现实生活中一类具有共同属性和行为的事物的抽象类的特点:类是对象的数据类型类是具有相同属性和行为的一组对象的集合1.4什么是对象的属性属性&#…

微信小程序——使用npm包,安装 Vant weapp 组件库安装教程及使用vant组件

一.小程序对 npm 的支持与限制目前,小程序中已经支持使用 npm 安装第三方包,从而来提高小程序的开发效率。但是,在小程序中使用 npm 包有如下3个限制:📜不支持依赖于 Node . js 内置库的包📜不支持依赖于浏…

【软件测试】2023年的软件测试咋样?见鬼,我到底该如何进阶?

目录:导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜)前言 一谈到进阶&#xf…

Sitecore本地安装详细介绍

一、简介 Sitecore 是一种 CMS(内容管理系统,位于 Web 前端和后端办公系统或流程之间的软件系统),本文已当前最新的 10.2.0 版本为例,介绍如何安装部署。 二、环境准备 Sitecore 依赖于 IIS、SQL Server,在后续 Sitecore 安装之前,这两依赖需要提前安装完成 2.1 II…

【CTF】ctf中用到的php伪协议总结及例题(持续更)

目录 前言 关于文件包含漏洞 php伪协议总结 关于php://协议 参考自: 前言 本篇文章使用的靶场是buuctf上的web题目:[BSidesCF 2020]Had a bad day 进行点击选项得到一个这样的url 这里猜测存在sql注入,没测出来。或者可能有php伪协议读…

excel函数应用:如何写出IF函数多级嵌套公式

说到函数就不得不提起函数中最受欢迎的三大家族:求和家族、查找引用家族、逻辑家族!!!没错!今天我们要介绍的就是三大家族之一逻辑函数家族的领头人:IF函数——很多人难以理解IF函数的多级嵌套使用。其实&a…

shell 函数详解

目录 函数 一,什么是函数 二, 函数的返回值 三,函数语法 示例1: 示例2: 四,函数的调用 示例1: 示例2: 五,函数库文件 六, 递归函数 示例1&#xf…

Node.js 全局对象介绍

在学习 Javascript 之初,会接触一个概念:JS 由三部分组成,DOM BOM ECMAScript。其中前两者是宿主环境,也就是浏览器所提供的能力。后者才是 JS 语言本身的标准。 在上篇文章《Node.js入门(1)&#xff1a…

SpringMVC之响应

目录 一:环境准备 二:响应页面[了解] 三:返回文本数据[了解] 四:响应JSON数据 SpringMVC接收到请求和数据后,进行一些了的处理,当然这个处理可以是转发给Service,Service层再调用Dao层完成的…

1月更新!EasyOps® 28+新功能“狂飙”上线~

2023节后,我们就要“搞事情”! 一波新功能已上线,快看是不是你需要的! 持续升级优化全平台产品, 只为成为你数字化变革最值得信赖的合作伙伴! 优维EasyOps全平台28新功能来了! ↓↓↓ 1、H…

Spring的后处理器之BeanFactoryPostProcessor

Spring的后处理器 Spring的后处理器是Spring对外开放的重要扩展点,允许我们介入到Bean的整个实例化流程中来,以达到动态注册BeanDefinition,动态修改BeanDefinition,以及动态修改Bean的作用。Spring主要有两种后处理器&#xff1…

【车载开发系列】StatusOfDTC的解析

【车载开发系列】StatusOfDTC的解析 StatusOfDTC的解析【车载开发系列】StatusOfDTC的解析StatusOfDTC概念StatusOfDTC列表StatusOfDTC状态掩码Bit 0: TestFailedBit 1: testFailedThisOperationCycleBit 2: pendingDTCBit 3: confirmedDTCBit 4: testNotCompletedSinceLastCle…

【操作系统】3、内存管理

文章目录三、内存管理3.1 内存基础3.1.1 内存管理概念3.1.2 程序装入与链接3.1.3 内存保护3.2 内存空间的分配与回收3.2.1 连续分配管理方式3.2.1.1 单一连续分配3.2.1.2 固定分区分配3.2.1.3 动态分区分配3.2.2 动态分区分配算法3.2.2.1 首次适应算法3.2.2.2 最佳适用算法3.2.…

【数据库原理与SQL Server应用】Part05——表和表数据操作

【数据库原理与SQL Server应用】Part05——表和表数据操作一、表概念1.1 表结构1.2 表类型1.3 数据类型二、创建表2.1 管理工具界面方式创建表2.2 命令行方式创建表三、修改表3.1 管理工具界面方式修改表3.2 命令行方式修改表四、删除表五、表数据操作5.1 管理工具界面方式操作…

怎么快速选择出色的香港服务器

相信一些大规模企业或站长都不满足于普通的香港VPS,虽然它也拥有很不错的性能与速度,但远远比不上香港服务器。但是,对于初次使用香港服务器的用户来说,选择起来肯定是要经过一番考虑的,那么,有没有什么简单…

电子词典项目

16. 电子词典项目需求 项目要求: 登录注册功能,不能重复登录,重复注册单词查询功能历史记录功能,存储单词,意思,以及查询时间基于TCP,支持多客户端连接采用数据库保存用户信息与历史记录将dic…

VS2022离线安装教程

官方教程下载和安装步骤 https://docs.microsoft.com/zh-cn/visualstudio/install/create-an-offline-installation-of-visual-studio?viewvs-2022 使用命令行创建本地布局 下载所需的 Visual Studio 版本的引导程序,并将其复制到要用作本地布局源位置的目录中。…