Kafka高级特性解析之生产者

news2024/10/6 14:35:48

1、消息发送

1.1、数据生产流程解析

  1. Producer创建时,会创建一个Sender线程并设置为守护线程。
  2. 生产消息时,内部其实是异步流程;生产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)。
  3. 批次发送的条件为:缓冲区数据大小达到batch.size或者linger.ms达到上限,哪个先达到就算哪个。
  4. 批次发送后,发往指定分区,然后落盘到broker;如果生产者配置了retrires参数大于0并且失败原因允许重试,那么客户端内部会对该消息进行重试。
  5. 落盘到broker成功,返回生产元数据给生产者。
  6. 元数据返回有两种方式:一种是通过阻塞直接返回,另一种是通过回调返回。 

1.2、必要参数配置

broker配置

(1)配置条目的使用方式:

(2) 配置参数:

属性说明重要性
bootstrap.servers生产者客户端与broker集群建立初始连接需要的broker地址列表,由该初始连接发现Kafka集群中其他的所有broker。该地址列表不需要写全部的Kafka集群中broker的地址,但也不要写一个,以防该节点宕机的时候不可用。形式为: host1:port1,host2:port2,... .high
key.serializer实现了接口org.apache.kafka.common.serialization.Serializer 的key序化类。high
value.serializer实现了接口org.apache.kafka.common.serialization.Serializer 的value序列化类。high
acks该选项控制着已发送消息的持久性。
acks=0 :生产者不等待broker的任何消息确认。只要将消息放到了socket的缓冲区,就认为消息已发送。不能保证服务器是否收到该消息, retries 设置也不起作用,因为客户端不关心消息是否发送失败。客户端收到的消息偏移量永远是-1。
acks=1 :leader将记录写到它本地日志,就响应客户端确认消息,而不等待follower副本的确认。如果leader确认了消息就宕机,则可能会丢失消息,因为follower副本可能还没来得及同步该消息。
acks=all :leader等待所有同步的副本确认该消息。保证了只要有一个同步副本存在,消息就不会丢失。这是最强的可用性保证。等价于acks=-1 。默认值为1,字符串。可选值:[all, -1, 0, 1]
high
compression.type生产者生成数据的压缩格式。默认是none(没有压缩)。允许的值: none , gzip , snappy 和lz4 。压缩是对整个消息批次来讲的。消息批的效率也影响压缩的比例。消息批越大,压缩效率越好。字符串类型的值。默认是none。high
retries设置该属性为一个大于1的值,将在消息发送失败的时候重新发送消息。该重试与客户端收到异常重新发送并无二至。允许重试但是不设置max.in.flight.requests.per.connection 为1,存在消息乱序的可能,因为如果两个批次发送到同一个分区,第一个失败了重试,第二个成功了,则第一个消息批在第二个消息批后。int类型的值,默认:0,可选值:[0,...,2147483647]high

1.3、序列化器

  • 由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要先将数据序列化为字节数组。
  • 序列化器的作用就是用于序列化要发送的消息的。 
  • Kafka使用org.apache.kafka.common.serialization.Serializer 接口用于定义序列化器,将泛型指定类型的数据转换为字节数组。
package org.apache.kafka.common.serialization;

import java.io.Closeable;
import java.util.Map;

/**
 * 将对象转换为byte数组的接口
 * <p>
 * 该接口的实现类需要提供无参构造器
 *
 * @param <T> 从哪个类型转换
 */
public interface Serializer<T> extends Closeable {
    /**
     * 类的配置信息
     *
     * @param configs key/value pairs
     * @param isKey   key的序列化还是value的序列化
     */
    void configure(Map<String, ?> configs, boolean isKey);

    /**
     * 将对象转换为字节数组
     *
     * @param topic 主题名称
     * @param data  需要转换的对象
     * @return 序列化的字节数组
     */
    byte[] serialize(String topic, T data);

    /**
     * 关闭序列化器
     * 该方法需要提供幂等性,因为可能调用多次。
     */
    @Override
    void close();
}

系统提供了该接口的子接口以及实现类:org.apache.kafka.common.serialization.ByteArraySerializer

org.apache.kafka.common.serialization.ByteBufferSerializer 

