Kafka Stream是什么
Kafka Streams是一套客户端类库,它可以对存储在Kafka内的数据进行流式处理和分析。
1. 什么是流处理
流处理平台(Streaming Systems)是处理无限数据集(Unbounded Dataset)的数据处理引擎,而流处理是与批处理(Batch Processing)相对应的。所谓的无线数据,指的是数据永远没有尽头。而流处理平台就是专门处理这种数据集的系统或框架。下图生动形象地展示了流处理和批处理的区别:
一个最简单的Streaming的结构如下图所示:
从一个Topic中读取到数据,经过一些处理操作之后,写入到另一个Topic中,嗯,这就是一个最简单的Streaming流式计算。其中,Source Topic中的数据会源源不断的产生新数据。
那么,我们再在上面的结构之上扩展一下,假设定义了多个Source Topic及Destination Topic,那就构成如下图所示的较为复杂的拓扑结构:
2. Kafka Stream的特点
近些年来,开源流处理领域涌现出了很多优秀框架。光是在 Apache 基金会孵化的项目,关于流处理的大数据框架就有十几个之多,比如早期的 Apache Samza、Apache Storm,以及这些年火爆的 Spark 以及 Flink 等。
Kafka Streams的特点
相比于其他流处理平台,Kafka Streams 最大的特色就是它不是一个平台,至少它不是一个具备完整功能(Full-Fledged)的平台,比如其他框架中自带的调度器和资源管理器,就是 Kafka Streams 不提供的。Kafka 官网明确定义 Kafka Streams 是一个客户端库(Client Library)。我们可以使用这个库来构建高伸缩性、高弹性、高容错性的分布式应用以及微服务。使用Kafka Streams API构建的应用程序就是一个普通的应用程序,我们可以选择任何熟悉的技术或框架对其进行编译、打包、部署和上线。很不幸,目前Kafka Streams还没有在除了Java之外的其他主流开发语言的SDK上提供。Kafka Streams最大的特点就是,对于上下游数据源的限定。目前Kafka Streams只支持与Kafka集群进行交互,它并没有提供开箱即用的外部数据源连接器。
Kafka Streams被认为是开发实时应用程序的最简单方法。它是一个Kafka的客户端API库,编写简单的java和scala代码就可以实现流式处理。
优势:
- 弹性,高度可扩展,容错
- 部署到容器,VM,裸机,云
- 同样适用于小型,中型和大型用例
- 与Kafka安全性完全集成
- 编写标准Java和Scala应用程序
- 在Mac,Linux,Windows上开发
- Exactly-once 语义
1. 测试kafkaStream
先看下简单的kafkaStream测试
0. 配置文件
server:
port: 9991
spring:
application:
name: kafka-demo
kafka:
bootstrap-servers: 192.168.200.130:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
compression-type: lz4
consumer:
group-id: ${spring.application.name}-test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
1. 编写生产者
ProducerQuickStart.java
package com.kafka.sample;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
@Slf4j
public class ProducerQuickStart {
public static void main(String[] args) {
//1. kafka的配置信息
Properties prop = new Properties();
//kafka的链接信息
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
//配置重试次数
prop.put(ProducerConfig.RETRIES_CONFIG, 5);
//数据压缩
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
//ack配置 消息确认机制 默认ack=1,即只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应
// prop.put(ProducerConfig.ACKS_CONFIG,"all");
消息key的序列化器
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//消息value的序列化器
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//2. 生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
//封装发送的消息
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("itcast-topic-input", "key_001", "hello kafka");
//3. 发送消息
for (int i = 0; i < 5; i++) {
producer.send(producerRecord);
}
//4. 关闭消息通道 必须关闭,否则消息发不出去
producer.close();
}
}
2 编写kafkaStream流式处理
KafkaStreamQuickStart.java
package com.kafka.sample;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* 流式处理
*/
public class KafkaStreamQuickStart {
public static void main(String[] args) {
//kafka的配置信心
Properties prop = new Properties();
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");
//stream 构建器
StreamsBuilder streamsBuilder = new StreamsBuilder();
//流式计算
streamProcessor(streamsBuilder);
//创建kafkaStream对象
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
//开启流式计算
kafkaStreams.start();
}
/**
* 流式计算
* 消息的内容:hello kafka hello itcast
* @param streamsBuilder
*/
private static void streamProcessor(StreamsBuilder streamsBuilder) {
//创建kstream对象,同时指定从那个topic中接收消息
KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
/**
* 处理消息的value
*/
stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.split(" "));
}
})
//按照value进行聚合处理
.groupBy((key,value)->value)
//时间窗口
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
//统计单词的个数
.count()
//转换为kStream
.toStream()
.map((key,value)->{
System.out.println("key:"+key+",vlaue:"+value);
return new KeyValue<>(key.key().toString(),value.toString());
})
//发送消息
.to("itcast-topic-out");
}
}
- 编写消费者
ConsumerQuickStart.java
package com.kafka.sample;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerQuickStart {
public static void main(String[] args) {
//1. 添加kafka的配置信息
Properties properties = new Properties();
// 配置链接信息
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
//配置消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-2");
//配置消息的反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//2. 消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//3. 订阅主题
consumer.subscribe(Collections.singletonList("itcast-topic-out"));
//当前线程一直监听消息
while(true){
//4. 消费者拉取消息: 每秒拉取一次
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.key());
System.out.println(record.value());
}
}
}
}
-
在远端(
192.168.200.130:9092
)启动docker中的kafka容器
-
启动消费者
ConsumerQuickStart
的main
函数 -
启动
kafkastream
的mian
函数 -
启动生产者
ProducerQuickStart
的main
函数 -
控制台打印结果:
整个过程:
生产者向kafka
中发送了5条“hello kafka”
消息,topic均为itcast-topic-input
。kafkastream监听这个topic,每10秒进行一次流式处理,将“hello kakfa”
字符串分割,并统计每个单词出现的次数。然后转为kstream
,发送消息到kafka中的topic=itcast-topic-out”
。消费者监听“itcast-topic-out”
的topic,消费消息。
2. Springboot整合kafkaStream
1. 配置文件新增
application.yml
server:
port: 9991
spring:
application:
name: kafka-demo
kafka:
bootstrap-servers: 192.168.200.130:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
compression-type: lz4
consumer:
group-id: ${spring.application.name}-test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# kafkaStream新增以下配置
kafka:
hosts: 192.168.200.130:9092
group: ${spring.application.name}
2. 在微服务中新增配置类
KafkaStreamConfig.java
package com.kafka.config;
import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import java.util.HashMap;
import java.util.Map;
/**
* 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
*/
@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
private String hosts;
private String group;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
props.put(StreamsConfig.RETRIES_CONFIG, 10);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return new KafkaStreamsConfiguration(props);
}
}
3. 使用kafkaStream监听消息
KafkaStreamHelloListener.java
package com.kafka.stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
import java.util.Arrays;
@Configuration
@Slf4j
public class KafkaStreamHelloListener {
@Bean
public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
//创建kstream对象,同时指定从那个topic中接收消息
KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.split(" "));
}
})
//根据value进行聚合分组
.groupBy((key,value)->value)
//聚合计算时间间隔
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
//求单词的个数
.count()
.toStream()
//处理后的结果转换为string字符串
.map((key,value)->{
System.out.println("key:"+key+",value:"+value);
return new KeyValue<>(key.key().toString(),value.toString());
})
//发送消息
.to("itcast-topic-out");
return stream;
}
}
测试:
启动springboot
应用程序,运行之前的ProducerQuickStart
来生产消息,约10秒后,看到kafkaStream
消息的处理结果
说明kafkaStream
接收到消息并将多条消息进行了统一处理。
参考(推荐阅读):
- https://cloud.tencent.com/developer/article/2100664
- https://www.cnblogs.com/tree1123/p/11457851.html