浅析Kafka-Stream消息流式处理流程及原理

news2025/1/18 10:03:22

以下结合案例:统计消息中单词出现次数,来测试并说明kafka消息流式处理的执行流程

Maven依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>connect-json</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
    </dependencies>

准备工作

首先编写创建三个类,分别作为消息生产者、消息消费者、流式处理者
KafkaStreamProducer:消息生产者

public class KafkaStreamProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
        //发送失败,失败的重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 5);
        //消息key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 5; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kafka-stream-topic-input", "hello kafka");
            producer.send(producerRecord);
        }

        producer.close();

    }
}

该消息生产者向主题kafka-stream-topic-input发送五次hello kafka
KafkaStreamConsumer:消息消费者

public class KafkaStreamConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        //消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //手动提交偏移量
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //订阅主题
        consumer.subscribe(Collections.singletonList("kafka-stream-topic-output"));

        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println("consumerRecord.key() = " + consumerRecord.key());
                    System.out.println("consumerRecord.value() = " + consumerRecord.value());
                }
                // 异步提交偏移量
                consumer.commitAsync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 同步提交偏移量
            consumer.commitSync();
        }
    }
}

KafkaStreamQuickStart:流式处理类

public class KafkaStreamQuickStart {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");

        StreamsBuilder streamsBuilder = new StreamsBuilder();

        //流式计算
        streamProcessor(streamsBuilder);

        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);

        kafkaStreams.start();
    }

    /**
     * 消息格式:hello world hello world
     * 配置并处理流数据。
     * 使用StreamsBuilder创建并配置KStream,对输入的主题中的数据进行处理,然后将处理结果发送到输出主题。
     * 具体处理包括:分割每个消息的值,按值分组,对每个分组在10秒的时间窗口内进行计数,然后将结果转换为KeyValue对并发送到输出主题。
     *
     * @param streamsBuilder 用于构建KStream对象的StreamsBuilder。
     */
    private static void streamProcessor(StreamsBuilder streamsBuilder) {
        // 从"kafka-stream-topic-input"主题中读取数据流
        KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");
        System.out.println("stream = " + stream);
        // 将每个值按空格分割成数组,并将数组转换为列表,以扩展单个消息的值
        stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {
                    String[] valAry = value.split(" ");
                    return Arrays.asList(valAry);
                })
                // 按消息的值进行分组,为后续的窗口化计数操作做准备
                .groupBy((key, value) -> value)
                // 定义10秒的时间窗口,在每个窗口内对每个分组进行计数
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                .count()
                // 将计数结果转换为流,以便进行进一步的处理和转换
                .toStream()
                // 显示键值对的内容,并将键和值转换为字符串格式
                .map((key, value) -> {
                    System.out.println("key = " + key);
                    System.out.println("value = " + value);
                    return new KeyValue<>(key.key().toString(), value.toString());
                })
                // 将处理后的流数据发送到"kafka-stream-topic-output"主题
                .to("kafka-stream-topic-output");
    }
    
}

该处理类首先从主题kafka-stream-topic-input中获取消息数据,经处理后发送到主题kafka-stream-topic-output中,再由消息消费者KafkaStreamConsumer进行消费

执行结果

在这里插入图片描述
在这里插入图片描述

流式处理流程及原理说明

初始阶段

当从输入主题kafka-stream-topic-input读取数据流时,每个消息都是一个键值对。假设输入消息的键是null或一个特定的字符串,这取决于消息是如何被发送到输入主题的。

KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");

分割消息值

使用flatMapValues方法分割消息的值,但这个操作不会改变消息的键。如果输入消息的键是null,那么在这个阶段消息的键仍然是null

stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {
    String[] valAry = value.split(" ");
    return Arrays.asList(valAry);
})

按消息的值进行分组

在 Kafka Streams 中,当使用groupBy方法对流进行分组时,实际上是在指定一个新的键,这个键将用于后续的窗口化操作和聚合操作。在这个案例中groupBy方法被用来按消息的值进行分组:

.groupBy((key, value) -> value)

这意味着在分组操作之后,流中的每个消息的键被设置为消息的值。因此,当你在后续的map方法中看到key参数时,这个key实际上是消息的原始值,因为在groupBy之后,消息的值已经变成了键。

定义时间窗口并计数

在这个阶段,消息被窗口化并计数,但是键保持不变。

.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.count()

