关于maxwell

news2025/1/20 3:45:05

这里写目录标题

  • 什么是Maxwell
    • 如何使用Maxwell

Maxwell是一个mysql二进制binlog日志分析工具,Java语言编写,功能十分强大,可以将日志转换成json并发送到kafka,redis,rabbitmq等中间组件,因为最近在理解怎样在项目中实际应用到Elasticsearch,这个可能理解起来很简单,引入一个依赖,直接就可以,但是怎样做到与数据库数据同步更新呢?这里就需要用到了Maxwell以及RabbitMq,接下来主要以这三个组件作为讲解。

什么是Maxwell

前面提到MaxWell是一个mysql二进制binlog日志分析工具,既然它可以分析mysql日志,那么他的应用场景是不是就很广泛,除了上面提到的实时更新Elasticsearch,我们还可以维护缓存、收集表级别的dml指标、数据分区迁移、切库binlog回滚方案等。
那么除了Maxwell我们还有其他选择吗?答案是肯定的。除了Maxwell外,目前常用的MySQL Binlog解析工具主要有阿里的canal、mysql_streamer,三个工具对比如下:

在这里插入图片描述
Canal 由Java开发,分为服务端和客户端,拥有众多的衍生应用,性能稳定,功能强大;canal 需要自己编写客户端来消费canal解析到的数据。

maxwell相对于canal的优势是使用简单,它直接将数据变更输出为json字符串,不需要再编写客户端。

如何使用Maxwell

  1. 使用Maxwell之前要检查自己的mysql是否开启了binlog日志
vim my.cnf
//mysql设置
#[mysqd]server_id=1log-bin=masterbinlog_format=row
  1. 之后我们要在mysql中创建一个用户来供Maxwell使用
Create user 'maxwell'@'%'Identified by '123456';
Grant ALL ON maxwell.* to 'maxwell' @ '%';
Grant select,replication client, replication slave on *.* to 'maxwell' @ '%';
  1. 然后我们可以写改下配置文件
vim config/server.properties

//因为之后我们要使用Rabbitmq,所以下面的为Rabbitmq的配置
# tl;dr config
log_level=info

producer=rabbitmq
#kafka.bootstrap.servers=localhost:9092

# mysql login info
host=localhost
user=maxwell1
password=Zhaorongqing.1688


#     *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
#producer=kafka

# set the log level.  note that you can configure things further in log4j2.xml
#log_level=DEBUG # [DEBUG, INFO, WARN, ERROR]

# if set, maxwell will look up the scoped environment variables, strip off the prefix and inject the configs
#env_config_prefix=MAXWELL_

#     *** mysql ***

# mysql host to connect to
#host=hostname

# mysql port to connect to
#port=3306

# mysql user to connect as.  This user must have REPLICATION SLAVE permissions,
# as well as full access to the `maxwell` (or schema_database) database
#user=maxwell

# mysql password
#password=maxwell

# options to pass into the jdbc connection, given as opt=val&opt2=val2
#jdbc_options=opt1=100&opt2=hello

# name of the mysql database where maxwell keeps its own state
#schema_database=maxwell

# whether to use GTID or not for positioning
#gtid_mode=true

# maxwell will capture an initial "base" schema containing all table and column information,
# and then keep delta-updates on top of that schema.  If you have an inordinate amount of DDL changes,
# the table containing delta changes will grow unbounded (and possibly too large) over time.  If you
# enable this option Maxwell will periodically compact its tables.
#max_schemas=10000

# SSL/TLS options
# To use VERIFY_CA or VERIFY_IDENTITY, you must set the trust store with Java opts:
#   -Djavax.net.ssl.trustStore=<truststore> -Djavax.net.ssl.trustStorePassword=<password>
# or import the MySQL cert into the global Java cacerts.
# MODE must be one of DISABLED, PREFERRED, REQUIRED, VERIFY_CA, or VERIFY_IDENTITY
#
# turns on ssl for the maxwell-store connection, other connections inherit this setting unless specified
#ssl=DISABLED
# for binlog-connector
#replication_ssl=DISABLED
# for the schema-capture connection, if used
#schema_ssl=DISABLED

# maxwell can optionally replicate from a different server than where it stores
# schema and binlog position info.  Specify that different server here:

#replication_host=other
#replication_user=username
#replication_password=password
#replication_port=3306