org.apache.kafka.common.serialization.BytesSerializer 

org.apache.kafka.common.serialization.DoubleSerializer 

org.apache.kafka.common.serialization.FloatSerializer 

org.apache.kafka.common.serialization.IntegerSerializer 

org.apache.kafka.common.serialization.StringSerializer 

org.apache.kafka.common.serialization.LongSerializer 

org.apache.kafka.common.serialization.ShortSerializer 

自定义序列化器 

数据的序列化一般生产中使用avro

        自定义序列化器需要实现org.apache.kafka.common.serialization.Serializer<T>接口,并实现其中的serialize 方法。

案例:

(1)实体类

package com.lagou.kafka.demo.entity;

/**
 * 用户自定义的封装消息的实体类
 */
public class User {
    private Integer userId;
    private String username;

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }
}

(2)序列化类:

package com.lagou.kafka.demo.serialization;

import com.lagou.kafka.demo.entity.User;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

public class UserSerializer implements Serializer<User> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // do nothing
        // 用于接收对序列化器的配置参数,并对当前序列化器进行配置和初始化的
    }

    @Override
    public byte[] serialize(String topic, User data) {
        try {
            if (data == null) {
                return null;
            } else {
                final Integer userId = data.getUserId();
                final String username = data.getUsername();

                if (userId != null) {
                    if (username != null) {
                        final byte[] bytes = username.getBytes("UTF-8");
                        int length = bytes.length;
                        // 第一个4个字节用于存储userId的值
                        // 第二个4个字节用于存储username字节数组的长度int值
                        // 第三个长度,用于存放username序列化之后的字节数组
                        ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
                        // 设置userId
                        buffer.putInt(userId);
                        // 设置username字节数组长度
                        buffer.putInt(length);
                        // 设置username字节数组
                        buffer.put(bytes);
                        // 以字节数组形式返回user对象的值
                        return buffer.array();
                    }
                }
            }
        } catch (Exception e) {
            throw new SerializationException("数据序列化失败");
        }
        return null;
    }

    @Override
    public void close() {
        // do nothing
        // 用于关闭资源等操作。需要幂等,即多次调用,效果是一样的。
    }
}

(3)生产者:

package com.lagou.kafka.demo.producer;

import com.lagou.kafka.demo.entity.User;
import com.lagou.kafka.demo.serialization.UserSerializer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;

public class MyProducer {
    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 设置自定义的序列化器
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class);

        KafkaProducer<String, User> producer = new KafkaProducer<String, User>(configs);

        User user = new User();
//        user.setUserId(1001);
//        user.setUsername("张三");
//        user.setUsername("李四");
//        user.setUsername("王五");
        user.setUserId(400);
        user.setUsername("赵四");

        ProducerRecord<String, User> record = new ProducerRecord<String, User>(
                "tp_user_01",   // topic
                user.getUsername(),   // key
                user                  // value
        );


        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    System.out.println("消息发送异常");
                } else {
                    System.out.println("主题:" + metadata.topic() + "\t"
                    + "分区:" + metadata.partition() + "\t"
                    + "生产者偏移量:" + metadata.offset());
                }
            }
        });

        // 关闭生产者
        producer.close();

    }
}

1.4、分区器

默认(DefaultPartitioner)分区计算: 

  • 如果record提供了分区号,则使用record提供的分区号
  • 如果record没有提供分区号,则使用key的序列化后的值的hash值对分区数量取模
  • 如果record没有提供分区号,也没有提供key,则使用轮询的方式分配分区号。
    • 会首先在可用的分区中分配分区号
    • 如果没有可用的分区,则在该主题所有分区中分配分区号。

如果要自定义分区器,则需要 

  1. 首先开发Partitioner接口的实现类
  2. 在KafkaProducer中进行设置:configs.put("partitioner.class", "xxx.xx.Xxx.class") 

位于org.apache.kafka.clients.producer 中的分区器接口:

package org.apache.kafka.clients.producer;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.Cluster;

import java.io.Closeable;

/**
 * 分区器接口
 */
