11.Kafka系列之Stream实践

news2025/1/13 13:28:38

Kafka Streams是一个基于Apache Kafka的处理库,可以用于实现高效、可扩展的实时数据处理应用程序。它是一个轻量级的库,允许你在Java和Scala中创建和运行流处理应用程序,这些应用程序可以读取输入流,执行各种数据转换,然后将处理后的结果发送到输出流。

1. 主要优势

1.简单易用:Kafka Streams提供了一个简单易用的编程模型,让你能够轻松地编写和调试流处理应用程序
2.高效性能:Kafka Streams使用Kafka作为底层存储和消息传递系统,利用了Kafka的高性能、低延迟和可伸缩性
3.可扩展性:Kafka Streams可以水平扩展,使你能够轻松地处理大规模数据流
4.容错性:Kafka Streams提供了一些内置的机制来保证应用程序的容错性,如重新平衡、故障恢复
5.集成性:Kafka Streams可以与其他Kafka生态系统中的工具和组件无缝集成,如Kafka Connect、Kafka Producer和Kafka Consumer
Kafka Streams可以用于各种实时数据处理应用程序,如实时ETL、实时分析、实时监控

2. 实时分析交易数据实践

该示例演示如何实现一个实时的分析应用程序,从Kafka主题中读取交易数据,执行一些复杂的数据转换和计算,并将结果写回到另一个Kafka主题中

2.1 创建对应主题
kafka-topics.sh --create --topic transactions --bootstrap-server localhost:9092
kafka-topics.sh --create --topic customer-analysis --bootstrap-server localhost:9092
2.2 POM.xml引入Kafka依赖
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.4.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>2.0.31</version>
</dependency>
2.3 代码主要逻辑

其他代码请关注公众号 算法小生,回复Kafka Stream即可

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class TransactionAnalyzer {


  public static void main(String[] args) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "transaction-analyzer");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:30092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new TransactionSerde().getClass());
    
    final StreamsBuilder builder = new StreamsBuilder();
    
    KStream<String, Transaction> transactions = builder.stream("transactions");
    
    // 计算每个客户的交易总额
    KTable<String, Double> customerTotalSpending = transactions
        .groupBy((key, transaction) -> transaction.getCustomerId())
        .aggregate(() -> 0.0, (customerId, transaction, total) -> total + transaction.getAmount(),
                Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("customer-total-spending-store")
                        .withValueSerde(Serdes.Double()));
    
    // 计算每个客户的平均交易额
    KTable<String, Double> customerAvgSpending = transactions
        .groupBy((key, transaction) -> transaction.getCustomerId())
        .aggregate(() -> new TotalAndCount(0.0, 0L),
                   (customerId, transaction, totalAndCount) -> totalAndCount.add(transaction.getAmount()),
                Materialized.<String, TotalAndCount, KeyValueStore<Bytes, byte[]>>as("customer-avg-spending-store")
                        .withValueSerde(new TotalAndCountSerde()))
        .mapValues((totalAndCount) -> totalAndCount.getTotal() / totalAndCount.getCount(),
                Materialized.with(Serdes.String(), Serdes.Double()))
        // 去重,保证JOIN后结果唯一
        .suppress(Suppressed.untilTimeLimit(null, Suppressed.BufferConfig.unbounded()));;

    // 找出每个客户的最大交易额
    KTable<String, Double> customerMaxSpending = transactions
        .groupBy((key, transaction) -> transaction.getCustomerId())
        .reduce((transaction1, transaction2) ->
                transaction1.getAmount() > transaction2.getAmount() ? transaction1 : transaction2)
        .mapValues((transaction) -> transaction.getAmount(), Named.as("customer-max-spending"),
                Materialized.with(Serdes.String(), Serdes.Double()))
        // 去重,保证JOIN后结果唯一
        .suppress(Suppressed.untilTimeLimit(null, Suppressed.BufferConfig.unbounded()));

    // 将所有结果合并,并写入到输出主题中
    KStream<String, CustomerAnalysis> analysis = customerTotalSpending
        .join(customerAvgSpending, (total, avg) -> new CustomerAnalysis(total, avg))
        .join(customerMaxSpending, (analysis1, max) -> analysis1.withMax(max))
        .mapValues((key, analysis2) -> analysis2.normalize(key))
        .toStream()
        .map((key, analysis3) -> KeyValue.pair(key, analysis3));

    analysis.to("customer-analysis", Produced.with(Serdes.String(), new CustomerAnalysisSerde()));

    Topology topology = builder.build();
    KafkaStreams streams = new KafkaStreams(topology, props);
    final CountDownLatch latch = new CountDownLatch(1);

    // attach shutdown handler to catch control-c
    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
      @Override
      public void run() {
        streams.close();
        latch.countDown();
      }
    });

    try {
      streams.start();
      latch.await();
    } catch (Throwable e) {
      System.exit(1);
    }
    System.exit(0);
  }
}
2.4 运行并实时查看

