龙蜥 8.6 安装 Kafka 3.3.1 并通过 SpringBoot 3.0.1 版本调试
- 一.Kafka 安装
- 1.下载编译后的文件
- 2.拷贝到 Anolis 并解压
- 3.启动服务
- 3.常用命令
- 1.Topic 增查删
- 2.生产消费测试
- 二.SpringBoot 连接 Kafka
- 1.项目结构、依赖、配置文件和启动类
- 2.生产者和生产监听
- 3.消费者和消费监听
- 4.自定义消费注解
- 三.测试和验证
- 1.发送正常数据
- 2.发送错误数据
一.Kafka 安装
1.下载编译后的文件
下载地址
下载编译后的包
2.拷贝到 Anolis 并解压
Kafka 依赖的 Jdk 环境自行配置
采用 Raft 配置启动 ,修改配置,自定义路径:vim config/kraft/server.properties
3.启动服务
生成UUID:./bin/kafka-storage.sh random-uuid
格式化数据目录:./bin/kafka-storage.sh format -t 3Nke4nZhRueBQrNeJVMrgw -c ./config/kraft/server.properties
启动服务:./bin/kafka-server-start.sh ./config/kraft/server.properties(末尾加上 & 即以进程形式启动)
进程形式启动(测试用进程日志输出到空):./bin/kafka-server-start.sh ./config/kraft/server.properties >/dev/null &
脚本语句
#!/bin/bash
KAFKA_HOME=/home/kafka_2.13-3.3.1
## 1.生成UUID
UUID=`${KAFKA_HOME}/bin/kafka-storage.sh random-uuid`
## 2.格式化
${KAFKA_HOME}/bin/kafka-storage.sh format -t ${UUID} -c ${KAFKA_HOME}/config/kraft/server.properties
Kafka 启动/停止脚本,创建脚本文件,贴入以下代码:vim /usr/local/bin/kafkaserver.sh
添加可执行权限:chmod a+x /usr/local/bin/kafkaserver.sh
#!/bin/bash
KAFKA_HOME=/home/kafka_2.13-3.3.1
case $1 in
"start")
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/kraft/server.properties >/dev/null &
;;
"stop")
${KAFKA_HOME}/bin/kafka-server-stop.sh >/dev/null
;;
*)
echo "Please input start|stop"
;;
esac
sleep 1
pid=`jps | grep Kafka | grep -v grep`
if [ -n "$pid" ]
then
echo -e "\033[32m Kafka is running \033[0m"
else
echo -e "\033[31m Kafka is stopped \033[0m"
fi
3.常用命令
1.Topic 增查删
## 1.创建Topic
./bin/kafka-topics.sh --create --topic kafka-test-topic --bootstrap-server 127.0.0.1:9092
## 2.查看
./bin/kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092
## 3.删除
./bin/kafka-topics.sh --delete --topic kafka-test-topic-ver --bootstrap-server 127.0.0.1:9092
2.生产消费测试
## 1.生产压测
./bin/kafka-producer-perf-test.sh --topic kafka-test-topic --throughput 1 --num-records 1 --record-size 1024 --producer-props bootstrap.servers=127.0.0.1:9092
## 2.查看消息
./bin/kafka-topics.sh --describe --topic kafka-test-topic --bootstrap-server 127.0.0.1:9092
## 3.消费消息
./bin/kafka-console-consumer.sh --topic kafka-test-topic --from-beginning --bootstrap-server 127.0.0.1:9092
二.SpringBoot 连接 Kafka
1.项目结构、依赖、配置文件和启动类
项目结构
maven 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.demo</groupId>
<artifactId>Kafka3</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>19</maven.compiler.source>
<maven.compiler.target>19</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<version.spring>3.0.1</version.spring>
<version.fastjson>2.0.22</version.fastjson>
<version.lombok>1.18.24</version.lombok>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${version.spring}</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
<version>${version.spring}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
<version>${version.spring}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${version.spring}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${version.fastjson}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${version.lombok}</version>
</dependency>
</dependencies>
</project>
配置文件
server:
port: 8080
spring:
kafka:
consumer:
test-topic: kafka-test-topic
启动类
package com.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
/**
* @author Administrator
* @Description
* @create 2023-01-11 22:02
*/
@SpringBootApplication(exclude = {KafkaAutoConfiguration.class})
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class,args);
}
}
2.生产者和生产监听
当前配置是写在代码里的,大家可以改到 YML 配置
package com.demo.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @author Administrator
* @Description
* @create 2023-01-11 22:07
*/
@Component
@EnableKafka
public class ProducerInit {
@Autowired
private MyProducerListener myProducerListener;
@Primary
@Bean("kafkaTemplate")
public KafkaTemplate<String,String> kafkaTemplate(){
KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setProducerListener(myProducerListener);
return kafkaTemplate;
}
/**
* 获取工厂
* @return
*/
public ProducerFactory<String,String> producerFactory(){
return new DefaultKafkaProducerFactory<>(producerConfig());
}
/**
* 获取配置信息
* @return
*/
private Map<String,Object> producerConfig(){
Map<String,Object> map = new HashMap<>(13);
map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.4:9092");
map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
map.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,1000 * 10);
map.put(ProducerConfig.BATCH_SIZE_CONFIG,2048 * 20);
map.put(ProducerConfig.LINGER_MS_CONFIG,1000 * 30);
map.put(ProducerConfig.BUFFER_MEMORY_CONFIG,1024*1000*1000);
map.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,10240*1000*1000);
map.put(ProducerConfig.METADATA_MAX_AGE_CONFIG,300000);
map.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"gzip");
map.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,2);
map.put(ProducerConfig.ACKS_CONFIG,"all");
map.put(ProducerConfig.RETRIES_CONFIG,"3");
map.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
map.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000 * 3);
return map;
}
}
监听器
package com.demo.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;
/**
* @author Administrator
* @Description
* @create 2023-01-11 22:24
*/
@Slf4j
@Component
public class MyProducerListener implements ProducerListener {
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
log.info("T:{} P:{} K:{} O:{}",producerRecord.topic(),recordMetadata.partition(),producerRecord.key(),recordMetadata.offset());
}
@Override
public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
log.error("Error T:{} P:{} K:{} O:{}",producerRecord.topic(),producerRecord.partition(),producerRecord.key(),recordMetadata.offset());
log.error("Error {}",exception.getMessage());
}
}
发送消息测试类
package com.demo.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
/**
* @author Administrator
* @Description
* @create 2023-01-11 22:02
*/
@Slf4j
@RestController
@RequestMapping("/send")
public class ProducerController {
@Autowired
KafkaTemplate kafkaTemplate;
private String topic = "kafka-test-topic";
/**
* 发送消息
* @param content
*/
@GetMapping("/test/{content}")
public void sendData(@PathVariable("content") String content){
String key = String.valueOf(UUID.randomUUID());
log.info("Wait For Send Info:{}",content);
kafkaTemplate.send(topic,key,content);
}
}
3.消费者和消费监听
消费者配置类
package com.demo.config.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @author Administrator
* @Description
* @create 2023-01-15 20:31
*/
@Component
public class ConsumerConfigInit {
@Bean("myContainerFactory")
@Primary
public ConcurrentKafkaListenerContainerFactory<String,String> containerFactory(){
return containerFactory(consumerFactory(),0);
}
/**
* 消费者工厂
* @return
*/
private ConsumerFactory<String,String> consumerFactory(){
Map<String,Object> map = new HashMap<>(7);
map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.4:9092");
map.put(ConsumerConfig.GROUP_ID_CONFIG,"my-test");
map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
map.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
map.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,100);
return new DefaultKafkaConsumerFactory<>(map);
}
/**
* 监听器工厂
* @param consumerFactory
* @param concurrency
* @return
*/
private ConcurrentKafkaListenerContainerFactory<String,String> containerFactory(ConsumerFactory<String,String> consumerFactory,int concurrency){
ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
if (concurrency > 0){
factory.setConcurrency(concurrency);
}
return factory;
}
}
消费者错误监听器
package com.demo.config.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.protocol.types.Field;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;
/**
* @author Administrator
* @Description
* @create 2023-01-15 20:31
*/
@Slf4j
@Component
public class ConsumerErrorListener {
@Bean("myErrorHandler")
public ConsumerAwareListenerErrorHandler errorHandler(){
return ((message, e, consumer) -> {
MessageHeaders headers = message.getHeaders();
log.info("Error header:{} ",headers);
return null;
});
}
}
注解式配置消费者
package com.demo.config.consumer;
import com.demo.config.aspect.ConsumerAnno;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
/**
* @author Administrator
* @Description 这里消费方法不做具体实现,通过切面做统一处理
* @create 2023-01-15 20:59
*/
@Component
public class MyConsumerInstance {
/**
* 丛指定位置消费
* 或者消费多个Topic 在配置文件内用逗号隔开即可
* @param record
*/
@KafkaListener(
clientIdPrefix = "test-batch",
//topics = "#{'${spring.kafka.consumer.test-topic}'.split(',')}",
topicPartitions = {
@TopicPartition(
topic = "#{'${spring.kafka.consumer.test-topic}'}",
partitionOffsets = {
@PartitionOffset(partition = "0",initialOffset = "1")
}
)
},
containerFactory = "myContainerFactory",
errorHandler = "myErrorHandler"
)
@ConsumerAnno(handler = "myHandler")
public void testConsumer(ConsumerRecord<String,String> record){}
}
4.自定义消费注解
通过注解指定消息处理器,注解类
package com.demo.config.aspect;
import java.lang.annotation.*;
/**
* @author Administrator
* @Description
* @create 2023-01-15 21:20
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface ConsumerAnno {
/**
* 此处可以指定一个默认的处理器
* @return
*/
String handler() default "";
}
切面类:统一的源数据校验、备份、异常处理等
package com.demo.config.aspect;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
/**
* @author Administrator
* @Description
* @create 2023-01-15 21:23
*/
@Slf4j
@Aspect
@Component
public class ConsumerAspect {
/**
* 注解型切点
*/
@Pointcut("@annotation(com.demo.config.aspect.ConsumerAnno)")
private void annotationPointcut(){}
@Around("annotationPointcut()")
public Object doAround(ProceedingJoinPoint joinPoint) throws Throwable{
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Object[] args = joinPoint.getArgs();
if (null != args){
ConsumerRecord<String,String> record = (ConsumerRecord<String, String>) args[0];
//TODO 打印原始消息,记录分区偏移等 或者其他统一处理
log.info("Topic:{} Key:{} P:{} O:{}",record.topic(),record.key(),record.partition(),record.offset());
//抛出异常
if (record.value().contains("error")){
throw new Throwable();
}
}
Method method = signature.getMethod();
ConsumerAnno consumerAnno = method.getAnnotation(ConsumerAnno.class);
String handler = consumerAnno.handler();
//TODO 可以根据默认处理或指定处理器进一步处理消息:通过实例名和Spring容器上下文获取实例
log.info("Handler:{}",handler);
return joinPoint.proceed();
}
}
三.测试和验证
1.发送正常数据
http://127.0.0.1:8080/send/test/Kafak发数
结果打印
2.发送错误数据
http://127.0.0.1:8080/send/test/Kafak发error数