kafka-Stream详解篇(附案例)

news2025/1/13 2:35:15

在这里插入图片描述

文章目录

    • Kafka Stream 概述
    • Kafka Stream 概念
    • Kafka Stream 数据结构
    • 入门案例一
      • 需求描述与分析
      • 配置KafkaStream
      • 定义处理流程
      • 声明Topic
      • 接收处理结果
      • 发送消息测试
    • 入门案例二
      • 需求描述与分析
      • 定义处理流程
      • 接收处理结果
      • 声明Topic

更多相关内容可查看

Kafka Stream 概述

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。
Kafka Stream的特点如下:

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展和顺序性保证
  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
  • 支持正好一次处理语义
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

Kafka Stream 概念

  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
  • 处理拓扑 : 数据的处理流程 , 每一步处理流程就是一个处理拓扑
  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

消息生产者 ----> Kafka Topic(原始数据) ------> Source Processor ------> 处理拓扑(很多步处理) ------> Sink Processor -----> Kafka Topic (运算结果) -----> 消费者(接收运行结果)

Kafka Stream 数据结构

Kafka数据结构类似于map,如下图,key-value键值对

KStream

KStream数据流,即是一段顺序的,可以无限长,不断更新的数据集。KStream数据流中的每一条数据相当于一次插入

商品的行为分值运算(排行) :
{“type”:“like”,“count”:1}
{“type”:“like”,“count”:-1}
{“type”:“like”,“count”:1}
对上面的行为数据进行运算得到运算结果 :
{“type”:“like”,“count”:2}

KTable数据流 , 即是一段顺序的,可以无限长,不断更新的数据集。KTable数据流中的每一条数据相当于一次更新

公交车的运行数据
{“No”:“518”,“location”:“武湖新天地”}
{“No”:“518”,“location”:“潘森产业园”}
{“No”:“518”,“location”:“产业园”}
对上面的行为数据进行运算得到运算结果 :
{“No”:“518”,“location”:“产业园”}

入门案例一

需求描述与分析


计算每个单词出现的次数

@Test
void testSend5() {
    List<String> strs = new ArrayList<String>();
    strs.add("hello word");
    strs.add("hello kafka");
    strs.add("hello spring kafka");
    strs.add("kafka stream");
    strs.add("spring kafka");

    strs.stream().forEach(s -> {
        kafkaTemplate.send("kafka.stream.topic1", "10001", s);
    });
}

配置KafkaStream

添加依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
</dependency>

开启KafkaStream功能

配置Kafka Stream

spring:
  application:
    name: kafka-consumer
  kafka:
    bootstrap-servers: 118.25.197.221:9092
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: ${spring.application.name}
      enable-auto-commit: false  # 关闭自动提交, 使用手动提交偏移量
    streams:
      application-id: ${spring.application.name}-application-id
      client-id: ${spring.application.name}-client-id
      properties:
        default:
          key:
            serde: org.apache.kafka.common.serialization.Serdes$StringSerde
          value:
            serde: org.apache.kafka.common.serialization.Serdes$StringSerde

定义处理流程

package com.heima.kafka.stream;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;
import java.util.Arrays;

/**
 * @Author Administrator
 * @Date 2023/6/30
 **/
@Configuration
public class KafkaStreamConfig {