# This may be useful when using MaxScale's binlog mirroring host.
# Specifies that Maxwell should capture schema from a different server than
# it replicates from:

#schema_host=other
#schema_user=username
#schema_password=password
#schema_port=3306


#       *** output format ***

# records include binlog position (default false)
#output_binlog_position=true

# records include a gtid string (default false)
#output_gtid_position=true

# records include fields with null values (default true).  If this is false,
# fields where the value is null will be omitted entirely from output.
#output_nulls=true

# records include server_id (default false)
#output_server_id=true

# records include thread_id (default false)
#output_thread_id=true

# records include schema_id (default false)
#output_schema_id=true

# records include row query, binlog option "binlog_rows_query_log_events" must be enabled" (default false)
#output_row_query=true

# DML records include list of values that make up a row's primary key (default false)
#output_primary_keys=true

# DML records include list of columns that make up a row's primary key (default false)
#output_primary_key_columns=true

# records include commit and xid (default true)
#output_commit_info=true

# This controls whether maxwell will output JSON information containing
# DDL (ALTER/CREATE TABLE/ETC) infromation. (default: false)
# See also: ddl_kafka_topic
#output_ddl=true

# turns underscore naming style of fields to camel case style in JSON output
# default is none, which means the field name in JSON is the exact name in MySQL table
#output_naming_strategy=underscore_to_camelcase

#       *** kafka ***

# list of kafka brokers
#kafka.bootstrap.servers=hosta:9092,hostb:9092

# kafka topic to write to
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
# in the latter case 'database' and 'table' will be replaced with the values for the row being processed
#kafka_topic=maxwell

# alternative kafka topic to write DDL (alter/create/drop) to.  Defaults to kafka_topic
#ddl_kafka_topic=maxwell_ddl

# hash function to use.  "default" is just the JVM's 'hashCode' function.
#kafka_partition_hash=default # [default, murmur3]

# how maxwell writes its kafka key.
#
# 'hash' looks like:
# {"database":"test","table":"tickets","pk.id":10001}
#
# 'array' looks like:
# ["test","tickets",[{"id":10001}]]
#
# default: "hash"
#kafka_key_format=hash # [hash, array]

# extra kafka options.  Anything prefixed "kafka." will get
# passed directly into the kafka-producer's config.

# a few defaults.
# These are 0.11-specific. They may or may not work with other versions.
kafka.compression.type=snappy
kafka.retries=0
kafka.acks=1
#kafka.batch.size=16384


# kafka+SSL example
# kafka.security.protocol=SSL
# kafka.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
# kafka.ssl.truststore.password=test1234
# kafka.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
# kafka.ssl.keystore.password=test1234
# kafka.ssl.key.password=test1234#

# controls a heuristic check that maxwell may use to detect messages that
# we never heard back from.  The heuristic check looks for "stuck" messages, and
# will timeout maxwell after this many milliseconds.
#
# See https://github.com/zendesk/maxwell/blob/master/src/main/java/com/zendesk/maxwell/producer/InflightMessageList.java
# if you really want to get into it.
#producer_ack_timeout=120000 # default 0


#           *** partitioning ***

# What part of the data do we partition by?
#producer_partition_by=database # [database, table, primary_key, transaction_id, thread_id, column]

# specify what fields to partition by when using producer_partition_by=column
# column separated list.
#producer_partition_columns=id,foo,bar

# when using producer_partition_by=column, partition by this when
# the specified column(s) don't exist.
#producer_partition_by_fallback=database

#            *** kinesis ***

#kinesis_stream=maxwell

# AWS places a 256 unicode character limit on the max key length of a record
# http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
#
# Setting this option to true enables hashing the key with the md5 algorithm
# before we send it to kinesis so all the keys work within the key size limit.
# Values: true, false
# Default: false
#kinesis_md5_keys=true

#            *** sqs ***

#sqs_queue_uri=aws_sqs_queue_uri

# The sqs producer will need aws credentials configured in the default
# root folder and file format. Please check below link on how to do it.
# http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/setup-credentials.html

#            *** pub/sub ***

#pubsub_project_id=maxwell
#pubsub_topic=maxwell
#ddl_pubsub_topic=maxwell_ddl

#            *** rabbit-mq ***

