【KafkaStream】简单使用

news2025/1/13 13:45:05

Kafka Stream是什么

Kafka Streams是一套客户端类库,它可以对存储在Kafka内的数据进行流式处理和分析。

1. 什么是流处理

流处理平台(Streaming Systems)是处理无限数据集(Unbounded Dataset)的数据处理引擎,而流处理是与批处理(Batch Processing)相对应的。所谓的无线数据,指的是数据永远没有尽头。而流处理平台就是专门处理这种数据集的系统或框架。下图生动形象地展示了流处理和批处理的区别:
在这里插入图片描述
一个最简单的Streaming的结构如下图所示:
在这里插入图片描述

从一个Topic中读取到数据,经过一些处理操作之后,写入到另一个Topic中,嗯,这就是一个最简单的Streaming流式计算。其中,Source Topic中的数据会源源不断的产生新数据。
那么,我们再在上面的结构之上扩展一下,假设定义了多个Source Topic及Destination Topic,那就构成如下图所示的较为复杂的拓扑结构:
在这里插入图片描述

2. Kafka Stream的特点

近些年来,开源流处理领域涌现出了很多优秀框架。光是在 Apache 基金会孵化的项目,关于流处理的大数据框架就有十几个之多,比如早期的 Apache Samza、Apache Storm,以及这些年火爆的 Spark 以及 Flink 等。

Kafka Streams的特点
相比于其他流处理平台,Kafka Streams 最大的特色就是它不是一个平台,至少它不是一个具备完整功能(Full-Fledged)的平台,比如其他框架中自带的调度器和资源管理器,就是 Kafka Streams 不提供的。Kafka 官网明确定义 Kafka Streams 是一个客户端库(Client Library)。我们可以使用这个库来构建高伸缩性、高弹性、高容错性的分布式应用以及微服务。使用Kafka Streams API构建的应用程序就是一个普通的应用程序,我们可以选择任何熟悉的技术或框架对其进行编译、打包、部署和上线。很不幸,目前Kafka Streams还没有在除了Java之外的其他主流开发语言的SDK上提供。Kafka Streams最大的特点就是,对于上下游数据源的限定。目前Kafka Streams只支持与Kafka集群进行交互,它并没有提供开箱即用的外部数据源连接器。
在这里插入图片描述
Kafka Streams被认为是开发实时应用程序的最简单方法。它是一个Kafka的客户端API库,编写简单的java和scala代码就可以实现流式处理。

优势:

  1. 弹性,高度可扩展,容错
  2. 部署到容器,VM,裸机,云
  3. 同样适用于小型,中型和大型用例
  4. 与Kafka安全性完全集成
  5. 编写标准Java和Scala应用程序
  6. 在Mac,Linux,Windows上开发
  7. Exactly-once 语义

1. 测试kafkaStream

先看下简单的kafkaStream测试

0. 配置文件

server:
  port: 9991
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      compression-type: lz4
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

1. 编写生产者

ProducerQuickStart.java

package com.kafka.sample;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.*;

import java.util.Properties;

@Slf4j
public class ProducerQuickStart {

    public static void main(String[] args) {

        //1. kafka的配置信息
        Properties prop = new Properties();
        //kafka的链接信息
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
        //配置重试次数
        prop.put(ProducerConfig.RETRIES_CONFIG, 5);
        //数据压缩
        prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
        //ack配置  消息确认机制   默认ack=1,即只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应
//        prop.put(ProducerConfig.ACKS_CONFIG,"all");

        消息key的序列化器
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        //2. 生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
        //封装发送的消息
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("itcast-topic-input", "key_001", "hello kafka");

        //3. 发送消息
        for (int i = 0; i < 5; i++) {
            producer.send(producerRecord);
        }

        //4. 关闭消息通道  必须关闭,否则消息发不出去
        producer.close();

    }
}

2 编写kafkaStream流式处理

KafkaStreamQuickStart.java

package com.kafka.sample;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * 流式处理
 */
