kafka实践-热点数据展示

news2024/9/23 23:22:50

1 实时流式计算

1.1 概念

流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。也就是将数据先聚集在集中全量处理。

2.2 应用场景

  • 日志分析
  • 大屏看板统计
  • 公交实时数据
  • 实时文章分值计算

2 Kafka Stream

2.1 概述

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。

Kafka Stream的特点如下:

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖

3 实践-app端热点文章计算

3.1 引入依赖

1)在之前的kafka-demo工程的pom文件中引入

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <exclusions>
        <exclusion>
            <artifactId>connect-json</artifactId>
            <groupId>org.apache.kafka</groupId>
        </exclusion>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>

2)发送者的微服务的nacos配置中加入kafka配置

```yaml
spring:
  application:
    name: leadnews-behavior
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

主要有发送的key和value的序列化器以及服务地址的重试次数

3.2 点赞行为发送给kafka流处理

在这里插入图片描述
1)调用kafkaTemplate.send发送数据

 //发送消息,数据聚合
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));    


//观看的消息,设置类型为viesw,数据聚合
        UpdateArticleMess mess = new UpdateArticleMess();
        mess.setArticleId(dto.getArticleId());
        mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);
        mess.setAdd(1);
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));

2)定义消息发送封装类:UpdateArticleMess

package com.heima.model.mess;

import lombok.Data;

@Data
public class UpdateArticleMess {

    /**
     * 修改文章的字段类型
      */
    private UpdateArticleType type;
    /**
     * 文章ID
     */
    private Long articleId;
    /**
     * 修改数据的增量,可为正负
     */
    private Integer add;

    public enum UpdateArticleType{
        COLLECTION,COMMENT,LIKES,VIEWS;
    }
}

这里的enum是迭代的意思,与后面聚合操作条件判断有关系

3.3 使用kafkaStream实时接收消息,聚合内容

1)定义实体类,用于聚合之后的分值封装

package com.heima.model.article.mess;

import lombok.Data;

@Data
public class ArticleVisitStreamMess {
    /**
     * 文章id
     */
    private Long articleId;
    /**
     * 阅读
     */
    private int view;
    /**
     * 收藏
     */
    private int collect;
    /**
     * 评论
     */
    private int comment;
    /**
     * 点赞
     */
    private int like;
}

2)修改常量类:增加常量

package com.heima.common.constans;

public class HotArticleConstants {

    public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
    public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="hot.article.incr.handle.topic";
}

两个常量的意思是需要修改redis缓存中当前项和默认项的热点缓存数据
3)定义stream,接收消息并聚合

package com.heima.article.stream;

import com.alibaba.fastjson.JSON;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import com.heima.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;

@Configuration
@Slf4j
public class HotArticleStreamHandler {

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //接收消息
        KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
        //聚合流式处理
        stream.map((key,value)->{
            UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
            //重置消息的key:1234343434   和  value: likes:1
            return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
        })
                //按照文章id进行聚合
                .groupBy((key,value)->key)
                //时间窗口
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                /**
                 * 自行的完成聚合的计算
                 */
                .aggregate(new Initializer<String>() {
                    /**
                     * 初始方法,返回值是消息的value
                     * @return
                     */
                    @Override
                    public String apply() {
                        return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
                    }
                    /**
                     * 真正的聚合操作,返回值是消息的value
                     */
                }, new Aggregator<String, String, String>() {
                    @Override
                    public String apply(String key, String value, String aggValue) {
                        if(StringUtils.isBlank(value)){
                            return aggValue;
                        }
                        String[] aggAry = aggValue.split(",");
                        int col = 0,com=0,lik=0,vie=0;
                        for (String agg : aggAry) {
                            String[] split = agg.split(":");
                            /**
                             * 获得初始值,也是时间窗口内计算之后的值
                             */
                            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                                case COLLECTION:
                                    col = Integer.parseInt(split[1]);
                                    break;
                                case COMMENT:
                                    com = Integer.parseInt(split[1]);
                                    break;
                                case LIKES:
                                    lik = Integer.parseInt(split[1]);
                                    break;
                                case VIEWS:
                                    vie = Integer.parseInt(split[1]);
                                    break;
                            }
                        }
                        /**
                         * 累加操作
                         */
                        String[] valAry = value.split(":");
                        switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){
                            case COLLECTION:
                                col += Integer.parseInt(valAry[1]);
                                break;
                            case COMMENT:
                                com += Integer.parseInt(valAry[1]);
                                break;
                            case LIKES:
                                lik += Integer.parseInt(valAry[1]);
                                break;
                            case VIEWS:
                                vie += Integer.parseInt(valAry[1]);
                                break;
                        }

                        String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
                        System.out.println("文章的id:"+key);
                        System.out.println("当前时间窗口内的消息处理结果:"+formatStr);
                        return formatStr;
                    }
                }, Materialized.as("hot-atricle-stream-count-001"))
                .toStream()
                .map((key,value)->{
                    return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
                })
                //发送消息
                .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);

        return stream;


    }

    /**
     * 格式化消息的value数据
     * @param articleId
     * @param value
     * @return
     */
    public String formatObj(String articleId,String value){
        ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
        mess.setArticleId(Long.valueOf(articleId));
        //COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
        String[] valAry = value.split(",");
        for (String val : valAry) {
            String[] split = val.split(":");
            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                case COLLECTION:
                    mess.setCollect(Integer.parseInt(split[1]));
                    break;
                case COMMENT:
                    mess.setComment(Integer.parseInt(split[1]));
                    break;
                case LIKES:
                    mess.setLike(Integer.parseInt(split[1]));
                    break;
                case VIEWS:
                    mess.setView(Integer.parseInt(split[1]));
                    break;
            }
        }
        log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));
        return JSON.toJSONString(mess);

    }
}

主要通过new Aggregator<String, String, String>(){两个apply方法}定义转换的规则。这里是将相同操作的值进行相加操作,输出的JSON通过通过ArticleVisitStreamMess(包含文章id字段)添加文章id到json字符串中,进而交给消费者

3.4 消费端监听消息,完成对缓存的修改

@Component
@Slf4j
public class ArticleIncrHandleListener {

    @Autowired
    private ApArticleService apArticleService;

    @KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
    public void onMessage(String mess){
        if(StringUtils.isNotBlank(mess)){
            ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);
            apArticleService.updateScore(articleVisitStreamMess);

        }
    }
}

2)updateScore更新分值,跟新mysql和redis
更新mysql

 private ApArticle updateArticle(ArticleVisitStreamMess mess) {
        ApArticle apArticle = getById(mess.getArticleId());
        apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());
        apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());
        apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());
        apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());
        updateById(apArticle);
        return apArticle;

更新redis中的当前页热点数据和默认页数据

```java
/**
     * 更新文章的分值  同时更新缓存中的热点文章数据
     * @param mess
     */
