Kafka-客户端使用

news2024/12/28 19:08:40

理解Kafka正确使用方式

Kafka提供了两套客户端API,HighLevel API和LowLevel API。

  • HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。

  • LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,但是使用起来非常复杂,也更容易出错。只在极少数对性能要求非常极致的场景才会偶尔使用。

基础的客户端

引入Maven依赖:

<dependency>
	<groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.13</artifactId>
    <version>3.4.0</version>
</dependency>

消息发送者主流程

public class MyProducer
{
    // private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";
    private static final String BOOTSTRAP_SERVERS = "你的公网IP:9092";
    
    private static final String TOPIC = "disTopic";
    
    public static void main(String[] args)
        throws ExecutionException, InterruptedException
    {
        // PART1:设置发送者相关属性
        Properties props = new Properties();
        // 此处配置的是kafka的端口
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        // 配置key的序列化类
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 配置value的序列化类
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        
        Producer<String, String> producer = new KafkaProducer<>(props);
        CountDownLatch latch = new CountDownLatch(5);
        for (int i = 0; i < 5; i++)
        {
            // Part2:构建消息
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);
            // Part3:发送消息
            // 单向发送:不关心服务端的应答。
            // producer.send(record);
            // System.out.println("message "+i+" sended");
            
            // 同步发送:获取服务端应答消息前,会阻塞当前线程。
            // RecordMetadata recordMetadata = producer.send(record).get();
            // String topic = recordMetadata.topic();
            // int partition = recordMetadata.partition();
            // long offset = recordMetadata.offset();
            // String message = recordMetadata.toString();
            // System.out.println("message:[" + message + "] sended with topic:" + topic + "; partition:" + partition
            // + ";offset:" + offset);
            
            // 异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数
            producer.send(record, new Callback()
            {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e)
                {
                    if (null != e)
                    {
                        System.out.println("消息发送失败," + e.getMessage());
                        e.printStackTrace();
                    }
                    else
                    {
                        String topic = recordMetadata.topic();
                        long offset = recordMetadata.offset();
                        String message = recordMetadata.toString();
                        System.out
                            .println("message:[" + message + "] sended with topic:" + topic + ";offset:" + offset);
                    }
                    latch.countDown();
                }
            });
        }
        // 消息处理完才停止发送者。
        latch.await();
        producer.close();
    }
}

构建Producer分为三个步骤:

1、设置Producer核心属性:Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG属性,显然就是指发送者要将消息发到哪个Kafka集群上。这是每个Producer必选的属性。在ProducerConfig中,对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。

2、构建消息:Kafka的消息是一个Key-Value结构的消息。其中,key和value都可以是任意对象类型。其中,key主要是用来进行Partition分区的,业务上更关心的是value。

3、使用Producer发送消息:通常用到的就是单向发送、同步发送和异步发送者三种发送方式。

异常信息: Expiring 1 record(s) for disTopic-0:120010 ms has passed since batch creation

修改kafka安装路径的config/server.properties配置,然后重启解决问题

# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://你的公网IP:9092

注意:云服务器配置安全组开放端口,虚拟机防火墙状态要关闭

消息消费者主流程

public class MyConsumer
{
    // private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";
    private static final String BOOTSTRAP_SERVERS = "你的公网IP:9092";
    
    private static final String TOPIC = "disTopic";
    
    public static void main(String[] args)
    {
        // PART1:设置发送者相关属性
        Properties props = new Properties();
        // kafka地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        // 每个消费者要指定一个group
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        // key序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        // value序列化类
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(TOPIC));
        // 自行调整Offset
        // consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC,0)));
        while (true)
        {
            // PART2:拉取消息
            // 100毫秒超时时间
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));
            // records.partitions().forEach(topicPartition -> {
            // String key = topicPartition.topic()+topicPartition.partition();
            // List<ConsumerRecord<String, String>> partionRecords = records.records(topicPartition);
            // long value = partionRecords.get(partionRecords.size()-1).offset();
            //
            // });
            // PART3:处理消息
            for (ConsumerRecord<String, String> record : records)
            {
                System.out.println("partition = " + record.partition() + "offset = " + record.offset() + ";key = "
                    + record.key() + "; value= " + record.value());
            }
            
            // 提交offset,消息就不会重复推送。
            consumer.commitSync(); // 同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
            // consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。
        }
    }
}

