发送原理
在消息发送过程中,涉及两个线程,main线程和Sender线程。在main线程中创建了一个双端队列,RecordAccumulator,Sender过程不断从RecordAccumulator中拉取消息发送到Kafka Broker
batch size:只有数据累计到batch.size之后,sender才会发送数据,默认16k。
linger.ms: 如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认是0ms,表示没有延迟
应答
应答acks
0:生产者发送过来数据,不需要等数据落盘应答
1:生产者发送过来的数据,leader收到数据后应答
-1:(all):生产者发送过来的数据,leader和ISR队列的所有节点收齐数据后应答。
生产者重要参数
参数名称 | 描述 |
---|---|
bootstrap.servers | 生产者连接集群所需要的broker清单,例如hadoop100:9002,hadoop101:9002,hadoop:102: 9002,可以设置1个或者多个,中间用逗号隔开,注意这里并不是需要所有broker的地址,因为生产者从给定的broker里查找到其他broker信息 |
key.serializer 和 value.serializer | 指定发送消息的key和value的序列化类型,一定要写全类名 |
key.serializer 和 value.serializer | 指定发送消息的key和value的序列化类型,一定要写全类名 |
buffer.memory | RecordAccumulator缓冲区总大小,默认32m |
batch.size | 缓冲区一批数据最大值,默认16K。适当增加该值,可以提高吞吐量,如果该值设置太大,会导致数据传输延迟增加 |
linger.ms | 如果数据迟迟未到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值时是0ms,表示没有延迟,生产环境建议该值大小为5-100ms |
acks | 0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据,leader收到数据后应答。-1:生产者发送过来的数据,leader+和isr队列里面所有的节点收起数据后应答,默认值是-1,-1和all等价的 |
retries | 当消息发送出现错误的时候,系统会重发消息,retries表示重试次数。默认是int最大值,如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1,否则在重试此失败小 |
retry.backoff.ms | 两次重试之间的间隔,默认是100ms |
enable.idempotence | 是否开启幂等性,默认true,开启幂等性 |
compression.type | 生产者发送的所有数据的压缩方式,默认是none,就是不压身,支持压缩类型:none,gzip,snappy,lz4和zstd |