public interface Partitioner extends Configurable, Closeable {
    /**
     * 为指定的消息记录计算分区值
     *
     * @param topic      主题名称
     * @param key        根据该key的值进行分区计算,如果没有则为null。
     * @param keyBytes   key的序列化字节数组,根据该数组进行分区计算。如果没有key,则为
     *                   null
     * @param value      根据value值进行分区计算,如果没有,则为null
     * @param valueBytes value的序列化字节数组,根据此值进行分区计算。如果没有,则为
     *                   null
     * @param cluster    当前集群的元数据
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object
            value, byte[] valueBytes, Cluster cluster);

    /**
     * 关闭分区器的时候调用该方法
     */
    public void close();
}

org.apache.kafka.clients.producer.internals 中分区器的默认实现:

package org.apache.kafka.clients.producer.internals;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

/**
 * 默认的分区策略:
 * <p>
 * 如果在记录中指定了分区,则使用指定的分区
 * 如果没有指定分区,但是有key的值,则使用key值的散列值计算分区
 * 如果没有指定分区也没有key的值,则使用轮询的方式选择一个分区
 */
public class DefaultPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    public void configure(Map<String, ?> configs) {
    }

    /**
     * 为指定的消息记录计算分区值
     *
     * @param topic      主题名称
     * @param key        根据该key的值进行分区计算,如果没有则为null。
     * @param keyBytes   key的序列化字节数组,根据该数组进行分区计算。如果没有key,则为
     *                   null
     * @param value      根据value值进行分区计算,如果没有,则为null
     * @param valueBytes value的序列化字节数组,根据此值进行分区计算。如果没有,则为
     *                   null
     * @param cluster    当前集群的元数据
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object
            value, byte[] valueBytes, Cluster cluster) {
    // 获取指定主题的所有分区信息
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        // 分区的数量
        int numPartitions = partitions.size();
    // 如果没有提供key
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
    // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
    // hash the keyBytes to choose a partition
    // 如果有,就计算keyBytes的哈希值,然后对当前主题的个数取模
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }

    public void close() {
    }
}

可以实现Partitioner接口自定义分区器: 

然后在生产者中配置: 

1.5、拦截器 

        Producer拦截器(interceptor)和Consumer端Interceptor是在Kafka 0.10版本被引入的,主要用于实现Client端的定制化控制逻辑。 

        对于Producer而言,Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,Producer允许用户指定多个Interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

  • onSend(ProducerRecord):该方法封装进KafkaProducer.send方法中,即运行在用户主线程中。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
  • onAcknowledgement(RecordMetadata, Exception):该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在Producer回调逻辑触发之前。onAcknowledgement运行在Producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢Producer的消息发送效率。
  • close:关闭Interceptor,主要用于执行一些资源清理工作。

        如前所述,Interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个Interceptor,则Producer将按照指定顺序调用它们,并仅仅是捕获每个Interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

自定义拦截器:

  1. 实现ProducerInterceptor接口
  2. 在KafkaProducer的设置中设置自定义的拦截器

案例:

(1)设置拦截器类:

package com.lagou.kafka.demo.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class InterceptorOne implements ProducerInterceptor<Integer, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class);

    @Override
    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
        System.out.println("拦截器1 -- go");


        // 消息发送的时候,经过拦截器,调用该方法

        // 要发送的消息内容
        final String topic = record.topic();
        final Integer partition = record.partition();
        final Integer key = record.key();
        final String value = record.value();
        final Long timestamp = record.timestamp();
        final Headers headers = record.headers();


        // 拦截器拦下来之后根据原来消息创建的新的消息
        // 此处对原消息没有做任何改动
        ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(
                topic,
                partition,
                timestamp,
                key,
                value,
                headers
        );
        // 传递新的消息
        return newRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("拦截器1 -- back");
        // 消息确认或异常的时候,调用该方法,该方法中不应实现较重的任务
        // 会影响kafka生产者的性能。
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {
        final Object classContent = configs.get("classContent");
        System.out.println(classContent);
    }
}

(2)生产者

package com.lagou.kafka.demo.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;

public class MyProducer {
    public static void main(String[] args) {

        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 保证等待确认的消息只有设置的这几个。如果设置为1,则只有一个请求在等待响应
        // 此时可以保证发送消息即使在重试的情况下也是有序的。
        configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
//        configs.put("max.in.flight.requests.per.connection", 1);

//        interceptor.classes
        // 如果有多个拦截器,则设置为多个拦截器类的全限定类名,中间用逗号隔开
       /*  configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.lagou.kafka.demo.interceptor.InterceptorOne," +
                "com.lagou.kafka.demo.interceptor.InterceptorTwo," +
                "com.lagou.kafka.demo.interceptor.InterceptorThree"); */
        configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.lagou.kafka.demo.interceptor.InterceptorOne");


        configs.put("classContent", "this is lagou's kafka class");

        KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);

        ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
                "tp_inter_01",
                0,
                1001,
                "this is lagou's 1001 message"
        );

        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println(metadata.offset());
                }
            }
        });

        // 关闭生产者
        producer.close();


    }
}