rabbitmq_host=localhost
rabbitmq_port=5672
rabbitmq_user=admin
rabbitmq_pass=admin
rabbitmq_virtual_host=/
rabbitmq_exchange=maxwell_exchange
rabbitmq_exchange_type=fanout
rabbitmq_exchange_durable=true 
rabbitmq_exchange_autodelete=false
rabbitmq_routing_key_template=%db%.%table%
rabbitmq_message_persistent=false
rabbitmq_declare_exchange=true

#           *** redis ***

#redis_host=redis_host
#redis_port=6379
#redis_auth=redis_auth
#redis_database=0

# name of pubsub/list/whatever key to publish to
#redis_key=maxwell

# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
#redis_pub_channel=maxwell
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
#redis_list_key=maxwell
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
# Valid values for redis_type = pubsub|lpush. Defaults to pubsub

#redis_type=pubsub

#           *** custom producer ***

# the fully qualified class name for custom ProducerFactory
# see the following link for more details.
# http://maxwells-daemon.io/producers/#custom-producer
#custom_producer.factory=

# custom producer properties can be configured using the custom_producer.* property namespace
#custom_producer.custom_prop=foo

#          *** filtering ***

# filter rows out of Maxwell's output.  Command separated list of filter-rules, evaluated in sequence.
# A filter rule is:
#  <type> ":" <db> "." <tbl> [ "." <col> "=" <col_val> ]
#  type    ::= [ "include" | "exclude" | "blacklist" ]
#  db      ::= [ "/regexp/" | "string" | "`string`" | "*" ]
#  tbl     ::= [ "/regexp/" | "string" | "`string`" | "*" ]
#  col_val ::= "column_name"
#  tbl     ::= [ "/regexp/" | "string" | "`string`" | "*" ]
#
# See http://maxwells-daemon.io/filtering for more details
#
filter= exclude: *.*, include: test.*

# javascript filter
# maxwell can run a bit of javascript for each row if you need very custom filtering/data munging.
# See http://maxwells-daemon.io/filtering/#javascript_filters for more details
#
#javascript=/path/to/javascript_filter_file

#       *** encryption ***

# Encryption mode. Possible values are none, data, and all. (default none)
#encrypt=none

# Specify the secret key to be used
#secret_key=RandomInitVector

#       *** monitoring ***

# Maxwell collects metrics via dropwizard. These can be exposed through the
# base logging mechanism (slf4j), JMX, HTTP or pushed to Datadog.
# Options: [jmx, slf4j, http, datadog]
# Supplying multiple is allowed.
#metrics_type=jmx,slf4j

# The prefix maxwell will apply to all metrics
#metrics_prefix=MaxwellMetrics # default MaxwellMetrics

# Enable (dropwizard) JVM metrics, default false
#metrics_jvm=true

# When metrics_type includes slf4j this is the frequency metrics are emitted to the log, in seconds
#metrics_slf4j_interval=60

# When metrics_type includes http or diagnostic is enabled, this is the port the server will bind to.
#http_port=8080

# When metrics_type includes http or diagnostic is enabled, this is the http path prefix, default /.
#http_path_prefix=/some/path/

# ** The following are Datadog specific. **
# When metrics_type includes datadog this is the way metrics will be reported.
# Options: [udp, http]
# Supplying multiple is not allowed.
#metrics_datadog_type=udp

# datadog tags that should be supplied
#metrics_datadog_tags=tag1:value1,tag2:value2

# The frequency metrics are pushed to datadog, in seconds
#metrics_datadog_interval=60

# required if metrics_datadog_type = http
#metrics_datadog_apikey=API_KEY

# required if metrics_datadog_type = udp
#metrics_datadog_host=localhost # default localhost
#metrics_datadog_port=8125 # default 8125

# Maxwell exposes http diagnostic endpoint to check below in parallel:
# 1. binlog replication lag
# 2. producer (currently kafka) lag

# To enable Maxwell diagnostic
#http_diagnostic=true # default false

# Diagnostic check timeout in milliseconds, required if diagnostic = true
#http_diagnostic_timeout=10000 # default 10000

#    *** misc ***

