SpringBoot 集成 Kafka
- 1 安装 Kafka
- 2 创建 Topic
- 3 Java 创建 Topic
- 4 SpringBoot 项目
- 4.1 pom.xml
- 4.2 application.yml
- 4.3 KafkaApplication.java
- 4.4 CustomizePartitioner.java
- 4.5 KafkaInitialConfig.java
- 4.6 SendMessageController.java
- 5 测试
1 安装 Kafka
Docker 安装 Kafka
2 创建 Topic
创建两个topic:topic1、topic2,其分区和副本数都设置为1 (可以在Java代码中创建)
PS C:\Users\Administrator> docker exec -it kafka /bin/sh
$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic1
Created topic topic1.
$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic2
Created topic topic2.
3 Java 创建 Topic
package com.xu.mq.demo.test.service;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.apache.kafka.clients.admin.NewTopic;
/**
* kafka 初始化配置类 创建 Topic
*
* @author Administrator
* @date 2023年2月17日11点30分
*/
@Configuration
public class KafkaInitialConfig {
public static final String AUDIO_UPLOAD_TOPIC = "AudioUploadTopic";
public static final String TEXT_UPLOAD_TOPIC = "TextUploadTopic";
@Bean
public NewTopic audioUploadTopic() {
return new NewTopic(AUDIO_UPLOAD_TOPIC, 1, (short) 1);
}
@Bean
public NewTopic textUploadTopic() {
return new NewTopic(TEXT_UPLOAD_TOPIC, 1, (short) 1);
}
}
4 SpringBoot 项目
4.1 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.8</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.xu</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.12</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
4.2 application.yml
server:
port: 8001
spring:
application:
name: hello-kafka
kafka:
# 以逗号分隔的地址列表,用于建立与Kafka集群的初始连接(kafka 默认的端口号为9092)
bootstrap-servers: 192.168.1.92:9092
producer:
# 发生错误后,消息重发的次数。
retries: 0
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: all
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: true
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在侦听器容器中运行的线程数。
concurrency: 4
4.3 KafkaApplication.java
package com.xu.mq.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
/**
* @author Administrator
*/
@EnableKafka
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
4.4 CustomizePartitioner.java
package com.xu.kafka.config;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
/**
* @author Administrator
*/
public class CustomizePartitioner implements Partitioner {
/**
* 自定义分区规则
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes The serialized key to partition on( or null if no key)
* @param value The value to partition on or null
* @param valueBytes The serialized value to partition on or null
* @param cluster The current cluster metadata
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
4.5 KafkaInitialConfig.java
package com.xu.kafka.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.apache.kafka.clients.admin.NewTopic;
/**
* kafka 初始化配置类 创建 Topic
*
* @author Administrator
* @date 2023年2月17日11点30分
*/
@Configuration
public class KafkaInitialConfig {
public static final String AUDIO_UPLOAD_TOPIC = "AudioUploadTopic";
public static final String TEXT_UPLOAD_TOPIC = "TextUploadTopic";
@Bean
public NewTopic audioUploadTopic() {
return new NewTopic(AUDIO_UPLOAD_TOPIC, 1, (short) 1);
}
@Bean
public NewTopic textUploadTopic() {
return new NewTopic(TEXT_UPLOAD_TOPIC, 1, (short) 1);
}
}
4.6 SendMessageController.java
package com.xu.kafka.message.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.xu.kafka.config.KafkaInitialConfig;
import cn.hutool.json.JSONUtil;
/**
* @author Administrator
*/
@RequestMapping(value = "/kafka")
@RestController
public class SendMessageController {
@Autowired
private KafkaTemplate template;
/**
* KafkaTemplate 发送消息
*
* @param message
*/
@GetMapping("/test1/{message}")
public void test1(@PathVariable("message") String message) {
template.send(KafkaInitialConfig.AUDIO_UPLOAD_TOPIC, message);
}
/**
* KafkaTemplate 发送消息 有回调
*
* @param message
*/
@GetMapping("/test2/{message}")
public void test2(@PathVariable("message") String message) {
template.send(KafkaInitialConfig.AUDIO_UPLOAD_TOPIC, message).addCallback(success -> {
System.out.println("发送成功\t" + success);
}, fail -> {
System.out.println("发送失败\t" + fail);
});
}
}
5 测试
发送成功 SendResult [producerRecord=ProducerRecord(topic=AudioUploadTopic, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=有回调的消息推送111, timestamp=null), recordMetadata=AudioUploadTopic-0@4]