2、原理剖析

由上图可以看出:KafkaProducer有两个基本线程: 

  • 主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器RecoderAccumulator中;
    • 消息收集器RecoderAccumulator为每个分区都维护了一个Deque<ProducerBatch> 类型的双端队列。
    • ProducerBatch 可以理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐量,降低网络影响;
    • 由于生产者客户端使用 java.io.ByteBuffer 在发送消息之前进行消息保存,并维护了一个 BufferPool 实现 ByteBuffer 的复用;该缓存池只针对特定大小( batch.size指定)的 ByteBuffer进行管理,对于消息过大的缓存,不能做到重复利用。
    • 每次追加一条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取一个ProducerBatch,判断当前消息的大小是否可以写入该批次中。若可以写入则写入;若不可以写入,则新建一个ProducerBatch,判断该消息大小是否超过客户端参数配置 batch.size 的值,不超过,则以 batch.size建立新的ProducerBatch,这样方便进行缓存重复利用;若超过,则以计算的消息大小建立对应的 ProducerBatch ,缺点就是该内存不能被复用了。
  • Sender线程:
    • 该线程从消息收集器获取缓存的消息,将其处理为 <Node, List<ProducerBatch> 的式, Node 表示集群的broker节点。
    • 进一步将<Node, List<ProducerBatch>转化为<Node, Request>形式,此时才可以向服务端发送数据。
    • 在发送之前,Sender线程将消息以 Map<NodeId, Deque<Request>> 的形式保存到InFlightRequests 中进行缓存,可以通过其获取 leastLoadedNode ,即当前Node中负载压力最小的一个,以实现消息的尽快发出。

3、生产者参数配置补充

(1)参数设置方式:

(2) 补充参数:

 

参数名称描述
retry.backoff.ms在向一个指定的主题分区重发消息的时候,重试之间的等待时间。
  • 比如3次重试,每次重试之后等待该时间长度,再接着重试。在一些失败的场景,避免了密集循环的重新发送请求。
  • long型值,默认100。可选值:[0,...]
retriesretries重试次数
  • 当消息发送出现错误的时候,系统会重发消息。跟客户端收到错误时重发一样。
  • 如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1,否则在重试此失败消息的时候,其他的消息可能发送成功了
request.timeout.ms

客户端等待请求响应的最大时长。

        如果服务端响应超时,则会重发请求,除非达到重试次数。该设置应该比replica.lag.time.max.ms (a broker configuration)要大,以免在服务器延迟时间内重发消息。int类型值,默认:30000,可选值:[0,...]

interceptor.classes

在生产者接收到该消息,向Kafka集群传输之前,由序列化器处理之前,可以通过拦截器对消息进行处理。

  • 要求拦截器类必须实现org.apache.kafka.clients.producer.ProducerInterceptor 接口。
  • 默认没有拦截器。Map<String, Object> configs中通过List集合配置多个拦截器类名。
acks

当生产者发送消息之后,如何确认消息已经发送成功了。

支持的值:

acks=0:

  • 如果设置为0,表示生产者不会等待broker对消息的确认,只要将消息放到缓冲区,就认为消息已经发送完成。
  • 该情形不能保证broker是否真的收到了消息,retries配置也不会生效,因为客户端不需要知道消息是否发送成功。
  • 发送的消息的返回的消息偏移量永远是-1。

