我们先来回顾下6.Kafka系列之设计思想(四)-消息传递语义中的一些内容
1. 消息传递保证
- At most once:最多一次。消息可能会丢失,但永远不会重新传递
- At least once:至少一次。消息永远不会丢失,但可能会重新传递
- Exactly once:这正是人们真正想要的,每条消息只传递一次
1.1 发布消息的持久性保证
1.发布消息时,我们有消息被“提交”到日志的概念。一旦发布的消息被提交,只要复制该消息写入的分区的一个代理保持“活动”状态,它就不会丢失。如果生产者尝试发布消息并遇到网络错误,则无法确定此错误是发生在消息提交之前还是之后。在 0.11.0.0 之前,如果生产者未能收到指示消息已提交的响应,它别无选择,只能重新发送消息。这提供了至少一次传递语义,因为如果原始请求实际上已经成功,则消息可能会在重新发送期间再次写入日志
2.从 0.11.0.0 开始,Kafka 生产者还支持幂等传递选项,保证重新发送不会导致日志中出现重复条目。为此,代理为每个生产者分配一个 ID,并使用生产者随每条消息发送的序列号对消息进行重复数据删除
3.同样从 0.11.0.0 开始,生产者支持使用类似事务的语义将消息发送到多个主题分区的能力:即 要么所有消息都已成功写入,要么都没有
1.2 消费消息时的保证
1.它可以读取消息,然后保存它在日志中的位置,最后处理消息。在这种情况下,消费者进程有可能在保存其位置之后但在保存其消息处理的输出之前崩溃。在这种情况下,接管处理的进程将从保存的位置开始,即使该位置之前的一些消息还没有被处理。这对应于“至多一次”语义,因为在消费者失败消息的情况下可能不会被处理
2.它可以读取消息、处理消息并最终保存其位置。在这种情况下,消费者进程有可能在处理消息之后但在保存其位置之前崩溃。在这种情况下,当新进程接管它接收到的前几条消息时,它已经被处理过了。这对应于消费者失败情况下的“至少一次”语义。在许多情况下,消息有一个主键,因此更新是幂等的(两次接收相同的消息只是用它自己的另一个副本覆盖记录)
3.那么 exactly once 语义(即你真正想要的东西)呢?从 Kafka 主题消费并生产到另一个主题时(如Kafka Streams 应用程序),我们可以利用上面提到的 0.11.0.0 中的新事务生产者功能。消费者的位置作为消息存储在主题中,因此我们可以在与接收处理数据的输出主题相同的事务中将偏移量写入 Kafka。如果交易被中止,消费者的位置将恢复到它的旧值,并且输出主题的产生的数据将对其他消费者不可见,这取决于他们的“隔离级别”。在默认的“read_uncommitted”隔离级别中,所有消息对消费者都是可见的,即使它们是中止事务的一部分,但在“read_committed”中,消费者将只返回来自已提交事务的消息(以及任何不属于一部分的消息)交易)
2. Kafka幂等性
我们在客户端参数显示设置enable.idempotence=true
就会开启生产者幂等消息传递
- 每个新的生产者实例在初始化时会被分配一个PID(producer id)
- 对于每个PID,消息发送到的每一个分区都有对应的序列号,序列号从0开始单调递增,生产者每发送一条消息就会将<PID,分区>对应的序列号值加1
- broker端会在内存中为每一对<PID,分区>维护一个序列号,对于收到的每一条消息,只有当它的序列号的值(SN_new)正好比broker端中维护的对应序列号的值(SN_old)大1,broker才会接收该消息。如果 SN_new < SN_old + 1,说明消息被重复写入,broker会将该消息丢弃。否则,说明中间有数据尚未写入,暗示可能有消息丢失,对应生产者会抛出 OutOfOrderSequenceException 异常
注意:序列号实现幂等只是针对每一对<PID,分区>,即Kafka的幂等性只能保证单个生产者会话(session)中单分区的幂等
3. Kafka事务
通过事务可以弥补幂等性不能跨多个分区的缺陷,且可以保证对多个分区写入操作的原子性
3.1 创建主题与查看分区
kafka-topics.sh --create --topic my-topic --partitions 10 --bootstrap-server localhost:9092
kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092
3.2 编写KafkaTransactionDemo代码
/**
* Kafka事务DEMO
*
* @author shenjian
* @since 2023/5/27
*/
public class KafkaTransactionDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:30092");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
// 开启幂等传递
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
}
}
3.3 运行并消费结果
kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092
# --time -1 表示获取的最新位移值
# --time -2 表示获取的最早的位移值,可能由于最早的数据由于过期被删除,所以最早的位移不一定是0
# 通过两数相减,就可以知道当前分区的数据条数
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-topic --time -2
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-topic --time -1
不放心的小伙伴可以去统计要总数量是否一致奥
通过事务,从生产者角度,Kafka可以保证:
- 跨生产者会话的消息幂等发送: transactionId与PID一一对应,如果新的生产者启动,具有相同transactionId的旧生产者会立即失效(每个生产者通过 transactionId获取PID的同时,还会获取一个单调递增的 producer epoch)
- 跨生产者会话的事务恢复: 当某个生产者实例宕机后,新的生产者实例可以保证任何未完成的旧事物要么被提交(Commit),要么被中止(Abort),如此可以使新的生产者实例从一个正常的状态开始工作(通过 producer epoch判断)
从消费者角度,事务能保证的语义相对偏弱,对于一些特殊的情况,Kafka并不能保证已提交的事务中的所有消息都能被消费:
- 对采用日志压缩策略的主题,事务中的某些消息可能被清理(相同key的消息,后写入的消息会覆盖前面写入的消息)
- 事务中消息可能分布在同一个分区的多个日志分段(LogSegment)中,当老的日志分段被删除时,对应的消息可能会丢失
4.Kafka事务实现原理
为了实现事务,Kafka引入了事务协调器(TransactionCoodinator)负责事务的处理,所有的事务逻辑包括分派PID等都是由TransactionCoodinator 负责实施的
broker节点有一个专门管理事务的内部主题 __transaction_state,TransactionCoodinator 会将事务状态持久化到该主题中
- 查找 TransactionCoordinator:生产者会先向某个broker发送 FindCoordinator 请求,找到 TransactionCoordinator 所在的 broker节点
- 获取PID:生产者会向 TransactionCoordinator 申请获取 PID,TransactionCoordinator 收到请求后,会把 transactionalId 和对应的 PID 以消息的形式保存到主题__transaction_state 中,保证 <transaction_Id,PID>的对应关系被持久化,即使宕机该对应关系也不会丢失
- 开启事务:调用 beginTransaction()后,生产者本地会标记开启了一个新事务
- 发送消息:生产者向用户主题发送消息,过程跟普通消息相同,但第一次发送请求前会先发送请求给TransactionCoordinator 将 transactionalId 和 TopicPartition 的对应关系存储在 __transaction_state 中
- 提交或中止事务:Kafka除了普通消息,还有专门的控制消息(ControlBatch)来标志一个事务的结束,控制消息有两种类型,分别用来表征事务的提交和中止。该阶段本质就是一个两阶段提交过程:
- 将 PREPARE_COMMIT 或 PREPARE_ABORT 消息写入主题 __transaction_state
- 将COMMIT 或 ABORT 信息写入用户所使用的普通主题和 __consumer_offsets
- 将 COMPLETE_COMMIT 或 COMPLETE_COMMIT_ABORT 消息写入主题 __transaction_state
如此一来,表面当前事务已经结束,此时就可以删除主题 __transaction_state 中所有关于该事务的消息
欢迎关注公众号算法小生