生产者工作流程
-
首先生产者调用send方法发送消息后,会先经过拦截器,接着进入序列化器。序列化器主要用于对消息的Key和Value进行序列化。接着进入分区器选择消息的分区。
-
上面这几步完成之后,消息会进入到一个名为RecordAccumulator的缓冲队列,这个队列默认32M。当满足以下两个条件的任意一个之,消息由sender线程发送。
条件一:消息累计达到batch.size,默认是16kb。
条件二:等待时间达到linger.ms,默认是0毫秒。
所以在默认情况下,由于等待时间是0毫秒,所以只要消息来一条就会发送一条。 -
Sender线程首先会通过sender读取数据,并创建发送的请求,针对Kafka集群里的每一个Broker,都会有一个InFlightRequests请求队列存放在NetWorkClient中,默认每个InFlightRequests请求队列中缓存5个请求。接着这些请求就会通过Selector发送到Kafka集群中。
-
当请求发送到Kafka集群后,Kafka集群会返回对应的acks信息。生产者可以根据具体的情况选择处理acks信息。
0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader收到数据后应答。
-1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。 -
发送成功的话,Deque会将NetworkClient中的缓存删除,然后将这个batch从Deque中删除。发送失败的话,sender会对这个发送请求进行重试(默认重试次数为int最大值,可以自定义)
Kafka生产者
操作kafka导入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.8.RELEASE</version>
</dependency>
异步发送
public static void main(String[] args) {
// 创建Kafka生产者配置对象
Properties props = new Properties();
// 给kafka配置对象添加配置信息
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop001:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建生产者 指定配置
Producer<String, String> producer = new KafkaProducer<>(props);
// 循环发送多条消息
for (int i = 0; i < 10; i++) {
// 创建生产者发送的消息和topic
ProducerRecord<String, String> record = new ProducerRecord<>("Hello-Kafka", "hello" + i,
"world" + i);
// 发送消息
producer.send(record);
}
// 关闭生产者
producer.close();
}
这里并没有使用Boot的自动装配,而是使用,通过API来进行操作Kafka的生产者
里面对于配置信息部分,都是可以方式Spring的YAML中的
如何查看消息发送的分区等信息,因为我们在发送完消息,并不知道消息的位置在哪里
在send()方法中进行 Callback中进行获取回调信息
/* 发送消息*/
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null) {
System.out.println("topic:"+metadata.topic());
System.out.println("partition:"+metadata.partition());
}else{
e.printStackTrace();
}
}
});
同步发送
for (int i = 0; i < 10; i++) {
// 创建生产者发送的消息和topic
ProducerRecord<String, String> record = new ProducerRecord<>("Hello-Kafka", "hello" + i,
"world" + i);
// 同步发送消息 需要处理check异常
try {
// 返回的对象就是 回调函数返回的对象获取回调信息
RecordMetadata metadata = producer.send(record).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// 关闭生产者
producer.close();
分区操作
为什么使用分区
消息对象指定分区
在创建消息的时候可以去指定分区
代码展示
// 创建生产者消息
// 指定发送到 1号 分区
ProducerRecord<String, String> record =
new ProducerRecord<>("Hello-Kafka", 1 ,"", "world");
// 创建生产者消息
// 指定发送到 同一个key下。kafka会对Key进行操作来通过key来指定一个分区
// 所以我们可以使用key来进行将 同一组的消息放在一个分区下
ProducerRecord<String, String> record1 =
new ProducerRecord<>("Hello-Kafka" ,"type", "world");
// 创建生产者消息
// 不指定分区和key 采取默认规则进行 多个分区选择传入
ProducerRecord<String, String> record2 =
new ProducerRecord<>("Hello-Kafka", "world");
生产者提高吞吐量
当我们发送的消息过多的时候,我们可以通过调节kafk的配置来提高我们想kafka传入的数据
buffer.memory
设置发送消息的缓冲区,默认值是33554432,就是32MB
如果发送消息出去的速度小于写入消息进去的速度,就会导致缓冲区写满,此时生产消息就会阻塞住,所以说这里就应该多做一些压测,尽可能保证说这块缓冲区不会被写满导致生产行为被阻塞住
compression.type
默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的cpu开销。
batch.size
设置merge batch的大小,如果 batch 太小,会导致频繁网络请求,吞吐量下降;
如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里
默认值是:16384,就是16kb,也就是一个batch满了16kb就发送出去,一般在实际生产环境,这个batch的值可以增大一些来提升吞吐量,可以自己压测一下。
linger.ms
这个值默认是0,意思就是消息必须立即被发送,但是这是不对的。
一般设置一个100毫秒之类的,这样的话就是说,这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch满了16kb,自然就会发送出去。
但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟时间太长,也避免给内存造成过大的一个压力。
这些可以通过Boot的yaml配置文件。或者通过编写的指定消费者配置信息时候指定
spring.kafka.proucer.buffer.memory
spring.kafka.proucer.batch.size
spring.kafka.proucer.linger.ms
spring.kafka.proucer.compression.type
数据丢失问题
ACK应答级别
spring.kafka.producer.ack =
## 0
## 1
## -1/all
0的时候,数据发过来还没落盘就应答,结果leader挂了导致了数据丢失。
1的时候,数据发送过来,leader落盘后就会应答,生产者收到ack应答认为信息已经发送成功,随后就会清除掉队列中的消息,但是此时follwer可能还没完成同步,这个时候leader挂掉,就会有一个follwer成为新的leader,可是生产者已经认为信息发送成功从队列中清除了消息,这就导致了数据的丢失。
-1(all):leader收到消息,并且所有follwer都完成消息同步后返回ack应答
最后一种 应答模式看似安全其实也是存在问题
可靠性总结:
- acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
- acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
- acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低; 在生产环境中,
- acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据, 对可靠性要求比较高的场景。
SpringBoot整合Kafka
上面操作通过Kafka整合的java的api同样可以在SpringBoot中使用
但是SpringBoot自动装配之后进行了封装 KafkaTemplate<> 使用
前面的配置信息很多可以通过YAML进行添加配置信息
kafka配置类
// 配置类
@Configuration
public class KafkaConfig {
/**
* 使用Spring配置类注解 将yaml中的信息引入到SpringBoot的kafka配置信息类中
*/
@Bean
@ConfigurationProperties(prefix = "spring.kafka")
@Primary
public KafkaProperties KafkaListenerKafkaProperties(){
return new KafkaProperties();
}
/**
* 创建kafka的工厂类
*/
@Bean
@Primary
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
//kafka 集群地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
//重试次数
props.put(ProducerConfig.RETRIES_CONFIG, 3);
//应答级别
//acks=0 把消息发送到kafka就认为发送成功
//acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
//acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
props.put(ProducerConfig.ACKS_CONFIG, "all");
//Key 序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//Value 序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//消息压缩:none、lz4、gzip、snappy,默认为 none。
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
return new DefaultKafkaProducerFactory<>(props);
}
/**
* kafkaTemplate 通过SpringBoot中的 kafka生产者工厂 和 kafka配置项
* 来创建 生产者的
*/
@Bean("kafkaTemplate")
@Primary
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(
new DefaultKafkaProducerFactory<>(KafkaListenerKafkaProperties().buildAdminProperties()));
}
}
SpringBoot的Kafka生产者
/**
* kafka 生产服务
*
* @author Leo
* @create 2020/12/31 16:06
**/
@Slf4j
@Service
public class KafkaProducerService {
@Qualifier("kafkaTemplate")
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Qualifier("kafkaTemplateWithTransaction")
@Resource
private KafkaTemplate<String, String> kafkaTemplateWithTransaction;
/**
* 发送消息(同步)
*
* @param topic 主题
* @param key 键
* @param message 值
*/
public void sendMessageSync(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
//可以指定最长等待时间,也可以不指定
kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message);
//指定key,kafka根据key进行hash,决定存入哪个partition
// kafkaTemplate.send(topic, key, message).get(10, TimeUnit.SECONDS);
//存入指定partition
// kafkaTemplate.send(topic, 0, key, message).get(10, TimeUnit.SECONDS);
}
/**
* 发送消息并获取结果
*
* @param topic
* @param message
* @throws ExecutionException
* @throws InterruptedException
*/
public void sendMessageGetResult(String topic, String key, String message) throws ExecutionException, InterruptedException {
SendResult<String, String> result = kafkaTemplate.send(topic, message).get();
log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message);
log.info("The partition the message was sent to: " + result.getRecordMetadata().partition());
}
/**
* 发送消息(异步)
*
* @param topic 主题
* @param message 消息内容
*/
public void sendMessageAsync(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
//添加回调
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
log.error("sendMessageAsync failure! topic : {}, message: {}", topic, message);
}
@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
log.info("sendMessageAsync success! topic: {}, message: {}", topic, message);
}
});
}
/**
* 可以将消息组装成 Message 对象和 ProducerRecord 对象发送
*
* @param topic
* @param key
* @param message
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
public void testMessageBuilder(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
// 组装消息
Message msg = MessageBuilder.withPayload(message)
.setHeader(KafkaHeaders.MESSAGE_KEY, key)
.setHeader(KafkaHeaders.TOPIC, topic)
.setHeader(KafkaHeaders.PREFIX, "kafka_")
.build();
//同步发送
kafkaTemplate.send(msg).get();
// 组装消息
// ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message);
// kafkaTemplate.send(producerRecord).get(10, TimeUnit.SECONDS);
}
}
afkaHeaders.MESSAGE_KEY, key)
.setHeader(KafkaHeaders.TOPIC, topic)
.setHeader(KafkaHeaders.PREFIX, “kafka_”)
.build();
//同步发送
kafkaTemplate.send(msg).get();
// 组装消息
// ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message);
// kafkaTemplate.send(producerRecord).get(10, TimeUnit.SECONDS);
}
}