KafkaStream:Springboot中集成

news2024/12/28 11:01:00

1、在kafka-demo中创建配置类

        配置kafka参数

package com.heima.kafkademo.config;

import lombok.Data;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;

import java.util.HashMap;
import java.util.Map;

/**
 * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
 */

@Data
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
    private String hosts;
    private String group;
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
        props.put(StreamsConfig.RETRIES_CONFIG, 10);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return new KafkaStreamsConfiguration(props);
    }
}

2、在application.yml中配置上面配置类需要的参数

server:
  port: 9991
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:
  hosts: 192.168.200.130:9092
  group: ${spring.application.name}

3、新增配置类,创建KStream对象,进行聚合

package com.heima.kafkademo.stream;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;
import java.util.Arrays;

@Configuration
@Slf4j
public class KafkaStreamHelloListener {

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //创建kstream对象,同时指定从那个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                    @Override
                    public Iterable<String> apply(String value) {
                        return Arrays.asList(value.split(" "));
                    }
                })
                //根据value进行聚合分组
                .groupBy((key,value)->value)
                //聚合计算时间间隔
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //求单词的个数
                .count()
                .toStream()
                //处理后的结果转换为string字符串
                .map((key,value)->{
                    System.out.println("key:"+key+",value:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //发送消息
                .to("itcast-topic-out");
        return stream;
    }
}

4、启动kafka-demo服务测试

        使用生产者发送消息可以看到控制台接收成功

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/871579.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于grpc从零开始搭建一个准生产分布式应用(2) - 工程构建

开始本章之前默认读者已经配置好了以下环境&#xff1a;Intellij IDEA 2022.1.2、JDK 1.8.0_144、Maven 3&#xff0c;另外也建议大家在一些免费代码托管平台开个帐号&#xff0c;这样就可以免费使用git做版本处理了&#xff0c;笔者自己私人使用的是阿里云的云效平台。因为此专…

【Lua基础入门】解密世界上最快的脚本语言

文章目录 前言一、Lua简介二、Lua功能三、安装LuaUbuntu LinuxWindows安装Lua 四、第一个Lua程序总结 前言 Lua是一种轻量级、快速且可嵌入的脚本语言&#xff0c;广泛应用于游戏开发、嵌入式系统、脚本扩展等领域。它的设计目标是简单、高效、可定制和易于集成。本文将介绍Lu…

射频入门知识-1

信号源 示波器 综合测试仪 功率计 噪声测试仪 频谱分析仪 频谱分析仪: 放大器的噪声系数测试 放大器增益测试 噪声和增益是放大器的最关键指标&#xff0c;学学怎么用频谱仪做放大器的噪声测试 那个 hbf740 输入和输出阻抗匹配具体怎么搞 《ADS2011射频电路设计与…

iOS Epub阅读器改造记录

六个月前在这个YHEpubDemo阅读器的基础上做了一些优化&#xff0c;这里做一下记录。 1.首行缩进修复 由于分页的存在&#xff0c;新的一页的首行可能是新的一行&#xff0c;则应该缩进&#xff1b;也可能是前面一页段落的延续&#xff0c;这时候不应该缩进。YHEpubDemo基于XDS…

Cenos7 搭建Minio集群部署服务器(一)

------> 道 | 法 | 术 | 器 | 势 <------ 多台服务器间免密登录|免密拷贝 Cenos7 搭建Minio集群部署服务器(一) 企业级开源对象存储(看看官网吹的牛B) 开源为云提供动力。开源为企业提供动力。开源为 MinIO 提供支持。每天都有成千上万的客户和社区成员信任 Mi…

Spring Boot @Validated 验证注解的使用

1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId> </dependency> 2、使用 2.1、非对象参数 参数如果是非对象格式&#xff0c;需要在controller类上面添…

cf暑假训练 1700-1800 day1

cf暑假训练 1700-1800 day1 1852B Imbalanced Arrays1850H. The Third Letter1833G Ksyusha and Chinchilla1833F Ira and Flamenco&#xff08;补完线段树来看&#xff09;1809D Binary String Sorting1780D Bit Guessing Game&#xff08;这题真的好难&#xff0c;我只能说我…

yolov5、YOLOv7、YOLOv8改进:注意力机制CA

论文题目&#xff1a;《Coordinate Attention for Efficient Mobile NetWork Design》论文地址&#xff1a; https://arxiv.org/pdf/2103.02907.pdf 本文中&#xff0c;作者通过将位置信息嵌入到通道注意力中提出了一种新颖的移动网络注意力机制&#xff0c;将其称为“Coordin…

msvcp120.dll丢失的解决方法?分享三种常见解决方法

msvcp120.dll是一个动态链接库文件&#xff0c;它是Microsoft Visual C Redistributable包中的一个组成部分。它是用于支持C编程语言的运行时库文件之一。它包含了许多标准库函数、容器类、算法和其他与C语言相关的功能。这些功能包括内存管理、字符串处理、数学计算、文件操作…

每日一题 206反转链表

题目 给你单链表的头节点 head &#xff0c;请你反转链表&#xff0c;并返回反转后的链表。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5] 输出&#xff1a;[5,4,3,2,1]示例 2&#xff1a; 输入&#xff1a;head [1,2] 输出&#xff1a;[2,1]示例 3&#xff1a; …

浅谈XML配置实现逻辑

XML简介 什么是XML&#xff1f; xml是可扩展的标记语言 XML的作用 主要作用&#xff1a; 1.用来保存数据&#xff0c;而且这些数据具有自我描述性 2.他可以作为项目或者模块的配置文件 3.还可以作为网络传输数据的格式&#xff08;现在JSON为主&#xff09; 第一个实例 命…

计算机组成原理-笔记-汇总

&#x1f4da; 前言 本人在备考408&#xff0c;王道讲得的确不错&#xff0c;本人之前也看过哈工大【刘宏伟老师】的课&#xff0c;两者对比下来。 王道——更加基础&#xff0c;对小白更加友好哈工大——偏实践偏硬件&#xff08;会将更多的代码硬件设计&#xff09; PS&#…

SpringBoot2-Tomcat部署

1.排除内置 Tomcat 在pom.xml文件中的下添加以下代码&#xff0c;用于排除SpringBoot内置Tomcat <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion&…

什么是CSS中的渐变(gradient)?如何使用CSS创建线性渐变和径向渐变?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 渐变&#xff08;Gradient&#xff09;在CSS中的应用⭐ 线性渐变&#xff08;Linear Gradient&#xff09;语法&#xff1a;示例&#xff1a; ⭐ 径向渐变&#xff08;Radial Gradient&#xff09;语法&#xff1a;示例&#xff1a; ⭐ 写…

Kotlin和Java互操作时的可空性

注&#xff1a;文中demo的kt版本是1.7.10 一、kotlin语言中的可空性设计 在Java语言中的NPE&#xff08;NullPointerException&#xff09;可以说非常常见&#xff0c;而且诟病已久。 kotlin做为后起之秀&#xff0c;在空指针的问题上进行了升级&#xff0c;即&#xff1…

day9 10-牛客67道剑指offer-JZ66、19、20、75、23、76、8、28、77、78

文章目录 1. JZ66 构建乘积数组暴力解法双向遍历 2. JZ19 正则表达式匹配3. JZ20 表示数值的字符串有限状态机遍历 4. JZ75 字符流中第一个不重复的字符5. JZ23 链表中环的入口结点快慢指针哈希表 6. JZ76 删除链表中重复的结点快慢指针三指针如果只保留一个重复结点 7. JZ8 二…

HttpRunner自动化工具之设置代理和请求证书验证

httprunner设置代理&#xff1a; httprunner 库本身没有提供设置代理的接口&#xff0c;但是底层使用了urllib.requests 等库&#xff0c;可以设置HTTP_PROXY 和HTTPS_PROXY 环境变量&#xff0c;常用的网络库会自动识别这些环境变量。 日常调试使用代理&#xff08;如charles…

SQL-每日一题【】

题目 Employees 表&#xff1a; EmployeeUNI 表&#xff1a; 展示每位用户的 唯一标识码&#xff08;unique ID &#xff09;&#xff1b;如果某位员工没有唯一标识码&#xff0c;使用 null 填充即可。 你可以以 任意 顺序返回结果表。 返回结果的格式如下例所示。 示例 1&a…

使用ip2region获取客户端地区

目录 从gitee拉取ip2region.xdb资源文件 写测试类 注意要写对资源路径 本地测试结果 ​编辑 远端测试结果 从gitee拉取ip2region.xdb资源文件 git clone https://gitee.com/lionsoul/ip2region.git 将xdb放入resources资源文件夹 写测试类 private Searcher searcher;GetMap…

funbox3靶场渗透笔记

funbox3靶场渗透笔记 靶机地址 https://download.vulnhub.com/funbox/Funbox3.ova 信息收集 fscan找主机ip192.168.177.199 .\fscan64.exe -h 192.168.177.0/24___ _/ _ \ ___ ___ _ __ __ _ ___| | __/ /_\/____/ __|/ __| __/ _ |/ …