文章目录
- 0.前言
- 漏洞
- spring-kafka 介绍
- 1.参考文档
- 2.基础介绍
- 3.解决方案
- 3.1. 升级版本
- 3.2. 替代方案
- 4.Spring kafka 使用教程代码示例
0.前言
背景:公司项目扫描到 Spring-Kafka上使用通配符模式匹配进行的安全绕过漏洞 CVE-2023-20873
漏洞
中等风险 | 2023年8月23日 | CVE-2023-34040
在Spring for Apache Kafka 3.0.9及更早版本以及2.9.10及更早版本中,存在可能的反序列化攻击向量,但只有在应用了不常见的配置时才会出现。攻击者必须在反序列化异常记录头中构造一个恶意序列化对象。
spring-kafka 介绍
Spring Kafka 是 Spring 框架提供的一个库,它提供了使用 Apache Kafka 的便捷方式。Apache Kafka 是一个开源的流处理平台,主要用于构建实时数据流管道和应用。Spring Kafka 通过提供一种抽象和封装的方法,使开发者能够更容易地在 Spring 框架中使用 Apache Kafka。它大大简化了在 Spring 框架中使用 Apache Kafka 的复杂性,使得开发者可以更专注于业务逻辑的开发。
简化的 Kafka 生产者和消费者配置:Spring Kafka 提供了一套简单的方法来配置 Kafka 生产者和消费者。它提供了与 Spring Boot 的集成,使得配置更加简单。
异常处理:Spring Kafka 提供了一套机制来处理生产者和消费者在使用过程中遇到的异常,包括发送消息失败、消息格式错误等异常。
消息转换:Spring Kafka 提供了一套机制来转换 Kafka 消息,使得可以使用喜欢的数据格式(如 JSON、Avro 等)来发送和接收 Kafka 消息。
事务支持:Spring Kafka 提供了对 Kafka 事务的支持,使得可以在一个事务中发送多个 Kafka 消息。
带有回调的消息发送:Spring Kafka 提供了一种方法,可以在消息发送后获取到发送结果的回调,以便于进行进一步的处理。
1.参考文档
CVE 官方网站 https://www.cve.org/CVERecord?id=CVE-2023-34040
spring官方网站 https://spring.io/security/cve-2023-34040
2.基础介绍
CVE-2023-34040:当配置不当时,Spring-Kafka中的Java反序列化漏洞
中等风险 | 2023年8月23日 | CVE-2023-34040
描述
在Spring for Apache Kafka 3.0.9及更早版本以及2.9.10及更早版本中,存在可能的反序列化攻击向量,但只有在应用了不常见的配置时才会出现。攻击者必须在反序列化异常记录头中构造一个恶意序列化对象。
具体来说,当满足以下所有条件时,应用程序才会变得脆弱:
用户未为记录的键和/或值配置ErrorHandlingDeserializer
用户明确地将容器属性checkDeserExWhenKeyNull和/或checkDeserExWhenValueNull设置为true。
用户允许不受信任的源发布到Kafka主题
默认情况下,这些属性为false,且只有在配置了ErrorHandlingDeserializer时,容器才会尝试反序列化这些头。ErrorHandlingDeserializer通过在处理记录之前移除所有此类恶意头,阻止了此漏洞的发生。
受影响的Spring产品和版本
Spring for Apache Kafka
2.8.1至2.9.10
3.0.0至3.0.9
3.解决方案
3.1. 升级版本
- 2.8.x和2.9.x的用户应升级到2.9.11
- 3.0.x的用户应升级到3.0.10
- 已修复此问题的版本包括:
Spring for Apache Kafka
3.0.10
2.9.11 Spring boot 3.0.10(或更高)依赖管理将自动使用Spring for Apache Kafka 3.0.10(或更高)。Spring Boot 2.7.x用户应将Boot的Spring for Apache Kafka 2.8.x依赖管理版本覆盖为2.9.11(或更高)。
3.2. 替代方案
在不使用ErrorHandlingDeserializers时,不要设置容器属性checkDeserExWhenKeyNull或checkDeserExWhenValueNull,或者使用ErrorHandlingDeserializers
4.Spring kafka 使用教程代码示例
pom.xml
文件中添加 Spring Kafka 的依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
在application.properties文件中设置Kafka的配置:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.template.default-topic=myTopic
创建一个消息生产者:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
kafkaTemplate.sendDefault(message);
}
}
创建一个消息消费者:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "myTopic", groupId = "myGroup")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
可以通过创建一个简单的REST API来测试消息的发送和接收:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
@Autowired
private KafkaProducer producer;
@PostMapping(value = "/publish")
public void sendMessageToKafka(@RequestBody String message) {
this.producer.sendMessage(message);
}
}
这个API接收一个POST请求,并将请求体中的消息发送到Kafka。