启动程序后

我们新开窗口实时查看统计信息

kafka-console-consumer.sh --topic customer-analysis --bootstrap-server localhost:9092

现在我们写入一些交易信息,稍等片刻,查看统计窗口变化情况

$ kafka-console-producer.sh --topic transactions --bootstrap-server localhost:9092
>{"transactionId": "000001", "customerId": "00001", "amount": 10000, "timestamp": 1683983695}
>{"transactionId": "000002", "customerId": "00002", "amount": 20000, "timestamp": 1683983695}
>{"transactionId": "000003", "customerId": "00002", "amount": 30000, "timestamp": 1683983896}
>{"transactionId": "000004", "customerId": "00001", "amount": 20000, "timestamp": 1683983896}

3. 错误总结

1.运行后,出现NullPointer in ProcessorParameters.toString

java.lang.NullPointerException: Cannot invoke "org.apache.kafka.streams.processor.api.ProcessorSupplier.get()" because "this.processorSupplier" is null
	at org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters.toString(ProcessorParameters.java:133)

Kafka3.3.1的bug,请升级Kafka版本,我升级到3.4.0,并且环境变量改为如下即可启动,kafka-server在hosts中配置对应IP

    spec:
      containers:
        - name: kafka
          image: bitnami/kafka:3.4.0
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 9092
              name: web
              protocol: TCP
          env:
            - name: KAFKA_CFG_ZOOKEEPER_CONNECT
              value: zookeeper-pod.middleware:2181
            - name: KAFKA_CFG_ADVERTISED_LISTENERS
              value: PLAINTEXT://kafka-server:30092
            #- name: KAFKA_HEAP_OPTS
            #  value: -Xmx2048m -Xms2048m
            - name: ALLOW_PLAINTEXT_LISTENER
              value: "yes"

2.序列化失败

Caused by: com.alibaba.fastjson.JSONException: illegal fieldName input, offset 7, character , line 1, column 8, fastjson-version 2.0.31 10000.0
	at com.alibaba.fastjson.JSON.parseObject(JSON.java:518)
	at online.shenjian.kafka.JsonDeserializer.deserialize(JsonDeserializer.java:26)

我们配置的默认序列化为

props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new TransactionSerde().getClass().getName());

而在KTable中我们定义对应的序列化即可

// 计算每个客户的交易总额
KTable<String, Double> customerTotalSpending = transactions
   .groupBy((key, transaction) -> transaction.getCustomerId())
   .aggregate(() -> 0.0, (customerId, transaction, total) -> total + transaction.getAmount(),
                Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("customer-total-spending-store")
                        .withValueSerde(Serdes.Double()));

3.合并结果,出现重复记录问题

我们在需要JOIN的两个KTable最后加入如下代码即可。对于本例中的 customerMaxSpending 流,可能会存在多条记录有相同的 key,即多个顾客的最大消费额相同。因此,在进行 join 操作之前,需要使用 suppress 方法对每个 key 的所有记录进行合并,以保证最终输出的结果中每个 key 只对应一个最大消费额

        .suppress(Suppressed.untilTimeLimit(null, Suppressed.BufferConfig.unbounded()));

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/524901.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MyBatis的配置案例