    /**
     * 原始数据 ------
     * 10001  hello word
     * 10001  hello kafka
     * 10001  hello spring kafka
     * 10001  kafka stream
     * 10001  spring kafka
     *
     * 对原始数据中的value字符串进行切割
     * 10001  [hello,word]
     * 10001  [hello,kafka]
     * 10001  [hello,spring,kafka]
     * 10001  [kafka,stream]
     * 10001  [spring,kafka]
     *
     * 对value数组进行扁平化处理(将多维数组转化为一维数组)
     * 10001  hello
     * 10001  word
     * 10001  hello
     * 10001  kafka
     * 10001  hello
     * 10001  spring
     * 10001  kafka
     * 10001  stream
     * 10001  spring
     * 10001  kafka
     *
     * 对数据格式进行转化, 使用value作为key
     * hello  hello
     * word   word
     * hello  hello
     * kafka  kafka
     * hello  hello
     * spring spring
     * kafka  kafka
     * kafka  kafka
     * stream stream
     * spring spring
     * kafka  kafka
     *
     * 对key进行分组 
     *  hello  hello
     *  hello  hello
     *  hello  hello
     *
     *  word   word
     *
     *  kafka  kafka
     *  kafka  kafka
     *  kafka  kafka
     *  kafka  kafka
     *
     *  spring  spring
     *  spring  spring
     *
     *  stream  stream
     *	
     *计算组内单词数量 , 得到运算结果 -----
     * hello 3
     * word 1
     * kafka 4
     * spring 2
     * stream 1
     *
     * @param builder
     * @return
     */
    @Bean
    public KStream<String, String> kStream(StreamsBuilder builder) {
        //1. 定义数据来源
        KStream<String, String> kStream = builder.<String, String>stream("kafka.stream.topic1");
        //2. 定义数据处理流程
        kStream
                //2.1 对原始数据中的value字符串进行切割   mapValues : 对流中数据的value进行处理转化
                .mapValues(value -> value.split(" "))
                //2.2 对value数组进行扁平化处理(将多维数组转化为一维数组)   flatMapValues : 对流中数据的数组格式的value进行处理转化(多维转一维)
                .flatMapValues(value -> Arrays.asList(value))
                //2.3 对数据格式进行转化, 使用value作为key   map : 对流中数据的key和value进行处理转化
                .map(((key, value) -> new KeyValue<>(value,value)))
                //2.4 对key进行分组  groupByKey : 根据key进行分组
                .groupByKey(Grouped.with(Serdes.String(),Serdes.String()))
                //设置聚合时间窗口, 在指定时间窗口范围之内的数据会进行一次运算, 输出运算结果
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //2.5 求每一个组中的单词数量   count : 组内计算元素数量
                .count(Materialized.with(Serdes.String(),Serdes.Long()))
                //2.6 将运算结果发送到另一个topic中   toStream : 将其他类型的流转化为 kStream
                .toStream()
                .map((key, value) -> new KeyValue<>(key.key(),value.toString()))
                //将运算结果发送到一个topic, 供消费者接收
                .to("kafka.stream.topic2");

        //3. 返回KStream对象
        return kStream;
    }
}

声明Topic

KafkaStream不会自动帮助我们创建Topic ,所以我们需要自己声明消息来源的topic和消息发送的topic

@Bean
public NewTopic streamTopic1() {
    return TopicBuilder.name("kafka.stream.topic1").build();
}

@Bean
public NewTopic streamTopic2() {
    return TopicBuilder.name("kafka.stream.topic2").build();
}

接收处理结果

定义一个消费者 , 从to("kafka.stream.topic2")中接收计算完毕的消息

@Component
@Slf4j
public class KafkaStreamConsumerListener {

    @KafkaListener(topics = "kafka.stream.topic2", groupId = "steam")
    public void listenTopic1(ConsumerRecord<String, String> record) {
        String key = record.key();
        String value = record.value();
        log.info("单词:{} , 出现{}次", key, value);
    }
}

发送消息测试

@SpringBootTest
@Slf4j
public class KafkaStreamProducerTest {

    @Resource
    private KafkaTemplate kafkaTemplate;

    @Test
    void testSend5() {
        List<String> strs = new ArrayList<String>();
        strs.add("hello word");
        strs.add("hello kafka");
        strs.add("hello spring kafka");
        strs.add("kafka stream");
        strs.add("spring kafka");

        strs.stream().forEach(s -> {
            kafkaTemplate.send("kafka.stream.topic1", "10001", s);
        });
    }
}

入门案例二

需求描述与分析

现在有一组文章行为数据 , 使用ArticleMessage对象封装

package com.heima.kafka.pojos;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author Administrator
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ArticleMessage {

    /**
     * 文章ID
     */
    private Long articleId;

    /**
     * 修改文章的字段类型
     */
    private UpdateArticleType type;

    /**
     * 修改数据的增量,可为正负
     */
    private Integer add;


    public enum UpdateArticleType {
        COLLECTION, COMMENT, LIKES, VIEWS;
    }
}

