KafkaStream:基本使用

news2024/11/23 15:37:29

简介:

        kafkaStream:提供了对存储在kafka中的数据进行流式处理和分析的功能

特点:

        KafkasSream提供了一个非常简单轻量的Library,它可以非常方便的嵌入到java程序中,也可以任何方式打包部署

入门案例:

  1、新建工程kafka-demo

           引入kafkaStream依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- kafkfa -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>

        <!--kafkaStream-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>connect-json</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

   2、新建流式处理类

          代码如下

package com.heima.kafkademo.sample;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/*
* 流式处理
* */
public class KafkaStreamQuickStart {
    public static void main(String[] args) {
        /*创建kafka配置中心并配置参数*/
        Properties prop = new Properties();
        //连接地址
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        //key序列化
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        //value序列化
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        //创建id名称
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");

        //stream构造器
        StreamsBuilder streamsBuilder = new StreamsBuilder();

        //流式计算
        streamProcessor(streamsBuilder);

        //创建KafkaStream对象
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);

        //开启流式计算
        kafkaStreams.start();
    }

    //流式计算方法
    private static void streamProcessor(StreamsBuilder streamsBuilder) {
        //创建kafka对象,同时指定从哪个topic获取消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        //处理消息的value
        stream.flatMapValues(new ValueMapper<String, Iterable<?>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split(" "));
            }
        })      //按照value进行聚合
                .groupBy((key,value)->value)
                //时间窗口,每隔10秒更新一次
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //统计单词个数
                .count()
                //转换为kStream
                .toStream()
                .map((key,value)->{
                    System.out.println("key:"+key+",vlaue:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //发送消息
                .to("itcast-topic-out");
    }
}

3、启动消费者类和流式处理类监听消息

        使用生产者类发送消息

       消费者和生产者类代码参考Kafka:安装和配置_Success___的博客-CSDN博客

4、测试

        成功接收到消息

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/871523.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在C++中怎么判断一个东西是左值还是右值

2023年8月13日&#xff0c;周日早上 #include <iostream>// 函数模板&#xff0c;根据参数类型判断是左值还是右值 template<typename T> void checkValueType(T&& arg) {if constexpr(std::is_lvalue_reference<T>())std::cout << "Expr…

运营商加速抢占云计算,5G带来更多利润,上半年躺赚近千亿

中国移动已率先公布了上半年的业绩&#xff0c;业绩显示它们正在加速抢占此前由互联网企业占据优势的云计算市场&#xff0c;不过最让它们开心的还是5G继续带动利润的高速增长&#xff0c;预计三大运营商今年上半年可以躺赚近千亿元。 中国移动无疑仍然是运营商行业的领头羊&am…

tampermonky插件安装

换了电脑&#xff0c;今天重新下载了tampermonky&#xff0c;哎&#xff0c;被自己气死的一上午。wwwww&#xff0c;最后发现最大的问题就是没有问题。 Edge能不用下载直接安装&#xff0c;但是何少 下载tampermonky&#xff0c;在谷歌程序扩展-chrome://extensions/ 添加解压…

VirtualBox 导致的系统蓝屏问题

1&#xff0c;操作系统win7,已经启动了结果蓝屏 2&#xff0c;进入操作系统的命令行模式删除virtualbox的相关目录可以启动&#xff0c;但是无法在控制面板卸载&#xff0c;只要点击卸载就会发生蓝屏 3&#xff0c;删除注册表的相关内容 4&#xff0c;导致无线网络链接受限&…

京东面试曝光:零点秒杀如何防止Redis雪崩崩溃?

大家好&#xff0c;我是你们的小米&#xff01;今天要和大家聊一个超有技术含量的话题&#xff0c;那就是如何应对零点秒杀活动下的Redis雪崩问题。在京东的面试题中&#xff0c;这个问题可是相当有挑战性哦&#xff01;废话不多说&#xff0c;咱们直接进入主题吧。 什么是雪崩…

Academic Inquiry|国外文献查找

一个失去了男子气概的人总有很多忧虑&#xff0c;这样就可以分散注意力&#xff0c;而不必为那件特别的羞耻而苦恼不堪。 ——《狂野之夜》〔美〕乔伊斯卡罗尔欧茨著 樊维娜译 许多研究者在进行研究的时候&#xff0c;都会查找对应主体的国内外引用文献&#xff0c;而大多得出的…

2023杭州十大画室排行榜丨哪家画室是你的最佳选择?

美术&#xff0c;是一种表达&#xff0c;是一种情感的释放&#xff0c;也是一种对美的追求。而在杭州的画室&#xff0c;就是这种追求的最好舞台。接下来&#xff0c;我们将一起揭晓杭州十大画室&#xff0c;一起感受艺术的独特魅力&#xff0c;一起感受那种深入骨髓的艺术冲击…

2023/08/13_______JVM(CG)垃圾回收 算法(复制算法,标记清除,标记清除压缩)

JVM GC算法 复制算法 1&#xff0c;每一次GC都会将伊甸&#xff08;Eden&#xff09;活的对象移到幸存区中&#xff1a;一旦Eden区被GC后 就会是空 只要有内容就是from区 谁空谁是to区 内存会从 伊甸->幸存区to->幸存from&#xff08;这个时候to和from交换区域&#xf…

STM32入门学习之定时器输入捕获

1.定时器的输入捕获可以用来测量脉冲宽度或者测量频率。输入捕获的原理图如下&#xff1a; 假设定时器是向上计数。在图中&#xff0c;t1~t2之间的便是我们要测量的高电平的时间(脉冲宽度)。首先&#xff0c;设置定时器为上升沿捕获&#xff0c;如此一来&#xff0c;在t1时刻可…

C语言刷题训练【第十天】

大家好&#xff0c;我是纪宁。 今天是C语言刷题训练的第十天&#xff0c;加油&#xff01; 文章目录 &#x1f386;1、求函数返回值&#xff0c;传入 -1 &#xff0c;则在64位机器上函数返回&#xff08; &#xff09;&#x1f3a4;2、读代码选结果&#xff08; &#xff09;&…

8.13树的总结(有新知识再更新)

二叉树题目几个重点&#xff1a; 1. 理解递归&#xff0c;优先掌握递归实现 递归三部曲&#xff1a;1.确定递归函数的参数和返回类型&#xff1b;2.确定终止条件 &#xff1b;3.确定递归逻辑 因为递归一层一层对我来说有点绕&#xff0c;主要感悟就是只针对某一个节点思考&…

Mac思维导图软件Xmind for Mac中文激活版

好的思维导图软件能帮助用户更好的发挥创作能力&#xff0c;XMind是一款流行的思维导图软件&#xff0c;可以帮助用户创建各种类型的思维导图和概念图。 多样化的导图类型&#xff1a;XMind提供了多种类型的导图&#xff0c;如鱼骨图、树形图、机构图等&#xff0c;可以满足不同…

计算机视觉中的特征检测和描述

一、说明 这篇文章是关于计算机视觉中特征检测和描述概念的简要理解。在其中&#xff0c;我们探讨了它们的定义、常用技术、简单的 python 实现和一些限制。 二、什么是特征检测和描述&#xff1f; 特征检测和描述是计算机视觉中的基本概念&#xff0c;在图像识别、对象跟踪和图…

Seaborn图表使用指南!

目录 介绍线图散点图直方图概率密度函数 &#xff08;PDF&#xff09;箱线图小提琴剧情配对图热图关节图地毯图 一、介绍 数据科学已成为一个突出的领域&#xff0c;近年来呈爆炸性增长。对精通从数据中获取见解并应用这些见解来解决现实世界问题的数据科学家的需求从未增加。…

centos 7.x 单用户模式

最近碰到 centos 7.9 一些参数设置错误无法启动系统的情况&#xff0c;研究后可以使用单用户模式进入系统进行恢复操作。 进入启动界面&#xff0c;按 e ro 替换为 rw init/sysroot/bin/sh 替换前 替换后 Ctrl-x 进行重启进入单用户模式 执行 chroot /sysroot 可以查看日…

数据库操作不再困难,MyBatis动态Sql标签解析

系列文章目录 MyBatis缓存原理 Mybatis的CachingExecutor与二级缓存 Mybatis plugin 的使用及原理 MyBatis四大组件Executor、StatementHandler、ParameterHandler、ResultSetHandler 详解 MyBatisSpringboot 启动到SQL执行全流程 数据库操作不再困难&#xff0c;MyBatis动态S…

一个软件测试面试相关的面试题目,你们会做吗?

有这样一个面试题&#xff1a;在一个Web测试页面上&#xff0c;有一个输入框&#xff0c;一个计数器&#xff08;count&#xff09;按钮&#xff0c;用于计算一个文本字符串中字母a出现的个数。请设计一系列测试用例用以测试这个Web页面。 <ignore_js_op> 有经验的测试人…

WS2812B RGB灯带使用

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; 本人持续分享更多关于电子通信专业内容以及嵌入式和单片机的知识&#xff0c;如果大家喜欢&#xff0c;别忘点个赞加个关注哦&#xff0c;让我们一起共同进步~ &#x…