acks=1

  • 表示消息只需要写到主分区即可,然后就响应客户端,而不等待副本分区的确认。
  • 在该情形下,如果主分区收到消息确认之后就宕机了,而副本分区还没来得及步该消息,则该消息丢失。

acks=all

  • 首领分区会等待所有的ISR副本分区确认记录。
  • 该处理保证了只要有一个ISR副本分区存货,消息就不会丢失。
  • 这是Kafka最强的可靠性保证,等效于acks=-1 。
batch.size当多个消息发送到同一个分区的时候,生产者尝试将多个记录作为一个批来处理。批处理提高了客户端和服务器的处理效率。
  • 该配置项以字节为单位控制默认批的大小。
  • 所有的批小于等于该值。
  • 发送给broker的请求将包含多个批次,每个分区一个,并包含可发送的数据。
  • 如果该值设置的比较小,会限制吞吐量(设置为0会完全禁用批处理)。如果设置的很大,又有一点浪费内存,因为Kafka会永远分配这么大的内存来参与到消息的批整合中。
client.id生产者发送请求的时候传递给broker的id字符串。
  • 用于在broker的请求日志中追踪什么应用发送了什么消息。
  • 一般该id是跟业务有关的字符串。
compression.type

生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。

  • 支持的值:none、gzip、snappy和lz4。
  • 压缩是对于整个批来讲的,所以批处理的效率也会影响到压缩的比例。
send.buffer.bytesTCP发送数据的时候使用的缓冲区(SO_SNDBUF)大小。如果设置为0,则使
用操作系统默认的。
buffer.memory

生产者可以用来缓存等待发送到服务器的记录的总内存字节。

        如果记录的发送速度超过了将记录发送到服务器的速度,则生产者将阻塞max.block.ms 的时间,此后它将引发异常。此设置应大致对应于生产者将使用的总内存,但并非生产者使用的所有内存都用于缓冲。一些额外的内存将用于压缩(如果启用了压缩)以及维护运行中的请求。long型数据。默认值:33554432,可选值:[0,...]

connections.max.idle.ms当连接空闲时间达到这个值,就关闭连接。long型数据,默认:540000
linger.ms

生产者在发送请求传输间隔会对需要发送的消息进行累积,然后作为一个批次
发送。

        一般情况是消息的发送的速度比消息累积的速度慢。有时客户端需要减少请求的次数,即使是在发送负载不大的情况下。该配置设置了一个延迟,生产者不会立即将消息发送到broker,而是等待这么一段时间以累积消息,然后将这段时间之内的消息作为一个批次发送。该设置是批处理的另一个上限:一旦批消息达到了batch.size 指定的值,消息批会立即发送,如果积累的消息字节数达不到batch.size 的值,可以设置该毫秒值,等待这么长时间之后,也会发送消息批。该属性默认值是0(没有延迟)。如果设置linger.ms=5 ,则在一个请求发送之前先等待5ms。long型值,默认:0,可选值:[0,...]

max.block.ms

控制KafkaProducer.send() 和KafkaProducer.partitionsFor() 阻塞的时长。

        当缓存满了或元数据不可用的时候,这些方法阻塞。在用户提供的序列化器和分区器的阻塞时间不计入。long型值,默认:60000,可选值:[0,...]

max.request.size

单个请求的最大字节数。

        该设置会限制单个请求中消息批的消息个数,以免单个请求发送太多的数据。服务器有自己的限制批大小的设置,与该配置可能不一样。int类型值,默认1048576,可选值:[0,...]

partitioner.class

实现了接口org.apache.kafka.clients.producer.Partitioner 的分区器实现类。

默认值为:org.apache.kafka.clients.producer.internals.DefaultPartitioner

receive.buffer.bytesTCP接收缓存(SO_RCVBUF),如果设置为-1,则使用操作系统默认的值。
int类型值,默认32768,可选值:[-1,...]
security.protocol

跟broker通信的协议:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.

string类型值,默认:PLAINTEXT

max.in.flight.requests.per.connection

单个连接上未确认请求的最大数量。

达到这个数量,客户端阻塞。如果该值大于1,且存在失败的请求,在重试的时候消息顺序不能保证。int类型值,默认5。可选值:[1,...]