模拟数据如下 :

@Test
void testSend6() {
    List<ArticleMessage> strs = new ArrayList<ArticleMessage>();
    ArticleMessage message1 = new ArticleMessage(1498972384605040641l, ArticleMessage.UpdateArticleType.LIKES, 1);
    ArticleMessage message4 = new ArticleMessage(1498972384605040641l, ArticleMessage.UpdateArticleType.LIKES, 1);
    ArticleMessage message7 = new ArticleMessage(1498972384605040641l, ArticleMessage.UpdateArticleType.LIKES, 1);
    ArticleMessage message3 = new ArticleMessage(1498972384605040641l, ArticleMessage.UpdateArticleType.LIKES, -1);
    ArticleMessage message2 = new ArticleMessage(1498972384605040641l, ArticleMessage.UpdateArticleType.VIEWS, 1);
    
    ArticleMessage message6 = new ArticleMessage(1498973263815045122l, ArticleMessage.UpdateArticleType.COLLECTION, 1);
    ArticleMessage message5 = new ArticleMessage(1498973263815045122l, ArticleMessage.UpdateArticleType.COLLECTION, 1);
    ArticleMessage message8 = new ArticleMessage(1498973263815045122l, ArticleMessage.UpdateArticleType.COLLECTION, 1);
    ArticleMessage message9 = new ArticleMessage(1498972384605040641l, ArticleMessage.UpdateArticleType.COLLECTION, 1);

    strs.add(message1);
    strs.add(message2);
    strs.add(message3);
    strs.add(message4);
    strs.add(message5);
    strs.add(message6);
    strs.add(message7);
    strs.add(message8);
    strs.add(message9);

    strs.stream().forEach(s -> {
        kafkaTemplate.send("hot.article.score.topic" , JSON.toJSONString(s));
    });
}

需求如下 : 请计算出每个文章每种行为的次数 , 输出 :
文章ID : COLLECTION:10,COMMENT:20,LIKES:5,VIEWS:30

定义处理流程

/**
 * @param builder
 * @return
 */
@Bean
public KStream<String, String> kStream(StreamsBuilder builder) {
    //获取KStream流对象
    KStream<String, String> kStream = builder.stream("hot.article.score.topic");
    //定义流处理拓扑
    kStream
            //JSON转化为Java对象
            .mapValues(value -> JSON.parseObject(value, ArticleMessage.class))
            //key和值处理  key: 文章ID  , value : 行为类型:数量
            .map((key, value) -> new KeyValue<>(value.getArticleId(), value.getType().name() + ":" + value.getAdd()))
            //根据key进行分组
            .groupByKey(Grouped.with(Serdes.Long(), Serdes.String()))
            //设置时间窗口
            .windowedBy(TimeWindows.of(Duration.ofMillis(10000)))
            //数据聚合
            .aggregate(() -> "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0", (key, value, aggValue) -> {
                if (StringUtils.isBlank(value)) {
                    return aggValue;
                }
                String[] aggValues = aggValue.split(",");

                Map<String, Integer> map = new HashMap<>();
                for (String agg : aggValues) {
                    String[] strs = agg.split(":");
                    map.put(strs[0], Integer.valueOf(strs[1]));
                }

                String[] values = value.split(":");
                map.put(values[0], map.get(values[0]) + Integer.valueOf(values[1]));

                String format = String.format("COLLECTION:%s,COMMENT:%s,LIKES:%s,VIEWS:%s", map.get("COLLECTION"), map.get("COMMENT"), map.get("LIKES"), map.get("VIEWS"));

                return format;
            }, Materialized.with(Serdes.Long(), Serdes.String()))
            //重新转化为kStream
            .toStream()
            //数据格式转换
            .map((key, value) -> new KeyValue<>(key.key().toString(), value.toString()))
            .to("hot.article.incr.handle.topic");

    return kStream;
}

接收处理结果

@KafkaListener(topics = "hot.article.incr.handle.topic", groupId = "group3")
public void consumer8(ConsumerRecord<String, String> record) {
    String key = record.key();
    String value = record.value();
    System.out.println("consumer8接收到消息:" + key + ":" + value);
}