@Override
public void updateScore(ArticleVisitStreamMess mess) {
    //1.更新文章的阅读、点赞、收藏、评论的数量
    ApArticle apArticle = updateArticle(mess);
    //2.计算文章的分值
    Integer score = computeScore(apArticle);
    score = score * 3;

    //3.替换当前文章对应频道的热点数据
    replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());

    //4.替换推荐对应的热点数据
    replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG);

}

/**
     * 替换数据并且存入到redis
     * @param apArticle
     * @param score
     * @param s
     */
private void replaceDataToRedis(ApArticle apArticle, Integer score, String s) {
    String articleListStr = cacheService.get(s);
    if (StringUtils.isNotBlank(articleListStr)) {
        List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleListStr, HotArticleVo.class);

        boolean flag = true;

        //如果缓存中存在该文章,只更新分值
        for (HotArticleVo hotArticleVo : hotArticleVoList) {
            if (hotArticleVo.getId().equals(apArticle.getId())) {
                hotArticleVo.setScore(score);
                flag = false;
                break;
            }
        }

        //如果缓存中不存在,查询缓存中分值最小的一条数据,进行分值的比较,如果当前文章的分值大于缓存中的数据,就替换
        if (flag) {
            if (hotArticleVoList.size() >= 30) {
                hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
                HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);
                if (lastHot.getScore() < score) {
                    hotArticleVoList.remove(lastHot);
                    HotArticleVo hot = new HotArticleVo();
                    BeanUtils.copyProperties(apArticle, hot);
                    hot.setScore(score);
                    hotArticleVoList.add(hot);
                }


            } else {
                HotArticleVo hot = new HotArticleVo();
                BeanUtils.copyProperties(apArticle, hot);
                hot.setScore(score);
                hotArticleVoList.add(hot);
            }
        }
        //缓存到redis
        hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
        cacheService.set(s, JSON.toJSONString(hotArticleVoList));

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1201830.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

