文章目录
- 前言
- 1、Kafka 系统架构
- 1.1、Producer 生产者
- 1.2、Consumer 消费者
- 1.3、Consumer Group 消费者群组
- 1.4、Topic 主题
- 1.5、Partition 分区
- 1.6、Log 日志存储
- 1.7、Broker 服务器
- 1.8、Offset 偏移量
- 1.9、Replication 副本
- 1.10、Zookeeper
- 2、Kafka 环境搭建
- 2.1、下载 Kafka
- 2.2、修改 Kafka 配置 config/server.properties
- 2.3、运行(下面有用 AdminClient API 操作)
- 3、Kafka 的 JAVA 实现
- 3.1、AdminClient API
- 3.2、Kafka 生产者 JAVA 代码
- 3.3、Kafka 消费者 JAVA 代码
- 3.4、Spring Boot 实现消费者对 Kafka 的监听
- 4、Kafka 幂等性
- 4.1、幂等性要解决的问题
- 4.2、Kafka 是怎么保证幂等性的
- 4.3、Kafka 幂等性的局限性
- 5、Kafka 常见问题
- 5.1、Kafka 消息丢失
- 5.2、Kafka 消息重复消费
- 5.3、kafka 为什么快/吞吐量大
前言
官网:http://kafka.apache.org
Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,由Scala和ava编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。
1、Kafka 系统架构
1.1、Producer 生产者
生产者用于创建消息。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。
消息发送
public Future<RecordMetadata> send(ProducerRecord<K, V> record);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
// 发后即忘(fire-and-forget)
producer.send(record);
// 同步(sync)
producer.send(record).get();
// 异步(async)
producer.send(record, new Callback(){
```
});
消息在通过send()方法发往broker的过程中,有可能需要经过拦截器、序列化器、和分区器的一系列作用之后才能被真正地发往broker。
生产者架构图
序列化
生产者需要用序列化器把对象转换成字节数组才能通过网络发送给Kafka。消费者需要用反序列化区把从Kafka中收到的字节数组转换成相应的对象。自带的有StringSerializer,ByteArray、ByteBuffer、Bytes、Double、Integer、Long等,还可以自定义序列化器。
分区器
如果消息中没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。也可以自定义分区器。
拦截器
生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。通过自定义实现ProducerInterceptor接口来使用。
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender发送线程:
① 在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator)中。消息累加器RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送进而减少网络传输的资源消耗以提升性能。RecordAccumulator的缓存大小可以通过buffer.memory
配置。
在RecordAccumulator的内部为每个分区都维护了一个双端队列,主线程发送过来的消息(ProducerRecord)都会被追加到某个双端队列(Dqueue)中,队列中的内容就是ProducerBatch,即Dqueue< ProducerBatch >。一个ProducerBatch 包含多个ProducerRecord。当一条消息(ProducerRecord)流入RecordAccumulator,如果这条消息小于batch.size参数大小则以batch.size参数大小创建ProducerBatch,否则以消息的实际大小创建ProducerBatch。
② Sender发送线程负责从消息累加器RecordAccumulator中获取消息并将其发送到Kafka中。后续Sender从缓存中获取消息,进行转换,发送到broker。在发送前还会保存到InFlightRequests中,作用是缓存已经发送出去但还没有收到响应的请求,缓存数量由max.in.flight.requests.per.connection
参数确定,默认是5,表示每个连接最多缓存5个未响应的请求。
1.2、Consumer 消费者
消费者,消息的订阅者,可以订阅一个或多个主题,并且依据消息生产的顺序读取他们,消费者通过检查消息的偏移量来区分已经读取过的消息。消费者一定属于某一个特定的消费组。
订阅主题和分区
通过subscribe()方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配政策来自动分配各个消费者与分区的关系,以实现消费者负载均衡和故障自动转移。而通过assign()方法则没有。
消息消费
Kafka中的消息是基于拉模式的。Kafka中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll()方法,而poll()方法返回的是所订阅的主题(分区)上的一组消息。如果没有消息则返回空。
public ConsumerRecords<K, V> (final Duration timeout)
timeout用于控制poll()方法的阻塞时间,没有消息时会阻塞。
位移提交
Kafka中的每条消息都有唯一的offset,用来标识消息在分区中对应的位置。Kafka默认的消费唯一的提交方式是自动提交,由enable.auto.commit
配置,默认为true。自动提交不是每一条消息提交一次,而是定期提交,周期由auto.commit.interval.ms
配置,默认为5秒。
自动提交可能发生消息重复或者丢失的情况,Kafka还提供了手动提交的方式。enable.auto.commit
配置为false开启手动提交。
指定位移消费
在Kafka中每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数auto.offset.reset
的配置来决定从何处开始进行消费。默认值为lastest,表示从分区末尾开始消费消息;earliest表示从起始开始消费;none为不进行消费,而是抛出异常。
seek()可以从特定的位移处开始拉去消息,得以追前消费或回溯消费。
public void seek(TopicPartition partition, long offset)
再均衡
再均衡是指分区的所属权从一个消费者转移到另一个消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或者往消费组内添加消费者。不过在再均衡发生期间,消费组内的消费者是无法读取消息的。再均衡后也可能出现重复消费的情况。所以应尽量避免不必要的再均衡发生。
1.3、Consumer Group 消费者群组
同一个消费者组中保证每个分区只能被一个消费者使用 ,不会出现多个消费者读取同一个分区的情况,通过这种方式,消费者可以消费包含大量消息的主题。而且如果某个消费者失效,群组里的其他消费者可以接管失效悄费者的工作。
1.4、Topic 主题
Kafka中 的消息是根据 Topic 进行分类的,Topic 是支持多订阅的,一个 Topic 可以有多个不同的订阅消息的消费者。Kafka 集群 Topic 的数量没有限制,同一个 Topic 的数据会被划分在同一个目录下,一个 Topic 可以包含 1 至多个分区,所有分区的消息加在一起就是一个 Topic 的所有消息。
1.5、Partition 分区
在Kafka中,每个 Topic 至少有一个 Partition ,一个 topic 可以包含多个 分区partition,topic 消息保存在各个 partition 上,由于一个 topic 能被分到多个分区上,给 kafka 提供给了并行的处理能力,这也正是 kafka 高吞吐的原因之一。
每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。分区中的消息都被分了一个序列号,称之为偏移量(offset):消息在日志中的位置,可以理解是消息在 partition 上的偏移量,也是代表消息的唯一序号。
分区策略
分区策略 | 说明 |
---|---|
轮询策略 | 按顺序轮流将每条数据分配到每个分区中 |
随机策略 | 每次都随机地将消息分配到每个分区 |
按键保存策略 | 生产者发送数据的时候,可以指定一个key,计算这个key的hashCodet值,按照hashCodel的值对不同消息进行存储 |
如果 topic 有多个 partition,消费数据时就不能保证数据的顺序。严格保证消息的消费顺序的场景下,需要将 partition 数目设为1。
1.6、Log 日志存储
一个分区对应一个日志文件(Log),为了防止Log过大,Kafka又引入了日志分段(LogSegment)的概念,将Log切分为多个LogSegment,便于消息的维护和清理。Log在物理上只以(命名为topic-partitiom)文件夹的形式存储,而每个LogSegment对应磁盘上的一个日志文件和两个索引文件,以及可能的其他文件。
LogSegment文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为segment的索引文件和数据文件
- partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值
- 数值大小为64位,20位数据字符长度,没有数字用0填充
消息压缩
一条消息通常不会太大,Kafka是批量消息压缩,通过compression.type
配置默认为producer,还可以配置为gzip、snappy、lz4,uncompressed表示不压缩。
日志索引
Kafka中的索引文件以稀疏索引的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(log.index.interval.bytes
指定,默认4KB)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引文件项和时间戳索引文件项。稀疏索引通过MappedByteBuffer将索引文件映射到内存中,以加快索引的查询速度。
日志清理
Kafka提供两种日志清理策略:
- 日志删除:按照一定的保留策略(基于时间、日志大小或日志起始偏移量)直接删除不符合条件的日志分段。
- 日志压缩:针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本。
页缓存
页缓存是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问,减少对磁盘IO的操作。
零拷贝
所谓的零拷贝是将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。减少了数据拷贝的次数和内核和用户模式之间的上下文切换。对于Linux操作系统而言,底层依赖于sendfile()方法实现。
一般的数据流程:磁盘 -> 内核 -> 应用 -> Socket -> 网卡,数据复制4次,上下文切换4次。
流程步骤:
- 操作系统将数据从磁盘文件中读取到内核空间的页面缓存。
- 应用程序将数据从内核空间读入用户空间缓冲区。
- 应用程序将读到数据写回内核空间并放入socket缓冲区。
- 操作系统将数据从socket缓冲区复制到网卡接口,此时数据才能通过网络发送。
通过网卡直接去访问系统的内存,就可以实现现绝对的零拷贝了。这样就可以最大程度提高传输性能。通过“零拷贝”技术,我们可以去掉那些没必要的数据复制操作, 同时也会减少上下文切换次数。
通过上图可以看到,零拷贝技术只用将磁盘文件的数据复制到页面缓存中一次,然后将数据从页面缓存直接发送到网络中(发送给不同的订阅者时,都可以使用同一个页面缓存),避免了重复复制操作。
1.7、Broker 服务器
一个独立的 Kafka 服务器被称为broker, broker接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。
如果broker端配置参数auto.create.topics.enable
设置为true(默认为true),那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为num.partitions
(默认为1)、副本因子为default.replication.factor
默认值为1的主题。分区和分区副本都对应一个日志文件,不是分区数越多吞吐量就越大,超过阈值会使Kafka报错或系统崩溃。分区只能增加不能减少。
1.8、Offset 偏移量
消息的唯一标识,是连续的序列号,偏移量决定读取数据的位置,不会有线程安全的问题,消费者通过偏移量来决定下次读取的消息。消息被消费之后,并不被马上删除,这样多个业务就可以重复使用 kafkal 的消息,我们某一个业务也可以通过修改偏移量达到重新读取消息的目的,偏移量由用户控制。消息最终还是会被删除的,默认生命周期为1周(7*24小时)。
1.9、Replication 副本
每个 partition 有多个副本,其中有且仅有一个作为 Leader,Leader 是当前负责数据的读写的 partition。 producer 和 consumer 只跟 leader 交互。Follower 跟随 Leader,所有写请求都通过 Leader 路由,数据变更会广播给所有 Follower,Follower 与 Leader 保持数据同步。如果 Leader 失效,则从 Follower 中选举出一个新的 Leader。
AR、ISR、OSR
- 分区中的所有副本统称为AR。
- 所有与leader副本保持一定同步程度的副本组成。
- 与leader副本同步滞后过多的副本组成OSR。
- AR = ISR +OSR。正常情况 应该AR=ISR,OSR集合为空。
Kafka 副本 Leader 选举原理的理解
① Kafka要先从所有Broker中选出唯一的一个Controller。
所有的Broker会尝试在Zookeeper中创建临时节点/controller,谁先创建成功,谁就是Controller。那如果Controller挂掉或者网络出现问题,ZooKeeper上的临时节点就会消失。其他的Broker通过Watch监听到Controller下线的消息后,继续按照先到先得的原则竞选Controller。这个Controller就相当于选举委员会的主席。
② Controller确定以后,就可以开始做分区选主的事情。接下来就是找候选人。显然,每个Replication副本都想推荐自己,但不是所有的副本都有竞选资格。只有在ISR保持心跳同步的副本才有资格参与竞选。就好比是皇帝每天着急皇子们开早会,只有每天来打卡的皇子才能加入ISR。那些请假的、迟到的没有资格参与选举。接下来,就是Leader选举,就相当于要在众多皇子中选出太子。它的选举算法和微软的PacificA算法最相近。大致意思就是,默认是让ISR中第一个Replica变成Leader。比如ISR是1、5、9,优先让1成为Leader。这个跟中国古代皇帝传位是一样的,优先传给皇长子。
1.10、Zookeeper
2、Kafka 环境搭建
2.1、下载 Kafka
你可以在kafka官网 http://kafka.apache.org/downloads下载到最新的kafka安装包,选择下载二进制版本的tgz文件。
2.2、修改 Kafka 配置 config/server.properties
修改配置文件:config/server.properties
#broker.id属性在kafka集群中必须要是唯一
broker.id= 0
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://127.0.0.1:9092
#kafka的消息存储文件
log.dir=/usr/local/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect= 127.0.0.1:2181
server.properties核心配置详解:
Property | Default | Description |
---|---|---|
broker.id | 0 | 每个broker都可以用⼀个唯⼀的非负整数id进⾏标识 |
log.dirs | /tmp/kafka-logs | kafka存放数据的路径。这个路径并不是唯⼀的,可以是多个,路径之间只需要使用逗号分隔即可 |
listeners | PLAINTEXT://127.0.0.1:9092 | server接受客户端连接的端⼝,ip配置kafka本机ip即可 |
zookeeper.connect | localhost:2181 | zooKeeper连接 |
log.retention.hours | 168 | 每个日志文件删除之前保存的时间 |
num.partitions | 1 | 创建topic的默认分区数 |
default.replication.factor | 1 | 自动创建topic的默认副本数量,建议设置为⼤于等于2 |
min.insync.replicas | 1 | producer 发送数据服务端的响应级别 |
delete.topic.enable | false | 是否允许删除主题 |
2.3、运行(下面有用 AdminClient API 操作)
- 启动zookeeper
sh zookeeper-server-start.sh config/zookeeper.properties
- 启动kafka
sh kafka-server-start.sh config/server.properties
- 创建一个topic
sh kafka-topic.sh --create --topic topic_name --zookeeper udp01:2181 --partitions 3 --replication-factor 1
参数解释:
- partition:指定当前创建的kafka topic的分区数量,不指定默认为1。
- replication-factor :知道每个分区的复制因子,不指定默认为1。
创建topic还有一种是自动创建,当你往一个不存在的topic里面输入数据的时候,他会自动创建一个默认配置的topic,这种方式需要在server.properties配置文件中加上auto.create.topics.enable=ture。
- 查看已经创建的topic
sh kafka-topics.sh --list --zookeeper udp01:2181
- 查看某个的topic的信息
sh kafka-topics.sh --describe --zookeeper udp01:2181 --topic topic_name
- 修改toipc信息
sh kafka-topics.sh --zookeeper udp01:2181 --alter --topic topic_name --partitions/config/delete-config
其中 partitions的数据只能比修改之前的大,不能小。
- 删除topic
sh kafka-topics.sh --delete --topic topic_name --zookeeper udp01::2181
删除有2种,一种是标记删除,但实际还是存在的,一种是真的删除。要真的删除也有2种方式:
- 删除本地磁盘以及zk上的相关topic信息。所属目录为/brokers/topics
- 配置server.properties文件中的delete.topic.enable为ture,需要重启kafka才会生效,在执行delete命令则会将topic删掉。
- 向topic中传数据
sh kafka-console-producer.sh --broker-list udp01:9092 --topic topic_name
- 消费topic
sh kafka-console-consumer.sh --bootstrap-server udp01:9092 --topic topic_name --from-beginning
3、Kafka 的 JAVA 实现
3.1、AdminClient API
package com.example.canal.YangKafka;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import java.util.*;
public class AdminClientYang {
public final static String TOPIC_NAME = "yangTest";
public static AdminClient adminClient() {
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9999");
AdminClient adminClient = AdminClient.create(properties);
return adminClient;
}
public static void main(String[] args) throws Exception {
AdminClient adminClient = AdminClientYang.adminClient();
System.out.println("adminClient : " + adminClient);
createTopic();
}
/**
* 创建Topic实例
*/
public static void createTopic() {
AdminClient adminClient = AdminClientYang.adminClient();
// 副本因子
Short re = 1;
NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, re);
CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic));
System.out.println("CreateTopicsResult : " + createTopicsResult);
adminClient.close();
}
/**
* 获取Topic列表
*/
public static void topicList() throws Exception {
AdminClient adminClient = adminClient();
// 是否查看internal选项
ListTopicsOptions options = new ListTopicsOptions();
// 设置我们是否应该列出内部topic
options.listInternal(true);
// 列出集群中可用的topic
ListTopicsResult listTopicsResult = adminClient.listTopics(options);
// 返回一个topic名称集合的future(这里是KafkaFuture)
Set<String> names = listTopicsResult.names().get();
// 返回一个KafkaFuture,它产生一个 TopicListing 对象的集合
Collection<TopicListing> topicListings = listTopicsResult.listings().get();
// 返回一个KafkaFuture,它产生一个topic名称到 TopicListing 对象的映射。
KafkaFuture<Map<String, TopicListing>> mapKafkaFuture = listTopicsResult.namesToListings();
// 打印names
names.stream().forEach(System.out::println);
System.out.println("---------------------------topic列表-------------------------");
// 打印topicListings
topicListings.stream().forEach((topicList) -> {
System.out.println(topicList);
});
System.out.println("---------------------------topic列表-------------------------");
}
/**
* 删除topic
*/
public static void delTopics() throws Exception {
AdminClient adminClient = adminClient();
// 删除一批topic。
// 此操作不是事务性的,因此它可能对某些主题成功,而对另一些主题则失败。
// DeleteTopicsResult返回成功后,所有代理可能需要几秒钟才能意识到主题已消失。 在此期间,
// listTopics()和describeTopics(Collection)可能会继续返回有关已删除主题的信息。
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
deleteTopicsResult.all().get();
}
/**
* 描述topic
* name: yibo_topic
* desc: (name=yibo_topic,
* internal=false,
* partitions=
* (partition=0,
* leader=192.168.174.128:9092 (id: 0 rack: null),
* replicas=192.168.174.128:9092 (id: 0 rack: null),
* isr=192.168.174.128:9092 (id: 0 rack: null)),
* authorizedOperations=null)
* @throws Exception
*/
public static void describeTopic() throws Exception {
AdminClient adminClient = adminClient();
// 描述集群中的一些topic。
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));
Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
System.out.println("----------------------------topic信息-----------------------------");
entries.stream().forEach((entry) -> {
System.out.println("name :" + entry.getKey() + " , desc: " + entry.getValue());
});
System.out.println("----------------------------topic信息-----------------------------");
}
/**
* 查询配置信息
*/
public static void describeConfig() throws Exception {
AdminClient adminClient = adminClient();
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
// 获取指定资源的配置
DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource));
Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get();
System.out.println("----------------------------配置信息-----------------------------");
configResourceConfigMap.entrySet().stream().forEach((entry) -> {
System.out.println("configResource : " + entry.getKey() + " , Config : " + entry.getValue());
});
System.out.println("----------------------------配置信息-----------------------------");
}
/**
* 修改配置信息 老版API
*/
public static void alterConfig() throws Exception {
AdminClient adminClient = adminClient();
Map<ConfigResource, Collection<AlterConfigOp>> configMaps = new HashMap<>();
// 具有配置的资源的类,需要提供type和名称 Type是他内部维护的枚举类,共有四种类型:BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2),
// UNKNOWN((byte) 0)
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
// 包含名称、值和操作类型的更改配置条目的类 ,需要注入ConfigEntry,和操作类型,同样OpType是个枚举类
AlterConfigOp alterConfigOp =
new AlterConfigOp(new ConfigEntry("preallocate", "false"), AlterConfigOp.OpType.SET);
configMaps.put(configResource, Arrays.asList(alterConfigOp));
// 逐步更新指定资源的配置
AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configMaps);
alterConfigsResult.all().get();
}
/**
* 增加partitions数量
*/
public static void incrPartitions(int partitions) throws Exception {
AdminClient adminClient = adminClient();
Map<String, NewPartitions> partitionsMap = new HashMap<>();
NewPartitions newPartitions = NewPartitions.increaseTo(partitions);
partitionsMap.put(TOPIC_NAME, newPartitions);
CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(partitionsMap);
createPartitionsResult.all().get();
}
}
3.2、Kafka 生产者 JAVA 代码
package com.example.canal.YangKafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* 生产者
*/
public class ProducerdemoYang {
public static void main(String[] args) {
// kafka 配置
Properties properties = new Properties();
/**
* 用于建立与 kafka 集群连接的 host/port
*/
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9999");
/**
* producer 需要 server 接收到数据之后发出的确认接收的信号,此项配置就是指 procuder需要多少个这样的确认信号。此配置实际上代表了数据备份的可用性。以下设置为常用选项:
* (1)acks=0:生产者在成功写入消息之前不会等待任何来自服务器的响应,消息传递过程中有可能丢失,其实就是保证消息不会重复发送或者重复消费,但是速度最快。同时重试配置不会发生作用。
* (2)acks=1:默认值,只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应。
* (3)acks=all:只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
*/
properties.put(ProducerConfig.ACKS_CONFIG, "1");
/**
* 如果请求失败,生产者会自动重试,如果启用重试,则会有重复消息的可能性,本次采用手动重试
*/
// properties.put(ProducerConfig.RETRIES_CONFIG, 3);
/**
* 当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送,设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去
* 比如说,kafka里的消息5秒钟Batch才凑满了16KB,才能发送出去。那这些消息的延迟就是5秒钟
*/
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
/**
* 有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间即使数据没达到16KB,也将这个批次发送出去
*/
properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
/**
* 生产者内存缓冲区的大小,如果数据产生速度大于向 broker 发送的速度,将会耗尽这个缓存空间
*/
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
/**
* 该配置控制 KafkaProducer's send(),partitionsFor(),inittransaction
* (),sendOffsetsToTransaction(),commitTransaction() 和abortTransaction()方法将阻塞。对于send(),此超时限制了获取元数据和分配缓冲区的总等待时间
*/
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "5000");
/**
* 将消息发送到kafka server, 所以肯定需要用到序列化的操作,我们这里发送的消息是string类型的,所以使用string的序列化类
*/
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
String test = "hello yang";
// 开启幂等
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
// 封装发送的消息
ProducerRecord<String, String> record = new ProducerRecord<String, String>("yangTest", test);
// 封装发送的消息,指定 key,消息将会被同一分区处理
// ProducerRecord<String, String> record = new ProducerRecord<String, String>("test-topic", "10086", "test");
// 同步发送消息
// producer.send(record);
// 异步发送消息
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("发送成功");
} else {
System.out.println("发送失败");
}
if (metadata != null) {
System.out.println("异步方式发送消息结果:" + "topic‐" + metadata.topic() + "|partition‐" + metadata.partition() + "|offset‐" + metadata.offset());
}
}
});
// 关闭消息通道,必须关闭,否则消息发送不成功
producer.close();
}
}
3.3、Kafka 消费者 JAVA 代码
package com.example.canal.YangKafka;
import com.fasterxml.jackson.databind.JsonDeserializer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
/**
* 消费者
*/
public class ConsumerdemoYang {
public static void main(String[] args) {
// kafka 配置
Properties properties = new Properties();
/**
* 用于建立与 kafka 集群连接的 host/port
*/
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9999");
/**
* 消费者组
*/
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
/**
* 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
*/
// properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
/**
* 自动提交的时间间隔,自动提交开启时生效
*/
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 2000);
/**
* 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录
* latest(默认):当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)
* none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
*/
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
/**
* 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
*/
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
/**
* 序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
*/
// properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
// "org.apache.kafka.common.serialization.StringSerializer");
// properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
// "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// 订阅主题
consumer.subscribe(Collections.singletonList("yangTest"));
// 消费指定分区
// consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
// 消息回溯消费
// consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
// 指定offset消费
// consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
// 从指定时间点开始消费,从1小时前开始消费
// List<PartitionInfo> topicPartitions = consumer.partitionsFor("TOPIC_NAME");
// long fetchDataTime = new Date().getTime() - (1000 * 60 * 60);
// Map<TopicPartition, Long> map = new HashMap<>();
// for (PartitionInfo par : topicPartitions) {
// map.put(new TopicPartition("topicName", par.partition()), fetchDataTime);
// }
// Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
// for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
// TopicPartition key = entry.getKey();
// OffsetAndTimestamp value = entry.getValue();
// if (key == null || value == null)
// continue;
// Long offset = value.offset();
// System.out.println("partition‐" + key.partition() + "|offset‐" + offset);
// System.out.println();
// // 根据消费里的timestamp确定offset
// if (value != null) {
// consumer.assign(Arrays.asList(key));
// consumer.seek(key, offset);
// }
// }
// while (true) {
// ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
// for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// System.out.println(consumerRecord.key());
// System.out.println(consumerRecord.value());
// }
// }
while (true) {
/**
* poll() API 是拉取消息的长轮询 比如设置了1000毫秒 并不是在这1秒钟内只拉取一次 而是当没有拉取到数据时 会多次拉取数据 直到拉取到数据 然后继续循环
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
if (records.count() > 0) {
// 手动同步提交offset,当前线程会阻塞直到offset提交成功
// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
// consumer.commitSync();
// 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " + exception.getStackTrace());
}
}
});
}
}
}
}
3.4、Spring Boot 实现消费者对 Kafka 的监听
application.properties
# kafka
spring.kafka.bootstrap-servers=127.0.0.1:9999
# 消费者组
spring.kafka.consumer.group-id=Yang
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理
spring.kafka.consumer.auto-offset-reset=earliest
# 是否开启自动提交
spring.kafka.consumer.enable-auto-commit=true
# 动提交的时间间隔,自动提交开启时生效
spring.kafka.consumer.auto-commit-interval=100
监听类
package com.example.canal.YangKafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class TopicListener {
@KafkaListener(topics = "yangTest")
public void listen(ConsumerRecord<?, ?> record) {
System.out.println("kafka-message: key-->" + record.key() + ",value-->" + record.value().toString());
}
}
4、Kafka 幂等性
kafka默认情况下,提供的是至少一次的可靠性保障。即broker保障已提交的消息的发送,但是遇上某些意外情况,如:网络抖动,超时等问题,导致Producer没有收到broker返回的数据ack,则Producer会继续重试发送消息,从而导致消息重复发送。
如果我们禁止Producer的失败重试发送功能,消息要么写入成功,要么写入失败,但绝不会重复发送。这样就是最多一次的消息保障模式。但对于消息组件,排除特殊业务场景,我们追求的一定是精确一次的消息保障模式。kafka通过 幂等性(Idempotence)和事务(Transaction) 的机制,提供了这种精确的消息保障。
4.1、幂等性要解决的问题
在 0.11.0 之前,Kafka 通过 Producer 端和 Server 端的相关配置可以做到 数据不丢 ,也就是 at least once,但是在一些情况下,可能会导致数据重复,比如:网络请求延迟等导致的重试操作,在发送请求重试时 Server 端并不知道这条请求是否已经处理(没有记录之前的状态信息),所以就会有可能导致数据请求的重复发送,这是 Kafka 自身的机制(异常时请求重试机制)导致的数据重复。
对于大多数应用而言,数据保证不丢是可以满足其需求的,但是对于一些其他的应用场景(比如支付数据等),它们是要求精确计数的,这时候如果上游数据有重复,下游应用只能在消费数据时进行相应的去重操作,应用在去重时,最常用的手段就是根据唯一 id 键做 check 去重。
在这种场景下,因为上游生产导致的数据重复问题,会导致所有有精确计数需求的下游应用都需要做这种复杂的、重复的去重处理。试想一下:如果在发送时,系统就能保证 exactly once,这对下游将是多么大的解脱。这就是幂等性要解决的问题,主要是解决数据重复的问题,正如前面所述,数据重复问题,通用的解决方案就是加唯一 id,然后根据 id 判断数据是否重复,Producer 的幂等性也是这样实现的。
4.2、Kafka 是怎么保证幂等性的
Kafka为了实现幂等性,它在底层设计架构中引入了ProducerID和SequenceNumber。
- ProducerID:在每个新的Producer初始化时,会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的。
- SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。
当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回Ack信号给Producer时,发生异常导致Producer接收Ack信号失败。对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引入了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,而之前Broker缓存过之前发送的相同的消息,那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的情况。
开启幂等性配置
props.put("enable.idempotence", ture)
//或者
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
4.3、Kafka 幂等性的局限性
开启enable.idempotence后,kafka就会自动帮你做好消息去重的一系列工作。底层具体实现原理很简单,就是用空间换时间的优化思路,即在broker端多存一些字段来标识数据的唯一性。当Producer发送了具有相同字段值的消息后,broker会进行匹配去重,丢弃重复的数据。实际的代码没这么简单,但大致是这么个处理逻辑。
官方的这个幂等实现看似简单高效,但也存在他的局限性。他只能保证单分区上的幂等性,即一个幂等性Producer只能够保证某个topic的一个分区上不出现重复消息,无法实现多分区的幂等。此外,如果Producer重启,也会导致幂等重置。
事务
对于多分区保证幂等的场景,则需要事务特性来处理了。kafka的事务跟我们常见数据库事务概念差不多,也是提供经典的ACID,即原子(Atomicity)、一致性 (Consistency)、隔离性 (Isolation) 和持久性 (Durability)。
事务Producer保证消息写入分区的原子性,即这批消息要么全部写入成功,要么全失败。此外,Producer重启回来后,kafka依然保证它们发送消息的精确一次处理。事务特性的配置也很简单:
和幂等Producer一样,开启enable.idempotence = true
设置Producer端参数transctional.id事务Producer的代码稍微也有点不一样,需要调一些事务处理的API。数据的发送需要放在beginTransaction和commitTransaction之间。Consumer端的代码也需要加上isolation.level参数,用以处理事务提交的数据。示例代码:
提供唯一的 transactionalId
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, “transacetionId”);
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
事务Producer虽然在多分区的数据处理上保证了幂等,但是处理性能上相应的是会有一些下降的。
5、Kafka 常见问题
5.1、Kafka 消息丢失
1. 生产过程丢失消息
解决方案:重发就行了。
由于kafka为了提高性能,采用了异步发送消息。我们只有获取到发送结果,才能确保消息发送成功。 有两个方案可以获取发送结果。一种是kafka把发送结果封装在Future对象中,我可以使用Future的get方法同步阻塞获取结果。
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, message));
try {
RecordMetadata recordMetadata = future.get();
if (recordMetadata != null) {
System.out.println("发送成功");
}
} catch (Exception e) {
e.printStackTrace();
}
另一种是使用kafka的callback函数获取返回结果。
producer.send(new ProducerRecord<>(topic, message), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("发送成功");
} else {
System.out.println("发送失败");
}
}
});
如果发送失败了,有两种重试方案:
- 自动重试 kafka支持自动重试,设置参数如下,当集群Leader选举中或者Follower数量不足等原因返回失败时,就可以自动重试。#
设置重试次数为3 retries = 3 # 设置重试间隔为100msretry.backoff.ms = 100 - 一般我们不会用kafka自动重试,因为超过重试次数,还是会返回失败,还需要我们手动重试。手动重试 在catch逻辑或else逻辑中,再调用一次send方法。如果还不成功怎么办? 在数据库中建一张异常消息表,把失败消息存入表中,然后搞个异步任务重试,便于控制重试次数和间隔时间。
2. 服务端持久化过程丢失消息
为了保证性能,kafka采用的是异步刷盘,当我们发送消息成功后,Broker节点在刷盘之前宕机了,就会导致消息丢失。
当然我们也可以设置刷盘频率:
# 设置每1000条消息刷一次盘
flush.messages = 1000
# 设置每秒刷一次盘
flush.ms = 1000
3. 消费过程丢失消息
kafka中有个offset的概念,consumer从partition中拉取消息,consumer本地处理完成后需要commit一下offset,表示消费完成,下次就不会再拉取到这条消息。所以我们需要关闭自动commit offset的配置,防止consumer拉到消息后,服务宕机,导致消息丢失。
enable.auto.commit = false
总结
5.2、Kafka 消息重复消费
1. Kafka消费端重复提交导致消息重复消费
默认情况下,消息消费完以后,会自动提交Offset的值,避免重复消费。但是Kafka消费端的自动提交,会有一个默认的5秒间隔,也就是说在5秒之后的下一次向Broker拉取消息的时候才提交上一批消费的offset。所以在消费者消费的过程中,如果遇到应用程序被强制kill掉或者宕机的情况,可能会导致Offset没有及时提交,从而产生重复提交的问题。
2. Kafka服务端的Partition再均衡机制导致消息重复消费
在Kafka中有一个Partition Balance机制,就是把多个Partition均衡的分配给多个消费者。消费端会从分配到的Partition里面去消费消息,如果消费者在默认的5分钟内没有处理完这一批消息。就会触发Kafka的Rebalance机制,从而导致offset自动提交失败。而Rebalance之后,消费者还是会从之前没提交的offset位置开始消费,从而导致消息重复消费。
解决方案
① 针对于消费端挂掉等原因造成的重复消费问题
这部分主要集中在消费端的编码层面,需要我们在设计代码时以幂等性的角度进行开发设计,保证同一数据无论进行多少次消费,所造成的结果都一样。处理方式可以在消息体中添加唯一标识(比如将消息生成md5保存到Mysql或者是Redis中,在处理消息前先检查下Mysql/Redis是否已经处理过该消息了),消费端进行确认此唯一标识是否已经消费过,如果消费过,则不进行之后处理。从而尽可能的避免了重复消费。
幂等角度大概两种实现:
- 将唯一标识存入第三方介质(如Redis),要操作数据的时候先判断第三方介质(数据库或者缓存)有没有这个唯一标识。
- 将版本号(offset)存入到数据里面,然后再要操作数据的时候用这个版本号做乐观锁,当版本号大于原先的才能操作。
② 针对于Consumer消费时间过长带来的重复消费问题
- 提高单条消息的处理速度。例如对消息处理中比较耗时的操作可通过异步的方式进行处理、利用多线程处理等。
- 其次,在缩短单条消息消费时常的同时,根据实际场景可将max.poll.interval.ms值设置大一点,避免不必要的rebalance,此外可适当减小max.poll.records的值,默认值是500,可根据实际消息速率适当调小。
提高消费端的处理性能避免触发Balance,比如可以用多线程的方式来处理消息,缩短单个消息消费的时长。或者还可以调整消息处理的超时时间,也还可以减少一次性从Broker上拉取数据的条数。
5.3、kafka 为什么快/吞吐量大
- 顺序读写:Kafka每个分区对应一个日志文件,消息写入是追加到日志文件后面、顺序写磁盘的速度快于随机写。
- 批量发送:Kafka发送消息时将消息缓存到本地,达到一定数量或者间隔一定时间再发送,减少了网络请求的次数。
- 批量压缩:发送的时候对数据进行压缩。
- 页面缓存:Kafka大量使用了页面缓存,就是将数据写入磁盘前会先写入系统缓存,然后进行刷盘;读取数据也会先读取缓存,没有再读磁盘。虽然异步刷盘会因单点故障导致数据丢失,但是多副本的机制保障了数据的持久化。
- 零拷贝:Kafka使用了DMA的技术,使Socket缓冲池可以直接读取内核内存的数据,减少了数据拷贝到应用再拷贝到Socket缓冲池的过程,也减少了2次上下文切换。