Consumer同样是分为三个步骤:

1、设置Consumer核心属性 :可选的属性都可以由ConsumerConfig类管理。在这个类中,同样对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。同样BOOTSTRAP_SERVERS_CONFIG是必须设置的属性。

2、拉取消息:Kafka采用Consumer主动拉取消息的Pull模式。consumer主动从Broker上拉取一批感兴趣的消息。

3、处理消息,提交位点:消费者将消息拉取完成后,就可以交由业务自行处理对应的这一批消息了。只是消费者需要向Broker提交偏移量offset。如果不提交Offset,Broker会认为消费者端消息处理失败了,还会重复进行推送。

kafka官方配置: Apache Kafka

从客户端属性来梳理客户端工作机制

Kafka的设计精髓,是在网络不稳定,服务也随时会崩溃的这些作死的复杂场景下,如何保证消息的高并发、高吞吐。首先要理解基础模型。

消费者分组消费机制

最重要的一个机制

在Consumer中,都需要指定一个GROUP_ID_CONFIG属性,表示当前Consumer所属的消费者组。

public static final String GROUP_ID_CONFIG = "group.id";
public static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";

//参数GROUP_INSTANCE_ID_CONFIG:给组成员设置一个固定的instanceId,可以减少Kafka不必要的rebalance

描述翻译:对于Consumer,如果需要在subcribe时使用组管理功能以及Kafka提供的offset管理策略,那就必须要配置GROUP_ID_CONFIG属性。

消费者组作用:生产者往Topic下发消息时,会尽量均匀的将消息发送到Topic下的各个Partition当中。而这个消息,会向所有订阅了该Topic的消费者推送。推送时,每个ConsumerGroup中只会推送一份。也就是同一个消费者组中的多个消费者实例,只会共同消费一个消息副本。而不同消费者组之间,会重复消费消息副本。

Offset偏移量:表示每个消费者组在每个Partiton中已经消费处理的进度

#查看消费者组Offset记录情况
bin/kafka-consumer-groups.sh --bootstrap-server worker1:9092 --describe --group test

这个Offset偏移量,需要消费者处理完成后主动向Kafka的Broker提交。提交完成后,Broker就会更新消费进度,表示这个消息已经被这个消费者组处理完了。但是如果消费者没有提交Offset,Broker就会认为这个消息还没有被处理过,就会重新往对应的消费者组进行推送,不过这次,一般会尽量推送给同一个消费者组当中的其他消费者实例。

Kafka自动提交:

public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";
private static final String ENABLE_AUTO_COMMIT_DOC = "If true the consumer's offset will be periodically committed in the background.";

如何提高Offset数据的安全性呢?

服务端:在Consumer中,实际上提供了AUTO_OFFSET_RESET_CONFIG参数,来指定消费者组在服务端的Offset不存在时如何进行后续消费。(有可能服务端初始化Consumer Group的Offset失败,也有可能Consumer Group当前的Offset对应的数据文件被过期删除了)

ConsumerConfig.AUTO_OFFSET_RESEWT_CONFIG :当Server端没有对应的Offset时如何处理。 可选项:

  • earliest: 自动设置为当前最早的offset

  • latest:自动设置为当前最晚的offset

  • none: 如果消费者组对应的offset找不到,就向Consumer抛异常。

  • 其他选项: 向Consumer抛异常。

客户端:

  • 异步提交:消费者在处理业务的同时,异步向Broker提交Offset。效率高,但是容易丢数据(处理失败但是offset提交了)

  • 同步提交:消费者保证处理完所有业务后,再提交Offset。消息不会因为offset丢失。业务处理失败不提交Offset,还可以重试。坏处是处理变慢,另外还可能重复消费(生产者推送消息给组内的其他消费者)

生产者拦截器机制

允许客户端在生产者消息发送到Kafka集群之前,对消息进行拦截,甚至可以修改消息内容

涉及到Producer中指定的一个参数:INTERCEPTOR_CLASSES_CONFIG

