1. 下载安装
官网下载地址:Apache Kafka
下载对应的文件
上传到服务器上,解压
tar -xzf kafka_2.13-3.7.0.tgz
目录结果如下
├── bin
│ └── windows
├── config
│ └── kraft
├── libs
├── licenses
└── site-docs
官方文档:Apache Kafka
kafka 有两种启动方式,ZooKeeper 和 KRaft,这里采用 KRaft 的方式,使用 kraft 目录下的配置文件
config
├── connect-console-sink.properties
├── connect-console-source.properties
├── connect-distributed.properties
├── connect-file-sink.properties
├── connect-file-source.properties
├── connect-log4j.properties
├── connect-mirror-maker.properties
├── connect-standalone.properties
├── consumer.properties
├── kraft
│ ├── broker.properties
│ ├── controller.properties
│ └── server.properties
├── log4j.properties
├── producer.properties
├── server.properties
├── tools-log4j.properties
├── trogdor.conf
└── zookeeper.properties
修改 server.properties
文件
process.roles
:KRaft 模式角色
- broker
- controller
- broker,controller
node.id
:节点 ID,需要为每个节点分配一个唯一的 IDcontroller.quorum.voters
:Controller 的投票者配置log.dirs
:日志目录
初始化集群,先生成一个 UUID
./bin/kafka-storage.sh random-uuid
再执行命令,使用生成的 UUID 完成集群初始化
./bin/kafka-storage.sh format -t thCDFveGRleJro7zTaOOGA -c ./config/kraft/server.properties
然后启动 Kafka
./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties
2. Spring Boot 集成
2.1 引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.4</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
</dependencies>
2.2 配置文件
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
consumer:
group-id: fable-group
2.3 生产者
使用 KafkaTemplate
实现消息的生产
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public ProducerController(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@GetMapping("/send/message")
public String sendMessage() {
kafkaTemplate.send("test", "Hello, Kafka!");
return "Message sent successfully";
}
}
2.4 消费者
添加 @KafkaListener
注解,监听消息
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class ConsumerService {
@KafkaListener(topics = "test", groupId = "fable-group")
public void listen(ConsumerRecord<String, String> consumerRecord) {
System.out.println("Received message: " + consumerRecord.value());
}
}
2.5 启动类
添加 @EnableKafka
注解
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
@SpringBootApplication
@EnableKafka
public class FableApplication {
public static void main(String[] args) {
SpringApplication.run(FableApplication.class, args);
}
}
2.6 测试
访问 /send/message
接口,可以看到控制台打印出接收到的消息
Received message: Hello, Kafka!