【2024】Kafka Streams详细介绍与具体使用(1)

news2025/1/12 0:49:03

目录

  • 介绍
    • 关键特性
    • 应用场景
    • 核心概念
    • 部署方式
    • kafka streams的处理模式
  • 具体使用
    • 1、准备工作
    • 2、添加依赖
    • 3、代码实现
    • 3、测试

介绍

Kafka Streams是构建在Apache Kafka之上的客户端库,用于构建高效、实时的流处理应用。它允许你以高吞吐量和低延迟的方式处理记录流,并且可以容易地扩展和复制数据处理流程。这种流处理方式适用于从简单的数据转换到复杂的事件驱动的应用程序。

关键特性

  • **易用性:**Kafka Streams提供了简洁的API,允许开发者轻松构建复杂的流处理应用。这些API包括高级的DSL(Domain Specific Language)和低级的处理器API,两者可以相互配合使用。
  • **无需单独的处理集群:**与其他流处理技术不同,Kafka Streams应用是作为常规的Java应用运行的,不需要维护一个专门的处理集群。你可以在你自己的应用中直接包含流处理逻辑,这使得部署和维护变得更容易。
  • **强大的状态处理能力:**Kafka Streams支持状态化处理,并允许容错、持久化的本地状态存储。这是通过管理和复制RocksDB实例来实现的,为应用程序的状态提供了持久化和容错支持。
  • **时间窗口处理:**Kafka Streams支持多种类型的时间窗口操作,如滑动窗口、跳跃窗口和会话窗口,使得在处理时间敏感的数据流时非常有效。
  • **流式表格双模型:**Kafka Streams引入了一个流式表格双模型,允许用户将流处理结果看作是一张动态更新的表。这个模型提供了一种理解流数据和转换流数据的直观方式。
  • **可扩展和容错:**由于Kafka Streams建立在Apache Kafka之上,它继承了Kafka的可扩展性和高可用性。应用可通过增加实例来水平扩展,故障转移由Kafka负责处理。

应用场景

Kafka Streams适用于多种实时数据处理场景,包括:

  • 实时分析和监控:对即时生成的数据进行聚合、过滤和分析。事件驱动的应用:基于特定事件自动触发流程和操作。
  • 数据转换和清洗:实时处理数据流,并将结果输出到Kafka主题或其他存储系统中。个性化推荐:根据用户行为实时更新推荐内容。
  • Kafka Streams的设计目标是提供一种简单、强大且易于部署的流处理方式。
  • 通过利用Kafka本身的优点,Kafka Streams可以帮助开发者更方便地构建和部署实时数据处理应用。

核心概念

  • DFP:以数据为中心的流式出来的方式

  • Source Processor:源头读的Processor

  • Stream Processors:进行流式处理的中间的Processors

  • Sink Processors:流中最后的一个Processors,用于pull到本地或者另外一个新的Topic

  • Topology:多个Processors就构成了一个Topology的环形图

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • sub-Topologies:获取数据分子的Topology

在这里插入图片描述

  • Streams Task:Streams的最小单位,真正处理数据的
  • Streams Thread:Streams 处理数据的线程,一般每个Streams Task会创建一个新的线程,提高并行

部署方式

  1. 在一个服务里面起一个Instance实例,这个实例里面创建两个线程,一个线程处理两个Task对象,这种方式先对并发最小

在这里插入图片描述

  1. 在一个服务里面启动两个Instance实例,在每一个线程里面可以处理一个Task,这样在处理上可以有效的提高并发,避免一个实例出现问题有限其他的

在这里插入图片描述

  1. 起多个服务集群化部署去跑多个实例,可以有效利用多核CPU的性能

在这里插入图片描述

kafka streams的处理模式

  • **Depth-First Processing(深度优先处理模式):**在处理拓扑中的节点时,首先处理完一个节点的所有分支,然后再处理下一个节点,这种处理模式可以确保数据再处理过程中的一致性和正确性。避免数据混乱
  • Breadth-First Processing(广度优先处理):与深度优先先反,广度优先处理模式会优先处理一个节点的所以相邻节点,然后再处理下一个节点。
  • Time Windowing(时间窗口处理):按照时间窗口进行分组,然后对每个窗口内的事件进行处理,这种模式适用于需要对一段时间内的事件进行聚合处理或计算

具体使用

1、准备工作

默认已经安装kafka了啊,如果还没通过我这篇文章去安装==>kafka安装

  • 在使用的时候,首先,我们需要创建两个个topic,
#进入kafka容器
docker exec -it kafka-server1 /bin/bash

#创建主题topic-1
/opt/kafka/bin/kafka-topics.sh --create --topic input-topic --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1
#创建主题topic-2
/opt/kafka/bin/kafka-topics.sh --create --topic out-topic --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1

2、添加依赖

我用的kafka-streams是3.1.2的

  • gradle
	implementation("org.apache.kafka:kafka-streams")
	implementation("org.springframework.kafka:spring-kafka")
  • mavne
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
</dependency>

3、代码实现

/**
 * 通过streams实现数据流处理,把字符串装为大写
 */