Mybatis中Map的使用 如果需要所有的代码&#xff0c;可以看我上一篇 在接口中定义 int addUser1(Map<String,Object> map); 插入语句 <insert id"addUser1">insert into user(id,name,pwd) values (#{userid},#{username},#{userPwd})</insert> …

Vue3-黑马(十)

目录&#xff1a; &#xff08;1&#xff09;vue3-antdv-全局提示与校验 &#xff08;2&#xff09;vue3-进阶-router-入门 &#xff08;3&#xff09;vue3-进阶-router-动态导入-嵌套路由-重定向 &#xff08;1&#xff09;vue3-antdv-全局提示与校验 当用户新增修改&…

redis(11)

一)基于Set集合实现点赞功能: 在我们的博客表当中&#xff0c;每一篇博客信息都有一个like字段&#xff0c;表示点赞的数量 需求: 1)同一个用户只能点赞一次&#xff0c;再次进行点赞则会被取消&#xff1b; 2)如果当前用户已经点赞过了&#xff0c;那么点赞按钮高亮显示&…

传输层:UDP协议

传输层中有两个重要的协议&#xff1a;TCP协议和UDP协议。 本博文分享的是UDP协议&#xff0c;本文将从UDP的协议格式、UDP的特定以及其缓冲区入手。 传输层 传输层的作用是负责数据能够从发送端传输到接收端&#xff0c;主要是传输策略。 端口号 端口号标识的是一个主机上进…

【AIGC提示工程 - Midjourney教程:三】如何利用Midjourney AI创作一幅杰出的艺术作品?

关注元壤教育公众号系统学习AIGC提示工程课程。 更多AIGC好博客&#xff0c;请移步访问AIGC博客派 要在Discord上使用Midjourney机器人&#xff0c;您需要输入一个指令。指令能帮助您创建图片、修改默认设置、监控用户信息以及执行其他有用的操作。如果想要生成一张图片&#x…

RHCSA之Linux目录结构

目录 Linux目录结构 Linux目录结构的特点 分区‘加载’于目录结构 Windows与Linux目录结构区别 工作目录、用户家目录及路径 Linux的文件类型大致可分为 查看文件系统类型 方法一 ls -l 路径 方法二 file 路径 RHCSA之Linux目录结构 使用树形目录结构来组织和管理文件 整个…

51单片机(十三)DS18B20温度传感器

❤️ 专栏简介&#xff1a;本专栏记录了从零学习单片机的过程&#xff0c;其中包括51单片机和STM32单片机两部分&#xff1b;建议先学习51单片机&#xff0c;其是STM32等高级单片机的基础&#xff1b;这样再学习STM32时才能融会贯通。 ☀️ 专栏适用人群 &#xff1a;适用于想要…

第14届蓝桥杯C++省赛(初级)真题

一、选择题&#xff08;50分&#xff09; 第 1 题 单选题&#xff08;10分&#xff09; C中&#xff0c;bool类型的变量占用字节数为 ( )。 *选择题严禁使用程序验证&#xff0c;选择题不答或答错都不扣分 A.1 B.2 C.3 D.4 第 2 题 单选题&#xff08;10分&#xff09;…

Windows 下载与安装CUDA和Pytorch【安装教程、深度学习】

参考链接&#xff1a;Windows 下安装 CUDA 和 Pytorch 跑深度学习 - 动手学深度学习v2_哔哩哔哩_bilibili 0.准备工作 请确保你是NVIDIA的显卡&#xff08;不能是AMD、集成显卡&#xff09; 1.下载CUDA 打开developer.nvidia.com/cuda-downloads&#xff0c;打开有点慢 选择…

2023/5/14 数值计算方法考试复盘

第一题 问我1-()如果减少乘除次数,那么如何做出变形。 正确解法&#xff1a; 可以利用乘法分配律&#xff0c;将1拆分成1 - 1/2! 1/2! - 1/3! 1/3! - ... - 1/n! 1/n!&#xff0c;然后将拆分出来的两项合并&#xff0c;得到&#xff1a; 1 - (1/2! - 1/2!) - (1/3! - 1/3…

支付系统设计一:支付系统产品化