public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to use as interceptors. Implementing the <code>org.apache.kafka.clients.producer.ProducerInterceptor</code> interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.";

按照说明,定义一个自己的拦截器实现类:

public class MyInterceptor implements ProducerInterceptor
{
    // 发送消息时触发
    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord)
    {
        System.out.println("prudocerRecord : " + producerRecord.toString());
        return producerRecord;
    }
    
    // 收到服务端响应时触发
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e)
    {
        System.out.println("acknowledgement recordMetadata:" + recordMetadata.toString());
    }
    
    // 连接关闭时触发
    @Override
    public void close()
    {
        System.out.println("producer closed");
    }
    
    // 整理配置项
    @Override
    public void configure(Map<String, ?> map)
    {
        System.out.println("=====config start======");
        for (Map.Entry<String, ?> entry : map.entrySet())
        {
            System.out.println("entry.key:" + entry.getKey() + " === entry.value: " + entry.getValue());
        }
        System.out.println("=====config end======");
    }
}

然后在生产者中指定拦截器类(多个拦截器类,用逗号隔开)

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.gao.kfk.basic.MyInterceptor");

拦截器机制一般用得比较少,主要用在一些统一添加时间等类似的业务场景。比如,用Kafka传递一些POJO,就可以用拦截器统一添加时间属性。但是我们平常用Kafka传递的都是String类型的消息,POJO类型的消息,Kafka可以传吗?这就要用到下面的消息序列化机制。

消息序列化机制

Producer指定了两个属性KEY_SERIALIZER_CLASS_CONFIG和VALUE_SERIALIZER_CLASS_CONFIG,对于这两个属性,在ProducerConfig中都有配套的说明属性。

public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
public static final String KEY_SERIALIZER_CLASS_DOC = "Serializer class for key that implements the <code>org.apache.kafka.common.serialization.Serializer</code> interface.";
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
public static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>org.apache.kafka.common.serialization.Serializer</code> interface.";

通过这两个参数,可以指定消息生产者如何将消息的key和value序列化成二进制数据。

  • key是用来进行分区的可选项。Kafka通过key来判断消息要分发到哪个Partition。

    如果没有填写key,那么Kafka会使Round-robin轮询的方式,自动选择Partition。

    如果填写了key,那么会通过声明的Serializer序列化接口,将key转换成一个byte[]数组,然后对key进行hash,选择Partition。这样可以保证key相同的消息会分配到相同的Partition中。

  • Value是业务上比较关心的消息。Kafka同样需要将Value对象通过Serializer序列化接口,将Key转换成byte[]数组,这样才能比较好的在网络上传输Value信息,以及将Value信息落盘到操作系统的文件当中。

生产者要对消息进行序列化,那么消费者拉取消息时,自然需要进行反序列化。所以,在Consumer中,也有反序列化的两个配置

public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
public static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>org.apache.kafka.common.serialization.Deserializer</code> interface.";
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
public static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Deserializer</code> interface.";

在Kafka中,对于常用的一些基础数据类型,都已经提供了对应的实现类。但是,如果需要使用一些自定义的消息格式,比如自己定制的POJO,就需要定制具体的实现类了。

在自己进行序列化机制时,需要考虑的是如何用二进制来描述业务数据:

一种类型是定长的基础类型,比如Integer,Long,Double等。这些基础类型转化成二进制数组都是定长的。这类属性可以直接转成序列化数组,在反序列化时,只要按照定长去读取二进制数据就可以反序列化了。

另一种是不定长的浮动类型,比如String,或者基于String的JSON类型等。这种浮动类型的基础数据转化成二进制数组,长度都是不一定的。对于这类数据,通常的处理方式都是先往二进制数组中写入一个定长的数据的长度数据(Integer或者Long类型),然后再继续写入数据本身。这样,反序列化时,就可以先读取一个定长的长度,再按照这个长度去读取对应长度的二进制数据,这样就能读取到数据的完整二进制内容。

