目录
1.引入kafka依赖
2.在yml文件配置配置kafka连接
3.注入KafkaTemplate模版
4.创建kafka消息监听和消费端
5.搭建kafka集群
5.1 下载 kafka Apache KafkaApache Kafka: A Distributed Streaming Platform.https://kafka.apache.org/downloads.html
5.2 在config目录下做相关配置
1.引入kafka依赖
<!-- Spring Boot Kafka Starter -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.在yml文件配置配置kafka连接
kafka:
bootstrap-servers: localhost:9092,localhost:9093,localhost:9095,localhost:9096
consumer:
group-id: myGroup
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
3.注入KafkaTemplate模版
@Configuration
public class KafkaConfig {
@Autowired
private ProducerFactory producerFactory;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory);
}
/* @Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 添加其他配置...
return new DefaultKafkaProducerFactory<>(configProps);
}*/
}
4.创建kafka消息监听和消费端
package com.example.consumer.consumer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@KafkaListener(topics = "yourTopic", groupId = "myGroup")
public void listen(String message) {
System.out.println("Received Message in group 'myGroup': " + message);
}
}
5.搭建kafka集群
5.1 下载 kafka Apache KafkaApache Kafka: A Distributed Streaming Platform.https://kafka.apache.org/downloads.html
5.2 在config目录下做相关配置
zookeeper.properties相关配置
server.properties相关配置 ,端口默认是9092,如果需要配置特定端口,可以加port=9092
想搞几个集群就复制几个,并且修改zookeeper.propertie和server.properties的端口。
就像我配置的