系列文章目录 支付系统设计一&#xff1a;支付系统产品化 支付系统设计二&#xff1a;统一开发框架 支付系统设计三&#xff1a;渠道网关设计01-总览 支付系统设计三&#xff1a;渠道网关设计02-客户端报文解析 支付系统设计三&#xff1a;渠道网关设计03-参数验证 支付系统设…

在 Windows 上安装 Docker

一、前言 个人主页: ζ小菜鸡大家好我是ζ小菜鸡&#xff0c;让我们一起学习在 Windows 上安装Docker。如果文章对你有帮助、欢迎关注、点赞、收藏(一键三连) 二、 Docker是什么 Docker是一个开源的应用容器引擎&#xff0c;让开发者可以打包他们的应用以及依赖包到一个可抑制的…

K8s(Kubernetes)学习(一):k8s概念及组件

Kubernetes中文文档&#xff1a;https://kubernetes.io/zh-cn/docs/home/ Kubernetes源码地址&#xff1a;https://github.com/kubernetes/kubernetes 一:Kubernetes是什么 首先要了解应用程序部署经历了以下几个时代&#xff1a; 传统部署时代&#xff1a;在物理服务器上运…

C语言——表达式求值中类型转换和优先级等问题

目录 1.隐式类型转换 2.算数转换 ​3.操作符的属性 1.隐式类型转换 C的整型算术运算总是至少以缺省整型类型的精度来进行的。 为了获得这个精度&#xff0c;表达式中的字符和短整型操作数在使用之前被转换为普通整型&#xff0c;这种转换称为整型提升。 整型提升的意义&a…

Sentinel : 服务容错(降级熔断、流量整形)

什么是服务雪崩&#xff1f; 服务雪崩效应是一种因“服务提供者的不可用”&#xff08;原因&#xff09;导致“服务调用者不可用”&#xff08;结果&#xff09;&#xff0c;并将不可用逐渐放大的现象。 我来用一个模拟场景带你感受一下服务雪崩的厉害之处。假设我有一个微服…

小世界网络评估

小世界网络评估 文章目录 小世界网络评估[toc]1、网络小世界定义2、网络评估R代码 1、网络小世界定义 现实中许多网络巨型组件都发现了“小世界特性”。小世界特性是指 网络节点间最短路径通常较小网络聚集系数较高 网络最短路径L计算公式为 L 1 n ( n − 1 ) ∑ i ⩾ j d…

【JZ-7Y-16静态中间继电器 触点容量大、电阻小 抗干扰强 JOSEF约瑟】

系列型号&#xff1a; JZ-7Y-15静态中间继电器&#xff1b; JZ-7J-15静态中间继电器&#xff1b; JZ-7L-15静态中间继电器&#xff1b; JZ-7D-15静态中间继电器&#xff1b; JZ-7Y-16静态中间继电器&#xff1b; JZ-7J-16静态中间继电器&#xff1b; JZ-7L-16静态中间继…

自动化渗透测试自动化挖掘src(2)

文章目录 前言思路ICP备案子域名枚举收集可用服务漏洞攻击 前言 上一谈我们讨论了自动化渗透测试的实验&#xff0c;但是他过于依赖fofa&#xff0c;不得不承认&#xff0c;fofa在资产收集这方面做的确实很厉害&#xff0c;但是就是需要花钱&#xff0c;那有没有不需要花钱都手…

UI GameObject可以在Scene View中显示,但是在Game View不能显示

出现UI GameObject可以在Scene View中显示&#xff0c;但是在Game View不能显示这种问题&#xff0c;可能有很多种原因&#xff0c;例如Layer设定、Camera Clipping设定、font assets等问题。 对于TextMeshPro&#xff0c;还有Material Set的问题&#xff0c;见这篇文章。 而我…

PHP 8.2:它对 WordPress、插件和开发者意味着什么?

PHP 8.2.0于 2022 年 12 月 8 日首次亮相。作为一项重大更新&#xff0c;它带来了性能改进和更简单的语法。PHP 8.2 引入了更高的类型安全性作为一项功能&#xff0c;将null、false和true作为独立类型。可能挑战 WordPress 开发人员的最大变化之一是添加了只读类&#xff0c;它…