序列化机制是在高并发场景中非常重要的一个优化机制。高效的序列化实现能够极大的提升分布式系统的网络传输以及数据落盘的能力。例如对于一个User对象,即可以使用JSON字符串这种简单粗暴的序列化方式,也可以选择按照各个字段进行组合序列化的方式。但是显然后者的占用空间比较小,序列化速度也会比较快。而Kafka在文件落盘时,也设计了非常高效的数据序列化实现,这也是Kafka高效运行的一大支撑。

在很多其他业务场景中,也需要我们提供更高效的序列化实现。例如使用MapReduce框架时,就需要自行定义数据的序列化方式。使用Netty框架进行网络调用时,为了防止粘包,也需要定制数据的序列化机制

消息分区路由机制

消息如何进行路由?

  • Producer会根据消息的key选择Partition,具体如何通过key找Partition呢?

  • 一个消费者组会共同消费一个Topic下的多个Partition中的同一套消息副本,那Consumer节点是不是可以决定自己消费哪些Partition的消息呢?

首先,在Producer中,可以指定一个Partitioner来对消息进行分配。

public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
private static final String PARTITIONER_CLASS_DOC = "A class to use to determine which partition to be send to when produce the records. Available options are:" +
    "<ul>" +
    "<li>If not set, the default partitioning logic is used. " +
    "This strategy will try sticking to a partition until at least " + BATCH_SIZE_CONFIG + " bytes is produced to the partition. It works with the strategy:" +
    "<ul>" +
    "<li>If no partition is specified but a key is present, choose a partition based on a hash of the key</li>" +
    "<li>If no partition or key is present, choose the sticky partition that changes when at least " + BATCH_SIZE_CONFIG + " bytes are produced to the partition.</li>" +
    "</ul>" +
    "</li>" +
    "<li><code>org.apache.kafka.clients.producer.RoundRobinPartitioner</code>: This partitioning strategy is that " +
    "each record in a series of consecutive records will be sent to a different partition(no matter if the 'key' is provided or not), " +
    "until we run out of partitions and start over again. Note: There's a known issue that will cause uneven distribution when new batch is created. " +
    "Please check KAFKA-9965 for more detail." +
    "</li>" +
    "</ul>" +
    "<p>Implementing the <code>org.apache.kafka.clients.producer.Partitioner</code> interface allows you to plug in a custom partitioner.";

这里就说明了Kafka是通过一个Partitioner接口的具体实现来决定一个消息如何根据Key分配到对应的Partition上的。可以很简单的实现一个自己的分配策略。

在之前的3.2.0版本,Kafka提供了三种默认的Partitioner实现类,RoundRobinPartitioner,DefaultPartitioner和UniformStickyPartitioner。目前后面两种实现已经标记为过期,被替换成了默认的实现机制。

对于生产者,默认的Sticky策略在给一个生产者分配了一个分区后,会尽可能一直使用这个分区。等待该分区的batch.size(默认16K)已满,或者这个分区的消息已完成 linger.ms(默认0毫秒,表示如果batch.size迟迟没有满后的等待时间)。RoundRobinPartitioner是在各个Partition中进行轮询发送,这种方式没有考虑到消息大小以及各个Broker性能差异,用得比较少。

另外可以自行指定一个Partitioner实现类,定制分区逻辑。在Partitioner接口中,核心要实现的就是partition方法。根据相关信息,选择一个Partition。比如用key对partition的个数取模之类的。而Topic下的所有Partition信息都在cluster参数中。

//获取所有的Partition信息。
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

然后,在Consumer中,可以指定一个PARTITION_ASSIGNMENT_STRATEGY分区分配策略,决定如何在多个Consumer实例和多个Partitioner之间建立关联关系。

