📑前言
本文主要讲了SpringBoot整合Kafka文章,如果有什么需要改进的地方还请大佬指出⛺️
上文链接:SpringBoot整合Kafka (一)
🎬作者简介:大家好,我是青衿🥇
☁️博客首页:CSDN主页放风讲故事
🌄每日一句:努力一点,优秀一点
目录
文章目录
- 📑前言
- **目录**
- 一、介绍
- 二、主要功能
- 三、Kafka基本概念
- 四、Spring Boot整合Kafka的demo
- 1、构建项目
- 1.1、引入依赖
- 1.2、YML配置
- 1.3、生产者简单生产
- 1.4、消费者简单消费
- 2、消费者
- 2.1、Kafka应答机制
- ACK应答级别
- 2.2、Kafka消息消费确认机制
- 自动提交
- 手动提交
- 2.3、指定消费
- 监听一个主题,指定分区消费消息
- 📑文章末尾
一、介绍
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景,比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目
二、主要功能
1.消息系统: Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
2.存储系统: Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
3.日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种\nconsumer,例如hadoop、Hbase、Solr等。
三、Kafka基本概念
kafka是一个分布式的,分区的消息(官方称之为commit log)服务。它提供一个消息系统应该具备的功能,但是确有着独特的设计。首先,让我们来看一下基础的消息(Message)相关术语:
Broker
消息中间件处理节点,一个Kafka节点就是一个broker,一 个或者多个Broker可以组成一个Kafka集群
Topic
Kafka根据topic对消息进行归类,发布到Kafka集群的每条 消息都需要指定一个topic
Producer
消息生产者,向Broker发送消息的客户端 Consumer 消息消费者,从Broker读取消息的客户端
ConsumerGroup
每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个ConsumerGroup中只能有一个Consumer能够消费该消息
Partition
物理上的概念,一个topic可以分为多个partition,每个 partition内部消息是有序的
四、Spring Boot整合Kafka的demo
1、构建项目
1.1、引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
1.2、YML配置
spring:
kafka:
bootstrap-servers: 192.168.147.200:9092 # 设置 Kafka Broker 地址。如果多个,使用逗号分隔。
producer: # 消息提供者key和value序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 3 # 生产者发送失败时,重试发送的次数
consumer: # 消费端反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: demo # 用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group,默认:""
1.3、生产者简单生产
@Autowired
private KafkaTemplate kafkaTemplate;
@Test
void contextLoads() {
ListenableFuture listenableFuture = kafkaTemplate.send("test01-topic", "Hello Wolrd test");
System.out.println("发送完成");
}
1.4、消费者简单消费
@Component
public class TopicConsumer {
@KafkaListener(topics = "test01-topic")
public void readMsg(String msg){
System.out.println("msg = " + msg);
}
}
2、消费者
2.1、Kafka应答机制
在生产者(producer)往Kafka发送数据的进程中,为了确保数据能够发送到指定的topic中,topic中的每一个partition在收到数据后,都需要向生产者发送 ack(ackacknowledgement)。
假设 producer 在必定的时间内收不到应对,那么producer会再次向Kafka发送此条数据。这就类似于写信,假定我们写一封信给或人,然后我们会在一段时间后收到一封回信,但假设超过了一个月我们还没有收到回信,就会猜想是不是信件丢掉了,会将这封信进行从头发送,直到收到回信中止。
ACK应答级别
一、0
介绍:生产者发送过来的数据,不需要等数据落盘应答
数据可靠性分析:容易丢数据
丢失数据原因:生产者发送完成后,Leader没有接收到数据,但是生产者认为已经发送成功了
二、1
介绍:生产者发送过来的数据,Leader收到数据后应答
数据可靠性分析:容易丢数据
丢失数据原因:应答完成后,还没开始同步副本,Leader挂了,新的Leader不会收到同步的消息,因为生产者已经认为发送成功了
三、-1(all)
介绍:生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答
数据可靠性分析:可靠
spring:
kafka:
bootstrap-servers: 192.168.***.***:9092 # 设置 Kafka Broker 地址。如果多个,使用逗号分隔。
producer: # 消息提供者key和value序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 3 # 生产者发送失败时,重试发送的次数
properties:
linger.ms: 0 # spring.kafka.producer.properties.linger.ms=0,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
acks: 1 # 修改ACK应答级别,默认是1
2.2、Kafka消息消费确认机制
Kafka消费消息确认机制分为两种:自动确认和手动确认。
(1)自动确认:在自动确认模式下,Kafka消费者消费一条消息后,会自动将消息偏移量提交到服务器端,不需要手动进行确认,从而确保消息被有效处理。此种确认机制的优点是操作简单,但是可能会导致消息重复消费,即当消费者处理消息的过程中出现异常,导致偏移量提交失败,下一次启动时就会重新消费之前已经处理过的消息。
(2)手动确认:在手动确认模式下,消费者需要显式地调用commit()方法,将消息的偏移量提交到服务器端,才会被标记为已处理。手动确认模式下,可以避免重复消费的问题,但是需要开发者自己实现确认逻辑,增加了一定的开发复杂度。
总的来说,自动确认适用于对消息的可靠性要求不高、实时性较高的场景;手动确认适用于对消息的可靠性要求较高、不要求实时性的场景
自动提交
这种提交方式有两个很重要的参数:
enable.auto.commit=true(是否开启自动提交,true or false)
auto.commit.interval.ms=5000(提交偏移量的时间间隔,默认5000ms)
每隔5秒,消费者会自动把从poll方法接收到的最大偏移量提交上去。自动提交是在轮询中进行,消费者每次轮询时都会检查是否提交该偏移量。可是这种情况会发生重复消费和丢失消息的情况。
server:
port: 18082
spring:
kafka:
bootstrap-servers: 192.168.***.***:9092,192.168.***.***:9093,192.168.***.***:9094
consumer: # consumer消费者
group-id: consumergroup # 默认的消费组ID
enable-auto-commit: true # 是否自动提交offset
auto-commit-interval: 120000
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
设置enable-auto-commit: true,开启自动提交,也就是偏移量不需要我们手动提交,程序会自己提交。
设置auto.commit.interval.ms=120000,也就是消费后,不会立即提交,会在2分钟后提交,只要在这期间服务异常终止,偏移量就无法提交到Broker,再次启动,会重复消费。
手动提交
手动提交模式可以有效确保消息不丢失以及不重复消费
MANUAL:poll()拉取一批消息,处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交。
我们可以先测试一下MANUAL模式,只需要需改配置application.yml即可:
spring:
kafka:
bootstrap-servers: 192.168.***.***:9092,192.***.***.130:9093,192.***.***.130:9094
consumer: # consumer消费者
group-id: consumergroup # 默认的消费组ID
enable-auto-commit: false # 是否自动提交offset
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: manual # 手动添加偏移量
消费者代码
@KafkaListener(topics = {"itmentu"},groupId = "itmentuGroup")
public void listener(ConsumerRecord<String,String> record, Acknowledgment ack){
//获取消息
String message = record.value();
//消息偏移量
long offset = record.offset();
System.out.println("读取的消息:"+message+"\n当前偏移量:"+offset);
//手动提交偏移量
ack.acknowledge();
}
2.3、指定消费
属性解释:
id:消费者ID
groupId:消费组ID
topics:监听的topic,可监听多个
topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听,手动分区。
监听一个主题,指定分区消费消息
/**
* 监听一个主题,且指定消费主题的哪些分区。
* 参数详解:消费者组=apple_group;监听主题=iphoneTopic;只消费的分区=1,2;消费者数量=2
* @param record
*/
@KafkaListener(
groupId = APPLE_GROUP,
topicPartitions = {
@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"})
},
concurrency = "2"
)
public void consumeByPattern(ConsumerRecord<String, String> record) {
System.out.println("consumeByPattern");
System.out.printf(
"主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value(),
record.timestamp()
);
以上是简单的Spring Boot整合kafka的示例,可以根据自己的实际需求进行调整。