项目demo地址 : https://mp.weixin.qq.com/s?__biz=MzkzODQyNzE3
1. 项目结构
─src
├─main
│ ├─java
│ │ └─org
│ │ └─example
│ │ │ KafkaApplication.java
│ │ │
│ │ └─demo
│ │ KafkaConsumerListener.java (监听消息类)
│ │
│ └─resources
│ application.yml
│
└─test
└─java
└─org
└─example
└─demo
KafkaProducerTest.java (发送消息测试类)
2. kafka依赖
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${kafka.version}</version>
</dependency>
3. 消息的发送
3.1 同步发送
同步发送是指发送消息后等待Kafka的响应,确认消息已成功发送。这个方式的优点在于可靠性高,但缺点是会阻塞当前线程,影响系统的响应速度。
/**
* 同步发送
*/
@Test
public void synchronizeSend() {
Map<String, Object> map = new HashMap<>();
map.put("say", "你好, kafka........");
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, JSONObject.toJSONString(map));
try {
SendResult<String, String> result = future.get();
log.info("获取同步消息结果:{}", result.getRecordMetadata().topic());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
3.2 异步发送
异步发送是指发送消息后立即返回,不等待Kafka的响应,而是通过回调函数处理发送结果。这种方式不会阻塞线程,更适合高并发的场景。
@Test
public void asynchronousSend() {
Map<String, Object> map = new HashMap<>();
map.put("say", "你好, kafka........");
kafkaTemplate.send(TOPIC, JSONObject.toJSONString(map)).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable ex) {
log.error("发送失败:{}", ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("异步消息结果:{}", result.getRecordMetadata().topic());
}
});
}
3.3 kafka事务
Kafka事务确保一组消息要么全部成功,要么全部失败,用于实现消息的原子性。适用于需要保证一致性的场景,例如订单处理。
@Test
public void transaction() {
kafkaTemplate.executeInTransaction(t -> {
t.send(TOPIC, "kafka事务消息...");
if (true) {
throw new RuntimeException("发生异常kafka回滚事务");
}
t.send(TOPIC, "你好, kafka........");
return true;
});
}
注意 :
如果想使用kafka事务需要在配置文件中开启事务
# 开启事务
transaction-id-prefix: tx_
开启事务后, 同步/异步发送消息都需要加@Transactional注解
4. 消息的接收
Kafka消息的接收是通过监听器实现的。监听器会自动接收指定主题的消息,并处理接收到的消息。以下是一个简单的示例:
/**
* 监听kafka数据
*/
@KafkaListener(topics = {"test-topic"})
public void consumer(ConsumerRecord<?, ?> consumerRecord) {
log.info("监听kafka消息>>>>>>>>>>>>>>>>>主题topic={}, 分区offset={}, 信息message={}", consumerRecord.topic(), consumerRecord.offset(), consumerRecord.value());
// 收到监听数据后面可以进行入库等业务操作
}
原文地址(阅读体验更佳) : 2. springboot集成kafka入门使用教程 (yuque.com)