reconnect.backoff.max.ms对于每个连续的连接失败,每台主机的退避将成倍增加,直至达到此最大值。
在计算退避增量之后,添加20%的随机抖动以避免连接风暴。long型值,默认1000,可选值:[0,...]
reconnect.backoff.ms

尝试重连指定主机的基础等待时间。

  • 避免了到该主机的密集重连。
  • 该退避时间应用于该客户端到broker的所有连接。
  • long型值,默认50。可选值:[0,...]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/63788.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Docker桌面版安装与使用(windows)

目录一、Docker概念二、下载安装三、docker镜像安装与操作四、制作自己的python镜像容器五、目录挂载六、多容器通信七、Docker-Compose管理多个容器运行八、发布和部署九、备份数据迁移一、Docker概念 1、Docker 是一个应用打包、分发、部署的工具2、镜像Image、容器Containe…

Windows OpenGL 图像绿幕抠图

目录 一.OpenGL 图像绿幕抠图 1.原始图片2.效果演示 二.OpenGL 图像绿幕抠图源码下载三.猜你喜欢 零基础 OpenGL ES 学习路线推荐 : OpenGL ES 学习目录 >> OpenGL ES 基础 零基础 OpenGL ES 学习路线推荐 : OpenGL ES 学习目录 >> OpenGL ES 特效 零基础 Open…

[问题解决方案](多人共同合并场景)git已merge到master分支代码且被同事代码覆盖如何回退

git已merge到master分支代码如何回退&#xff08;多人共同合并&#xff09;场景已经被同事代码覆盖的解决方案&#xff08;无需强制合并权限&#xff09;代码revert后又需要重新启用怎么办如果是未受保护分支代码的回退且只有你一人合并的代码 可以直接使用下面的命令即可如果只…

【Unity3D日常开发】Unity3D中实现不规则Button按钮的精准响应

推荐阅读 CSDN主页GitHub开源地址Unity3D插件分享简书地址我的个人博客QQ群&#xff1a;1040082875 大家好&#xff0c;我是佛系工程师☆恬静的小魔龙☆&#xff0c;不定时更新Unity开发技巧&#xff0c;觉得有用记得一键三连哦。 一、前言 在使用Unity3D开发中&#xff0c;可…

全志V853平台Camera模块开发框架详解

Camera 本章节介绍V853平台 Camera 模块的开发。 V853支持并口CSI、MIPI&#xff0c;使用VIN camera驱动框架。 Camera通路框架 VIN支持灵活配置单/双路输入双ISP多通路输出的规格 引入media框架实现pipeline管理 将libisp移植到用户空间解决GPL问题 将统计buffer独立为v…

Web大学生网页作业成品——抗击疫情网站设计与实现(HTML+CSS)

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

Vue3 样式绑定

Vue3 样式绑定1.Vue.js class2.class 属性绑定3.Vue.js style(内联样式)4.组件上使用 class 属性1.Vue.js class class 与 style 是 HTML 元素的属性&#xff0c;用于设置元素的样式&#xff0c;我们可以用 v-bind 来设置样式属性。 v-bind 在处理 class 和 style 时&#xf…

微信小程序反编译

本文转载于&#xff1a;https://www.cnblogs.com/one-seven/p/15524457.html 微信小程序反编译 微信文件保存位置\WeChat Files\Applet\小程序id_APP_.wxapkg 现在小程序是处于编码状态 github上下载一个python版的解密工具 https://github.com/superdashu/pc_wxapkg_decr…

【免杀前置课——Windows编程】十四、异步IO——什么是异步IO、API定位问题、APC调用队列

异步IO异步IO异步I/0注意事项:定位问题总解决方案APC调用队列异步IO 当我们读取一个文件时&#xff0c;一般情况下&#xff0c;线程是阻塞的&#xff0c;也就是说&#xff0c;当前线程在等待文件读取操作结束,这种方式叫同步IO。 Windows 在系统底层为用户实现了另外一种高效的…

【软考】-- 操作系统(下)