# maxwell's bootstrapping functionality has a couple of modes.
#
# In "async" mode, maxwell will output the replication stream while it
# simultaneously outputs the database to the topic.  Note that it won't
# output replication data for any tables it is currently bootstrapping -- this
# data will be buffered and output after the bootstrap is complete.
#
# In "sync" mode, maxwell stops the replication stream while it
# outputs bootstrap data.
#
# async mode keeps ops live while bootstrapping, but carries the possibility of
# data loss (due to buffering transactions).  sync mode is safer but you
# have to stop replication.
#bootstrapper=async [sync, async, none]

# output filename when using the "file" producer
#output_file=/path/to/file

之后我们再来配置下Rabbitmq就可以开始使用了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/414641.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

QtSqlite加密--QtCipherSqlitePlugin的使用

文章目录QtSqlite加密第一步&#xff1a;环境准备第二步&#xff1a;连接数据库第三步&#xff1a;数据库操作第四步&#xff1a;使用新的可视化工具查看数据库数据QtSqlite加密 上次说了QxOrm的数据库连接、映射和基础的增删改查&#xff0c;但是我们在使用数据库的时候并不希…

期刊论文图片代码复现【由图片还原代码】(OriginMatlab)

&#x1f468;‍&#x1f393;个人主页&#xff1a;研学社的博客&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5;&#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密…

【数据结构】图解八大排序(上)

文章目录一、排序简介二、直接插入排序三、希尔排序四、直接选择排序五、堆排序六、冒泡排序七、冒泡排序与直接插入排序效率对比一、排序简介 生活中&#xff0c;我们经常能看到排序的应用。例如&#xff0c;我们在网购商品的时候&#xff0c;经常按销量从高到低排序。 那么这…

Linux服务器怎么分区

Linux服务器怎么分区 我是艾西&#xff0c;linux系统除了从业某个行业经常要用到的程序员比较熟悉&#xff0c;对于小白或只会用Windows系统的小伙伴还是会比较难上手的。今天艾西简单的跟大家聊聊linux系统怎么分区&#xff0c;让身为小白的你也能一眼看懂直接上手操作感受程序…

【数据结构】用Java实现七大排序算法

目录 &#x1f337;1. 排序的概念及引用 1.1 排序的概念 1.2 衡量指标 1.2 十个排序算法 1.3 十个排序性能对比 &#x1f337;2. 冒泡排序 2.1 算法描述 2.2 动图 ⭐️代码优化 &#x1f337;3. 选择排序 3.1 算法描述 3.2 动图 3.3 代码 &#x1f337;4. 插入排序 4.1 算法描述…

(大数据开发随笔9)Hadoop 3.3.x分布式环境部署——全分布式模式

索引完全分布式模式守护进程布局集群搭建准备总纲配置文件格式化集群启动集群集群控制命令集群启停进程查看启动日志查看集群常见问题案例演示&#xff1a;WordCount完全分布式模式 分布式文件系统中&#xff0c;HDFS相关的守护进程也分布在不同的机器上&#xff0c;如&#x…

cgroups是linux内核中限制、记录、隔离进程组(process groups)所使用的物理资源的机制

容器虚拟化 可以实现应用程序的隔离 直接使用物理机的操作系统可以快速响应用户请求 不占用部署时间 占用少量磁盘空间 缺点∶学习成本增加、操作控制麻烦、网络控制与主机虚拟化有所区别、服务治理难。 微服务架构师需要会多门编程语言&#xff0c;才能治理各种服务 三种…

web路径专题+会话技术

目录自定义快捷键1. 工程路径问题及解决方案1.1 相对路径1.2 相对路径缺点1.3 base标签1.4 作业11.5 作业21.6注意细节1.7 重定向作业1.8 web工程路径优化2. Cookie技术2.1 Cookie简单示意图2.2 Cookie常用方法2.2 Cookie创建2.3 Cookie读取2.3.1 JSESSIONID2.3.2 读取指定Cook…

Linux文件目录操作命令

目录 Linux常用的基础命令 使用技巧 1. ls命令&#xff1a;查看当前目录所有内容 ls 命令的多种使用方法&#xff1a; 注&#xff1a;假如执行乱码&#xff0c;则执行以下两步的代码&#xff1a; 2. cd命令&#xff1a;切换当前工作目录&#xff0c;即进入指定目录 3. …

网络-IP地址(嵌入式学习)