public class KafkaStreamQuickStart {

    public static void main(String[] args) {

        //kafka的配置信心
        Properties prop = new Properties();
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");

        //stream 构建器
        StreamsBuilder streamsBuilder = new StreamsBuilder();

        //流式计算
        streamProcessor(streamsBuilder);


        //创建kafkaStream对象
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
        //开启流式计算
        kafkaStreams.start();
    }

    /**
     * 流式计算
     * 消息的内容:hello kafka  hello itcast
     * @param streamsBuilder
     */
    private static void streamProcessor(StreamsBuilder streamsBuilder) {
        //创建kstream对象,同时指定从那个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        /**
         * 处理消息的value
         */
        stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split(" "));
            }
        })
                //按照value进行聚合处理
                .groupBy((key,value)->value)
                //时间窗口
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //统计单词的个数
                .count()
                //转换为kStream
                .toStream()
                .map((key,value)->{
                    System.out.println("key:"+key+",vlaue:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //发送消息
                .to("itcast-topic-out");

    }
}
  1. 编写消费者
    ConsumerQuickStart.java
package com.kafka.sample;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumerQuickStart {

    public static void main(String[] args) {

        //1. 添加kafka的配置信息
        Properties properties = new Properties();
        // 配置链接信息
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
        //配置消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-2");
        //配置消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //2. 消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        //3. 订阅主题
        consumer.subscribe(Collections.singletonList("itcast-topic-out"));

        //当前线程一直监听消息
        while(true){
            //4. 消费者拉取消息: 每秒拉取一次
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.key());
                System.out.println(record.value());
            }
        }

    }
}
  1. 在远端(192.168.200.130:9092)启动docker中的kafka容器

  2. 启动消费者ConsumerQuickStartmain函数

  3. 启动kafkastreammian函数

  4. 启动生产者ProducerQuickStartmain函数

  5. 控制台打印结果:
    在这里插入图片描述
    在这里插入图片描述

整个过程:
生产者向kafka中发送了5条“hello kafka”消息,topic均为itcast-topic-input。kafkastream监听这个topic,每10秒进行一次流式处理,将“hello kakfa”字符串分割,并统计每个单词出现的次数。然后转为kstream,发送消息到kafka中的topic=itcast-topic-out”。消费者监听“itcast-topic-out”的topic,消费消息。

2. Springboot整合kafkaStream

1. 配置文件新增

application.yml

server:
  port: 9991
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      compression-type: lz4
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# kafkaStream新增以下配置
kafka:
  hosts: 192.168.200.130:9092
  group: ${spring.application.name}

2. 在微服务中新增配置类

KafkaStreamConfig.java

package com.kafka.config;

import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;

import java.util.HashMap;
import java.util.Map;

/**
 * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
 */

@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
    private String hosts;
    private String group;
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
        props.put(StreamsConfig.RETRIES_CONFIG, 10);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return new KafkaStreamsConfiguration(props);
    }
}

3. 使用kafkaStream监听消息

KafkaStreamHelloListener.java

package com.kafka.stream;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;
import java.util.Arrays;

