Kafka系列(四)

news2024/11/15 22:45:31

本文接kafka三,代码实践kafkaStream的应用,用来完成流式计算。

kafkastream

        关于流式计算也就是实时处理,无时间概念边界的处理一些数据。想要更有性价比地和java程序进行结合,因此了解了kafka。但是本人阅读了kafka地官网,觉得可阅读性并不是很高,当然是个人认为,就是界面做的就不是很舒服。

简介

简介一下kafkaStream

Kafka Stream的特点
  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署

  • 除了Kafka外,无任何外部依赖

  • 充分利用Kafka分区机制实现水平扩展和顺序性保证(想要保证消息有序性就要设置一个分区)

  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)

  • 支持正好一次处理语义

  • 提供记录级的处理能力,从而实现毫秒级的低延迟

  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)

  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

关键概念
  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。

  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

Kstream

(1)数据结构类似于map,key-value键值对

KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。 数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:

(“ alice”,1)->(“” alice“,3)

如果流处理应用是要总结每个用户的价值,它将返回alice,4。因为第二条数据记录不会覆盖第一条,而是做了一个insert,累加。

代码实现

依赖
       <!-- kafkfa -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>


        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>connect-json</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
kafkaStream配置类

需要在nacos的配置里面配置hosts属性和group,本地等怎么配置都可以,只要能读取到就行。


/**
 * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
 */

@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;

    private String hosts;

    private String group;
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
        props.put(StreamsConfig.RETRIES_CONFIG, 10);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return new KafkaStreamsConfiguration(props);
    }
}

这里生产者和消费者我就不再举例子了,直接举中间这个stream怎么写。

stream需要知道是谁发的,所以生产者和stream需要绑定一个相同的主题,而stream需要知道要给谁发送过去,消费者知道是谁发的,所以stream和消费者又有一个相同的主题。

streamhandler代码

具体的每一行代码的含义结合个人理解都在注释里面。

package com.neu.article.stream;

import com.alibaba.fastjson.JSON;

import com.neu.base.constants.HotArticleConstants;
import com.neu.base.model.mess.ArticleVisitStreamMess;
import com.neu.base.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;

@Configuration
@Slf4j
public class HotArticleStreamHandler {

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //接收消息
        KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
        //聚合流式处理
        stream.map((key,value)->{
            UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
            //重置消息的key:1234343434(文章id)   和  value: likes:1 当前文章点赞一次
            //mess.getType().name():用于区分是点赞还是阅读 mess.getAdd():用于区分是加1还是减1
            return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
        })
                //按照文章id进行聚合
                .groupBy((key,value)->key)
                //时间窗口
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                /**
                 * 自行的完成聚合的计算
                 */
                .aggregate(new Initializer<String>() {
                    /**
                     * 初始方法,返回值是消息的value->aggValue,聚合之后的value
                     * @return
                     */
                    @Override
                    public String apply() {
                        return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
                    }
                    /**
                     * 真正的聚合操作,返回值是消息的value
                     */
                }, new Aggregator<String, String, String>() {
                    /**
                     *
                     * @param key 消息的key :mess.getArticleId().toString()
                     * @param value  消息的value likes:1
                     * @param aggValue   初始化消息聚合后的一个值 COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
                     * @return
                     */
                    @Override
                    public String apply(String key, String value, String aggValue) {
                        System.out.println(value);
                        if(StringUtils.isBlank(value)){
                            return aggValue;
                        }
                        String[] aggAry = aggValue.split(",");
                        int col = 0,com=0,lik=0,vie=0;
                        for (String agg : aggAry) {
                            //agg遍历第一次的时候最开始为 COLLECTION:0
                            String[] split = agg.split(":");//split[0] = COLLECTION  split[1] = 0

                            /**
                             * 获得初始值,也是时间窗口内计算之后的值
                             */
                            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                                case COLLECTION:
                                    col = Integer.parseInt(split[1]);
                                    break;
                                case COMMENT:
                                    com = Integer.parseInt(split[1]);
                                    break;
                                case LIKES:
                                    lik = Integer.parseInt(split[1]);
                                    break;
                                case VIEWS:
                                    vie = Integer.parseInt(split[1]);
                                    break;
                            }
                        }
                        /**
                         * 累加操作   likes:1
                         */
                        String[] valAry = value.split(":");
                        //valAry[0] = likes  valAry[1] = 1
                        switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){
                            case COLLECTION:
                                col += Integer.parseInt(valAry[1]);
                                break;
                            case COMMENT:
                                com += Integer.parseInt(valAry[1]);
                                break;
                            case LIKES:
                                lik += Integer.parseInt(valAry[1]);
                                break;
                            case VIEWS:
                                vie += Integer.parseInt(valAry[1]);
                                break;
                        }
                        //返回值是有要求的,必须与初始化apply方法的返回值形式一致
                        String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
                        System.out.println("文章的id:"+key);
                        System.out.println("当前时间窗口内的消息处理结果:"+formatStr);
                        return formatStr;
                    }
                    //Materialized.as("hot-atricle-stream-count-001"):用于指定六十处理的状态,字符串可以随便给,多个流处理的话不重复就行
                }, Materialized.as("hot-atricle-stream-count-001"))
                .toStream()
                .map((key,value)->{
                    //key.key().toString():文章id,value:COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
                    return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
                })
                //发送消息
                .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);

        return stream;


    }

    /**
     * 格式化消息的value数据
     * @param articleId
     * @param value
     * @return
     */
    public String formatObj(String articleId,String value){
        ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
        mess.setArticleId(Long.valueOf(articleId));
        //COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
        String[] valAry = value.split(",");
        for (String val : valAry) {
            String[] split = val.split(":");
            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                case COLLECTION:
                    mess.setCollect(Integer.parseInt(split[1]));
                    break;
                case COMMENT:
                    mess.setComment(Integer.parseInt(split[1]));
                    break;
                case LIKES:
                    mess.setLike(Integer.parseInt(split[1]));
                    break;
                case VIEWS:
                    mess.setView(Integer.parseInt(split[1]));
                    break;
            }
        }
        log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));
        return JSON.toJSONString(mess);

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1391036.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