将计数结果转换为流

当将计数结果转换为流时,键仍然是之前分组时的键

.toStream()

处理和转换结果

map方法中,你看到的key参数实际上是分组后的键,也就是消息的原始值:

.map((key, value) -> {
    System.out.println("key = " + key);
    System.out.println("value = " + value);
    return new KeyValue<>(key.key().toString(), value.toString());
})

map方法中的key.key().toString()是为了获取键的字符串表示,而value.toString()是为了将计数值转换为字符串。

将处理后的数据发送到输出主题

.to("kafka-stream-topic-output");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1920259.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OpenCV:python图像旋转,cv2.getRotationMatrix2D 和 cv2.warpAffine 函数

前言 仅供个人学习用&#xff0c;如果对各位朋友有参考价值&#xff0c;给个赞或者收藏吧 ^_^ 一. cv2.getRotationMatrix2D(center, angle, scale) 1.1 参数说明 parameters center&#xff1a;旋转中心坐标&#xff0c;是一个元组参数(col, row) angle&#xff1a;旋转角度…

Kafka基础组件图推演

文章目录 1. Controller Broker保障机制 2. 组件架构1. Log Manager2. Replication Manager3. SocketServer4. NetworkServer5. ZKClient 1. Controller Broker Kafka集群中有一个Controller Broker&#xff0c;负责元数据管理和协调。 Kafka使用Zookeeper作为集群元数据的存储…

利用js实现图片压缩功能

图片压缩在众多应用场景中扮演着至关重要的角色&#xff0c;尤其是在客户端上传图片时。原始图片往往体积庞大&#xff0c;直接上传不仅消耗大量带宽资源&#xff0c;还可能导致上传速度缓慢&#xff0c;严重影响用户体验。因此&#xff0c;在图片上传至服务器前对其进行压缩处…

【安全设备】入侵检测

一、什么是入侵检测 入侵检测是一种网络安全技术&#xff0c;用于监测和识别对计算机系统或网络的恶意使用行为或未经授权的访问。入侵检测系统&#xff08;IDS&#xff09;是实现这一目标的技术手段&#xff0c;其主要目的是确保计算机系统的安全&#xff0c;通过及时发现并报…

Renesas R7FA8D1BH (Cortex®-M85) 控制DS18B20

目录 概述 1 软硬件 1.1 软硬件环境信息 1.2 开发板信息 1.3 调试器信息 2 FSP和KEIL配置 2.1 硬件接口电路 2.2 FSB配置DS18B20的IO 2.3 生成Keil工程文件 3 DS18B20驱动代码 3.1 DS18B20介绍 3.2 DS18B20驱动实现 3.2.1 IO状态定义 3.2.2 读IO状态函数 3.2.3…

谷粒商城实战笔记-27-分布式组件-SpringCloud-Gateway-创建测试API网关

本节的主要内容是创建网关模块&#xff0c;将网关注册到Nacos&#xff0c;并配置路由进行测试。 一&#xff0c;创建网关模块 右键工程New->Module&#xff0c;创建新模块&#xff0c;模块名称 gulimall-gateway。 填充各种信息。 选中Gateway依赖。 点击Create创建模块。…

uni-app iOS上架相关App store App store connect 云打包有次数限制

相册权限 uni-app云打包免费有次数 切换一个账号继续

C语言 | Leetcode C语言题解之第230题二叉搜索树中第K小的元素

题目&#xff1a; 题解&#xff1a; /*** Definition for a binary tree node.* struct TreeNode {* int val;* struct TreeNode *left;* struct TreeNode *right;* };*/int search_num(struct TreeNode* root, int k, int *result, int num) {if(num k 1){retu…

JAVA之(static关键字、final关键字、抽象类、接口)

JAVA之&#xff08;static关键字、final关键字&#xff09; 一、 static关键字1、静态变量2、静态方法3、 静态代码块4、例子 二、final关键字1、final修饰类2、 final修饰方法3、修饰变量 三、抽象类1、 抽象类 四、接口 一、 static关键字 1、静态变量 private static Stri…

Java异常体系、UncaughtExceptionHandler、Spring MVC统一异常处理、Spring Boot统一异常处理

概述 所有异常都是继承自java.lang.Throwable类&#xff0c;Throwable有两个直接子类&#xff0c;Error和Exception。 Error用来表示程序底层或硬件有关的错误&#xff0c;这种错误和程序本身无关&#xff0c;如常见的NoClassDefFoundError。这种异常和程序本身无关&#xff0…

