Kafka 事务确保了数据在写入Kafka时的原子性和一致性。
1 幂等
幂等就是对接口的多次调用所产生的结果和调用一次是一致的。
Kafka 生产者在进行重试的时候可能会写入重复的消息,开启幂等性功能后就可以避免这种情况。将生产者客户端参数enable.idempotence设置为true即可。
1.1 实现原理
Kafka 引入了producer id(简称PID)和序列号(sequence number)这两个概念。分别对应v2版的日志格式中RecordBatch的producer id 和first sequence这两个字段。
每个新的生产者实例在初始化时都会由broker分配一个PID(对于用户不可见)。
对于每个PID,消息发送到每个分区都有对应的序列号,序列号从0开始单调递增。生产者每发送一条消息,就会将<PID,分区>对应的序列号的值加1。
1.1.1 序列号值比对
broker端会在内存中为每一对<PID,分区>维护一个序列号(SN_old),对于收到的每一条消息,将比对它的序列号值(SN_new)。以下有三种情况:
SN_new < SN_old+1:消息被重复写入,broker可以直接将其丢弃。
SN_new = SN_old+1:是新消息,且消息没有丢失。
SN_new > SN_old+1:消息可能丢失,生产者会抛出OutOfOlderSequenceException异常。
1.1.2 局限性
Kafka 的幂等只能保证单个生产者会话(session)中单分区的幂等。
2 事务
事务可以保证跨生产者会话的消息幂等发送及新生产者实例及跨生产者会话的事务恢复。
前者指具有相同事务id的新生产者示例被创建且工作的时候,旧的且拥有相同事务id的生产者实例将不再工作。
后者指某个生产者实例宕机后,新的生产者实例可以保证任何未完成的旧事务要么被提交,要么被终止。使新的生产者实例从一个正常的状态开始工作。
2.1 实现原理
图 生产者及事务协调器初始化事务到提交事务的流程
开启事务,生产者客户端需要提供唯一的transactionalId(通过客户端参数transactional.id来设置)。并且需要开启幂等特性。
2.1.1 查找事务协调器
KafkaProducer的initTransactions()方法初始化事务。
TransactionCoordinator事务协调器负责分配PID和管理事务。
生产者首先要找到对应的事务协调器所在broker节点。发送FindCoorinatorRequest请求,Kafka根据请求体中的coordinator_key(事务id)来查找节点(具体方式是根据事务id的哈希值计算其在主题__transaction_state中的分区编号),算法如下:
分区编号 = 事务id的哈希值 % transaction_state的分区数
根据分区编号,寻找此分区leader副本所在的broker节点,并将节点信息返回给生产者。
2.1.2 获取PID
生产者在找到事务协调器的节点后,发送InitProducerIdRequest请求(如果未开启事务特性而只开启幂等性,这个请求可以方式给任意broker,否则只会发给事务协调器所在的broker)来获取PID。
事务协调器生成PID后,会把事务id和对应的PID以消息的形式保存到主题__transaction_state中。
该请求还会触发协调器执行以下任务:
1)增加该PID对应的producer_epoch(单调递增)。具有相同PID但producer_epoch小于该值的其他生产者新开启的事务会被拒绝。
producer_epoch 确保了相同事务id任何时刻只有一个生产者实例。
具有相同事务id的新生产者实例被创建且工作的时,旧的且拥有相同事务id的生产者实例将不再工作。
2)恢复(Commit)或终止(Abort)之前生产者未完成的事务。
2.1.3 开启事务
KafkaProducer的beginTransaction()方法开启一个事务。调用该方法后,生产者本地会标记已经开启了一个新的事务。只有在生产者发送第一条消息之后事务协调器才会认为该事务已开启。
2.1.4 发送消息
在生产者给一个新的分区发送数据库之前,它需要先向事务协调器发送AddPartitionsToTxnRequest请求,让事务协调器将<transactionId,TopicPartition>的对应关系存储在主题__transaction_state中。
如果该分区是对应事务中的第一个分区,那么事务协调器还会启动会该事务的计时。
随后生产者通过ProduceRequest请求发送消息(ProducerBatch)到用户自定义主题中。和普通消息不同的是,ProducerBatch会包含实质的PID、producer_epoch和sequence_number。
2.1.5 提交或终止事务
调用KafkaProducer的commitTransaction()或abortTransaction()方法来结束当前的事务。
生产者会向事务协调器发送EndTxnRequest请求。协调器收到请求后会执行如下操作:
- 事务协调器将PREPARE_COMMIT或PREPARE_ABORT消息写入主题__transaction_state。
- 协调器向事务中各个分区的leader节点发送WriteTxnMarkersRequest请求,当leader节点收到请求后,会在相应分区中写入控制消息来标识事务的终结。它和普通消息一样存储在日志文件中。
- 事务协调器将最终的COMPLETE_COMMIT或COMPLETE_ABORT信息写入主题__transaction_state以表明当前事务已结束。
此时可以删除主题__transaction_state中所有关于该事务的消息(将相应的消息设置为墓碑消息即可)。
2.1.6 控制消息ControlBatch
各个分区的leader收到事务协调器的WriteTxnMarkersRequest请求后,会在相应的分区写入控制消息(ControlBatch)。来标识事务的终结。它和普通消息一样存储在日志文件中。不同点在于RecordBatch中某些字段的值。
图 控制消息日志格式
key 中的type表示控制类型:0 ABORT,1 COMMIT。
value中的coordinator_epoch 表示协调器的纪元(版本)。
2.2 消费者与事务
Kafka并不能保证已提交的事务中的所有消息都能被消费:
- 事务中的某些消息可能被清理(压缩或删除)。
- 消费者通过seek()方法访问任意offset的消息,从而可能遗漏事务中的部分消息。
- 消费者在消费时可能没有分配到事务内的所有分区。
2.2.1 消费者的隔离级别
消费端的参数isolation.level,默认值为“read_uncommitted”表示可以消费未提交的事务,而“read_committed”表示只能消费已提交的事务。
如果隔离级别为“read_committed”,生产者开启事务,并发送3条消息,在生产者执行commitTransaction()或abortTransaction()方法前,KafkaConsumer看不到这些消息,但是其内部会缓存消息,直到生产者执行commitTransaction(),它才会将消息推送给消费端应用;如果生产者执行abortTransaction(),那它就会丢弃这些消息。