Kafka之【生产消息】

news2025/1/23 2:22:26

消息(Record)

在kafka中传递的数据我们称之为消息(message)或记录(record),所以Kafka发送数据前,需要将待发送的数据封装为指定的数据模型:
在这里插入图片描述

在这里插入图片描述

相关属性必须在构建数据模型时指定,其中主题和value的值是必须要传递的。如果配置中开启了自动创建主题,那么Topic主题可以不存在。value就是我们需要真正传递的数据了,而在未指定分区器或者未指定分区得情况下,Key可以用于数据的分区定位


根据前面提供的配置信息创建生产者对象,通过这个生产者对象向Kafka服务器节点发送数据,而具体的发送是由生产者对象创建时,内部构建的多个组件实现的,多个组件的关系有点类似于生产者消费者模式。

生产者(Producer)是一个关键组件,负责将消息发送到Kafka集群。Kafka生产者主要由三个核心部分组成:

  • KafkaProducer
  • RecordAccumulator
  • Sender

在这里插入图片描述


数据生产者(KafkaProducer)

作用:

KafkaProducer是生产者客户端的核心接口,为生产者对象,用于对我们的数据进行必要的转换和处理,将处理后的数据放入到数据收集器中,类似于生产者消费者模式下的生产者,负责提供向Kafka集群发布消息的功能。

组成:

KafkaProducer由以下关键部分组成:

  • 配置(ProducerConfig):用于初始化和配置生产者客户端的参数。
  • 拦截器(Interceptor):用于在消息发送前后进行自定义处理。如果配置拦截器栈(interceptor.classes),那么将数据进行拦截处理。某一个拦截器出现异常并不会影响后续的拦截器处理。
  • 序列化器(Serializer):因为发送的数据为KV数据,所以需要根据配置信息中的序列化对象对数据中Key和Value分别进行序列化处理。
  • 分区器(Partitioner):计算数据所发送的分区位置。

工作流程:

  • 初始化:根据配置参数初始化客户端,包括序列化器、分区器等。
  • 消息发送:消息经过拦截器处理、序列化、分区选择后,放入RecordAccumulator中。
  • 事务支持:处理事务消息的发送和事务边界(如果配置了事务,如幂等)。

数据收集器(RecordAccumulator)

作用:

RecordAccumulator用于收集,转换我们产生的数据,类似于生产者消费者模式下的缓冲区。为了优化数据的传输,Kafka并不是生产一条数据就向Broker发送一条数据,而是通过合并单条消息,进行批量(批次)发送,提高吞吐量,减少带宽消耗。

组成:

RecordAccumulator由以下部分组成:

  • 内存缓冲区(BufferPool):管理消息缓冲区的内存分配。
  • 消息队列(Deque):每个分区对应一个消息队列,用于存储批次消息。
  • 批次(ProducerBatch):用于存储一批待发送的消息。

内部工作:

  • 默认情况下,一个发送批次的数据容量为16K,这个可以通过参数batch.size进行改善。
  • 批次是和分区进行绑定的。也就是说发往同一个分区的数据会进行合并,形成一个批次。
  • 将消息追加到对应分区的批次中,如果当前批次已满达到时间限制,创建新的批次。
  • 这个队列使用的是Java的双端队列Deque。旧的批次关闭不再接收新的数据,等待发送

重要参数:

  • batch.size:每个批次的大小,默认16K。
  • linger.ms:发送前的等待时间限制,默认0s。
  • buffer.memory:内存缓冲区的总大小默认32M。

数据发送器(Sender)

作用:

Sender是一个后台线程,负责从RecordAccumulator中取出消息批次,向服务节点发送。类似于生产者消费者模式下的消费者。因为是线程对象,所以启动后会不断轮询获取数据收集器中已经关闭的批次数据。对批次进行整合后再发送到Broker节点中

组成:

Sender由以下部分组成:

  • 网络客户端(NetworkClient):负责与Kafka Broker进行网络通信。
  • 元数据管理(Metadata):获取和更新Kafka集群的元数据信息。
  • 请求管理(ClientRequest/ClientResponse):管理发送的请求和接收的响应。

内部工作:

  • 因为数据真正发送的地方是Broker节点,不是分区。所以需要将从数据收集器中收集到的批次数据按照可用Broker节点重新组合成List集合。
  • 将组合后的<节点,List<批次>>的数据封装成客户端请求发送到网络客户端对象的缓冲区,由网络客户端对象通过网络发送给Broker节点。
  • Broker节点获取客户端请求,并根据请求键进行后续的数据处理:向分区中增加数据。

重要参数:

  • retries:重试次数。
  • retry.backoff.ms:重试间隔时间。
  • request.timeout.ms:请求超时时间。

协作机制

  1. 消息发送流程

    • 用户通过KafkaProducer.send()方法发送消息。
    • 消息经过序列化、分区选择和拦截器处理后,进入RecordAccumulator
    • RecordAccumulator将消息存入对应分区的消息队列中,形成批次。
  2. 消息传输流程

    • Sender后台线程不断从RecordAccumulator中取出已准备好的消息批次。
    • Sender通过NetworkClient将消息批次发送到Kafka Broker。
    • 如果发送成功,Sender接收响应并通知KafkaProducer;如果发送失败,根据重试策略进行重试。

通过这种协作机制,Kafka生产者实现了高效、可靠的消息发送。KafkaProducer负责接口和配置管理,RecordAccumulator负责消息缓存和批量处理,Sender负责消息的实际传输和重试逻辑。


生产者代码

// TODO 配置属性集合
Map<String, Object> configMap = new HashMap<>();
// TODO 配置属性:Kafka服务器集群地址
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// TODO 配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作
configMap.put(
        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer");
configMap.put(
        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer");
// TODO 创建Kafka生产者对象,建立Kafka连接
//      构造对象时,需要传递配置参数
KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
// TODO 准备数据,定义泛型
//      构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
        "test", "key1", "value1"
);
// TODO 生产(发送)数据
producer.send(record);
// TODO 关闭生产者连接
producer.close();

拦截器

生产者API在数据准备好发送给Kafka服务器之前,允许我们对生产的数据进行统一的处理,比如校验,整合数据等等。这些处理我们是可以通过Kafka提供的拦截器完成。因为拦截器不是生产者必须配置的功能,所以可以根据实际的情况自行选择使用。

但是要注意,这里的拦截器是可以配置多个的。执行时,会按照声明顺序执行完一个后,再执行下一个。并且某一个拦截器如果出现异常,只会跳出当前拦截器逻辑,并不会影响后续拦截器的处理。所以开发时,需要将拦截器的这种处理方法考虑进去。

在这里插入图片描述

自定义拦截器

要想自定义拦截器,只需要创建一个类,然后实现Kafka提供的分区类接口ProducerInterceptor,接下来重写方法。这里我们只关注onSend方法即可。


import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

/**
 * TODO 自定义数据拦截器
 *      1. 实现Kafka提供的生产者接口ProducerInterceptor
 *      2. 定义数据泛型 <K, V>
 *      3. 重写方法
 *         onSend
 *         onAcknowledgement
 *         close
 *         configure
 */
public class KafkaInterceptorMock implements ProducerInterceptor<String, String> {
    
    @Override
    // 数据发送前,会执行此方法,进行数据发送前的预处理
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return record;
    }

    @Override	
    // 数据发送后,获取应答时,会执行此方法
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    @Override
    // 生产者关闭时,会执行此方法,完成一些资源回收和释放的操作
    public void close() {
    }

    @Override
    // 创建生产者对象的时候,会执行此方法,可以根据场景对生产者对象的配置进行统一修改或转换。
    public void configure(Map<String, ?> configs) {
    }
}

使用拦截器

// 仅需在配置properties的时候,指定自定义拦截器器即可
configMap.put( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaInterceptorMock.class.getName());

同步发送和异步发送

1. 异步发送

如果Kafka通过主线程代码将一条数据放入到缓冲区后,无需等待数据的后续发送过程,就直接发送一下条数据的场合,我们就称之为异步发送。
在这里插入图片描述

import org.apache.kafka.clients.producer.*;

import java.util.HashMap;
import java.util.Map;