@Configuration
@Slf4j
public class KafkaStreamHelloListener {

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //创建kstream对象,同时指定从那个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split(" "));
            }
        })
                //根据value进行聚合分组
                .groupBy((key,value)->value)
                //聚合计算时间间隔
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //求单词的个数
                .count()
                .toStream()
                //处理后的结果转换为string字符串
                .map((key,value)->{
                    System.out.println("key:"+key+",value:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //发送消息
                .to("itcast-topic-out");
        return stream;
    }
}

测试:

启动springboot应用程序,运行之前的ProducerQuickStart来生产消息,约10秒后,看到kafkaStream消息的处理结果
在这里插入图片描述

说明kafkaStream接收到消息并将多条消息进行了统一处理。

参考(推荐阅读):

  1. https://cloud.tencent.com/developer/article/2100664
  2. https://www.cnblogs.com/tree1123/p/11457851.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/935886.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++的静态栈以及有点鸡肋的array数组

目录 1.静态栈 1.举例展示 2.注意事项 2.array 1.静态栈 1.举例展示 1.我们想到栈&#xff0c;就会想到是一个数组来维护它的&#xff0c;并且一般由于不知道存储的多少内容&#xff0c;所以一般都是用动态数组不断的在堆上开辟新的空间。 但是C支持了一个新的语法就是静…

【Java基础增强】类加载器和反射

1.类加载器 1.1类加载器【理解】 作用 负责将.class文件&#xff08;存储的物理文件&#xff09;加载在到内存中 1.2类加载的过程【理解】 类加载时机 创建类的实例&#xff08;对象&#xff09; 调用类的类方法 访问类或者接口的类变量&#xff0c;或者为该类变量赋值 …

机器学习实战笔记(蜥蜴书)—— 第二章:端到端项目

目录 前言机器学习前的准备工作1、机器学习需要用到的库&#xff1a;安装&#xff1a;文件导入库 2、所用工具 数据准备1、获取数据2、检查数据3、创建训练/测试集 数据可视化数据预处理1、缺失值处理2、文本属性处理3、数据集添加其他列4、数值缩放5、得到预处理的数据 模型建…

Rabbitmq消息积压问题如何解决以及如何进行限流

一、增加处理能力 优化系统架构、增加服务器资源、采用负载均衡等手段&#xff0c;以提高系统的处理能力和并发处理能力。通过增加服务器数量或者优化代码&#xff0c;确保系统能够及时处理所有的消息。 二、异步处理 将消息的处理过程设计为异步执行&#xff0c;即接收到消息…

STM32 Cubemx 同名外设中断及回调

文章目录 前言示例工程个人理解 前言 最近在学习STM32&#xff0c;采用HAL库开发方式。记录一下同名外设中断及回调。 这里提及的同名外设指USART1/2之类的相同外设&#xff0c;但不是同一个instance。 示例工程 以使用cubemx配置两个同名外设EXTI0/EXT4为例。 在NVIC配置…

QPainter主要功能说明与使用

图形填充QBrush主要功能&#xff1a; QBrush类定义QPaint绘制的形状的填充图案。 函数原型功能void setColor(QColor &color)设置画刷颜色&#xff0c;实体填充时即填充颜色void setStyle(Qt::BrushStyle style)设置画刷填充样式&#xff0c;参数为枚举类型Qt::BrushStyl…

在vue项目中用vue-watermark快捷开发屏幕水印效果

我们先引入一个第三方依赖 npm install vue-watermark然后 因为这只是个测试工具 我就直接代码写 App.vue里啦 参考代码如下 <template><div><vue-watermark :text"watermarkText"></vue-watermark><!-- 正常的页面内容 --></div…

Git基本操作(Idea版)

第一次发布项目&#xff08;本地->远程&#xff09; 方式一 通过push的方式推送本地库到远程库&#xff08;远程已创建好仓库&#xff09; 这种方式需要提前创建好仓库。 右键点击项目&#xff0c;可以将当前分支的内容 push 到 GitHub 的远程仓库中。 注意&#xff1a…

2023/8/27周报

目录 摘要 论文阅读 1、标题和现存问题 2、过度平滑和度量方法 3、处理过坡 4、实验结果 深度学习 1、解决可视化问题 2、CART算法 总结 摘要 本周在论文阅读上&#xff0c;阅读了一篇Pairnorm:解决GNNS中的过平滑问题的论文。PairNorm 的核心思想是在图卷积层之间引…

LeetCode——回溯篇(一)

刷题顺序及思路来源于代码随想录&#xff0c;网站地址&#xff1a;https://programmercarl.com 目录 77. 组合 216. 组合总和 III 17. 电话号码的字母组合 39. 组合总和 40. 组合总和 II 77. 组合 给定两个整数 n 和 k&#xff0c;返回范围 [1, n] 中所有可能的 k 个数的…

UE4/5在蓝图细节面板中添加函数按钮(蓝图与c++的方法)

目录 在细节面板中添加按钮使用函数 蓝图的方法 事件 函数 效果 uec的方法 效果 在细节面板中添加按钮使用函数 很多时候&#xff0c;我们可以看到一些插件的actor类中&#xff0c;点击一下之后就可以实现如矩阵一样的效果。 实际上是因为其使用了函数来修改了蓝图中的数…

Compose - 容器组合项

一、延迟列表 LazyColumn、LazyRow 可滚动&#xff0c;类似RecyclerView&#xff08; Column、Row 用 Modifier 设置滚动是相当于ScrollView&#xff09;。 key设置为集合元素的唯一值例如id&#xff0c;使得列表能感知元素位置是否发生变化或新增移除&#xff0c;对于内容是否…

基于vue和element的脚手架【vue-element-admin 和vue-element-plus-admin 】

vue-element-admin vue-element-admin 是一个后台前端解决方案&#xff0c;它基于 vue 和 element-ui实现 介绍 | vue-element-adminA magical vue adminhttps://panjiachen.github.io/vue-element-admin-site/zh/guide/ vue-element-plus-admin vue-element-plus-admin 是一…

Python 通过traceback追溯异常信息

Python 通过traceback追溯异常信息 导入traceback包 import traceback自定义函数 def func_3():return 1 / 0def func_2():func_3()def func_1():func_2()捕捉异常 try:func_1() except Exception as e:traceback_info traceback.format_exc()print("traceback_info"…

Java IO流动(实战操作)

目录 1 IO流原理2 IO流的分类3 输入、输出流代码示例4 小结5 文件在前后台之间传递 在Java中&#xff0c;IO流是一种用于处理输入和输出操作的机制。它提供了一种统一的方式来读取和写入数据&#xff0c;平日开发中在文件读写&#xff0c;网络通信&#xff0c;特定场景的数据库…

哔哩哔哩 B站 bilibili 视频倍速设置 视频倍速可自定义

目录 一、复制如下代码 二、在B站视频播放页面进入控制台 三、将复制的代码粘贴到下方输入框&#xff0c;并 回车Enter 即可 四、然后就可以了 一、复制如下代码 &#xff08;该代码用于设置倍速为3&#xff0c;最后的数值是多少就是多少倍速&#xff0c;可以带小数点&#…

Docker容器:Harbor 私有仓库迁移

文章目录 一.私有仓库迁移的介绍1.为何要对Harbor 私有仓库的迁移2.Harbor 私有仓库的迁移特点3. Harbor 私有仓库的迁移注意要点 二.私有仓库迁移配置1.源Harbor配置&#xff08;192.168.198.11&#xff09;&#xff08;1&#xff09;接着以下操作查看容器状况及是否可以登录 …

【滑动窗口】leetcode1004:最大连续1的个数

一.题目描述 最大连续1的个数 这道题要我们找最大连续1的个数&#xff0c;看到“连续”二字&#xff0c;我们要想到滑动窗口的方法。滑动窗口的研究对象是一个连续的区间&#xff0c;这个区间需要满足某个条件。那么本题要找的是怎样的区间呢&#xff1f;是一个通过翻转0后得到…

容器导入与导出

docker的一大优势就是可移植性&#xff0c;容器因此docker容器可以随意的进行导入导出操作。 容器导出 使用export命令可以导出容器&#xff0c;具体操作如下&#xff1a; 创建一个容器&#xff0c;进行基本的配置操作 本案例中我首先创建一个nginx容器&#xff0c;然后启动…

开发过程中自己遇到的异常(六)

连接数据库失败&#xff1a; InternalError: (pymysql.err.InternalError) (1130, "Host xxx.xx.1.106 is not allowed to connect to this MySQL server") (Background on this error at: http://sqlalche.me/e/2j85) 解决方式&#xff1a; mysql> use mysql; …