声明Topic

@Bean
public NewTopic topic7() {
    return TopicBuilder.name("kafka.topic7").build();
}

@Bean
public NewTopic article() {
    return TopicBuilder.name("hot.article.score.topic").build();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1875478.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

qt实现打开pdf(阅读器)功能用什么库比较合适

关于这个问题&#xff0c;网上搜一下&#xff0c;可以看到非常多的相关博客和例子&#xff0c;可以先看看这个总结性的博客&#xff08;https://zhuanlan.zhihu.com/p/480973072&#xff09; 该博客讲得比较清楚了&#xff0c;这里我再补充一下吧&#xff08;qt官方也给出了一些…

【漏洞复现】飞企互联——SQL注入

声明&#xff1a;本文档或演示材料仅供教育和教学目的使用&#xff0c;任何个人或组织使用本文档中的信息进行非法活动&#xff0c;均与本文档的作者或发布者无关。 文章目录 漏洞描述漏洞复现测试工具 漏洞描述 飞企互联-FE企业运营管理平台是一个基于云计算、智能化、大数据…

【LeetCode】接雨水

目录 一、题目二、解法完整代码 一、题目 给定 n 个非负整数表示每个宽度为 1 的柱子的高度图&#xff0c;计算按此排列的柱子&#xff0c;下雨之后能接多少雨水。 示例 1&#xff1a; 输入&#xff1a;height [0,1,0,2,1,0,1,3,2,1,2,1] 输出&#xff1a;6 解释&#xff…

力扣 移除元素

class Solution {public int removeElement(int[] nums, int val) {int left 0;for(int right 0;right<nums.length;right){if(nums[right] ! val){nums[left] nums[right];left;}}return left;} }

2000-2022年 省、市、县三级逐年归一化植被指数NDVI

NDVI&#xff08;归一化植被指数&#xff09;是一种重要的遥感指数&#xff0c;广泛应用于植被生长状况的监测和评估。以下是对省、市、县三级逐年归一化植被指数NDVI数据的介绍&#xff1a; 数据简介 定义&#xff1a;NDVI是一种基于植被在红光和近红外波段反射特性的遥感指…

Java学习 - Redis缓存问题与优化

缓存收益与成本 收益 加速读写降低后端、持久层的负载和压力 成本 可能导致数据不一致代码运维成本redis节点运维成本 缓存更新策略 策略一致性维护成本介绍LRU/LIRS算法剔除最差底剔除最近最少使用的数据超时剔除较差底定时删除、惰性删除主动更新最好高持久层更新&#x…

数学建模---最小生成树问题的建模~~~~~Matlab代码

目录 1.相关概念 &#xff08;1&#xff09;什么是树 &#xff08;2&#xff09;生成树和最小生成树&#xff1a; 2.适用赛题 &#xff08;1&#xff09;赛题分类 &#xff08;2&#xff09;不同之处 3.两种算法 &#xff08;1&#xff09;prim算法 &#xff08;2&…

【后端】Nginx+lua+OpenResty高性能实践

文章目录 9. HTTPS安全认证9.1 证书9.2 证书获取方式9.3 自签证书-openssl工具9.4 Nginx配置HTTPS 10. websocket转发配置100.常遇的问题100.1 host 【后端&网络&大数据&数据库目录贴】 9. HTTPS安全认证 http协议问题&#xff1a; 明文传输&#xff0c;有被第三方…

【redis】Redis AOF

1、AOF的基本概念 AOF持久化方式是通过保存Redis所执行的写命令来记录数据库状态的。AOF以日志的形式来记录每个写操作&#xff08;增量保存&#xff09;&#xff0c;将Redis执行过的所有写指令记录下来&#xff08;读操作不记录&#xff09;。AOF文件是一个只追加的文件&…

文心一言4.0免费使用

领取&安装链接&#xff1a;Baidu Comate 领取季卡 有图有真相 原理&#xff1a;百度comate使用文心一言最新的4.0模型。百度comate目前免费使用&#xff0c;可以借助comate达到免费使用4.0模型目的。 如何获得 点击「Baidu Comate 领取季卡 -> 领取权益」&#xff0…

分享一个自学AI的文档-通往AGI之路

通往AGI之路-自学神器 这是是一个有关AI知识的开源文档。 但是&#xff0c;我认为这是小白学习AI的最强王者&#xff0c;每一个想学习AI、想使用AI的人都可以把它设为首页&#xff0c;从它开始。 飞书文档&#xff1a;通往AGI之路 通往AGI之路是由一群热爱A|的专家和爱好者…

[数据库]索引机制

目录 索引机制 索引的类型 索引使用 哪些适合添加索引 ​编辑 索引机制 当没有索引的时候, 如下示例,在找到id等于1的时候, 仍然会往下继续查找, 进行全表扫描, 因为它认为下面也有可能还会有1 加上索引之后进行二叉树查找, 找到1之后, 发现1的左边没有了, 右边也没有了就停…

我关于Excel使用点滴的笔记

本篇笔记是我关于Excel使用点滴的学习笔记&#xff0c;摘要和地址链接列表。临时暂挂&#xff0c;后面可能在不需要时删除。 (笔记模板由python脚本于2024年06月28日 12:23:32创建&#xff0c;本篇笔记适合初通Python&#xff0c;熟悉六大基本数据(str字符串、int整型、float浮…

GPT-5 一年半后发布,对此你有何期待?

IT之家6月22日消息&#xff0c;在美国达特茅斯工程学院周四公布的采访中&#xff0c;OpenAI首席技术官米拉穆拉蒂被问及GPT-5是否会在明年发布&#xff0c;给出了肯定答案并表示将在一年半后发布。此外&#xff0c;穆拉蒂在采访中还把GPT-4到GPT-5的飞跃描述为高中生到博士生的…

find()方法——字符串首次出现的索引位置

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 语法参考 find()方法实现查询一个字符串在其本身字符串对象中首次出现的索引位置&#xff0c;如起始位置从11到结束位置17之间子字符串出现的位置&a…

使用Endnote中英文等的实现和GB7714格式

Endnote是一款被广泛使用的文献管理软件&#xff0c;其是SCI&#xff08;Thomson Scientific 公司&#xff09;的官方软件&#xff0c;支持国际期刊的参考文献格式有3776 种【也可以自定义期刊引用格式】。 软件非常方便科研狗进行文献整理&#xff0c;写笔记&#xff0c;做备…

深度学习经典检测方法概述

一、深度学习经典检测方法 two-stage&#xff08;两阶段&#xff09;&#xff1a;Faster-rcnn Mask-Rcnn系列 one-stage&#xff08;单阶段&#xff09;&#xff1a;YOLO系列 1. one-stage 最核心的优势&#xff1a;速度非常快&#xff0c;适合做实时检测任务&#xff01; 但是…

Spring简单学习

Spring简单学习 spring中的设计模式工厂模式简单工厂模式工厂方法模式 Springmvc简单学习MVC模式Spring MVC 常用组件Spring MVC 工作流程 [Spring IOC](https://juejin.cn/post/6983957368115560485#heading-3)IOC是什么使用IOCSpring提供的IOC容器Spring的IOC实现原理**Root …

第4章_USB 设备编程

文章目录 4.1 USB 学习指南4.1 USB 学习指南 4.2 USB 系统硬件框架和软件框架4.2.1 实验现象4.2.2 硬件框架4.2.3 软件框架 4.3 软件工程师眼里的 USB 电气信号4.3.1 USB 设备状态切换图4.3.2 硬件线路4.3.3 电子信号4.3.4 低速/全速信号电平4.3.5 高速信号电平4.3.6 设备连接与…

【报错记录】第一次安装VUE启动失败sh: vite: command not found

大家好&#xff0c;我是DX3906 &#x1f308; 欢迎莅临我的个人主页 &#x1f448;这里是我静心耕耘大前端领域、真诚分享知识与智慧的小天地&#xff01;&#x1f387; 问题描述&#xff1a;第一次使用npm create vuelatest搭建vue3.0脚手架的时候&#xff0c;运行npm run dev…