@Slf4j
public class KafkaStreamsYellingApp {
//    appid
    private final static String APPLICATION_ID = "yelling_app_id";
    private final static String INPUT_TOPIC = "input-topic";
    private final static String OUTPUT_TOPIC = "out-topic";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";


    public static void main(String[] args) throws InterruptedException {
     
//        配置kafka stream属性连接
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        StreamsConfig streamsConfig = new StreamsConfig(properties);
//        配置键值对的序列化/反序列化Serdes对象
        Serde<String> stringSerde = Serdes.String();
//        构建流处理拓扑(用于输出)
        StreamsBuilder builder = new StreamsBuilder();
        
        
//        数据源处理器:从指定的topic中取出数据
        KStream<String, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde));
//
        KStream<String, String> upperStream = inputStream
                .peek((key, value) -> {
                    log.info("[收集]key:{},value:{}", key, value);
                })
                .filter((key, value) -> value.length() > 5)
                .mapValues(time -> time.toUpperCase())
                .peek((key, value) -> log.info("[过滤结束]key:{},value:{}", key, value));
//        日志打印upperStream处理器的数据
        upperStream.print(Printed.toSysOut());
//        把upperStream处理器的数据输出到指定的topic中
        upperStream.to(OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));

        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);
//        jvm关闭时,把流也关闭
        CountDownLatch downLatch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            kafkaStreams.close();
            downLatch.countDown();
            log.info("关闭流处理");
        }));

        kafkaStreams.start();
        log.info("启动执行!");
    }
}

上面代码的重点具体步骤:

  1. 创建Source Processor源,去topic中读取消息
    KStream<String, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde));
  2. 创建Stream Processors中间处理的流
           KStream<String, String> upperStream = inputStream
                   .peek((key, value) -> {
                       log.info("[收集]key:{},value:{}", key, value);
                   })
                   .filter((key, value) -> value.length() > 5)
                   .mapValues(time -> time.toUpperCase())
                   .peek((key, value) -> log.info("[过滤结束]key:{},value:{}", key, value));
    
  3. 创建Sink Processor,流中最后的一个Processors,用于pull到本地或者另外一个新的Topic
    upperStream.to(OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));

具体的含义后面会详细编写一篇,这里先介绍简单使用

3、测试

  • 进入生产者topic(看发的最后三条)
    在这里插入图片描述

  • 进入消费topic
    在这里插入图片描述
    日志输出
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1803356.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

人工智能在交通与物流领域的普及及应用

文章目录 &#x1f40b;引言 &#x1f40b;自动驾驶 &#x1f988;自动驾驶汽车 &#x1f421;应用现状 &#x1f421;技术实现 &#x1f421;实现过程及代码 &#x1f40b;智能交通管理 &#x1f988;应用现状 &#x1f988;技术实现 &#x1f988;实现过程及代码 &…

【讲解下ECMAScript和JavaScript之间有何区别?】

&#x1f308;个人主页: 程序员不想敲代码啊 &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共…

React Hooks 封装可粘贴图片的输入框组件(wangeditor)

需求是需要一个文本框 但是可以支持右键或者ctrlv粘贴图片&#xff0c;原生js很麻烦&#xff0c;那不如用插件来实现吧~我这里用的wangeditor插件&#xff0c;初次写初次用&#xff0c;可能不太好&#xff0c;但目前是可以达到实现需求的一个效果啦&#xff01;后面再改进吧~ …

【国产NI替代】SMU 源测量仪:源测量单元平台主要用于半导体、传感器、模组等 IVR 测试测量

• 集 5 台仪器 (数字万用表、电压源、电流源、电子负载和脉冲发生器) 功能于⼀体 • 典型输出源及测量精度 02%&#xff0c;支持直流/脉冲输出模式 • 脉冲输出模式&#xff0c;最⼩脉冲宽度 100 us &#xff0c;上升时间 10 us • 具有 pA 级分辨率高精度源&#xff0c;且…

Linux编译器-gcc或g++的使用

一.安装gcc/g 在linux中是不会自带gcc/g的&#xff0c;我们需要编译程序就自己需要安装gcc/g。 很简单我们使用简单的命令安装gcc&#xff1a;sudo yum install -y gcc。 g安装&#xff1a;sudo yum install -y gcc-c。 我们知道Windows上区分文件&#xff0c;都是使用文件…

linux系统——ping命令

ping命令可以用来判断对远端ip的连通性&#xff0c;可以加域名也可以加公共ip地址 这里发送出56字节&#xff0c;返回64字节

【C++修行之道】类和对象(四)运算符重载

目录 一、 运算符重载 函数重载和运算符重载有什么关系&#xff1f; 二、.*运算符的作用 三、运算符重载的正常使用 四、重载成成员函数 五、赋值运算符重载 1.赋值运算符重载格式 传值返回和引用返回 有没有办法不生成拷贝&#xff1f; 2. 赋值运算符只能重载成类的…

命令提示符方式获取笔记本电脑的电池使用情况和状态