32单片机RTC时间接续,掉电时间保存

1、实现思路 前提&#xff1a;首先要实现RTC掉电之后时间还能继续走&#xff0c;RTC电池是必要的 说明&#xff1a;设备第一次启动需要初始化配置RTC&#xff0c;但当二次启动再重新配置RTC会导致RTC计数器置零&#xff0c;所以传统的程序流程是不行的&#xff0c;我们需要知…

苹果最新系统iOS 17的调试和适配方法 - Xcode 14.3.1 真机调试指南

最近苹果发布了iOS 17作为其最新操作系统版本&#xff0c;作为开发者&#xff0c;你可能需要了解如何在Xcode 14.3.1中进行真机调试和适配。本文将为你详细介绍步骤和注意事项。 I. 检查Xcode版本 在开始之前&#xff0c;确保你已经安装了Xcode 14.3.1或更高版本。你可以在Xco…

计算机网络(超详解!) 第二节 数据链路层(上)

1.数据链路层使用的信道 数据链路层使用的信道主要有以下两种类型&#xff1a; 1.点对点信道&#xff1a;这种信道使用一对一的点对点通信方式。 2.广播信道&#xff1a;这种信道使用一对多的广播通信方式&#xff0c;因此过程比较复杂。广播信道上连接的主机很多&#xff0…

学习记录-自动驾驶与机器人中的SLAM技术

以下所有内容均为高翔大神所注的《自动驾驶与机器人中的SLAM技术》中的内容 融合导航 1. EKF和优化的关系 2. 组合导航eskf中的预测部分&#xff0c;主要是F矩阵的构建 template <typename S> bool ESKF<S>::Predict(const IMU& imu) {assert(imu.timestamp…

HCIA——10实验:跨路由转发。静态路由、负载均衡、缺省路由、手工汇总、环回接口。空接口与路由黑洞、浮动静态。

学习目标&#xff1a; 跨路由转发、负载均衡、环回接口、手工汇总、缺省路由、空接口与路由黑洞、浮动静态 学习内容&#xff1a; 跨路由转发静态路由、负载均衡、缺省路由、手工汇总。环回接口空接口与路由黑洞、浮动静态 目录 学习目标&#xff1a; 学习内容&#xff1a…

imgaug库指南(27):从入门到精通的【图像增强】之旅

引言 在深度学习和计算机视觉的世界里&#xff0c;数据是模型训练的基石&#xff0c;其质量与数量直接影响着模型的性能。然而&#xff0c;获取大量高质量的标注数据往往需要耗费大量的时间和资源。正因如此&#xff0c;数据增强技术应运而生&#xff0c;成为了解决这一问题的…

23/76-LeNet

LeNet 早期成功的神经网络。 先使用卷积层来学习图片空间信息。 然后使用全连接层转换到类别空间。 #In[]LeNet,上世纪80年代的产物,最初为了手写识别设计from d2l import torch as d2l import torch from torch import nn from torch.nn.modules.loss import CrossEntropyLos…

Git中,版本库和远程库有什么区别

✅作者简介&#xff1a;大家好&#xff0c;我是Leo&#xff0c;热爱Java后端开发者&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;Leo的博客 &#x1f49e;当前专栏&#xff1a;每天一个知识点 ✨特色专栏&#xff1a…

【jupyter添加虚拟环境内核(pytorch、tensorflow)- 实操可行】

jupyter添加虚拟环境内核&#xff08;pytorch、tensorflow&#xff09;- 实操可行 1、查看当前状态(winR&#xff0c;cmd进入之后)2、激活虚拟环境并进入3、安装ipykernel5、完整步骤代码总结6、进入jupyter 添加pytorch、tensorflow内核操作相同&#xff0c;以下内容默认已经安…

IP定位技术在网络安全行业的探索

随着互联网的普及和深入生活&#xff0c;网络安全问题日益受到人们的关注。作为网络安全领域的重要技术&#xff0c;IP定位技术正逐渐成为行业研究的热点。本文将深入探讨IP定位技术在网络安全行业的应用和探索。 一、IP定位技术的概述 IP定位技术是通过IP地址来确定设备地理位…

muduo网络库剖析——通道Channel类

muduo网络库剖析——通道Channel类 前情从muduo到my_muduo 概要事件种类channel 框架与细节成员函数细节实现使用方法 源码结尾 前情 从muduo到my_muduo 作为一个宏大的、功能健全的muduo库&#xff0c;考虑的肯定是众多情况是否可以高效满足&#xff1b;而作为学习者&#x…

RK3399平台入门到精通系列讲解(硬件篇)常用的硬件工具介绍

🚀返回总目录 文章目录 一、万⽤表1.1、测量交流和直流电压1.2、测量交流和直流电流二、逻辑分析仪三、示波器作为⼀名嵌⼊式开发⼯程师,是有必要对各类常⽤的硬件⼯具有⼀定了解的,你可以不懂怎么使⽤它,但你必须知道它是什么,有什么⽤,在什么时候可以⽤得上。 一、万…

自动驾驶中的坐标系

自动驾驶中的坐标系 自动驾驶中的坐标系 0.引言1.相机传感器坐标系2.激光雷达坐标系3.车体坐标系4.世界坐标系4.1.地理坐标系4.2.投影坐标系4.2.1.投影方式4.2.2.墨卡托(Mercator)投影4.2.3.高斯-克吕格(Gauss-Kruger)投影4.2.4.通用横轴墨卡托UTM&#xff08;UniversalTransve…

Linux命令之用户账户管理whoami,useradd,passwd,chage,usermod,userdel的使用

1、查看当前用户账户 2、切换用户为root用户 3、新建用户user1&#xff0c;给用户user1设置密码为password123 4、新建用户user2&#xff0c;UID为510&#xff0c;指定其所属的私有组为group1&#xff08;group1组的标识符为500&#xff09;&#xff0c;用户的主目录为/home/us…

人类的逻辑常常是演绎、归纳和溯因推理混合

人类的逻辑推理往往是一种综合运用不同推理方式的能力。 演绎推理是从已知的前提出发&#xff0c;推断出必然的结论。通过逻辑规则的应用&#xff0c;人们可以从一些已知的事实或前提出发&#xff0c;得出一个必然成立的结论。演绎推理是一种严密的推理方式&#xff0c;它能够保…

网络安全笔记-SQL注入

文章目录 前言一、数据库1、Information_schema2、相关函数 二、SQL注入分类1、联合查询注入&#xff08;UNION query SQL injection&#xff09;语法 2、报错注入&#xff08;Error-based SQL injection&#xff09;报错注入分类报错函数报错注入原理 3、盲注布尔型盲注&#…

ROS第 2 课 ROS 系统安装和环境搭建

文章目录 方法一&#xff1a;一键安装&#xff08;推荐&#xff09;方法二&#xff1a;逐步安装&#xff08;常规安装方式&#xff09;1.版本选择2.检查 Ubuntu 的软件和更新源3.设置 ROS 的下载源3.1 设置国内下载源3.2 设置公匙3.3 更新软件包 4. 安装 ROS5. 设置环境变量6. …

HBase 基础

HBase 基础 HBase1. HBase简介1.1 HBase定义1.2 HBase数据模型1.2.1 HBase逻辑结构1.2.2 HBase物理存储结构1.2.3 数据模型 1.3 HBase基本架构 2. HBase环境安装2.1 HBase 安装部署2.1.1 HBase 本地按照2.1.2 HBase 伪分布模式安装2.1.3 HBase 集群安装 2.2 HBase Shell操作2.2…

springcloud Alibaba中gateway和sentinel联合使用

看到这个文章相信你有一定的sentinel和gateway基础了吧。 官网的gateway和sentinel联合使用有些过时了&#xff0c;于是有了这个哈哈&#xff0c;给你看看官网的&#xff1a; 才sentinel1.6&#xff0c;现在都几了啊&#xff0c;所以有些过时。 下面开始讲解&#xff1a; 首先…

JAVAEE初阶 文件IO(一)

这里写目录标题 一. 计算机中存储数据的设备1.1 CPU1.2 内存1.3 硬盘1.4 三种存储的区别 二.文件系统2.1 相对路径2.2 绝对路径2.3 .和..的含义2.4 例子2.5 everything工具 三.文件3.1 文本文件3.2 二进制文件 四. JAVA对于文件的API4.1 getParent getName getPath getAbsolute…