操作系统&#xff08;下&#xff09;第五节 文件管理&#x1f355;一、文件管理的基本概念1️⃣文件2️⃣文件目录3️⃣目录结构:&#x1f354;二、文件路径&#x1f35f;三、文件命名规则&#x1f32d;四、文件的基本操作&#x1f37f;五、文件类型与扩展名&#x1f9c2;六、系…

Docker中安装Kibana

Kibana是一个免费且开放的用户界面,能够让你对Elasticsearch 数据进行可视化,并让你在Elastic Stack中进行导航。你可以进行各种操作,从跟踪查询负载,到理解请求如何流经你的整个应用,都能轻松完成。 在Docker Hub中选择最新版本的Kibaba镜像(选择版本为8.5.1),如下图…

bigquant选股模型主要有哪些?

bigquant选股模型一般常见的有七种&#xff0c;即多因子模型、风格轮动模型、行业轮动模型、资金流模型、动量反转模型、一致预期模型、趋势追踪模型等方面。不过要想样样都学会精通也是需要花费时间&#xff0c;以及精力等&#xff0c;那么&#xff0c;小编就从最基本的多因子…

DPDK Mempool

mempool是DPDK提供的内存池&#xff0c;其用处有&#xff1a; 由于DPDK使用UIO让DMA将网卡中的数据直接拷贝至用户态&#xff0c;因此需要一块固定的区域提供给DMA重复利用内存&#xff0c;提高效率 结构 mempool的主要结构如下图所示。 mempool为每个注册的lcore都分配了一…

Node.js - nvm管理node.js版本

使用nvm来管理node.js的版本真的很方便&#xff0c;这样就可以根据自己的需要来回切换node.js版本&#xff01; 一、卸载本地安装的node.js版本 略 二、安装nvm管理工具 2.1、下载 https://github.com/coreybutler/nvm-windows/releases 2.2 安装 (1) 鼠标双击nvm-setup.exe文件…

【从零开始学习深度学习】7.自己动手实现softmax回归的训练与预测

基于上一篇文章读取fashion-minist数据集的基础&#xff0c;本文自己动手实现一个softmax模型对其进行训练与预测。 目录1. 自己动手实现softmax回归1.1 读取数据1.2 初始化模型参数1.3 实现softmax运算1.4 定义模型1.5 定义损失函数1.6 计算分类准确率1.7 训练模型1.8 预测完整…

面试碰壁15次!作为一个已经27岁的测试工程师,未来在何方....

3年测试经验原来什么都不是&#xff0c;只是给你的简历上画了一笔&#xff0c;一直觉得经验多&#xff0c;无论在哪都能找到满意的工作&#xff0c;但是现实却是给我打了一个大巴掌&#xff01;事后也不会给糖的那种... 先说一下自己的个人情况&#xff0c;普通二本计算机专业…

LabVIEW编程LabVIEW开发SMP10辐射表例程与相关资料

LabVIEW编程LabVIEW开发SMP10辐射表例程与相关资料 ​​SMP10辐射表是荷兰Kipp&Zonen公司的一种用于测量短波辐射的产品&#xff0c;配有只能型接口&#xff0c;能够提供标准输出&#xff0c;能耗低。 作为一款副基准总辐射表,SMP10结合了CMP 11的传感器技术、SMP 11的智…

2023最新SSM计算机毕业设计选题大全(附源码+LW)之java基于自组网的空地一体化信息系统mf392

面对老师五花八门的设计要求&#xff0c;首先自己要明确好自己的题目方向&#xff0c;并且与老师多多沟通&#xff0c;用什么编程语言&#xff0c;使用到什么数据库&#xff0c;确定好了&#xff0c;在开始着手毕业设计。 1&#xff1a;选择课题的第一选择就是尽量选择指导老师…

[附源码]计算机毕业设计JAVA疫情期间回乡人员管理系统

[附源码]计算机毕业设计JAVA疫情期间回乡人员管理系统 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM…

基于Java的课程管理系统

摘 要 在Internet高速发展的今天&#xff0c;我们生活的各个领域都涉及到计算机的应用&#xff0c;其中包括课程管理系统的网络应用&#xff0c;在外国课程管理已经是很普遍的方式&#xff0c;不过国内的课程管理可能还处于起步阶段。课程管理系统具有下载课件功能。课程管理系…