public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "A list of class names or class types, " +
    "ordered by preference, of supported partition assignment strategies that the client will use to distribute " +
    "partition ownership amongst consumer instances when group management is used. Available options are:" +
    "<ul>" +
    "<li><code>org.apache.kafka.clients.consumer.RangeAssignor</code>: Assigns partitions on a per-topic basis.</li>" +
    "<li><code>org.apache.kafka.clients.consumer.RoundRobinAssignor</code>: Assigns partitions to consumers in a round-robin fashion.</li>" +
    "<li><code>org.apache.kafka.clients.consumer.StickyAssignor</code>: Guarantees an assignment that is " +
    "maximally balanced while preserving as many existing partition assignments as possible.</li>" +
    "<li><code>org.apache.kafka.clients.consumer.CooperativeStickyAssignor</code>: Follows the same StickyAssignor " +
    "logic, but allows for cooperative rebalancing.</li>" +
    "</ul>" +
    "<p>The default assignor is [RangeAssignor, CooperativeStickyAssignor], which will use the RangeAssignor by default, " +
    "but allows upgrading to the CooperativeStickyAssignor with just a single rolling bounce that removes the RangeAssignor from the list.</p>" +
    "<p>Implementing the <code>org.apache.kafka.clients.consumer.ConsumerPartitionAssignor</code> " +
    "interface allows you to plug in a custom assignment strategy.</p>";

同样,Kafka内置了一些实现方式,在通常情况下也都是最优的选择。可以实现自己的分配策略。

从上面介绍可以看到Kafka默认提供了三种消费者的分区分配策略

  • range策略: 比如一个Topic有10个Partiton(partition 0~9) 一个消费者组下有三个Consumer(consumer1~3)。Range策略就会将分区0~3分给一个Consumer,4~6给一个Consumer,7~9给一个Consumer。

  • round-robin策略:轮询分配策略,可以理解为在Consumer中一个一个轮流分配分区。比如0,3,6,9分区给一个Consumer,1,4,7分区给一个Consumer,然后2,5,8给一个Consumer

  • sticky策略:粘性策略。这个策略有两个原则:

    • 在开始分区时,尽量保持分区的分配均匀。比如按照Range策略分(这一步实际上是随机的)。

    • 分区的分配尽可能的与上一次分配的保持一致。比如在range分区的情况下,第三个Consumer的服务宕机了,那么按照sticky策略,就会保持consumer1和consumer2原有的分区分配情况。然后将consumer3分配的7~9分区尽量平均的分配到另外两个consumer上。这种粘性策略可以很好的保持Consumer的数据稳定性。

另外可以通过继承AbstractPartitionAssignor抽象类自定义消费者的订阅方式。

官方默认提供的生产者端的默认分区器以及消费者端的RangeAssignor+CooperativeStickyAssignor分配策略,在大部分场景下都是非常高效的算法。深入理解这些算法,对于你深入理解MQ场景,以及借此去横向对比理解其他的MQ产品,都是非常有帮助的。

那么在哪些场景下我们可以自己来定义分区器呢?例如如果在部署消费者时,如果我们的服务器配置不一样,就可以通过定制消费者分区器,让性能更好的服务器上的消费者消费较多的消息,而其他服务器上的消费者消费较少的消息,这样就能更合理的运用上消费者端的服务器性能,提升消费者的整体消费速度。

生产者消息缓存机制

Kafka生产者为了避免高并发请求对服务端造成过大压力,每次发消息时并不是一条一条发往服务端,而是增加了一个高速缓存,将消息集中到缓存后,批量进行发送。这种缓存机制也是高并发处理时非常常用的一种机制。

Kafka的消息缓存机制涉及到KafkaProducer中的两个关键组件: accumulator 和 sender

//1.记录累加器
int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
this.accumulator = new RecordAccumulator(logContext,batchSize,this.compressionType,lingerMs(config),retryBackoffMs,deliveryTimeoutMs, partitionerConfig,metrics,PRODUCER_METRIC_GROUP_NAME,time,apiVersions,transactionManager,new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME));
//2. 数据发送线程
this.sender = newSender(logContext, kafkaClient, this.metadata);

其中RecordAccumulator,就是Kafka生产者的消息累加器。KafkaProducer要发送的消息都会在ReocrdAccumulator中缓存起来,然后再分批发送给kafka broker。

在RecordAccumulator中,会针对每一个Partition,维护一个Deque双端队列,这些Dequeue队列基本上是和Kafka服务端的Topic下的Partition对应的。每个Dequeue里会放入若干个ProducerBatch数据。KafkaProducer每次发送的消息,都会根据key分配到对应的Deque队列中。然后每个消息都会保存在这些队列中的某一个ProducerBatch中。而消息分发的规则,就是由上面的Partitioner组件完成的。

这里主要涉及到两个参数

//RecordAccumulator缓冲区大小
public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
private static final String BUFFER_MEMORY_DOC = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are "
                                                    + "sent faster than they can be delivered to the server the producer will block for <code>" + MAX_BLOCK_MS_CONFIG + "</code> after which it will throw an exception."
                                                    + "<p>"
                                                    + "This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since "
                                                    + "not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if "
                                                    + "compression is enabled) as well as for maintaining in-flight requests.";

//缓冲区每一个batch的大小
public static final String BATCH_SIZE_CONFIG = "batch.size";
private static final String BATCH_SIZE_DOC = "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent"
                                                 + " to the same partition. This helps performance on both the client and the server. This configuration controls the "
                                                 + "default batch size in bytes. "
                                                 + "<p>"
                                                 + "No attempt will be made to batch records larger than this size. "
                                                 + "<p>"
                                                 + "Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. "
                                                 + "<p>"
                                                 + "A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable "
                                                 + "batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a "
                                                 + "buffer of the specified batch size in anticipation of additional records."
                                                 + "<p>"
                                                 + "Note: This setting gives the upper bound of the batch size to be sent. If we have fewer than this many bytes accumulated "
                                                 + "for this partition, we will 'linger' for the <code>linger.ms</code> time waiting for more records to show up. "
                                                 + "This <code>linger.ms</code> setting defaults to 0, which means we'll immediately send out a record even the accumulated "
                                                 + "batch size is under this <code>batch.size</code> setting.";

这里面也提到了几个其他的参数,比如 MAX_BLOCK_MS_CONFIG ,默认60秒

接下来,sender就是KafkaProducer中用来发送消息的一个单独的线程。从这里可以看到,每个KafkaProducer对象都对应一个sender线程。他会负责将RecordAccumulator中的消息发送给Kafka。

Sender也并不是一次就把RecordAccumulator中缓存的所有消息都发送出去,而是每次只拿一部分消息。他只获取RecordAccumulator中缓存内容达到BATCH_SIZE_CONFIG大小的ProducerBatch消息。当然,如果消息比较少,ProducerBatch中的消息大小长期达不到BATCH_SIZE_CONFIG的话,Sender也不会一直等待。最多等待LINGER_MS_CONFIG时长。然后就会将ProducerBatch中的消息读取出来。LINGER_MS_CONFIG默认值是0。

然后,Sender对读取出来的消息,会以Broker为key,缓存到一个对应的队列当中。这些队列当中的消息就称为InflightRequest。接下来这些Inflight就会一一发往Kafka对应的Broker中,直到收到Broker的响应,才会从队列中移除。这些队列也并不会无限缓存,最多缓存MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION(默认值为5)个请求。

生产者缓存机制的主要目的是将消息打包,减少网络IO频率。所以,在Sender的InflightRequest队列中,消息也不是一条一条发送给Broker的,而是一批消息一起往Broker发送。而这就意味着这一批消息是没有固定的先后顺序的。

其中涉及到的几个主要参数如下:

public static final String LINGER_MS_CONFIG = "linger.ms";
private static final String LINGER_MS_DOC = "The producer groups together any records that arrive in between request transmissions into a single batched request. "
    + "Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to "
    + "reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount "
    + "of artificial delay&mdash;that is, rather than immediately sending out a record, the producer will wait for up to "
    + "the given delay to allow other records to be sent so that the sends can be batched together. This can be thought "
    + "of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once "
    + "we get <code>" + BATCH_SIZE_CONFIG + "</code> worth of records for a partition it will be sent immediately regardless of this "
    + "setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the "
    + "specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting <code>" + LINGER_MS_CONFIG + "=5</code>, "
    + "for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.";



public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."
    + " Note that if this configuration is set to be greater than 1 and <code>enable.idempotence</code> is set to false, there is a risk of"
    + " message reordering after a failed send due to retries (i.e., if retries are enabled); "
    + " if retries are disabled or if <code>enable.idempotence</code> is set to true, ordering will be preserved."
    + " Additionally, enabling idempotence requires the value of this configuration to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."
    + " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. ";