学习大数据DAY14 PLSQL基础语法3

目录 二重循环 三种循环随便嵌套 exit continue return 作业 数据提取 游标 隐式游标 显示游标 动态游标 游标使用流程 游标属性 游标配合循环使用示例 作业2 参数游标 current of 语句 作业3 PLSQL基础语法&#xff08;三&#xff09; 二重循环 三种循环随便嵌…

threadLocal详细认识(使用场景与局限性)与样例测试

Threadlocal的介绍与使用 1&#xff0c;是什么&#xff1f; ThreadLocal 是 Java 提供的一个工具类&#xff0c;用于在多线程环境中为每个线程提供独立的变量副本。它是 Java 标准库中的一部分&#xff0c;提供了线程局部存储的功能&#xff0c;这意味着每个线程都有自己独立…

【安全设备】APT攻击预警平台

一、什么是APT 高级持续性威胁&#xff08;APT&#xff09;是一种高度复杂和长期的网络攻击&#xff0c;旨在通过持续监视和访问特定目标来窃取敏感信息或进行其他恶意活动。这种攻击结合了多种先进的技术手段和社会工程学方法&#xff0c;以极高的隐蔽性实现长期潜伏和信息窃…

2 文件

2 文件 1、文件系统1.1 文件系统的逻辑结构1.2 文件的访问流程 2、文件类型3、文件的打开与关闭4、文件的内核结构5、文件的读写4.1 顺序与随机读写4.2 文件描述符的复制4.3 访问测试4.4 修改文件大小 5、文件锁5.1 读写冲突5.2 文件锁5.3 文件锁的内核结构 6、文件的元数据7、…

MTK Camera 冷启动、前后摄切换性能优化分析

和你一起终身学习&#xff0c;这里是程序员Android 经典好文推荐&#xff0c;通过阅读本文&#xff0c;您将收获以下知识点: 一、背景二、问题分解三、工具分析四、 traceView教程五、surface create优化六、systrace的教程七、优化方案八、前后切换速度优化九、优化方案十、热…

旷野之间4 - 100 个 Kubernetes 面试问题及答案

100 个 Kubernetes 面试问题及答案 Kubernetes 简介 什么是 Kubernetes&#xff1f; Kubernetes 是一个开源容器编排平台&#xff0c;可自动部署、扩展和管理容器化应用程序。 什么是容器&#xff1f; 容器是一个轻量级、独立的、可执行软件包&#xff0c;其中包含运行应用…

学习笔记——动态路由——IS-IS中间系统到中间系统(特性之路由撤销)

6、路由撤销 ISIS路由协议的路由信息是封装在LSP报文中的TLV中的&#xff0c;但是它对撤销路由的处理和OSPF的处理方式类似。 在ISIS中撤销一条路由实则是将接口下的ISIS关闭&#xff1a; 撤销内部路由&#xff1a; 在ISIS中路由信息是由IP接口TLV和IP内部可达性TLV共同来描…

游戏AI的创造思路-技术基础-决策树(2)

上一篇写了决策树的基础概念和一些简单例子&#xff0c;本篇将着重在实际案例上进行说明 目录 8. 决策树应用的实际例子 8.1. 方法和过程 8.1.1. 定义行为 8.1.2. 确定属性 8.1.3. 构建决策树 8.1.4. 实施行为 8.1.5. 实时更新 8.2. Python代码 8. 决策树应用的实际例子…

hudi数据湖万字全方位教程+应用示例

1、时间轴&#xff08;TimeLine&#xff09; Hudi的核心是维护表上在不同的即时时间&#xff08;instants&#xff09;执行的所有操作的时间轴&#xff08;timeline&#xff09;&#xff0c;这有助于提供表的即时视图 一个instant由以下三个部分组成&#xff1a; 1&#xff09;…

YOLOv10改进 | Conv篇 | RCS-OSA替换C2f实现暴力涨点(减少通道的空间对象注意力机制)

一、本文介绍 本文给大家带来的改进机制是RCS-YOLO提出的RCS-OSA模块&#xff0c;其全称是"Reduced Channel Spatial Object Attention"&#xff0c;意即"减少通道的空间对象注意力"。这个模块的主要功能是通过减少特征图的通道数量&#xff0c;同时关注空…