1、管理
使用kafka-topics.sh脚本:
选项 | 说明 |
--config <String: name=value> | 为创建的或修改的主题指定配置信息。支持下述配置条目:
|
--create | 创建一个新主题 |
--delete | 删除一个主题 |
--delete-config <String: name> | 删除现有主题的一个主题配置条目。这些条目就是在 --config中给出的配置条目。 |
--alter | 更改主题的分区数量,副本分配和/或配置条目。 |
--describe | 列出给定主题的细节。 |
--disable-rack-aware | 禁用副本分配的机架感知。 |
--force | 抑制控制台提示信息 |
--help | 打印帮助信息 |
--if-exists | 如果指定了该选项,则在修改或删除主题的时候,只有主题存在才可以执行。 |
--if-not-exists | 在创建主题的时候,如果指定了该选项,则只有主题不存在的时候才可以执行命令。 |
--list | 列出所有可用的主题。 |
--partitions <Integer: # of partitions> | 要创建或修改主题的分区数。 |
--replica-assignment <String:broker_id_for_part1_replica1 :broker_id_for_part1_replica2 ,broker_id_for_part2_replica1 :broker_id_for_part2_replica2 , ...> | 当创建或修改主题的时候手动指定partition-to-broker的分配关系。 |
--replication-factor <Integer:replication factor> | 要创建的主题分区副本数。1表示只有一个副本,也就是Leader副本。 |
--topic <String: topic> | 要创建、修改或描述的主题名称。除了创建,修改和描述在这里还可以使用正则表达式。 |
--topics-with-overrides | if set when describing topics, only show topics that have overridden configs |
--unavailable-partitions | if set when describing topics, only show partitions whose leader is not available |
--under-replicated-partitions | if set when describing topics, only show under replicated partitions |
--zookeeper <String: urls> | 必需的参数:连接zookeeper的字符串,逗号分隔的多个host:port列表。多个URL可以故障转移。 |
主题中可以使用的参数定义:
属性 | 默认值 | 服务器默认属性 | 说明 |
cleanup.policy | delete | log.cleanup.policy | 要么是”delete“要么是” compact“; 这个字符串指明了针对旧日志部分的利用方式;默认方式("delete")将会丢弃旧的部分当他们的回收时间或者尺寸限制到达时。”compact“将会进行日志压缩 |
compression.type | none | producer用于压缩数据的压缩类型。默认是无压缩。正确的选项值是none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好。 | |
delete.retention.ms | 86400000 (24hours) | log.cleaner.delete.retention.ms | 对于压缩日志保留的最长时间,也是客户端消费消息的最长时间,通log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。此项配置可以在topic创建时的置顶参数覆盖 |
flush.ms | None | log.flush.interval.ms | 此项配置用来置顶强制进行fsync日志到磁盘的时间间隔;例如,如果设置为1000,那么每1000ms就需要进行一次fsync。一般不建议使用这个选项 |
flush.messages | None | log.flush.interval.messages | 此项配置指定时间间隔:强制进行fsync日志。例如,如果这个选项设置为1,那么每条消息之后都需要进行fsync,如果设置为5,则每5条消息就需要进行一次fsync。一般来说,建议你不要设置这个值。此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失. |
index.interval.bytes | 4096 | log.index.interval.bytes | 默认设置保证了我们每4096个字节就对消息添加一个索引,更多的索引使得阅读的消息更加靠近,但是索引规模却会由此增大;一般不需要改变这个选项 |
max.message.bytes | 1000000 | max.message.bytes | kafka追加消息的最大尺寸。注意如果你增大这个尺寸,你也必须增大你consumer的fetch 尺寸,这样consumer才能fetch到这些最大尺寸的消息。 |
min.cleanable.dirty.ratio | 0.5 | min.cleanable.dirty.ratio | 此项配置控制log压缩器试图进行清除日志的频率。默认情况下,将避免清除压缩率超过50%的日志。这个比率避免了最大的空间浪费 |
min.insync.replicas | 1 | min.insync.replicas | 当producer设置request.required.acks为-1时, min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer会产生异常。 |
retention.bytes | None | log.retention.bytes | 如果使用“delete”的retention 策略,这项配置就是指在删除日志之前,日志所能达到的最大尺寸。默认情况下,没有尺寸限制而只有时间限制 |
retention.ms | 7 days | log.retention.minutes | 如果使用“delete”的retention策略,这项配置就是指删除日志前日志保存的时间。 |
segment.bytes | 1GB | log.segment.bytes | kafka中log日志是分成一块块存储的,此配置是指log日志划分成块的大小 |
segment.index.bytes | 10MB | log.index.size.max.bytes | 此配置是有关offsets和文件位置之间映射的索引文件的大小;一般不需要修改这个配置 |
segment.jitter.ms | 0 | log.roll.jitter.{ms,hours} | The maximum jitter to subtract from logRollTimeMillis. |
segment.ms | 7 days | log.roll.hours | 即使log的分块文件没有达到需要删除、压缩的大小,一旦log 的时间达到这个上限,就会强制新建一个log分块文件 |
unclean.leader.election.enable | true | 指明了是否能够使不在ISR中replicas设置用来作为leader |
(1)创建主题
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_x --partitions 1 --replication-factor 1 kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_02 --partitions 3 --replication-factor 1 --config max.message.bytes=1048576 --config segment.bytes=10485760
(2)查看主题
kafka-topics.sh --zookeeper localhost:2181/myKafka --list kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_x kafka-topics.sh --zookeeper localhost:2181/myKafka --topics-with-overrides --describe
(3)修改主题
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_01 --partitions 2 --replication-factor 1
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config max.message.bytes=1048576
kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_test_01
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config segment.bytes=10485760
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --delete-config max.message.bytes --topic topic_test_01
(4)删除主题
kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_x
给主题添加删除的标记:
要过一段时间删除。
2、增加分区
通过命令行工具操作,主题的分区只能增加,不能减少。否则报错:
ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic myTop1 currently has 2 partitions, 1 would not be an increase.
通过--alter修改主题的分区数,增加分区。
kafka-topics.sh --zookeeper localhost/myKafka --alter --topic myTop1 --partitions 2
3、分区副本的分配-了解
副本分配的三个目标:
- 均衡地将副本分散于各个broker上
- 对于某个broker上分配的分区,它的其他副本在其他broker上
- 如果所有的broker都有机架信息,尽量将分区的各个副本分配到不同机架上的broker。
在不考虑机架信息的情况下:
- 第一个副本分区通过轮询的方式挑选一个broker,进行分配。该轮询从broker列表的随机位置进行轮询。
- 其余副本通过增加偏移进行分配。
4、必要参数配置
kafka-topics.sh --config xx=xx --config yy=yy
配置给主题的参数:
属性 | 默认值 | 服务器默认属性 | 说明 |
cleanup.policy | delete | log.cleanup.policy | 要么是”delete“要么是” compact“; 这个字符串指明了针对旧日志部分的利用方式;默认方式("delete")将会丢弃旧的部分当他们的回收时间或者尺寸限制到达时。”compact“将会进行日志压缩 |
compression.type | none | producer用于压缩数据的压缩类型。默认是无压缩。正确的选项值是none、gzip、snappy、 lz4。压缩最好用于批量处理,批量处理消息越多,压缩性能越好。 | |
max.message.bytes | 1000000 | max.message.bytes | kafka追加消息的最大字节数。注意如果你增大这个字节数,也必须增大consumer的fetch字节数,这样consumer才能fetch到这些最大字节数的消息。 |
min.cleanable.dirty.ratio | 0.5 | min.cleanable.dirty.ratio | 此项配置控制log压缩器试图进行清除日志的频率。默认情况下,将避免清除压缩率超过50%的日志。这个比率避免了最大的空间浪费 |
min.insync.replicas | 1 | min.insync.replicas | 当producer设置request.required.acks为-1时, min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到, producer会产生异常。 |
retention.bytes | None | log.retention.bytes | 如果使用“delete”的retention 策略,这项配置就是指在删除日志之前,日志所能达到的最大尺寸。默认情况下,没有尺寸限制而只有时间限制 |
retention.ms | 7 days | log.retention.minutes | 如果使用“delete”的retention策略,这项配置就是指删除日志前日志保存的时间。 |
segment.bytes | 1GB | log.segment.bytes | kafka中log日志是分成一块块存储的,此配置是指log日志划分成块的大小 |
segment.index.bytes | 10MB | log.index.size.max.bytes | 此配置是有关offsets和文件位置之间映射的索引文件的大小;一般不需要修改这个配置 |
segment.jitter.ms | 0 | log.roll.jitter.{ms,hours} | The maximum jitter to subtract from logRollTimeMillis. |
segment.ms | 7 days | log.roll.hours | 即使log的分块文件没有达到需要删除、压缩的大小,一旦log 的时间达到这个上限,就会强制新建一个log分块文件 |
unclean.leader.election.enable | true | 指明了是否能够使不在ISR中replicas设置用来作为lea |
5、KafkaAdminClient应用
说明
除了使用Kafka的bin目录下的脚本工具来管理Kafka,还可以使用管理Kafka的API将某些管理查看的功能集成到系统中。在Kafka0.11.0.0版本之前,可以通过kafka-core包(Kafka的服务端,采用Scala编写)中的AdminClient和AdminUtils来实现部分的集群管理操作。Kafka0.11.0.0之后,又多了一个AdminClient,在kafka-client包下,一个抽象类,具体的实现是org.apache.kafka.clients.admin.KafkaAdminClient。
功能与原理介绍
Kafka官网:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects。
KafkaAdminClient包含了一下几种功能(以Kafka1.0.2版本为准):
名称 | api |
创建主题 | createTopics(final Collection<NewTopic> newTopics, final CreateTopicsOptions options) |
删除主题 | deleteTopics(final Collection<String> topicNames, DeleteTopicsOptions options) |
列出所有主题 | listTopics(final ListTopicsOptions options) |
查询主题 | describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options) |
查询集群信息 | describeCluster(DescribeClusterOptions options) |
查询配置信息 | describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options) |
修改配置信息 | alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options) |
修改副本的日志目录 | alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaLogDirsOptions options) |
查询节点的日志目录信息 | describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options) |
查询副本的日志目录信息 | describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options) |
增加分区 | createPartitions(Map<String, NewPartitions> newPartitions, final CreatePartitionsOptions options) |
其内部原理是使用Kafka自定义的一套二进制协议来实现,详细可以参见Kafka协议。
用到的参数:
属性 | 说明 | 重要性 |
bootstrap.servers | 向Kafka集群建立初始连接用到的host/port列表。客户端会使用这里列出的所有服务器进行集群其他服务器的发现,而不管是否指定了哪个服务器用作引导。 这个列表仅影响用来发现集群所有服务器的初始主机。 字符串形式:host1:port1,host2:port2,... 由于这组服务器仅用于建立初始链接,然后发现集群中的所有服务器,因此没有必要将集群中的所有地址写在这里。 一般最好两台,以防其中一台宕掉。 | high |
client.id | 生产者发送请求的时候传递给broker的id字符串。用于在broker的请求日志中追踪什么应用发送了什么消息。 一般该id是跟业务有关的字符串。 | medium |
connections.max.idle.ms | 当连接空闲时间达到这个值,就关闭连接。long型数据,默认:300000 | medium |
receive.buffer.bytes | TCP接收缓存(SO_RCVBUF),如果设置为-1,则使用操作系统默认的值。int类型值,默认65536,可选值:[-1,...] | medium |
request.timeout.ms | 客户端等待服务端响应的最大时间。如果该时间超时,则客户端要么重新发起请求,要么如果重试耗尽,请求失败。int类型值,默认:120000 | medium |
security.protocol | 跟broker通信的协议:PLAINTEXT, SSL,SASL_PLAINTEXT, SASL_SSL. string类型值,默认:PLAINTEXT | medium |
send.buffer.bytes | 用于TCP发送数据时使用的缓冲大小(SO_SNDBUF),-1表示使用OS默认的缓冲区大小。int类型值,默认值:131072 | medium |
reconnect.backoff.max.ms | 对于每个连续的连接失败,每台主机的退避将成倍增加,直至达到此最大值。在计算退避增量之后,添加20%的随机抖动以避免连接风暴。 long型值,默认1000,可选值:[0,...] | medium |
reconnect.backoff.ms | 重新连接主机的等待时间。避免了重连的密集循环。该等待时间应用于该客户端到broker的所有连接。 long型值,默认:50 | low |
retries | The maximum number of times to retry a call before failing it.重试的次数,达到此值,失败。 int类型值,默认5。 | low |
retry.backoff.ms | 在发生失败的时候如果需要重试,则该配置表示客户端等待多长时间再发起重试。该时间的存在避免了密集循环。 long型值,默认值:100。 | low |
主要操作步骤:
- 客户端根据方法的调用创建相应的协议请求,比如创建Topic的createTopics方法,其内部就是发送CreateTopicRequest请求。
- 客户端发送请求至Kafka Broker。
- Kafka Broker处理相应的请求并回执,比如与CreateTopicRequest对应的是CreateTopicResponse。
- 客户端接收相应的回执并进行解析处理。
- 和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是这些请求和响应类的两个父类。
综上,如果要自定义实现一个功能,只需要三个步骤:
- 自定义XXXOptions;
- 自定义XXXResult返回值;
- 自定义Call,然后挑选合适的XXXRequest和XXXResponse来实现Call类中的3个抽象方法。
package com.lagou.kafka.demo;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public class MyAdminClient {
private KafkaAdminClient client;
@Before
public void before() {
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "node1:9092");
configs.put("client.id", "admin_001");
client = (KafkaAdminClient) KafkaAdminClient.create(configs);
}
@After
public void after() {
// 关闭admin客户端
client.close();
}
@Test
public void testListTopics() throws ExecutionException, InterruptedException {
// 列出主题
// final ListTopicsResult listTopicsResult = client.listTopics();
ListTopicsOptions options = new ListTopicsOptions();
// 列出内部主题
options.listInternal(true);
// 设置请求超时时间,单位是毫秒
options.timeoutMs(500);
final ListTopicsResult listTopicsResult = client.listTopics(options);
// final Set<String> strings = listTopicsResult.names().get();
//
// strings.forEach(name -> {
// System.out.println(name);
// });
// 将请求变成同步的请求,直接获取结果
final Collection<TopicListing> topicListings = listTopicsResult.listings().get();
topicListings.forEach(new Consumer<TopicListing>() {
@Override
public void accept(TopicListing topicListing) {
// 该主题是否是内部主题
final boolean internal = topicListing.isInternal();
// 该主题的名字
final String name = topicListing.name();
System.out.println("主题是否是内部主题:" + internal);
System.out.println("主题的名字:" + name);
System.out.println(topicListing);
System.out.println("=====================================");
}
});
}
@Test
public void testDescribeLogDirs() throws ExecutionException, InterruptedException {
final DescribeLogDirsResult describeLogDirsResult = client.describeLogDirs(Collections.singleton(0));
final Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> integerMapMap
= describeLogDirsResult.all().get();
integerMapMap.forEach(new BiConsumer<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>() {
@Override
public void accept(Integer integer, Map<String, DescribeLogDirsResponse.LogDirInfo> stringLogDirInfoMap) {
System.out.println("broker.id = " + integer);
// log.dirs可以设置多个目录
stringLogDirInfoMap.forEach(new BiConsumer<String, DescribeLogDirsResponse.LogDirInfo>() {
@Override
public void accept(String s, DescribeLogDirsResponse.LogDirInfo logDirInfo) {
System.out.println("logdir = " + s);
final Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfos = logDirInfo.replicaInfos;
replicaInfos.forEach(new BiConsumer<TopicPartition, DescribeLogDirsResponse.ReplicaInfo>() {
@Override
public void accept(TopicPartition topicPartition, DescribeLogDirsResponse.ReplicaInfo replicaInfo) {
System.out.println("主题分区:" + topicPartition.partition());
System.out.println("主题:" + topicPartition.topic());
// final boolean isFuture = replicaInfo.isFuture;
// final long offsetLag = replicaInfo.offsetLag;
// final long size = replicaInfo.size;
}
});
}
});
}
});
}
}
6、偏移量管理
- Kafka 1.0.2,__consumer_offsets主题中保存各个消费组的偏移量。
- 早期由zookeeper管理消费组的偏移量。
查询方法:
- 通过原生 kafka 提供的工具脚本进行查询。
- 工具脚本的位置与名称为bin/kafka-consumer-groups.sh
首先运行脚本,查看帮助:
参数 | 说明 |
--all-topics | 将所有关联到指定消费组的主题都划归到reset-offsets 操作范围。 |
--bootstrapserver<String: server to connect to> | 必须:(基于消费组的新的消费者): 要连接的服务器地址。 |
--by-duration <String: duration> | 距离当前时间戳的一个时间段。格式:'PnDTnHnMnS' |
--command-config<String: command config property file> | 指定配置文件,该文件内容传递给Admin Client和消费者。 |
--delete | 传值消费组名称,删除整个消费组与所有主题的各个分区偏移量和所有者关系。 如: --group g1 --group g2。 传值消费组名称和单个主题,仅删除该消费组到指定主题的分区偏移量和所属关系。 如: --group g1 --group g2 --topic t1。 传值一个主题名称,仅删除指定主题与所有消费组分区偏移量以及所属关系。 如: --topic t1 注意:消费组的删除仅对基于ZK保存偏移量的消费组有效,并且要小心使用,仅删除不活跃的消费组。 |
--describe | 描述给定消费组的偏移量差距(有多少消息还没有消费)。 |
--execute | 执行操作。支持的操作: reset-offsets。 |
--export | 导出操作的结果到CSV文件。支持的操作: reset-offsets。 |
--from-file <String: path to CSV file> | 重置偏移量到CSV文件中定义的值。 |
--group <String: consumer group> | 目标消费组。 |
--list | 列出所有消费组。 |
--new-consumer | 使用新的消费者实现。这是默认值。随后的发行版中会删除这一操作。 |
--reset-offsets | 重置消费组的偏移量。当前一次操作只支持一个消费组,并且该消费组应该是不活跃的。
|
--shift-by <Long: number-of-offsets> | 重置偏移量n个。n可以是正值,也可以是负值。 |
--timeout <Long: timeout (ms)> | 对某些操作设置超时时间。 如:对于描述指定消费组信息,指定毫秒值的最大等待时间,以获取正常数据(如刚创建的消费组,或者消费组做了一些更改操作)。默认时间: 5000。 |
--to-current | 重置到当前的偏移量。 |
--to-datetime <String: datetime> | 重置偏移量到指定的时间戳。格式:'YYYY-MM-DDTHH:mm:SS.sss' |
--to-earliest | 重置为最早的偏移量 |
--to-latest | 重置为最新的偏移量 |
--to-offset <Long:offset> | 重置到指定的偏移量。 |
--topic <String: topic> | 指定哪个主题的消费组需要删除,或者指定哪个主题的消费组需要包含到 reset-offsets操作中。对于 reset-offsets操作,还可以指定分区: topic1:0,1,2。其中0,1,2表示要包含到操作中的分区号。重置偏移量的操作支持多个主题一起操作。 |
--zookeeper <String: urls> | 必须,它的值,你懂的。 --zookeeper node1:2181/myKafka。 |
这里我们先编写一个生产者,消费者的例子:
我们先启动消费者,再启动生产者, 再通过 bin/kafka-consumer-groups.sh 进行消费偏移量查询
由于kafka 消费者记录group的消费偏移量有两种方式 :
- kafka 自维护 (新)
- zookpeer 维护 (旧) ,已经逐渐被废弃
所以 ,脚本只查看由broker维护的,由zookeeper维护的可以将--bootstrap-server 换成--zookeeper 即可。
(1)查看有那些 group ID 正在进行消费:
注意:
- 这里面是没有指定 topic,查看的是所有topic消费者的 group.id 的列表。
- 注意: 重名的 group.id 只会显示一次
(2)查看指定group.id 的消费者消费情况
kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group group
如果消费者停止,查看偏移量信息:
将偏移量设置为最早的:
将偏移量设置为最新的:
分别将指定主题的指定分区的偏移量向前移动10个消息: