一、SpringBoot生产者
(1)修改SpringBoot核心配置文件application.propeties, 添加生产者相关信息
# 连接 Kafka 集群
spring.kafka.bootstrap-servers=192.168.134.47:9093
# SASL_PLAINTEXT 和 SCRAM-SHA-512 认证配置
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=SCRAM-SHA-512
spring.kafka.properties.sasl.jaas.config=“kafka链接的验证账户和密码”
# 生产者配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
(2)创建controller从浏览器接收数据, 并写入指定的topic
@RestController
public class ProducerController {
@Autowired
KafkaTemplate<String, String> kafka;
@RequestMapping("/atguigu")
public String data(String msg){
// 通过kafka发送出去
kafka.send("WJ-TEST", msg);
return "ok";
}
}
(3)在浏览器中给/atguigu接口发送数据
http://localhost:8080/atguigu?msg=hello
二、SpringBoot消费者
(1)修改SpringBoot核心配置文件application.propeties
# 连接 Kafka 集群
spring.kafka.bootstrap-servers=192.168.134.47:9093
# SASL_PLAINTEXT 和 SCRAM-SHA-512 认证配置
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=SCRAM-SHA-512
spring.kafka.properties.sasl.jaas.config=“kafka链接的验证账户和密码”
# 生产者配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 消费者配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费者组 ID
spring.kafka.consumer.group-id=atguigu
# 消费偏移量配置:从最早偏移量开始消费
spring.kafka.consumer.auto-offset-reset=earliest
# 自动提交偏移量
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=1000
(2)创建类消费Kafka中指定topic的数据
@Configuration
public class KafkaConsumer {
@KafkaListener(topics = "WJ-TEST")
public void consumerTopic(String msg){
System.out.println("收到消息:" + msg);
}
}
(3) 向WJ-TEST主题发送数据
bin/kafka-console-producer.sh --bootstrap-server kafka1:9093 --topic WJ-TEST --producer.config ./config/scram_common.properties
(4)消费到数据