Kafka Streams是一个基于Apache Kafka的处理库,可以用于实现高效、可扩展的实时数据处理应用程序。它是一个轻量级的库,允许你在Java和Scala中创建和运行流处理应用程序,这些应用程序可以读取输入流,执行各种数据转换,然后将处理后的结果发送到输出流。
1. 主要优势
1.简单易用:Kafka Streams提供了一个简单易用的编程模型,让你能够轻松地编写和调试流处理应用程序
2.高效性能:Kafka Streams使用Kafka作为底层存储和消息传递系统,利用了Kafka的高性能、低延迟和可伸缩性
3.可扩展性:Kafka Streams可以水平扩展,使你能够轻松地处理大规模数据流
4.容错性:Kafka Streams提供了一些内置的机制来保证应用程序的容错性,如重新平衡、故障恢复
5.集成性:Kafka Streams可以与其他Kafka生态系统中的工具和组件无缝集成,如Kafka Connect、Kafka Producer和Kafka Consumer
Kafka Streams可以用于各种实时数据处理应用程序,如实时ETL、实时分析、实时监控
2. 实时分析交易数据实践
该示例演示如何实现一个实时的分析应用程序,从Kafka主题中读取交易数据,执行一些复杂的数据转换和计算,并将结果写回到另一个Kafka主题中
2.1 创建对应主题
kafka-topics.sh --create --topic transactions --bootstrap-server localhost:9092
kafka-topics.sh --create --topic customer-analysis --bootstrap-server localhost:9092
2.2 POM.xml引入Kafka依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.31</version>
</dependency>
2.3 代码主要逻辑
其他代码请关注公众号 算法小生,回复Kafka Stream即可
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class TransactionAnalyzer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "transaction-analyzer");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:30092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new TransactionSerde().getClass());
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Transaction> transactions = builder.stream("transactions");
// 计算每个客户的交易总额
KTable<String, Double> customerTotalSpending = transactions
.groupBy((key, transaction) -> transaction.getCustomerId())
.aggregate(() -> 0.0, (customerId, transaction, total) -> total + transaction.getAmount(),
Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("customer-total-spending-store")
.withValueSerde(Serdes.Double()));
// 计算每个客户的平均交易额
KTable<String, Double> customerAvgSpending = transactions
.groupBy((key, transaction) -> transaction.getCustomerId())
.aggregate(() -> new TotalAndCount(0.0, 0L),
(customerId, transaction, totalAndCount) -> totalAndCount.add(transaction.getAmount()),
Materialized.<String, TotalAndCount, KeyValueStore<Bytes, byte[]>>as("customer-avg-spending-store")
.withValueSerde(new TotalAndCountSerde()))
.mapValues((totalAndCount) -> totalAndCount.getTotal() / totalAndCount.getCount(),
Materialized.with(Serdes.String(), Serdes.Double()))
// 去重,保证JOIN后结果唯一
.suppress(Suppressed.untilTimeLimit(null, Suppressed.BufferConfig.unbounded()));;
// 找出每个客户的最大交易额
KTable<String, Double> customerMaxSpending = transactions
.groupBy((key, transaction) -> transaction.getCustomerId())
.reduce((transaction1, transaction2) ->
transaction1.getAmount() > transaction2.getAmount() ? transaction1 : transaction2)
.mapValues((transaction) -> transaction.getAmount(), Named.as("customer-max-spending"),
Materialized.with(Serdes.String(), Serdes.Double()))
// 去重,保证JOIN后结果唯一
.suppress(Suppressed.untilTimeLimit(null, Suppressed.BufferConfig.unbounded()));
// 将所有结果合并,并写入到输出主题中
KStream<String, CustomerAnalysis> analysis = customerTotalSpending
.join(customerAvgSpending, (total, avg) -> new CustomerAnalysis(total, avg))
.join(customerMaxSpending, (analysis1, max) -> analysis1.withMax(max))
.mapValues((key, analysis2) -> analysis2.normalize(key))
.toStream()
.map((key, analysis3) -> KeyValue.pair(key, analysis3));
analysis.to("customer-analysis", Produced.with(Serdes.String(), new CustomerAnalysisSerde()));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
2.4 运行并实时查看
启动程序后
我们新开窗口实时查看统计信息
kafka-console-consumer.sh --topic customer-analysis --bootstrap-server localhost:9092
现在我们写入一些交易信息,稍等片刻,查看统计窗口变化情况
$ kafka-console-producer.sh --topic transactions --bootstrap-server localhost:9092
>{"transactionId": "000001", "customerId": "00001", "amount": 10000, "timestamp": 1683983695}
>{"transactionId": "000002", "customerId": "00002", "amount": 20000, "timestamp": 1683983695}
>{"transactionId": "000003", "customerId": "00002", "amount": 30000, "timestamp": 1683983896}
>{"transactionId": "000004", "customerId": "00001", "amount": 20000, "timestamp": 1683983896}
3. 错误总结
1.运行后,出现NullPointer in ProcessorParameters.toString
java.lang.NullPointerException: Cannot invoke "org.apache.kafka.streams.processor.api.ProcessorSupplier.get()" because "this.processorSupplier" is null
at org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters.toString(ProcessorParameters.java:133)
Kafka3.3.1的bug,请升级Kafka版本,我升级到3.4.0,并且环境变量改为如下即可启动,kafka-server在hosts中配置对应IP
spec:
containers:
- name: kafka
image: bitnami/kafka:3.4.0
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9092
name: web
protocol: TCP
env:
- name: KAFKA_CFG_ZOOKEEPER_CONNECT
value: zookeeper-pod.middleware:2181
- name: KAFKA_CFG_ADVERTISED_LISTENERS
value: PLAINTEXT://kafka-server:30092
#- name: KAFKA_HEAP_OPTS
# value: -Xmx2048m -Xms2048m
- name: ALLOW_PLAINTEXT_LISTENER
value: "yes"
2.序列化失败
Caused by: com.alibaba.fastjson.JSONException: illegal fieldName input, offset 7, character , line 1, column 8, fastjson-version 2.0.31 10000.0
at com.alibaba.fastjson.JSON.parseObject(JSON.java:518)
at online.shenjian.kafka.JsonDeserializer.deserialize(JsonDeserializer.java:26)
我们配置的默认序列化为
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new TransactionSerde().getClass().getName());
而在KTable中我们定义对应的序列化即可
// 计算每个客户的交易总额
KTable<String, Double> customerTotalSpending = transactions
.groupBy((key, transaction) -> transaction.getCustomerId())
.aggregate(() -> 0.0, (customerId, transaction, total) -> total + transaction.getAmount(),
Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("customer-total-spending-store")
.withValueSerde(Serdes.Double()));
3.合并结果,出现重复记录问题
我们在需要JOIN的两个KTable最后加入如下代码即可。对于本例中的 customerMaxSpending 流,可能会存在多条记录有相同的 key,即多个顾客的最大消费额相同。因此,在进行 join 操作之前,需要使用 suppress 方法对每个 key 的所有记录进行合并,以保证最终输出的结果中每个 key 只对应一个最大消费额
.suppress(Suppressed.untilTimeLimit(null, Suppressed.BufferConfig.unbounded()));