public class KafkaProducerASynTest {
    public static void main(String[] args) {
        Map<String, Object> configMap = new HashMap<>();
        configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configMap.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        configMap.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
        // TODO 循环生产数据
        for ( int i = 0; i < 10; i++ ) {
            // TODO 创建数据
            ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);
            // TODO 发送数据
            producer.send(record, new Callback() {
                // TODO 回调对象
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    // TODO 当数据发送成功后,会回调此方法
                    System.out.println("数据发送成功:" + recordMetadata.timestamp());
                }
            });
            // TODO 发送当前数据
            System.out.println("发送数据");
        }
        producer.close();
    }
}

2. 同步发送

如果Kafka通过主线程代码将一条数据放入到缓冲区后,需等待数据的后续发送操作的应答状态,才能发送一下条数据的场合,我们就称之为同步发送。所以这里的所谓同步,就是生产数据的线程需要等待发送线程的应答(响应)结果。

在这里插入图片描述

import org.apache.kafka.clients.producer.*;

import java.util.HashMap;
import java.util.Map;

public class KafkaProducerASynTest {
    public static void main(String[] args) throws Exception {
        Map<String, Object> configMap = new HashMap<>();
        configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configMap.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        configMap.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
        // TODO 循环生产数据
        for ( int i = 0; i < 10; i++ ) {
            // TODO 创建数据
            ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);
            // TODO 发送数据
            producer.send(record, new Callback() {
                // TODO 回调对象
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    // TODO 当数据发送成功后,会回调此方法
                    System.out.println("数据发送成功:" + recordMetadata.timestamp());
                }
            }).get();
            // TODO 发送当前数据
            System.out.println("发送数据");
        }
        producer.close();
    }
}

分区器

  1. 如果指定了分区,直接使用
  2. 如果指定了自己的分区器,通过分区器计算分区编号,如果有效,直接使用
  3. 如果指定了数据Key,且使用Key选择分区的场合,采用murmur2非加密散列算法(类似于hash)计算数据Key序列化后的值的散列值,然后对主题分区数量模运算取余,最后的结果就是分区编号。hash(key)%numPartitions = 分区号
  4. 如果未指定数据Key,或不使用Key选择分区,那么Kafka会自动分区

自定义分区器

只需要创建一个类,然后实现Kafka提供的分区类接口Partitioner,接下来重写方法。这里我们只关注partition方法即可,因为此方法的返回结果就是需要的分区编号。

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * TODO 自定义分区器实现步骤:
 *      1. 实现Partitioner接口
 *      2. 重写方法
 *         partition : 返回分区编号,从0开始
 *         close
 *         configure
 */