IP地址基本概念IPv4 五类&#xff1a;A B C D E特殊地址子网掩码子网号概念IPv6优势举个栗子基本概念 IP地址是Internet中主机的标识 IP地址&#xff08;Internet Protocol Address 互联网国际地址&#xff09;是一种在Internet上的给主机编址的方式&#xff0c;它主要是为互…

Java Web 开发技术的演进:从 Servlet、Spring MVC 到 WebFlux 及其竞品分析

前言 随着互联网技术的快速发展&#xff0c;Web 应用程序在处理海量用户访问和大数据时面临着巨大的挑战。在这个过程中&#xff0c;Java Web 开发技术经历了从 Servlet 到 Spring MVC 再到 WebFlux 的演变。在这篇文章中&#xff0c;我们将探讨这三个技术的发展历程、痛点及解…

Go的IO -- Go语言设计与实现

Go合IO的不解之缘 协程是Go的很大的一个优势。Go天然支持高并发&#xff0c;那么我们来研究一下这个高并发的秘诀在哪里&#xff1f; 执行体调度得当。CPU 不停的在不同的执行体&#xff08; Goroutine &#xff09;之间反复横跳&#xff01;CPU 一直在装填和运行不同执行体的…

数字化坚鹏:金融数据治理、数据安全政策解读及银行数字化转型

金融数据治理、数据安全政策解读及银行数字化转型课程背景&#xff1a; 很多银行存在以下问题&#xff1a; 不知道如何准确理解金融数据治理及数据安全相关政策 不清楚金融数据治理及数据安全相关政策对银行有什么影响&#xff1f; 不清楚如何进行银行数字化转型&#xff1f…

Azure DevOps Pipelines

Azure DevOps主要通过管理代码、管理服务器、管理发布的管道来实现一体化解决方案 发布流程&#xff1a; 1、代码上传Repos仓储 略 2、DevOps连接并管理发布服务器 2.1、Deployment Groups配置 2.2、服务器执行连接指令 2.3、服务器状态查看 3、创建 Pipline(构建代码) 3.1…

前端中font的使用

知识点&#xff1a; 运行截图&#xff1a; 代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head> <meta charset"UTF-8"> <meta http-equiv"X-UA-Compatible" content"IEedge"> <meta name&…

【RabbitMQ】SpringBoot整合RabbitMQ、实现RabbitMQ五大工作模式(万字长文)

目录 一、准备 1、创建SpringBoot项目 2、添加配置信息 3、创建配置类 二、RabbitMQ的配置类里创建队列 三、RabbitMQ的配置类里创建交换机及绑定队列 四、SpringBoot整合RabbitMQ入门案例 1、生产者 2、消费者 四、SpringBoot里实现RabbitMQ五大工作模式 1、简单模式…

Linux--进程多线程(上)

前言 精神内耗一方面可能是消极的&#xff0c;人好像一直在跟自己过不去&#xff0c;但其实它也是一种积极的情绪。精神内耗在某种程度上&#xff0c;是在寻找一种出口&#xff0c;寻找他自己人生的出口&#xff0c;寻找我今天的出口&#xff0c;或者寻找我一觉醒来明天的出口。…

【k8s完整实战教程5】网络服务配置(nodeport/loadbalancer/ingress)

系列文章&#xff1a;这个系列已完结&#xff0c;如对您有帮助&#xff0c;求点赞收藏评论。 读者寄语&#xff1a;再小的帆&#xff0c;也能远航&#xff01; 【k8s完整实战教程0】前言【k8s完整实战教程1】源码管理-Coding【k8s完整实战教程2】腾讯云搭建k8s托管集群【k8s完…

恐怖的ChatGPT!

大家好&#xff0c;我是飞哥&#xff01;不知道大家那边咋样。反正我最近感觉是快被ChatGPT包围了。打开手机也全是ChatGPT相关的信息&#xff0c;我的好几个老同学都在问我ChatGPT怎么用&#xff0c;部门内也在尝试用ChatGPT做一点新业务出来。那就干脆我就趁清明假期这一天宝…

AB测试基本原理

AB测试基本原理AB测试AB测试的基本步骤1、AB测试的基本步骤①选取指标指标的分类②建立假设③选取实验单位④计算样本量⑤流量分割⑥实验周期计算⑦线上验证⑧数据检验AB测试 所谓的AB测试就是使用实验组和对照组&#xff0c;通过控制变量法保证实验组和对照组基本条件一致&am…