文章目录
更多相关内容可查看
Kafka介绍
Apache Kafka是一个开源的分布式事件流平台,由LinkedIn公司开发并于2011年贡献给Apache软件基金会。Kafka设计用于处理大规模实时数据,它能够处理每秒数百万条消息,因此被广泛应用于大数据和实时分析领域。
Kafka的主要特点包括:
-
高吞吐量:Kafka能够处理每秒数百万条消息,满足大规模数据处理的需求。
-
分布式:Kafka通过分布式系统设计,提供数据冗余和容错能力。
-
实时性:Kafka能够实时处理数据,适合需要快速响应的场景。
-
持久性:Kafka将数据存储在磁盘上,即使系统崩溃,数据也不会丢失。
-
可扩展性:Kafka可以通过添加更多的服务器来扩展处理能力。
Kafka使用场景
自媒体用户发布文章成功之后需要进行文章的审核 , 审核通过之后才会发布到APP端供用户查看 , 审核功能因为耗时较久 , 长时间阻塞会影响用户体验 , 而且长时间阻塞会严重影响系统的吞吐量
所以为了实现功能之间的解耦 , 提升用户体验 , 我们可以抽取一个独立的审核服务 , 文章发布成功之后自媒体服务
通过MQ
通知审核服务进行文章审核 , 如下图所示 :
为什么要选择使用Kafka作为消息中间件
- 因为我们后期会使用MQ进行行为数据采集 , 对于消息的吞吐量要求更高
- 因为后期会进行文章的实时推荐 , 会使用到一些实时流计算技术 , Kafka提供这么一个技术 Kafka Stream , 开发成本和运维成本会更低一些
kafka概述和安装
kafka概述
消息中间件对比
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
开发语言 | java | erlang | java | scala(JVM) |
单机吞吐量 | 万级 | 万级 | 10万级 | 100万级 |
时效性 | ms | us | ms | ms级以内 |
可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 非常高(分布式) |
功能特性 | 成熟的产品、较全的文档、各种协议支持好 | 并发能力强、性能好、延迟低 | MQ功能比较完善,扩展性佳 | 只支持主要的MQ功能,主要应用于大数据领域 |
消息中间件对比-选择建议
消息中间件 | 建议 |
---|---|
Kafka | 追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务 |
RocketMQ | 可靠性要求很高的金融互联网领域,稳定性高,经历了多次阿里双11考验 |
RabbitMQ | 性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的RabbitMQ |
kafka介绍
Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。
kafka官网:http://kafka.apache.org/
kafka介绍-名词解释
- producer:发布消息的对象称之为主题生产者(Kafka topic producer)
- topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
- consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
- broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
kafka安装配置
可查看文章【Linux】Docker安装kafka教程(保姆篇)
kafka快速入门
- 生产者发送消息,多个消费者只能有一个消费者接收到消息
- 生产者发送消息,多个消费者都可以接收到消息
创建项目
导入依赖
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
发送消息
启动引导类
@SpringBootApplication
public class KafkaProducerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}
}
在kafka-producer项目中编写生产者代码发送消息 , 创建application.yml配置文件, 配置Kafka连接信息
spring:
application:
name: kafka-producer
kafka:
bootstrap-servers: 118.25.197.221:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
配置消息主题
@Configuration
public class KafkaConfig {
@Bean
public NewTopic newTopic(){
return TopicBuilder.name("topic.my-topic1").build();
}
}
发送消息到Kafka
package com.heima.kafka;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SuccessCallback;
import java.util.concurrent.ExecutionException;
@SpringBootTest
public class KafkaProducerTest {
@Autowired
private KafkaTemplate kafkaTemplate;
@Test
public void testSend() throws ExecutionException, InterruptedException {
kafkaTemplate.send("kafka.topic.my-topic1", "kafka", "hello kafka !");
}
}
接收消息
启动引导类
@SpringBootApplication
public class KafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaConsumerApplication.class, args);
}
}
在kafka-consumer项目中编写消费者代码接收消息 , 创建application.yml配置文件, 配置Kafka连接信息
spring:
application:
name: kafka-consumer
kafka:
bootstrap-servers: 118.25.197.221:9092
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
创建监听器类, 监听kafka消息
@Component
public class KafkaConsumerListener {
@KafkaListener(topics = "kafka.topic.my-topic1", groupId = "group1")
public void listenTopic1group1(ConsumerRecord<String, String> record) {
String key = record.key();
String value = record.value();
System.out.println("group1中的消费者接收到消息:" + key + " : " + value+"));
}
}
kafka生产者详解
发送类型
同步发送: 使用send()
方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功,因为如果发送操作失败,kafkaTemplate.send().get()
会抛出异常,而不会返回SendResult
。如果SendResult被成功返回,那么就意味着消息已经被成功发送到Kafka。
消息偏移量:是Kafka用于标识消息在主题中的位置的一个数字。每个新的消息都会被赋予一个比前一个消息大的偏移量。
@Test
public void testSend() throws ExecutionException, InterruptedException {
同步发送
SendResult result = (SendResult) kafkaTemplate.send("kafka.topic.my-topic1", "kafka", "hello kafka").get();
//打印发送结果中的消息偏移量。
System.out.println(result.getRecordMetadata().offset());
}
异步发送 :调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数
@Test
public void testSend() throws ExecutionException, InterruptedException {
//异步发送
ListenableFuture future = kafkaTemplate.send("kafka.topic.my-topic1", "kafka", "hello kafka");
future.addCallback(result -> {
//消息发送成功执行
SendResult sendResult = (SendResult) result;
System.out.println(sendResult.getRecordMetadata().offset());
}, throwable -> {
//消息发送失败执行
System.out.println("发送消息出现异常:" + throwable);
});
Thread.sleep(1000);
}
参数详解
签收机制 : acks
代码的配置方式:
spring:
application:
name: kafka-producer
kafka:
bootstrap-servers: 118.25.197.221:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 10 # 重试次数
compression-type: gzip # 消息压缩算法
batch-size: 16KB #批量提交的数据大小
acks: all # 消息确认机制 0: 不签收 , 1 : leader签收 , all : leader和follower都签收
参数的选择说明
确认机制 | 说明 |
---|---|
acks=0 | 生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快 |
acks=1(默认值) | 只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应 |
acks=all | 只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应 |
追求极致的吞吐量和性能使用 acks=0
追求是数据安全, 消息发送不丢失 , acks=all
既要吞吐量也要可靠性 : acks=1 (折中方案)
重试机制 : retries
生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms
代码中配置方式:
spring:
application:
name: kafka-producer
kafka:
bootstrap-servers: 118.25.197.221:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 10 # 重试次数
为了提高消息投递的成功率, 可以将重试次数设为一个很大的值 , 例如 : 999999999999999
kafka消费者详解
消息有序性
所谓消息有序性就是保证Kafka消息消费的顺序和发送的顺序保持一致 ,
应用场景 : 比如客户开车出事故了需要保险公司来处理,至少要有以下几个步骤: 报案、查勘定损、立案、收单理算支付、结案等环节,这些环节是严格有序的。保险公司每完成一个环节,需要给中保信(监管保险公司的)推送数据,如果推送顺序有问题,会返回错误,比如上一个环节还没有完成。同样电商行业也是如此,下单、支付、发货都是有序的。
Kafka消息有序性
我们知道Kafka中的每个分区中的数据是有序的,但有序性仅限于当前的分区中。比如我们现在往一个topic
中发送消息 , 这个topic有两个分区 , 默认采用轮询策略
, 那么这个topic分区0中插入数据 1,3,5,然后在分区1中插入数据2,4,6 , 这时如果消费者想要读取这个topic的数据,他就可能随机从分区0和分区1中读取数据,比如读出结果为1,3,2,5,4,6。这时可以看到读到的数据顺序已经不是插入的顺序了。
方法一 : 一个 Topic 只对应一个 Partion
消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。Kafka 通过偏移量(offset)来保证消息在分区内的顺序性。每次添加消息到 Partition(分区) 的时候都会采用尾加法,如下图所示。Kafka 只能为我们保证 Partition(分区) 中的消息有序,而不能保证 Topic(主题) 中的 Partition(分区) 的有序。
所以,我们就有一种很简单的保证消息消费顺序的方法:一个 Topic 只对应一个 Partion , 这种方式影响Kafka效率
方法二 : 按键路由
Kafka 中发送 1 条消息的时候,可以指定 topic
, partition
, key
,data
(数据) 4 个参数。如果你发送消息的时候指定了 partion 的话,所有消息都会被发送到指定的 partion。并且,同一个 key 的消息可以保证只发送到同一个 partition ! 这样我们就可以为需要保证顺序的消息设置同一个Key , 这样就能保证这组消息都发送到同一个分区中 , 从而保证消息顺序性
@Test
public void sendTopic3() {
kafkaTemplate.send("test.topic03", "order_1001", "kafka!");
}
提交和偏移量
Kafka会记录每条消息的offset(偏移量) , 消费者可以使用offset来追踪消息在分区的位置 , 所以在Kafka中消息消费采用的是pull模型, 由消费者主动去Kafka Brocker中拉取消息
之前说过Kafka的消费者再均衡机制 : 如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡 , 例如:
如果消费者2挂掉以后,会发生再均衡,消费者2负责的分区会被其他消费者进行消费
再均衡后不可避免会出现一些问题(消息丢失&消息重复消费)
问题一:消息重复消费
如果提交偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
问题二:消息丢失
如果提交的偏移量大于客户端的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。
如果想要解决这些问题,还要知道目前kafka提交偏移量的方式 , 提交偏移量的方式有两种,分别是自动提交偏移量和手动提交
- 自动提交偏移量 :当
enable.auto.commit
被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去 - 手动提交偏移量 : 当
enable.auto.commit
被设置为false , 需要程序员手动提交偏移量
手动提交偏移量 : 同步提交
把enable.auto.commit
设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量,commitSync()将会提交poll返回的最新的偏移量,所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。
只要没有发生不可恢复的错误,commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。
@KafkaListener(topics = "kafka.topic.my-topic1", groupId = "group1")
public void listenTopic1group1(KafkaConsumer consumer, ConsumerRecord<String, String> record) {
String key = record.key();
String value = record.value();
System.out.println("group1中的消费者接收到消息:" + key + " : " + value+",偏移量:"+record.offset());
//同步提交偏移量
consumer.commitSync();
}
手动提交偏移量 : 异步提交
手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。
@KafkaListener(topics = "kafka.topic.my-topic1", groupId = "group1")
public void listenTopic1group1(KafkaConsumer consumer, ConsumerRecord<String, String> record) {
String key = record.key();
String value = record.value();
System.out.println("group1中的消费者接收到消息:" + key + " : " + value+",偏移量:"+record.offset());
//异步提交偏移量
consumer.commitAsync();
}
手动提交偏移量 : 同步和异步组合提交
异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。
@KafkaListener(topics = "kafka.topic.my-topic1", groupId = "group1")
public void listenTopic1group1(KafkaConsumer consumer, ConsumerRecord<String, String> record) {
String key = record.key();
String value = record.value();
System.out.println("group1中的消费者接收到消息:" + key + " : " + value+",偏移量:"+record.offset());
//同步异步, 结合提交
try {
consumer.commitAsync();
} catch (Exception e) {
e.printStackTrace();
consumer.commitSync();
}
}