最后,Sender会通过其中的一个Selector组件完成与Kafka的IO请求,并接收Kafka的响应。

//org.apache.kafka.clients.producer.KafkaProducer#doSend
if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), appendCallbacks.getPartition());
                this.sender.wakeup();
            }

Kafka的生产者缓存机制是Kafka面对海量消息时非常重要的优化机制。合理优化这些参数,对于Kafka集群性能提升是非常重要的。比如如果你的消息体比较大,那么应该考虑加大batch.size,尽量提升batch的缓存效率。而如果Producer要发送的消息确实非常多,那么就需要考虑加大total.memory参数,尽量避免缓存不够造成的阻塞。如果发现生产者发送消息比较慢,那么可以考虑提升max.in.flight.requests.per.connection参数,这样能加大消息发送的吞吐量。

发送应答机制

生产者消息幂等性

生产者消息事务

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1304324.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

快宝技术:连接无代码开发,API集成提升电商营销和用户运营效率

无代码开发&#xff1a;创新的启航 快宝技术自2012年成立至今&#xff0c;一直是无代码开发领域的佼佼者。通过无代码开发平台&#xff0c;快宝技术旨在降低技术门槛&#xff0c;并使非技术人员能够轻松创建和部署应用程序。这不仅使得快递末端软件开发变得高效和便捷&#xf…

用Sketch for Mac轻松创作无限可能的矢量绘图

在如今的数码时代&#xff0c;矢量绘图软件成为了许多设计师和创意爱好者的必备工具。而在众多的矢量绘图软件中&#xff0c;Sketch for Mac无疑是最受欢迎的一款。它以其简洁易用的界面和强大的功能&#xff0c;让用户能够轻松创作出无限可能的矢量图形。 首先&#xff0c;Sk…

Qt Desktop Widgets 控件绘图原理逐步分析拆解

Qt 是目前C语言首选的框架库。之所以称为框架库而不单单是GUI库&#xff0c;是因为Qt提供了远远超过GUI的功能封装&#xff0c;即使不使用GUI的后台服务&#xff0c;也可以用Qt大大提高跨平台的能力。 仅就界面来说&#xff0c;Qt 保持各个平台绘图等效果的统一&#xff0c;并…

QX320F28346,TI的TMS320F28346定制的DSP吗?为什么没有模拟外设ADC、DAC等?

QX320F28346&#xff0c;TI的TMS320F28346定制的DSP吗&#xff1f;为什么没有模拟外设ADC、DAC等&#xff1f;

基于Java+vue的音乐网站设计与实现(源码+文档+数据库)

摘 要 在此基础上&#xff0c;提出了一种基于javavue的在线音乐排行榜系统的设计与实现方法。本系统分为两个大的功能&#xff0c;即&#xff1a;前端显示、后端管理。而在前台&#xff0c;则是播放不同的歌曲&#xff0c;让人可以在上面观看不同的歌曲&#xff0c;也可以观看…

Vue--第八天

Vue3 1.优点&#xff1a; 2.创建&#xff1a; 3.文件&#xff1a; 换运行插件&#xff1a; 4.运行&#xff1a; setup函数&#xff1a; setup函数中获取不到this&#xff08;this 在定义的时候是Undefined) reactive()和ref(): 代码&#xff1a; <script setup> // …

springboot listener、filter登录实战

转载自&#xff1a; www.javaman.cn 博客系统访问&#xff1a; http://175.24.198.63:9090/front/index 登录功能 1、前端页面 采用的是layui-admin框架&#xff0c;文中的验证码内容&#xff0c;请参考作者之前的验证码功能 <!DOCTYPE html> <html lang"zh…

如何通过VNC实现公网远程控制macOS设备

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

Pinia无废话,快速上手

Pinia无废话&#xff0c;快速上手 Vue3 状态管理 - Pinia 1. 什么是Pinia Pinia 是 Vue 的专属的最新状态管理库 &#xff0c;是 Vuex 状态管理工具的替代品 2. 手动添加Pinia到Vue项目 后面在实际开发项目的时候&#xff0c;Pinia可以在项目创建时自动添加&#xff0c;现…

PDI/Kettle-9.4.0.0-343源码下载及编译

