一、前言
Apache Kafka作为常用的开源分布式流媒体平台,可以实时发布、订阅、存储和处理数据流,多用于作为消息队列获取实时数据,构建对数据流的变化进行实时反应的应用程序,已被数千家公司用于高性能数据管道、流分析、数据集成和任务关键型应用程序。而其中Apache Kafka Connect 作为 Kafka 中用于和其他数据系统流式传输数据的服务,其独立运行版本可以在 Kafka 发布包中通过 bin/connect-standalone.sh
启动,默认会在 8083 端口开启 HTTP REST API 服务,攻击者可以利用基于SASLJAAS 配置和SASL 协议的任意Kafka客户端,对可对连接器(Connector)的配置进行操作,将连接器中的 Kafka 客户端 sasl.jaas.config 属性值设置为 com.sun.security.auth.module.JndiLoginModule(通过 producer.override.sasl.jaas.config, consumer.override.sasl.jaas.config
或 admin.override.sasl.jaas.config
属性进行配置)时,如果此时连接器连接到攻击者可控的 LDAP 服务器时容易受到反序列化攻击,也称JNDI 注入来实现远程任意代码执行。云平台中,Kafka Connect 服务通常用于提供 Kafka 数据迁移、数据同步的管道能力,其默认 HTTP API 开放于 8083 端口。
因现场kafka选用版本较低,安全扫描时触发安全风险告警,低于 Kafka 升级3.4.0版本,涉及【Apache Kafka JNDI注入漏洞(CVE-2023-25194)】漏洞,该漏洞可允许远程代码执行,当攻击者可控制kafka-clients连接时的属性,可通过设置 ’ sasl.jaas.config ’ 属性为 ’ com.sun.security.auth.module.JndiLoginModule ’ 进行JNDI注入或反序列化利用,当JDK版本过低或者存在Gadgets时可导致远程代码执行。现场版本kafka_2.13-2.8.0,sasl.jaas.config 配置采用:
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule;
☬ 漏洞复现: 执行创建文件,相关安全软件会报:JNDI注入的告警;影响版本: 2.3.0 至 3.3.2 版本Kafka Connect,原则上不影响 Kafka server (broker),但是会级联影响,最好还是升级到3.4.0及以上版本,升级JDK版本,可采用OpenJDK替换,相关经验已验证:OpenJDK1.8.0_362 + Zookeeper3.6.3 + Kafka3.4.0。
关联资源:官网升级指导、kafka部署快走
二、软件升级
本次软件要升级到3.4.0版本,版本说明见Kafka - Version 3.4.0,升级步骤也可参考官网升级指导。注意这里咱们是从2.1.x升级到3.x,不同于3.x升级,需注意存储 consumer offsets的schema和inter.broker.protocol.version里的版本,一旦升级后不支持降级。Apache Kafka 3.4.0以来,新增了一个系统属性:org.apache.kafka.disallowed.login.modules,用来在SASL JAAS配置中禁用有问题的登录模块,另外默认com.sun.security.auth.module.JndiLoginModule 在该版本中被禁用;另外需注意的是, Kafka 3.0中 Java 8 已注明废弃, 在Apache Kafka 4.0将不再支持;如果启用TLS,Java 11及更高版本的性能会明显更好,因此强烈建议使用它们。对应的zk版本稳定版为 3.5,注意zk需要足够的堆空间(3-5G,看数据量大小);另zk集群不宜过大,尤其是在写操作频繁的使用模式中,意味着会造成大量的集群内通信(写操作和随后的集群成员更新的配额),尽量让ZooKeeper系统尽可能小,并尽可能保持其独立性,以处理负载。
1)滚动升级步骤
1、升级前注意:在待升级节点server.properties文件中添加:inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 3.3, 3.2, etc.),如果是从0.11.0.x或更高版本升级的,并且没有重写message.format.version,那么只需要配置覆写: inter-broker protocol version参数的kafka版本即可,否者还需要设置:log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION,现场我们只需要添加::nter.broker.protocol.version=3.4;
2、对broker滚动升级,一次升级一个节点或实例:关闭待升级的broker,解压审计的新版本,然后迁移更新配置,重启代理,验证数据同步;这时,最新版本的broker程序会运行,之后可以验证kafka集群的业务行为和性能是否符合预期。如果出现任何问题,目前还可以进行降级。
3、一旦验证了集群的业务行为和性能满足预期,就可以通过更改协议版本来应用:inter.broker.protocol.version=3.4
4、然后逐个重启kafka brokers,以让inter.broker.protocol.version=3.4生效;这是,就不再支持降级了;
5、最后完成kafka整个集群状态及数据分布验证。
2.1、软件下载
#MD5: CF 6B 8B 1C A1 12 9E 69 41 39 92 99 B6 CC 47 8C
wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.12-3.4.0.tgz
2.2、单节点/实例kafka升级
注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper 集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程。
三、附录
3.1、Kafka消息发送流程
kafka在消息发送的过程中,涉及到两个线程:main 线程和 Sender 线程。其中,main 线程中会创建了一个队列 RecordAccumulator,main 线程将消息发送给 RecordAccumulator;Sender 线程则不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。示意如下: