kafka官方文档
参考Kafka三款监控工具比较
1 查看kafka的版本
进入kafka所在目录,通过查看libs目录下的jar包。
2.11是scala的版本,2.0.0是kafka的版本。
测试环境
#systemctl start zookeeper
#systemctl start kafkka
2 kafka的常用配置
Kafka使用属性文件格式的键值对进行配置。这些值可以从文件或以编程方式提供。
2.1 Broker配置
*************************
*****Importance: high
advertised.listeners null
auto.create.topics.enable true
auto.leader.rebalance.enable true
background.threads 10
broker.id -1
compression.type producer
control.plane.listener.name null
controller.listener.names null
controller.quorum.election.backoff.max.ms 1000
controller.quorum.election.timeout.ms 1000
controller.quorum.fetch.timeout.ms 2000
controller.quorum.voters ""
delete.topic.enable true
early.start.listeners null
leader.imbalance.check.interval.seconds 300
leader.imbalance.per.broker.percentage 10
listeners PLAINTEXT://:9092
log.dir /tmp/kafka-logs
log.dirs null
log.flush.interval.messages 9223372036854775807
log.flush.interval.ms null
log.flush.offset.checkpoint.interval.ms 60000
log.flush.scheduler.interval.ms 9223372036854775807
log.flush.start.offset.checkpoint.interval.ms 60000
log.retention.bytes -1
log.retention.hours 168
log.retention.minutes null
log.retention.ms null
log.roll.hours 168
log.roll.jitter.hours 0
log.roll.jitter.ms null
log.roll.ms null
log.segment.bytes 1073741824 (1 gibibyte)
log.segment.delete.delay.ms 60000 (1 minute)
message.max.bytes 1048588
metadata.log.dir null
metadata.log.max.record.bytes.between.snapshots 20971520
metadata.log.segment.bytes 1073741824 (1 gibibyte)
metadata.log.segment.ms 604800000 (7 days)
metadata.max.retention.bytes -1
metadata.max.retention.ms 604800000 (7 days)
min.insync.replicas 1
node.id -1
num.io.threads 8
num.network.threads 3
num.recovery.threads.per.data.dir 1
num.replica.alter.log.dirs.threads null
num.replica.fetchers 1
offset.metadata.max.bytes 4096 (4 kibibytes)
offsets.commit.required.acks -1
offsets.commit.timeout.ms 5000 (5 seconds)
offsets.load.buffer.size 5242880
offsets.retention.check.interval.ms 600000 (10 minutes)
offsets.retention.minutes 10080
offsets.topic.compression.codec 0
offsets.topic.num.partitions 50
offsets.topic.replication.factor 3
offsets.topic.segment.bytes 104857600 (100 mebibytes)
process.roles ""
queued.max.requests 500
replica.fetch.min.bytes 1
replica.fetch.wait.max.ms 500
replica.high.watermark.checkpoint.interval.ms 5000 (5 seconds)
replica.lag.time.max.ms 30000 (30 seconds)
replica.socket.receive.buffer.bytes 65536 (64 kibibytes)
replica.socket.timeout.ms 30000 (30 seconds)
request.timeout.ms 30000 (30 seconds)
sasl.mechanism.controller.protocol GSSAPI
socket.receive.buffer.bytes 102400 (100 kibibytes)
socket.request.max.bytes 104857600 (100 mebibytes)
socket.send.buffer.bytes 102400 (100 kibibytes)
transaction.max.timeout.ms 900000 (15 minutes)
transaction.state.log.load.buffer.size 5242880
transaction.state.log.min.isr 2
transaction.state.log.num.partitions 50
transaction.state.log.replication.factor 3
transaction.state.log.segment.bytes 104857600 (100 mebibytes)
transactional.id.expiration.ms 604800000 (7 days)
unclean.leader.election.enable false
zookeeper.connect null
zookeeper.connection.timeout.ms null
zookeeper.max.in.flight.requests 10
zookeeper.session.timeout.ms 18000 (18 seconds)
zookeeper.set.acl false
*************************
*****Importance: medium
broker.heartbeat.interval.ms 2000 (2 seconds)
broker.id.generation.enable true
broker.rack null
broker.session.timeout.ms 9000 (9 seconds)
connections.max.idle.ms 600000 (10 minutes)
connections.max.reauth.ms 0
controlled.shutdown.enable true
controlled.shutdown.max.retries 3
controlled.shutdown.retry.backoff.ms 5000 (5 seconds)
controller.quorum.append.linger.ms 25
controller.quorum.request.timeout.ms 2000 (2 seconds)
controller.socket.timeout.ms 30000 (30 seconds)
default.replication.factor 1
delegation.token.expiry.time.ms 86400000 (1 day)
delegation.token.master.key null
delegation.token.max.lifetime.ms 604800000 (7 days)
delegation.token.secret.key null
delete.records.purgatory.purge.interval.requests 1
fetch.max.bytes 57671680 (55 mebibytes)
fetch.purgatory.purge.interval.requests 1000
group.initial.rebalance.delay.ms 3000 (3 seconds)
group.max.session.timeout.ms 1800000 (30 minutes)
group.max.size 2147483647
group.min.session.timeout.ms 6000 (6 seconds)
initial.broker.registration.timeout.ms 60000 (1 minute)
inter.broker.listener.name null
inter.broker.protocol.version
log.cleaner.backoff.ms 15000 (15 seconds)
log.cleaner.dedupe.buffer.size 134217728
log.cleaner.delete.retention.ms 86400000 (1 day)
log.cleaner.enable true
log.cleaner.io.buffer.load.factor 0.9
log.cleaner.io.buffer.size 524288
log.cleaner.io.max.bytes.per.second 1.7976931348623157E308
log.cleaner.max.compaction.lag.ms 9223372036854775807
log.cleaner.min.cleanable.ratio 0.5
log.cleaner.min.compaction.lag.ms 0
log.cleaner.threads 1
log.cleanup.policy delete
log.index.interval.bytes 4096 (4 kibibytes)
log.index.size.max.bytes 10485760 (10 mebibytes)
log.message.format.version
log.message.timestamp.difference.max.ms 9223372036854775807
log.message.timestamp.type CreateTime
log.preallocate false
log.retention.check.interval.ms 300000 (5 minutes)
max.connection.creation.rate 2147483647
max.connections 2147483647
max.connections.per.ip 2147483647
max.connections.per.ip.overrides ""
max.incremental.fetch.session.cache.slots 1000
num.partitions 1
password.encoder.old.secret null
password.encoder.secret null
principal.builder.class
producer.purgatory.purge.interval.requests 1000
queued.max.request.bytes -1
replica.fetch.backoff.ms 1000 (1 second)
replica.fetch.max.bytes 1048576 (1 mebibyte)
replica.fetch.response.max.bytes 10485760 (10 mebibytes)
replica.selector.class null
reserved.broker.max.id 1000
sasl.client.callback.handler.class null
sasl.enabled.mechanisms GSSAPI
sasl.jaas.config null
sasl.kerberos.kinit.cmd /usr/bin/kinit
sasl.kerberos.min.time.before.relogin 60000
sasl.kerberos.principal.to.local.rules DEFAULT
sasl.kerberos.service.name null
sasl.kerberos.ticket.renew.jitter 0.05
sasl.kerberos.ticket.renew.window.factor 0.8
sasl.login.callback.handler.class null
sasl.login.class null
sasl.login.refresh.buffer.seconds 300
sasl.login.refresh.min.period.seconds 60
sasl.login.refresh.window.factor 0.8
sasl.login.refresh.window.jitter 0.05
sasl.mechanism.inter.broker.protocol GSSAPI
sasl.oauthbearer.jwks.endpoint.url null
sasl.oauthbearer.token.endpoint.url null
sasl.server.callback.handler.class null
sasl.server.max.receive.size 524288
security.inter.broker.protocol PLAINTEXT
socket.connection.setup.timeout.max.ms 30000 (30 seconds)
socket.connection.setup.timeout.ms 10000 (10 seconds)
socket.listen.backlog.size 50
ssl.cipher.suites ""
ssl.client.auth none
ssl.enabled.protocols TLSv1.2,TLSv1.3
ssl.key.password null
ssl.keymanager.algorithm SunX509
ssl.keystore.certificate.chain null
ssl.keystore.key null
ssl.keystore.location null
ssl.keystore.password null
ssl.keystore.type JKS
ssl.protocol TLSv1.3
ssl.provider null
ssl.trustmanager.algorithm PKIX
ssl.truststore.certificates null
ssl.truststore.location null
ssl.truststore.password null
ssl.truststore.type JKS
zookeeper.clientCnxnSocket null
zookeeper.ssl.client.enable false
zookeeper.ssl.keystore.location null
zookeeper.ssl.keystore.password null
zookeeper.ssl.keystore.type null
zookeeper.ssl.truststore.location null
zookeeper.ssl.truststore.password null
zookeeper.ssl.truststore.type null
*************************
*****Importance: low
alter.config.policy.class.name null
alter.log.dirs.replication.quota.window.num 11
alter.log.dirs.replication.quota.window.size.seconds 1
authorizer.class.name ""
client.quota.callback.class null
connection.failed.authentication.delay.ms 100
controller.quorum.retry.backoff.ms 20
controller.quota.window.num 11
controller.quota.window.size.seconds 1
create.topic.policy.class.name null
delegation.token.expiry.check.interval.ms 3600000 (1 hour)
kafka.metrics.polling.interval.secs 10
kafka.metrics.reporters ""
listener.security.protocol.map PLAINTEXT:PLAINTEXT
log.message.downconversion.enable true
metadata.max.idle.interval.ms 500
metric.reporters ""
metrics.num.samples 2
metrics.recording.level INFO
metrics.sample.window.ms 30000 (30 seconds)
password.encoder.cipher.algorithm AES/CBC/PKCS5Padding
password.encoder.iterations 4096
password.encoder.key.length 128
password.encoder.keyfactory.algorithm null
quota.window.num 11
quota.window.size.seconds 1
replication.quota.window.num 11
replication.quota.window.size.seconds 1
sasl.login.connect.timeout.ms null
sasl.login.read.timeout.ms null
sasl.login.retry.backoff.max.ms 10000 (10 seconds)
sasl.login.retry.backoff.ms 100
sasl.oauthbearer.clock.skew.seconds 30
sasl.oauthbearer.expected.audience null
sasl.oauthbearer.expected.issuer null
sasl.oauthbearer.jwks.endpoint.refresh.ms 3600000 (1 hour)
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms 10000 (10 seconds)
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms 100
sasl.oauthbearer.scope.claim.name scope
sasl.oauthbearer.sub.claim.name sub
security.providers null
ssl.endpoint.identification.algorithm https
ssl.engine.factory.class null
ssl.principal.mapping.rules DEFAULT
ssl.secure.random.implementation null
transaction.abort.timed.out.transaction.cleanup.interval.ms 10000 (10 seconds)
transaction.remove.expired.transaction.cleanup.interval.ms 3600000 (1 hour)
zookeeper.ssl.cipher.suites null
zookeeper.ssl.crl.enable false
zookeeper.ssl.enabled.protocols null
zookeeper.ssl.endpoint.identification.algorithm HTTPS
zookeeper.ssl.ocsp.enable false
zookeeper.ssl.protocol TLSv1.2
2.2 Topic配置
*****Importance: medium
cleanup.policy delete
compression.type producer
delete.retention.ms 86400000 (1 day)
file.delete.delay.ms 60000 (1 minute)
flush.messages 9223372036854775807
flush.ms 9223372036854775807
follower.replication.throttled.replicas ""
index.interval.bytes 4096 (4 kibibytes)
leader.replication.throttled.replicas ""
max.compaction.lag.ms 9223372036854775807
max.message.bytes 1048588
message.format.version 3.0-IV1
message.timestamp.difference.max.ms 9223372036854775807
message.timestamp.type CreateTime
min.cleanable.dirty.ratio 0.5
min.compaction.lag.ms 0
min.insync.replicas 1
preallocate false
retention.bytes -1
retention.ms 604800000 (7 days)
segment.bytes 1073741824 (1 gibibyte)
segment.index.bytes 10485760 (10 mebibytes)
segment.jitter.ms 0
segment.ms 604800000 (7 days)
unclean.leader.election.enable false
*****Importance: low
message.downconversion.enable true
2.3 Producer配置
*************************
*****Importance: high
key.serializer
value.serializer
bootstrap.servers ""
buffer.memory 33554432
compression.type none
retries 2147483647
ssl.key.password null
ssl.keystore.certificate.chain null
ssl.keystore.key null
ssl.keystore.location null
ssl.keystore.password null
ssl.truststore.certificates null
ssl.truststore.location null
ssl.truststore.password null
*************************
*****Importance: medium
batch.size 16384
client.dns.lookup use_all_dns_ips
client.id ""
connections.max.idle.ms 540000 (9 minutes)
delivery.timeout.ms 120000 (2 minutes)
linger.ms 0
max.block.ms 60000 (1 minute)
max.request.size 1048576
partitioner.class null
partitioner.ignore.keys false
receive.buffer.bytes 32768 (32 kibibytes)
request.timeout.ms 30000 (30 seconds)
sasl.client.callback.handler.class null
sasl.jaas.config null
sasl.kerberos.service.name null
sasl.login.callback.handler.class null
sasl.login.class null
sasl.mechanism GSSAPI
sasl.oauthbearer.jwks.endpoint.url null
sasl.oauthbearer.token.endpoint.url null
security.protocol PLAINTEXT
send.buffer.bytes 131072 (128 kibibytes)
socket.connection.setup.timeout.max.ms 30000 (30 seconds)
socket.connection.setup.timeout.ms 10000 (10 seconds)
ssl.enabled.protocols TLSv1.2,TLSv1.3
ssl.keystore.type JKS
ssl.protocol TLSv1.3
ssl.provider null
ssl.truststore.type JKS
*************************
*****Importance: low
acks all
enable.idempotence true
interceptor.classes ""
max.in.flight.requests.per.connection 5
metadata.max.age.ms 300000 (5 minutes)
metadata.max.idle.ms 300000 (5 minutes)
metric.reporters ""
metrics.num.samples 2
metrics.recording.level INFO
metrics.sample.window.ms 30000 (30 seconds)
partitioner.adaptive.partitioning.enable true
partitioner.availability.timeout.ms 0
reconnect.backoff.max.ms 1000 (1 second)
reconnect.backoff.ms 50
retry.backoff.ms 100
sasl.kerberos.kinit.cmd /usr/bin/kinit
sasl.kerberos.min.time.before.relogin 60000
sasl.kerberos.ticket.renew.jitter 0.05
sasl.kerberos.ticket.renew.window.factor 0.8
sasl.login.connect.timeout.ms null
sasl.login.read.timeout.ms null
sasl.login.refresh.buffer.seconds 300
sasl.login.refresh.min.period.seconds 60
sasl.login.refresh.window.factor 0.8
sasl.login.refresh.window.jitter 0.05
sasl.login.retry.backoff.max.ms 10000 (10 seconds)
sasl.login.retry.backoff.ms 100
sasl.oauthbearer.clock.skew.seconds 30
sasl.oauthbearer.expected.audience null
sasl.oauthbearer.expected.issuer null
sasl.oauthbearer.jwks.endpoint.refresh.ms 3600000 (1 hour)
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms 10000 (10 seconds)
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms 100
sasl.oauthbearer.scope.claim.name scope
sasl.oauthbearer.sub.claim.name sub
security.providers null
ssl.cipher.suites null
ssl.endpoint.identification.algorithm https
ssl.engine.factory.class null
ssl.keymanager.algorithm SunX509
ssl.secure.random.implementation null
ssl.trustmanager.algorithm PKIX
transaction.timeout.ms 60000 (1 minute)
transactional.id null
2.4 Consumer配置
*************************
*****Importance: high
key.deserializer
value.deserializer
bootstrap.servers ""
fetch.min.bytes 1
group.id null
heartbeat.interval.ms 3000 (3 seconds)
max.partition.fetch.bytes 1048576 (1 mebibyte)
session.timeout.ms 45000 (45 seconds)
ssl.key.password null
ssl.keystore.certificate.chain null
ssl.keystore.key null
ssl.keystore.location null
ssl.keystore.password null
ssl.truststore.certificates null
ssl.truststore.location null
ssl.truststore.password null
*************************
*****Importance: medium
allow.auto.create.topics true
auto.offset.reset latest
client.dns.lookup use_all_dns_ips
connections.max.idle.ms 60000 (1 minute)
enable.auto.commit true
exclude.internal.topics true
fetch.max.bytes 52428800 (50 mebibytes)
group.instance.id null
isolation.level read_uncommitted
max.poll.interval.ms 300000 (5 minutes)
max.poll.records 500
partition.assignment.strategy
receive.buffer.bytes 65536 (64 kibibytes)
request.timeout.ms 30000 (30 seconds)
sasl.client.callback.handler.class null
sasl.jaas.config null
sasl.kerberos.service.name null
sasl.login.callback.handler.class null
sasl.login.class null
sasl.mechanism GSSAPI
sasl.oauthbearer.jwks.endpoint.url null
sasl.oauthbearer.token.endpoint.url null
security.protocol PLAINTEXT
send.buffer.bytes 131072 (128 kibibytes)
socket.connection.setup.timeout.max.ms 30000 (30 seconds)
socket.connection.setup.timeout.ms 10000 (10 seconds)
ssl.enabled.protocols TLSv1.2,TLSv1.3
ssl.keystore.type JKS
ssl.protocol TLSv1.3
ssl.provider null
ssl.truststore.type JKS
*************************
*****Importance: low
auto.commit.interval.ms 5000 (5 seconds)
check.crcs true
client.id ""
client.rack ""
fetch.max.wait.ms 500
interceptor.classes ""
metadata.max.age.ms 300000 (5 minutes)
metric.reporters ""
metrics.num.samples 2
metrics.recording.level INFO
metrics.sample.window.ms 30000 (30 seconds)
reconnect.backoff.max.ms 1000 (1 second)
reconnect.backoff.ms 50
retry.backoff.ms 100
sasl.kerberos.kinit.cmd /usr/bin/kinit
sasl.kerberos.min.time.before.relogin 60000
sasl.kerberos.ticket.renew.jitter 0.05
sasl.kerberos.ticket.renew.window.factor 0.8
sasl.login.connect.timeout.ms null
sasl.login.read.timeout.ms null
sasl.login.refresh.buffer.seconds 300
sasl.login.refresh.min.period.seconds 60
sasl.login.refresh.window.factor 0.8
sasl.login.refresh.window.jitter 0.05
sasl.login.retry.backoff.max.ms 10000 (10 seconds)
sasl.login.retry.backoff.ms 100
sasl.oauthbearer.clock.skew.seconds 30
sasl.oauthbearer.expected.audience null
sasl.oauthbearer.expected.issuer null
sasl.oauthbearer.jwks.endpoint.refresh.ms 3600000 (1 hour)
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms 10000 (10 seconds)
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms 100
sasl.oauthbearer.scope.claim.name scope
sasl.oauthbearer.sub.claim.name sub
security.providers null
ssl.cipher.suites null
ssl.endpoint.identification.algorithm https
ssl.engine.factory.class null
ssl.keymanager.algorithm SunX509
ssl.secure.random.implementation null
ssl.trustmanager.algorithm PKIX
2.5 Kafka Connect配置
2.6 Kafka Streams配置
2.7 AdminClient配置
3 KafkaOffsetMonitor
KafkaOffsetMonitor是Kafka的一款客户端消费监控工具,用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,我们可以浏览当前的消费者组,并且每个Topic的所有Partition的消费情况都可以一目了然。
KafkaOffsetMonitor,它是由Kafka开源社区提供的一款Web管理界面,是Kafka的一款客户端消费监控工具,用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,通过KafkaOffsetMonitor,我们可以很直观的知道,每个Partition的Message是否消费掉,有没有阻塞等。
通过web界面,可以方便的得知以下信息:
(1)对Consumer的消费监控,并列出每个Consumer的Offset数据。
(2)保护消费者组列表信息。
(3)每个Topic的所有Partition列表包含:Topic、Pid、Offset、LogSize、Lag以及Owner等。
(4)浏览查阅Topic的历史消费信息。
3.1 下载
KafkaOffsetMonitor托管在Github上,可以通过Github下载。
下载地址:https://github.com/gunsid/KafkaOffsetMonitor.git。
3.2 启动
将下载下来的KafkaOffsetMonitor jar包上传到linux上,可以新建一个目录KafkaMonitor,用于存放KafkaOffsetMonitor-assembly-0.2.1.jar进入到KafkaMonitor目录下,通过java编译命令来运行这个jar包:
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb
--zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181
--port 8088
--refresh 5.seconds
--retain 1.days
如果没有指定端口,则默认会开启一个随机端口。
zk :zookeeper主机地址,如果有多个,用逗号隔开
port :应用程序端口
refresh :应用程序在数据库中刷新和存储点的频率
retain :在db中保留多长时间
dbName :保存的数据库文件名,默认为offsetapp
编辑启动脚本
#vi kafka-monitor-start.sh
#chmod a+x kafka-monitor-start.sh
java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--port 8089 \
--zk 10.23.241.202:2181 \
--refresh 5.minutes \
--retain 1.day >/dev/null 2>&1;
#nohup /root/mm/kafka-monitor-start.sh & 启动
#netstat -lnp | grep 8089 查看端口占用情况
3.3 web UI
在游览器中输入:http://ip:port即可以查看KafkaOffsetMonitor Web UI。
http://10.23.241.202:8089/。