目录 &#x1f351;一、概要&#x1f34a;最新版本10.x&#xff08;2023-11-30&#xff09; &#x1f351;二、下载&#x1f351;三、编译&#x1f34a;3.1、导入开发工具&#x1f34a;3.2、开始编译&#x1f34a;3.3、编译报错&#x1f34a;3.4、报错原因&#xff1a;jdk版本低…

if - else 实现点击展开 / 折叠

在前端开发过程中&#xff0c;我们经常需要使用到点击展开/折叠的按钮。 此案例是一个数组嵌套数组的效果展示&#xff0c;使用的是v-if else 来实现的展开效果。 一、实现方法 if...else&#xff1a;当指定条件为真&#xff0c;if 语句会执行一段语句。如果条件为假&#x…

2023/12/11 作业

1.思维导图 2.作业 成果&#xff1a; 第一个头文件 #ifndef TEST3GET_H #define TEST3GET_H #include <QWidget> #include<QMessageBox> QT_BEGIN_NAMESPACE namespace Ui { class test3get; } QT_END_NAMESPACE class test3get : public QWidget { Q_OBJE…

用docker创建jmeter容器,如何实现性能测试?

用 docker 创建 jmeter 容器, 实现性能测试 我们都知道&#xff0c;jmeter可以做接口测试&#xff0c;也可以用于性能测试&#xff0c;现在企业中性能测试也大多使用jmeter。docker是最近这些年流行起来的容器部署工具&#xff0c;可以创建一个容器&#xff0c;然后把项目放到…

chrome浏览器使用flash player

今天用chrome打开学校校园网&#xff0c;显示不出来成绩单提示如下&#xff1a; 结果下了也没用。 Chrome浏览器在2020年12月已经停止支持Flash Player插件&#xff0c;所以无法在Chrome浏览器上使用Flash Player。 使用其他浏览器 如果之前安装了Flash Player插件的小伙伴&…

多维时序 | MATLAB实现SAO-CNN-BiGRU-Multihead-Attention多头注意力机制多变量时间序列预测

多维时序 | MATLAB实现SAO-CNN-BiGRU-Multihead-Attention多头注意力机制多变量时间序列预测 目录 多维时序 | MATLAB实现SAO-CNN-BiGRU-Multihead-Attention多头注意力机制多变量时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 MATLAB实现SAO-CNN-B…

关于引用unpkg.com的mars3d相关依赖文件报错无法请求的说明

问题来源&#xff1a; 1.关于引用unpkg.com的mars3d相关依赖文件报错无法请求的说明 说明&#xff1a; 1.最近npm、unpkeg都访问异常&#xff0c;可能是unpkg.com等国外的服务器不稳定导致的请求未响应。 解决方案&#xff1a; 1.请切换静态文件引入的方式请求相关资源。参…

Science Robotics 挖掘机升级智能机器人,充分使用当地材料自主搭建石墙和土墙

建筑业对人类生产力至关重要&#xff0c;但需要实质性创新来满足不断增长的需求并减少其对环境的严重影响。建筑业是世界上最大的经济部门之一&#xff0c;占全球国内生产总值的13%。推而广之&#xff0c;它几乎是所有其他行业的重要组成部分&#xff1a;建筑业负责运输和农业基…

定时补偿方案

1&#xff1a;需求描述 支持NVR升级后通道数变更&#xff0c;完成升级后&#xff0c;设备SDK上报通道数量给A平台&#xff0c;A平台将NVR通道数量同步给B平台&#xff0c;B平台自动调用C平台接口&#xff0c;同步通道数量给C平台&#xff0c;C平台重新生成通道序列号&#xff…

PHP 之道(PHP The Right Way 中文版)

PHP 之道&#xff08;PHP The Right Way 中文版&#xff09;

HeartBeat监控Mysql状态

目录 一、概述 二、 安装部署 三、配置 四、启动服务 五、查看数据 一、概述 使用heartbeat可以实现在kibana界面对 Mysql 服务存活状态进行观察&#xff0c;如有必要&#xff0c;也可在服务宕机后立即向相关人员发送邮件通知 二、 安装部署 参照章节&#xff1a;监控组件…