文章目录
- 1、整合
- 2、消息的生产
- 3、消费
- 4、补充:安装
Kafka主体不是用来做消息中间件的,但也有这个功能,接下来整合Kafka
1、整合
导入依赖坐标:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
添加相关配置:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: order # 不可省略,否则监听时报错No group.id found in consumer config
在需要的地方注入操作对象:
@Autowired
private KafkaTemplate<String ,String> kafkaTemplate;
//key和value的泛型自适应
2、消息的生产
继续在Service层演示消息的发送,send方法,传入topic和message
@Service
@Slf4j
public class MessageServiceKafkaImpl implements MessageService {
@Autowired
private KafkaTemplate<String ,String> kafkaTemplate;
@Override
public void sendMessage(String id) {
log.info("使用Kafka将待发送短信的订单纳入处理队列,id:"+id);
kafkaTemplate.send("kafka_topic",id);
}
}
3、消费
直接使用监听器监听队列,不演示手动自己拿。使用@KafkaListener
注解,其有一属性topic,就是send时的那个topic。需要注意的是,监听时,message依旧在形参,但类型是ConsumerRecord
@Component
@Slf4j
public class KafkaMessageListener{
@KafkaListener(topics = {"kafka_topic"})
public void onMessage(ConsumerRecord<?, ?> record) {
log.info("已完成短信发送业务,id:"+record.value());
}
}
打印下ConsumerRecord对象看看:
注意监听时记得配置spring.kafka.consumer.group-id:
4、补充:安装
生产环境一般都用docker安装在容器里或者Linux上,这里备份下Windows安装,因为本地开发调式还得用。
- 下载
下载地址:https://kafka.apache.org/downloads
windows 系统下3.0.0版本存在BUG,建议使用2.X版本
-
安装:解压缩即安装
-
启动zookeeper(注册中心),注意这里双重目录,用第二个kafka的bin
/windows
下的bat文件
# 在windows/zookeeper-server-start.bat所在目录cmd,再执行以下指令
# 默认端口:2181,zookeeper.properties中自行修改
zookeeper-server-start.bat ..\..\config\zookeeper.properties
# 报错输入行太长时,移动文件夹位置
# 或者将文件夹的版本号重命名去掉
- 启动Kafka
# 在windows/kafka-server-start.bat所在目录cmd,再执行以下指令
# 用第二个kafka的bin/windows下的bat文件
# 默认端口:9092,server.properties中自行修改
kafka-server-start.bat ..\..\config\server.properties
到此,安装启动成功。可测试下功能,进入对应的bat文件目录执行指令:
- 创建topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test123 # 分区等参数在kafka学
- 查看topic
kafka-topics.bat --zookeeper 127.0.0.1:2181 --list
- 删除topic
kafka-topics.bat --delete --zookeeper localhost:2181 --topic test
- 生产者功能测试
kafka-console-producer.bat --broker-list localhost:9092 --topic test123
- 消费者功能测试
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test123 --from-beginning