TCP连接出现大量CLOSE_WAIT不回收的问题排查

背景 日常运维过程中&#xff0c;收到“应用A”突然挂起没有处理请求的告警&#xff0c;然后触发“存活检查”不通过&#xff0c;自动重启了。 问题 为什么“应用A”突然挂起&#xff1f; 分析 排查过程很长&#xff0c;走了很多弯路&#xff0c;这里只列出本案例有效行动…

金蝶云星空与金蝶云星空对接集成盘亏单查询打通盘亏单新增

金蝶云星空与金蝶云星空对接集成盘亏单查询打通盘亏单新增 接通系统&#xff1a;金蝶云星空 金蝶K/3Cloud&#xff08;金蝶云星空&#xff09;是移动互联网时代的新型ERP&#xff0c;是基于WEB2.0与云技术的新时代企业管理服务平台。金蝶K/3Cloud围绕着“生态、人人、体验”&am…

什么是权限?(Linux篇)

前言 其实我们在学会运用一些简单的Linux指令之后&#xff0c;我们可以简单的用ls查看当前目录的文件有哪些啊&#xff0c;可以使用tree用树形结构查看目录&#xff0c;可以使用touch来创建文件&#xff0c;用mkdir创建目录&#xff0c;可以使用rm来删除目录和文件&#xff0c;…

酷柚易汛ERP - 发货地址管理操作指南

1、应用场景 对发货地址进行管理&#xff0c;使用【物流服务】时的自动获取发货地址。 2、主要操作 打开【资料】-【发货地址管理】新增发货地址。 可以对进行地址设置及管理&#xff0c;点击【新增】可添加新的发货地址信息地址简称方便使用者在选择发货地址时&#xff0c;…

【毕业论文】基于微信小程序的大学生互助平台设计与实现

完整下载链接https://download.csdn.net/download/No_Name_Cao_Ni_Mei/88519756 基于微信小程序的大学生互助平台设计与实现 Design and Implementation of a College Student Assistance Platform based on WeChat Mini Program 目录 目录 2 摘要 3 关键词 4 第一章 绪论 4 1.…

[RoarCTF 2019]Easy Java1

提示 WEB-INF/web.xml泄露 后拿到题目的一般想到的正常思路 sql注入弱口令 先信息收集不进行操作 有download&#xff0c;难道是下载文件&#xff0c;访问看看 这里访问抓包试了下&#xff0c;传送后返回码是200&#xff0c;说明应该可以访问&#xff0c;但是看响应包像是报错…

淘宝API接口成为淘宝商家及企业ERP系统不可或缺的重要因素

API全称为Application Programming Interface&#xff0c;中文是应用程序编程接口。它其实是一些预先定义的函数&#xff0c;目的是提供应用程序与开发人员基于某软件或硬件得以访问一组例程的能力&#xff0c;而又无需访问源码&#xff0c;或理解内部工作机制的细节。 那如果…

Git常用指令以及常见问题解决

摘要&#xff1a;记录本人Git常用指令以及常见问题解决 1.Git流程 2.具体操作 git init&#xff1a;初始化目录&#xff08;一般直接git clone远端的工程&#xff0c;这一步都可以省略掉&#xff09;&#xff1b; 输入命令“git config --global user.name xxx”来配置你的用…

Outlook搜索功能不全

Outlook搜索功能不全 解决方案 1、当打开Outlook想搜索内容&#xff0c;但无法搜索或者搜索不全时。 2、关掉Outlook在桌面上找到此电脑&#xff0c;右键管理 3、进入计算机管理后---服务和应用---服务----找到Windows Search--右键启动或者重新启动即可

ChatkBQA:一个基于大语言模型的知识库问题生成-检索框架11.13

ChatkBQA&#xff1a;一个基于大语言模型的知识库问题生成-检索框架 摘要1 引言3 准备工作4 方法4.1 ChatKBQA概述4.2 在LLMS上进行高效微调4.3 用微调LLMS生成逻辑形式4.4 实体和关系的非监督检索4.5 可解释查询执行 摘要 基于知识的问答&#xff08;KBQA&#xff09;旨在从大…