public class KafkaPartitionerMock implements Partitioner {
    /**
     * 分区算法 - 根据业务自行定义即可
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes The serialized key to partition on( or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster The current cluster metadata
     * @return 分区编号,从0开始
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

使用分区器

// 仅需在配置properties的时候,指定自定义分区器即可
configMap.put( ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaPartitionerMock.class.getName());

消息可靠性(Acknowledgement)

ACK = 0

当生产数据时,生产者对象将数据通过网络客户端将数据发送到网络数据流中的时候,Kafka就对当前的数据请求进行了响应(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。

ACK = 1

当生产数据时,Kafka Leader副本将数据接收到并写入到了日志文件后,就会对当前的数据请求进行响应(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。

ACK = -1/all (默认) 最可靠

当生产数据时,Kafka Leader副本和Follower副本都已经将数据接收到并写入到了日志文件后,再对当前的数据请求进行响应(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。


数据重试导致的数据重复

由于网络或服务节点的故障,Kafka在传输数据时,可能会导致数据丢失,所以我们才会设置ACK应答机制,尽可能提高数据的可靠性。

  1. 在某些场景中,数据并不是真正地丢失,比如将ACK应答设置为1,Leader副本将数据写入文件后,Kafka就可以对请求进行响应。
  2. 此时,假设网络故障的原因,Kafka并没有成功将ACK应答信息发送给Producer,那么此时对于Producer来讲,以为kafka没有收到数据,所以就会一直等待响应,一旦超过某个时间阈值,就会发生超时错误,也就是说在Kafka Producer眼里,数据已经丢了
  3. 所以在这种情况下,kafka Producer会尝试对超时的请求数据进行重试(retry)操作。通过重试操作尝试将数据再次发送给Kafka。
  4. 如果此时发送成功,Kafka就又收到了数据,两条数据一样,也就是说数据的重复。

数据乱序

数据重试(retry)功能除了可能会导致数据重复以外,还可能会导致数据乱序。

  1. 假设需要将编号为1,2,3的三条连续数据发送给Kafka。
  2. 如果在发送过程中,1因为网络原因发送失败,2、3发生成功
  3. 则此时在Broker的缓存中,为消息2、3
  4. 生产者重发消息1
  5. 则此时在Broker的缓存中,为消息2、3、1

这就产生了数据的乱序


幂等

为了解决Kafka传输数据时,所产生的数据重复和乱序问题,Kafka引入了幂等性操作,所谓的幂等性,就是Producer同样的一条数据,无论向Kafka发送多少次,kafka都只会存储一条。注意,这里的同样的一条数据,指的不是内容一致的数据,而是指的不断重试的数据。

// 幂等需要手动开启
enable.idempotence配置为true
  1. 开启幂等性后,为了保证数据不会重复,那么就需要给每一个请求批次的数据增加唯一性标识,kafka中,这个标识采用的是连续的序列号数字sequencenum,但是不同的生产者Producer可能序列号是一样的,所以仅仅靠seqnum还无法唯一标记数据,所以还需要同时对生产者进行区分,所以Kafka采用申请生产者ID(producerid)的方式对生产者进行区分。这样,在发送数据前,我们就需要提前申请producerid以及序列号sequencenum

  2. Broker中会给每一个分区记录生产者的生产状态:采用队列的方式缓存最近的5个批次数据。队列中的数据按照seqnum进行升序排列。这里的数字5是经过压力测试,均衡空间效率和时间效率所得到的值,所以为固定值,无法配置且不能修改。

  3. 如果Borker当前新的请求批次数据在缓存的5个旧的批次中存在相同的,如果有相同的,那么说明有重复,当前批次数据不做任何处理。
    在这里插入图片描述

  4. 如果Broker当前的请求批次数据在缓存中没有相同的,那么判断当前新的请求批次的序列号是否为缓存的最后一个批次的序列号加1,如果是,说明是连续的,顺序没乱。那么继续,如果不是,那么说明数据已经乱了,发生异常。
    在这里插入图片描述

  5. Broker根据异常返回响应,通知Producer进行重试。Producer重试前,需要在缓冲区中将数据重新排序,保证正确的顺序后。再进行重试即可。
    在这里插入图片描述

  6. 如果请求批次不重复,且有序,那么更新缓冲区中的批次数据。将当前的批次放置再队列的结尾,将队列的第一个移除,保证队列中缓冲的数据最多5个。

从上面的流程可以看出,Kafka的幂等性是通过消耗时间和性能的方式提升了数据传输的有序和去重,在一些对数据敏感的业务中是十分重要的。但是通过原理,咱们也能明白,这种幂等性还是有缺陷的

  • 幂等性的producer仅做到单分区上的幂等性,即单分区消息有序不重复,多分区无法保证幂等性。
  • 只能保持生产者单个会话的幂等性,无法实现跨会话的幂等性(也就是说如果一个producer挂掉再重启,那么重启前和重启后的producer对象会被当成两个独立的生产者,从而获取两个不同的独立的生产者ID,导致broker端无法获取之前的状态信息,所以无法实现跨会话的幂等)
  • 要想解决这个问题,就需要采用事务功能。

什么是生产者事务

在Kafka中,生产者事务(Producer Transactions)允许生产者以原子方式向一个或多个主题写入消息。事务可以确保消息要么全部成功写入,要么全部失败,从而保证数据的一致性。下面详细解释Kafka中生产者事务的原理:

事务组件

在Kafka中,事务涉及以下几个组件:

  • Transactional Producer(事务生产者):负责在事务中写入消息。
  • Transaction Coordinator(事务协调器):管理事务的生命周期,包括开始事务、提交事务和中止事务。
  • Broker(代理):Kafka集群中的服务器,存储消息并协助协调事务。

事务处理的详细步骤

  1. 初始化事务:事务生产者向事务协调器注册,事务协调器为该生产者分配一个唯一的Transactional IDProducer Epoch
  2. 开始事务:生产者调用beginTransaction时,事务协调器记录事务开始状态。
  3. 发送消息:生产者发送消息时,消息会标记为“待处理”状态并包含事务ID和epoch。
  4. 提交事务:生产者调用commitTransaction时,事务协调器将事务状态更新为“提交中”,然后通知所有相关分区代理提交消息。
  5. 完成提交:所有分区代理确认消息已写入日志后,事务协调器更新事务状态为“已提交”,通知生产者事务完成。
  6. 中止事务:在任何步骤发生错误时,生产者可以调用abortTransaction,事务协调器将事务状态更新为“中止中”,通知相关分区代理丢弃消息,最后更新事务状态为“已中止”。

实现事务的要点

  • 幂等性:确保消息的幂等性,以避免重复写入。Kafka通过Producer IDSequence Number实现幂等性。
  • 协调和日志:事务协调器负责管理事务状态,事务日志记录事务的所有状态变化。
  • 原子提交:确保事务提交的原子性,通过两阶段提交协议(2PC)实现。
    1. 第一阶段:预提交

      • 生产者发送消息时,代理记录Producer ID和Sequence Number。
      • 消息标记为“待提交”。
    2. 第二阶段:提交或中止

      • 当生产者提交事务时,事务协调器通知代理将消息标记为“已提交”。
      • 如果事务中止,代理丢弃消息,不进行处理。
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;

public class ProducerTransactionTest {
    public static void main(String[] args) {
        Map<String, Object> configMap = new HashMap<>();
        configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // TODO 配置幂等性
        configMap.put( ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        // TODO 配置事务ID
        configMap.put( ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-tx-id");
        // TODO 配置事务超时时间
        configMap.put( ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5);
        // TODO 创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
        // TODO 初始化事务
        producer.initTransactions();
        try {
            // TODO 启动事务
            producer.beginTransaction();
            // TODO 生产数据
            for ( int i = 0; i < 10; i++ ) {
                ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);
                final Future<RecordMetadata> send = producer.send(record);
            }
            // TODO 提交事务
            producer.commitTransaction();
        } catch ( Exception e ) {
            e.printStackTrace();
            // TODO 终止事务
            producer.abortTransaction();
        }
        // TODO 关闭生产者对象
        producer.close();

    }
}

如何确保跨会话中的幂等(生产者崩溃后,事务恢复生产者,即为跨会话)

确保幂等性的过程主要依赖于Kafka的Producer ID、Producer Epoch和Sequence Number机制。以下是详细的过程描述:

1. 初始生产者启动和消息发送

  1. 生产者初始化

    • 生产者在初始化时,Kafka为其分配一个唯一的Producer ID (PID)。
    • Producer ID标识当前生产者实例。
  2. 开始发送消息

    • 生产者发送消息时,每个消息附带一个Sequence Number。
    • Sequence Number在每个分区内是递增的,用于标识消息的顺序。

2. 记录Producer ID和Sequence Number

  • Kafka代理(Broker)会记录每个分区的Producer ID和最新的Sequence Number。
  • 每条消息发送时,代理会检查当前Producer ID和Sequence Number,确保消息的顺序和唯一性。

3. 生产者崩溃和重启

  1. 生产者崩溃

    • 假设生产者在事务过程中崩溃。
  2. 生产者重启

    • 生产者重启后,使用相同的Transactional ID重新初始化。
    • Kafka为重启的生产者分配一个新的Producer ID。
    • 同时,Producer Epoch递增,标识这是生产者的一个新的会话。

4. 恢复未完成的事务

  • 事务协调器:负责管理事务状态,恢复未完成的事务。
  • 当生产者重启并调用initTransactions时,事务协调器会:
    • 检查与Transactional ID相关的未完成事务。
    • 根据事务日志记录,决定是提交还是中止这些事务。

5. 幂等性保障机制

  1. 新的Producer ID和递增的Producer Epoch

    • Kafka代理识别新的Producer ID和递增的Producer Epoch,确保重启后的生产者实例与之前的实例区分开。
  2. 更新记录

    • 代理更新记录新的Producer ID和Sequence Number。
    • 新的Producer ID和递增的Sequence Number确保消息不重复处理。
  3. 消息去重

    • 代理通过检查Producer ID和Sequence Number,确保同一个Producer ID的消息不会被处理两次。
    • 即使生产者在重启后重新发送消息,代理能够识别并忽略重复的消息。

6. 事务性消息处理

  1. 第一阶段:预提交

    • 生产者发送消息时,代理记录Producer ID和Sequence Number。
    • 消息标记为“待提交”。
  2. 第二阶段:提交或中止

    • 当生产者提交事务时,事务协调器通知代理将消息标记为“已提交”。
    • 如果事务中止,代理丢弃消息,不进行处理。

总结

通过以上机制,Kafka确保生产者在不同会话中的幂等性:

  • Producer ID和Producer Epoch:标识生产者实例和会话阶段。
  • Sequence Number:确保每个分区中的消息顺序和唯一性。
  • 事务协调器:管理事务状态,确保事务的一致性和原子性。

即使生产者崩溃并重启,通过这些机制,Kafka能够保证消息的幂等性和事务一致性,避免重复处理消息,确保数据可靠性和一致性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1689388.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第2天 搭建安全拓展_小迪网络安全笔记

1.常见搭建平台脚本使用: 例如 phpstudy IIS Nginx(俗称中间件): 什么是中间件: 中间件是介于应用系统和系统软件之间的一类软件&#xff0c;它使用系统软件所提供的基础服务&#xff08;功能&#xff09;&#xff0c;衔接网络上应用系统的各个部分或不同的应用&#…

论文阅读--ViLD

现在的目标检测数据集&#xff0c;标注的类别都很有限&#xff0c;如图中的base categories&#xff0c;只能检测出toy而不能检测出细分类别&#xff0c;能不能在现有数据集的基础上&#xff0c;不额外打标注&#xff0c;就能直接检测细分物体&#xff1f; &#xff08;a&#…

订餐系统总结、

应用层&#xff1a; SpringBoot:快速构建Spring项目&#xff0c;采用“约定大于配置”的思想&#xff0c;简化Spring项目的配置开发。 SpringMvc&#xff1a;Spring框架的一个模块&#xff0c;springmvc和spring无需通过中间整合层进行整合&#xff0c;可以无缝集成。 Sprin…

深度学习之Python+OpenCV+Tensorflow实时人体检测和计数

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 深度学习之PythonOpenCVTensorflow实时人体检测和计数项目简介 一、项目背景与意义 随着科技的不断发展&#xff…

Spring 事件监听

参考&#xff1a;Spring事件监听流程分析【源码浅析】_private void processbean(final string beanname, fi-CSDN博客 一、简介 Spring早期通过实现ApplicationListener接口定义监听事件&#xff0c;Spring 4.2开始通过EventListener注解实现监听事件 FunctionalInterface p…

Excel插入多行VBA实现

我们还可以利用 VBA&#xff08;Visual Basic for Applications&#xff09;宏语言&#xff0c;在 Excel 中写一个 VBA 宏来自动插入多行数据。这种方法可以方便我们自定义需要插入的行数和插入位置。下面是编写 VBA 宏的步骤&#xff1a; 1、按下Alt F11快捷键&#xff0c;打…

python文件名通常以什么结尾

python文件后缀一般有两个&#xff0c;分别是.py和.pyw。视窗用 python.exe 运行 .py&#xff0c;用 pythonw.exe 运行 .pyw 。 这纯粹是因为安装视窗版Python时&#xff0c;扩展名 .py 自动被登记为用 python.exe 运行的文件&#xff0c;而 .pyw 则被登记为用 pythonw.exe 运…

c++ - vector容器常用接口模拟实现

文章目录 一、成员变量二、常用迭代器接口模拟实现三、一些常用接口模拟四、默认成员函数五、功能测试 一、成员变量 我们通过在堆上申请一个数组空间来进行储存数据&#xff0c;我们的成员变量是三个指针变量&#xff0c;分别指向第一个位置、最后储存有效位置的下一个位置以…

OpenMV学习笔记1——IDE安装与起步

目录 一、OpenMV IDE下载 二、OpenMV界面 三、Hello World&#xff01; 四、将代码烧录到OpenMV实现脱机运行 五、插SD卡&#xff08;为什么买的时候没送&#xff1f;&#xff09; 一、OpenMV IDE下载 浏览器搜索OpenMV官网&#xff0c;进入后点击“立即下载”&#xff0…

org.json下载方法

介绍org.json下载的一些方法。 工具/原料 浏览器 方式一 在百度上搜索org.json&#xff0c;点击第一个搜索结果。进入JSON网站后&#xff0c;可以看到有各种语言版本的json工具包&#xff0c;选择JSON-java。 点击JSON-java后页面跳转到GitHub上&#xff0c;在该网页上点击…

吉林大学软件工程易错题

1.【单选题】软件工程方法是&#xff08; &#xff09;。 A、为开发软件提供技术上的解决方法 &#xff08;软件工程方法 &#xff09; B、为支持软件开发、维护、管理而研制的计算机程序系统&#xff08;软件工程工具&#xff09; …

Linux基础(四):Linux系统文件类型与文件权限

各位看官&#xff0c;好久不见&#xff0c;在正式介绍Linux的基本命令之前&#xff0c;我们首先了解一下&#xff0c;关于文件的知识。 目录 一、文件类型 二、文件权限 2.1 文件访问者的分类 2.2 文件权限 2.2.1 文件的基本权限 2.2.2 文件权限值的表示方法 三、修改文…

爬虫实训案例:中国大学排名

近一个月左右的时间学习爬虫&#xff0c;在用所积累的知识爬取了《中国大学排名》这个网站&#xff0c;爬取的内容虽然只是可见的文本&#xff0c;但对于初学者来说是一个很好的练习。在爬取的过程中&#xff0c;通过请求数据、解析内容、提取文本、存储数据等几个重要的内容入…

MT3039 山脉

思路&#xff1a; 往右看能看到山顶&#xff0c;可以看成找第一个比当前元素>的元素&#xff0c;即构造单调递减栈。 例子&#xff1a; 7 5 3 4 1. 7入栈: 7 2. 5入栈: 7 5 ansans1(1是指有1个元素&#xff08;7&#xff09;可以看到5) 3. 3入栈: 7 5 3 ansans2(2是指…

使用神经实现路径表示的文本到向量生成

摘要 矢量图形在数字艺术中得到广泛应用&#xff0c;并受到设计师的青睐&#xff0c;因为它们具有可缩放性和分层特性。然而&#xff0c;创建和编辑矢量图形需要创造力和设计专业知识&#xff0c;使其成为一项耗时的任务。最近在文本到矢量&#xff08;T2V&#xff09;生成方面…

单例模式中的 双判断锁 问题、单例模式的资源问题

》》》Lazy 不存在高并发问题&#xff0c;lazy已经解决了。 CLR 类执行的顺序 静态变量初始化 1次静态构造函数 1次实例变量初始化基类静态变量初始化 1次基类静态构造函数 1次基类实例变量初始化基类实例构造函数实例构造函数 》》》 创建单例模式 好多种 1&#xff0c;静态…

丰田精益生产的模板

丰田精益生产&#xff0c;也被称为丰田生产方式&#xff08;Toyota Production System, TPS&#xff09;&#xff0c;是一套完整的生产和管理系统&#xff0c;其核心目标是最大化效率、消除浪费&#xff0c;并通过持续改进来提升产品质量。 学习优秀企业 学习福特 丰田精益生产…

文件流下载优化:由表单提交方式修改为Ajax请求

如果想直接看怎么写的可以跳转到 解决方法 节&#xff01; 需求描述 目前我们系统导出文件时&#xff0c;都是通过表单提交后&#xff0c;接收文件流自动下载。但由于在表单提交时没有相关调用前和调用后的回调函数&#xff0c;所以我们存在的问题&#xff0c;假如导出数据需…

【数据分析】Numpy和Pandas库基本用法及实例--基于Japyter notebook实现

各位大佬好 &#xff0c;这里是阿川的博客 &#xff0c; 祝您变得更强 个人主页&#xff1a;在线OJ的阿川 大佬的支持和鼓励&#xff0c;将是我成长路上最大的动力 阿川水平有限&#xff0c;如有错误&#xff0c;欢迎大佬指正 承接上篇的博客 数据分析—技术栈和开发环境搭…

【Django】从零开始学Django(持续更新中)

PyCharm的版本必须为专业版&#xff0c;社区版不具备Web开发功能的。 一. Django建站基础 Django采用MTV的框架模式&#xff0c;即模型(Model)、模板(Template)和视图(Views)&#xff0c;三者之间各自负责不同的职责。 ●模型&#xff1a;数据存取层&#xff0c;处理与数据相关…