1、键盘输入WinR,输入cmd&#xff0c;然后在命令提示符界面输入 powercfg/batteryreport2、将显示的路径复制到浏览器中查看

造假高手——faker

在测试写好的代码时通常需要用到一些测试数据&#xff0c;大量的真实数据有时候很难获取&#xff0c;如果手动制造测试数据又过于繁重无聊&#xff0c;显得不够优雅&#xff0c;今天我们介绍的faker这个轮子可以完美的解决这个问题。faker是一个用于生成各种类型假数据的库&…

多模态模型是什么意思(国内外的AI多模态有哪些)

在人工智能和机器学习的领域&#xff0c;我们经常会遇到一些专业术语&#xff0c;这些术语可能会让初学者感到困惑。其中&#xff0c;"多模态模型"就是这样一个概念。 什么是AI多模态。它是什么意思呢&#xff1f; 那么&#xff0c;多模态模型是什么意思呢&#xff1…

前端开发之中svg图标的使用和实例

svg图标的使用和实例 前言效果图1、安装插件2、vue3中使用2.1、 在components文件夹中,创建公共类SvgIcon/index.vue2.2、创建icons文件,存放svg图标和将所有的svg图标进行引用并注册成全局组件2.3、在man.js 中注册2.4、在vue.config.js中配置svg2.5、在vue中的调用svg图标3…

算法004:盛水最多的容器

. - 力扣&#xff08;LeetCode&#xff09;. - 备战技术面试&#xff1f;力扣提供海量技术面试资源&#xff0c;帮助你高效提升编程技能,轻松拿下世界 IT 名企 Dream Offer。https://leetcode.cn/problems/container-with-most-water/ 这道题比较简单&#xff0c;使用双指针。 …

IIoT(智能物联网)的现状、应用及安全

近年来&#xff0c;物联网&#xff08;IoT&#xff09;作为推动现代公司和智能城市发展的一个范式&#xff0c;已经取得了显著的发展。IoT使得分布式设备&#xff08;如手机、平板电脑和计算机&#xff09;能够感知并从外部环境传输数据&#xff0c;以服务于最终用户。IoT的概念…

万字长文|OpenAI模型规范(全文)

本文是继《OpenAI模型规范概览》之后对OpenAI Model Spec的详细描述&#xff0c;希望能对各位从事大模型及RLHF研究的朋友有帮助。万字长文&#xff0c;建议收藏后阅读。 一、概述 在AI的世界里&#xff0c;确保技术的行为符合我们的期望至关重要。OpenAI最近发布了一份名为Mo…

【动态规划-BM78 打家劫舍(一)】

题目 描述 你是一个经验丰富的小偷&#xff0c;准备偷沿街的一排房间&#xff0c;每个房间都存有一定的现金&#xff0c;为了防止被发现&#xff0c;你不能偷相邻的两家&#xff0c;即&#xff0c;如果偷了第一家&#xff0c;就不能再偷第二家&#xff1b;如果偷了第二家&…

四种跨域解决方案

文章目录 1.引出跨域1.基本介绍2.具体演示1.启动之前学习过的springboot-furn项目2.浏览器直接访问 [localhost:8081/furns](http://localhost:8081/furns) 可以显示信息3.启动前端项目&#xff0c;取消请求拦截器&#xff0c;这样设置&#xff0c;就会出现跨域4.跨域原因 2.跨…

YOLOv8改进 | 卷积模块 | 在主干网络中添加/替换蛇形卷积Dynamic Snake Convolution

&#x1f4a1;&#x1f4a1;&#x1f4a1;本专栏所有程序均经过测试&#xff0c;可成功执行&#x1f4a1;&#x1f4a1;&#x1f4a1; 蛇形动态卷积是一种新型的卷积操作&#xff0c;旨在提高对细长和弯曲的管状结构的特征提取能力。它通过自适应地调整卷积核的权重&#xff0…

软件游戏找不到d3dx9_43.dll怎么办,三分钟教你解决此问题

在现代科技发展的时代&#xff0c;电脑已经成为我们生活中不可或缺的一部分。然而&#xff0c;在使用电脑的过程中&#xff0c;我们可能会遇到一些问题&#xff0c;其中之一就是电脑缺失d3dx943.dll文件。这个问题可能会影响到我们的正常使用&#xff0c;因此了解其原因和解决方…

spring源码解析-(2)Bean的包扫描

包扫描的过程 测试代码&#xff1a; // 扫描指定包下的所有类 BeanDefinitionRegistry registry new SimpleBeanDefinitionRegistry(); // 扫描指定包下的所有类 ClassPathBeanDefinitionScanner scanner new ClassPathBeanDefinitionScanner(registry); scanner.scan(&quo…

SSL/TLS和HTTPS

HTTPS就是用了TLS包装的Socket进行通信的HTTP 混合加密 被称为混合加密。具体过程如下&#xff1a; 使用非对称加密协商对称密钥&#xff1a; 在通信的开始阶段&#xff0c;通常由客户端和服务器使用非对称加密算法&#xff08;如RSA&#xff09;来协商一个对称密钥。通常情…