【网络】TCP协议理论

TCP协议理论 一、TCP协议简介1、浅谈可靠性2、UDP协议存在的意义 二、TCP的协议格式TCP的解包和分用 三、确认应答机制一种应答方式——捎带应答 四、超时重传机制超时等待时间 五、流量控制1、TCP的缓冲区2、TCP的窗口大小3、TCP的PSH标志位 六、TCP的六个标志位URG字段的详细…

泊车功能专题介绍 ———— AVP系统技术要求之地图数据感知要求

文章目录 地图数据规范地图图层和表达要求地图各类数据属性要求SLAM地图要求坐标系数据采集车传感器数据采集数据流程 感知功能要求车端感知功能关键安全感知次要安全感知功能感知体验相关感知 车-场协同感知类型一&#xff1a;引导类型二&#xff1a;重点地段增强类型三&#…

Unit3:贪心算法

文章目录 一、介绍二、分数背包问题问题描述分析时间复杂度伪代码案例彩蛋 三、活动选择问题问题描述分析伪代码时间复杂度拓展&#xff1a;加权活动选择分析计算伪代码时间复杂度案例 对比动态规划和贪心算法 四、哈夫曼编码分类定长编码 目标变长码 案例分析伪代码时间复杂度…

【ArcGIS Pro二次开发】(75):ArcGIS Pro SDK二次开发中的常见问题及解决方法

ArcGIS Pro SDK二次开发路程也近一年时间了&#xff0c;这里总结一下平时遇到的一些问题和解决方法。有些问题没能解决&#xff0c;也记录下来&#xff0c;提醒自己不要忘记。 一、在VS2022中进行调试弹出错误 正常在VS2022中&#xff0c;如果要调试程序的话&#xff0c;直接按…

【工具】QQ音乐本地下载(亲测可用)

目录 0.环境 1.详细步骤 1&#xff09;电脑端打开QQ音乐&#xff0c;登录 2&#xff09;随便搜一首歌&#xff0c;在界面上按【F12】打开开发者工具界面&#xff0c;点击【播放】 3&#xff09;进入播放界面后&#xff0c;点击【网络】&#xff0c;再点击【媒体】 4&…

职场人实用办公技能 | 数据可视化模板

套用模板&#xff0c;不仅报表做得快&#xff0c;数据可视化效果也足够美观&#xff0c;但有些职场人还不太了解从哪些工具上可获得兼具实用性、美观性的BI数据可视化模板&#xff0c;接下来就来介绍一款可提供大量系统化可视化模板的BI工具——奥威BI工具。 奥威BI工具&#…

idea报错java: 程序包com.alibaba.fastjson不存在,明明存在!

经常从线上拉下来代码后编译运行时会报这个错误。刷新maven也没用&#xff0c;重新导入项目也没用 发现解决方法如下&#xff1a; 找到当前报错文件的路径。找到iml文件 删除它&#xff01;然后刷新maven 就好了&#xff01;&#xff01;&#xff01; 记录一下我的解决方法&…

代码随想录算法训练营第四十八天丨 动态规划part11

123.买卖股票的最佳时机III 思路 这道题目相对 121.买卖股票的最佳时机 (opens new window)和 122.买卖股票的最佳时机II (opens new window)难了不少。 关键在于至多买卖两次&#xff0c;这意味着可以买卖一次&#xff0c;可以买卖两次&#xff0c;也可以不买卖。 接来下我…

YOLO V1中关于bounding boxs的部分要点

YOLO的核心原理预览 YOLO将输入的图片resize成448 x 448&#xff0c;并且为 S x S&#xff08;S 7&#xff09;个grid&#xff0c;如果物体的中心落入该grid中&#xff0c;那么该grid就需要负责检测该物体。一次性输出所检测到的目标信息&#xff0c;包括类别和位置。 对于每一…

聚力未来!云起无垠成为光合组织成员单位

近日&#xff0c;云起无垠正式加入海光产业生态合作组织&#xff08;简称“光合组织”&#xff09;。云起无垠将与光合组织联合打造安全、好用、开放的产品与解决方案&#xff0c;满足企业的安全检测需求&#xff0c;帮助企业解决业务系统上线前的安全检测和修复问题。 图1 光合…