安装kafka,直接到官网下载bin文件,本文使用windows进行使用kafka。
下载之后,第一步,启动zookeeper:
zookeeper-server-start.bat ..\..\config\zookeeper.properties
第二步,启动kafka:
kafka-server-start.bat ..\..\config\server.properties
第三步,在pom中导入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
第四步,修改yml文件,添加配置:
spring: kafka: bootstrap-servers: localhost:9092 producer: retries: 10 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: ${spring.application.name}-test key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
第五步, 即可编写测试类(测试消息队列):
生产者:
package com.farm.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@GetMapping("/hello")
public String hello(){
kafkaTemplate.send("topic1","你好旧时光");
return "ok";
}
}
消费者:
package com.farm.controller;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@Component
public class HelloListener {
@KafkaListener(topics = "topic1")
public void onMessage(String message){
if(!StringUtils.isEmpty(message)){
System.out.println(message);
}
}
}
效果如下:
只要访问/hello就可以触发生产者向topic1主题发送“你好旧时光”的字样,通过注解可以让消费者消费这条消息。
补充:发送对象,采用fastjson进行封装:
@GetMapping("/hello")
public String hello(){
User user = new User();
user.setUsername("xiaowang");
user.setAge(18);
kafkaTemplate.send("topic1", JSON.toJSONString(user));
return "ok";
}
消费者:
package com.heima.kafka.listener;
import com.alibaba.fastjson.JSON;
import com.heima.kafka.pojo.User;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@Component
public class HelloListener {
@KafkaListener(topics = "topic1")
public void onMessage(String message){
if(!StringUtils.isEmpty(message)){
User user = JSON.parseObject(message, User.class);
System.out.println(user);
}
}
}