黑马头条Day11- 实时计算热点文章、KafkaStream

news2025/1/23 9:09:52

一、今日内容

1. 定时计算与实时计算

2. 今日内容

KafkaStream

  • 什么是流式计算
  • KafkaStream概述
  • KafkaStream入门案例
  • SpringBoot集成KafkaStream

实时计算

  • 用户行为发送消息
  • KafkaStream聚合处理消息
  • 更新文章行为数量
  • 替换热点文章数据

二、实时流式计算

1. 概念

一般流式计算会与批量计算相比较。在流式计算模型这种,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算替代全量计算

流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。

2. 应用场景

日志分析:

对网站的用户访问日志进行实时的分析,计算访问量、用户画像、留存率等,实时进行数据分析,帮助企业进行决策

大屏看板统计:

可以实时的查看网站注册数量、订单数量、购买数量、金额等

公交实时数据:

可以随时更新公交车方位,计算多久到达站牌

实时文章分值计算:

头条类文章的分值计算,通过用户的行为实时计算文章的分值,分值越高就越被推荐。

3. 技术方案选项

Hadoop

Apache Storm

Storm 是一个分布式实时大数据处理系统,可以帮助我们方便地处理海量数据,具有高可靠、高容错、高扩展的特点。是流式框架,有很高的数据吞吐能力。

Kafka Stream

可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包、部署和操作工具集成。

三、Kafka Stream

1. 概述

Kafka Stream 是Apache Kafka从1.0版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。

Kafka Stream的特点如下:

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展和顺序性保证
  • 通过可容错的State store实现高效的状态操作(如windowed join和aggregation)
  • 支持正好一次处理语义
  • 提高记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可以处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原理Processor(类似于Storm 的spout和bolt),以及高层抽象的DSL(类似于Spark 的map/group/reduce)

2. Kafka Stream的关键概念

源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个Kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。

Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题。

3. KStream

(1)数据结构类似于map,如下图,key-value键值对

(2)KStream

KStream数据流(data stream):即是一段顺序的,可以无限长,不断更新的数据集。数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

KStream负责抽象的,就是数据流。与Kafka自身的topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据

为了说明这一点,让我们想象一下两个数据记录正在发送到流中:

("alice", 1) -> ("alice", 3)

如果您的流处理应用是要总结每个用户的价值,它将返回了4了alice。为什么?因为第二条数据记录将不被视为先前记录的更新。(insert)新数据

4. Kafka Stream入门案例

(1)需求分析,求单词个数(word count)

(2)引入依赖

在之前的kafka-demo工程的pom文件中引入依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <exclusions>
        <exclusion>
            <artifactId>connect-json</artifactId>
            <groupId>org.apache.kafka</groupId>
        </exclusion>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>

(3)创建原生的kafka stream入门案例

package com.heima.kafka.sample;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * 流式处理
 */
public class KafkaStreamQuickStart {
    public static void main(String[] args) {
        // kafka的配置中心
        Properties properties = new Properties();
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");

        // stream构建器
        StreamsBuilder streamsBuilder = new StreamsBuilder();

        // 流式处理
        StreamProcessor(streamsBuilder);

        // 创建KafkaStream对象
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
        // 开启流式计算
        kafkaStreams.start();
    }

    /**
     * 流式计算
     * 消息的内容:hello kafka hello itcast
     * @param streamsBuilder
     */
    private static void StreamProcessor(StreamsBuilder streamsBuilder) {
        // 1. 创建kstream对象,同时指定从哪个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        // 2. 处理消息的value
        stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split(" "));
            }
        })
                // 按照value进行聚合处理
                .groupBy((key, value) -> value)
                // 时间窗口
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                // 统计单词的个数
                .count()
                // 转换为KStream
                .toStream()
                .map((key, value) -> {
                    System.out.println("key: " + key + ", value: " + value);
                    return new KeyValue<>(key.key().toString(), value.toString());
                })
                // 发送消息
                .to("itcast-topic-out");
    }
}

(4)测试准备

使用生产者在topic为:itcast_topic_input中发送多条消息

package com.heima.kafka.sample;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * 生产者
 */
public class ProducerQuickStart {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //1.kafka链接配置信息
        Properties prop = new Properties();
        //kafka链接地址
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        //key和value的序列化
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //ack配置  消息确认机制
        prop.put(ProducerConfig.ACKS_CONFIG,"all");

        //重试次数
        prop.put(ProducerConfig.RETRIES_CONFIG,10);

        //数据压缩
        prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");

        //2.创建kafka生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<String,String>(prop);

        //3.发送消息
        /**
         * 第一个参数 :topic
         * 第二个参数:消息的key
         * 第三个参数:消息的value
         */
        for (int i = 0; i < 5; i++) {
            ProducerRecord<String,String> kvProducerRecord = new ProducerRecord<String,String>("itcast-topic-input","hello kafka");
            producer.send(kvProducerRecord);
        }

        //同步发送消息
        /*RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
        System.out.println(recordMetadata.offset());*/

        //异步消息发送
       /* producer.send(kvProducerRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if(e != null){
                    System.out.println("记录异常信息到日志表中");
                }
                System.out.println(recordMetadata.offset());
            }
        });*/

        //4.关闭消息通道  必须要关闭,否则消息发送不成功
        producer.close();

    }

}

使用消费者接收topic为:itcast_topic_input

package com.heima.kafka.sample;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * 消费者
 */
public class ConsumerQuickStart {

    public static void main(String[] args) {

        //1.kafka的配置信息
        Properties prop = new Properties();
        //链接地址
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
        //key和value的反序列化器
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //设置消费者组
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");

        //手动提交偏移量
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);


        //2.创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);

        //3.订阅主题
        consumer.subscribe(Collections.singletonList("itcast-topic-out"));

        //4.拉取消息

        //同步提交和异步提交偏移量
        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord.key());
                    System.out.println(consumerRecord.value());
                   /* System.out.println(consumerRecord.offset());
                    System.out.println(consumerRecord.partition());*/
                }
                //异步提交偏移量
                consumer.commitAsync();
            }
        }catch (Exception e){
            e.printStackTrace();
            System.out.println("记录错误的信息:"+e);
        }finally {
            //同步
            consumer.commitSync();
        }


        /*while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
                System.out.println(consumerRecord.offset());
                System.out.println(consumerRecord.partition());

               *//* try {
                    //同步提交偏移量
                    consumer.commitSync();
                }catch (CommitFailedException e){
                    System.out.println("记录提交失败的异常:"+e);
                }*//*
            }
            //异步的方式提交偏移量
            *//*consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
                    if(e != null){
                        System.out.println("记录错误的提交偏移量:"+map+",异常信息为:"+e);
                    }
                }
            });*//*



        }*/

    }

}

先启动Consumer,再启动KafkaStream,最后启动Producer发送消息

结果:通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出

5. SpringBoot集成Kafka Stream

(1)自定义配置参数

package com.heima.kafka.config;


import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;

import java.util.HashMap;
import java.util.Map;

/**
 * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
 */
@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix = "kafka")
public class KafkaStreamConfig {
    private static final int MAX_MESSAGE_SIZE = 16 * 1024 * 1024;
    private String hosts;
    private String group;

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
        props.put(StreamsConfig.RETRIES_CONFIG, 10);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        return new KafkaStreamsConfiguration(props);
    }
}

修改application.yml文件,在最下方添加自定义配置

kafka:
  hosts: 192.168.200.130:9092
  group: ${spring.application.name}

(2)新增配置类,创建KStream对象,进行聚合

package com.heima.kafka.stream;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;
import java.util.Arrays;

@Configuration
@Slf4j
public class KafkaStreamHelloListener {
    @Bean
    public KStream<String, String> kStream(StreamsBuilder streamsBuilder) {
        // 创建KStream对象,同时指定从哪个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split(" "));
            }
        })
                //根据value进行聚合分组
                .groupBy((key,value)->value)
                //聚合计算时间间隔
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //求单词的个数
                .count()
                .toStream()
                //处理后的结果转换为string字符串
                .map((key,value)->{
                    System.out.println("key:"+key+",value:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //发送消息
                .to("itcast-topic-out");
        return stream;
    }
}

(3)测试:先启动ConsumerQuickStart -> KafkaDemoApplication -> ProducerQuickStart

启动微服务,正常发送消息,可以正常接收到消息

四、App端热点文章计算

1. 思路说明

2. 功能实现

2.1 用户行为(阅读、评论、点赞、收藏)发送消息,以阅读和点赞为例

(在第9天的实战中,我已经把这部分做了)

步骤①:在heima-leadnews-behavior微服务中集成Kafka生产者配置

在nacos修改leadnews-behavior.yml

spring:
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
  autoconfigure:
    exclude: org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
  redis:
    host: 192.168.200.130
    password: leadnews
    port: 6379

步骤②:修改ApLikesBehaviorServiceImpl,新增发送消息

定义消息发送封装类:UpdateArticleMess

package com.heima.model.mess;

import lombok.Data;

@Data
public class UpdateArticleMess {

    /**
     * 修改文章的字段类型
      */
    private UpdateArticleType type;
    /**
     * 文章ID
     */
    private Long articleId;
    /**
     * 修改数据的增量,可为正负
     */
    private Integer add;

    public enum UpdateArticleType{
        COLLECTION,COMMENT,LIKES,VIEWS;
    }
}

topic常量类:HotArticleConstants

package com.heima.common.constants;

public class HotArticleConstants {

    public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
    public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="hot.article.incr.handle.topic";
}

ApLikesBehaviorServiceImpl:

package com.heima.behavior.service.impl;

import com.alibaba.fastjson.JSON;
import com.heima.behavior.service.ApLikesBehaviorService;
import com.heima.common.constants.BehaviorConstants;
import com.heima.common.constants.HotArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.behavior.dtos.LikesBehaviorDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.mess.UpdateArticleMess;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.thread.AppThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@Transactional
@Slf4j
public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService {
    @Autowired
    private CacheService cacheService;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 点赞或取消点赞
     * @param dto 0:点赞 1:取消点赞
     * @return
     */
    @Override
    public ResponseResult like(LikesBehaviorDto dto) {
        // 1. 检查参数
        if(dto == null || dto.getArticleId() == null || checkParam(dto)) {
            return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
        }

        // 2. 是否登录
        ApUser apUser = AppThreadLocalUtil.getUser();
        if(apUser == null) {
            return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
        }

        UpdateArticleMess mess = new UpdateArticleMess();
        mess.setArticleId(dto.getArticleId());
        mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);

        // 3. 点赞,保存数据
        if(dto.getOperation() == 0) {
            Object obj = cacheService.hGet(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), apUser.getId().toString());
            if(obj != null) {
                return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID, "已点赞");
            }
            // 保存当前key
            log.info("保存当前key:{}, {}, {}", dto.getArticleId(), apUser.getId(), dto);
            cacheService.hPut(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), apUser.getId().toString(), JSON.toJSONString(dto));
            mess.setAdd(1);
        } else {
            // 取消点赞,删除当前key
            log.info("删除当前key:{}, {}", dto.getArticleId(), apUser.getId());
            cacheService.hDelete(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), apUser.getId().toString());
            mess.setAdd(-1);
        }

        // 4. 发送消息,数据聚合
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));

        // 5. 结果返回
        return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
    }

    /**
     * 检查参数
     * @param dto
     * @return
     */
    private boolean checkParam(LikesBehaviorDto dto) {
        // 参数有误
        if(dto.getType() > 2 || dto.getType() < 0 || dto.getOperation() > 1 || dto.getOperation() < 0) {
            return true;
        }

        return false;
    }
}

步骤③:修改阅读行为的类ApReadBehaviorServiceImpl发送消息

package com.heima.behavior.service.impl;

import com.alibaba.fastjson.JSON;
import com.heima.behavior.service.ApReadBehaviorService;
import com.heima.common.constants.BehaviorConstants;
import com.heima.common.constants.HotArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.behavior.dtos.ReadBehaviorDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.mess.UpdateArticleMess;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.thread.AppThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@Transactional
@Slf4j
public class ApReadBehaviorServiceImpl implements ApReadBehaviorService {
    @Autowired
    private CacheService cacheService;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 用户行为 - 阅读
     * @param dto
     * @return
     */
    @Override
    public ResponseResult readBehavior(ReadBehaviorDto dto) {
        // 1. 检查参数
        if(dto == null || dto.getArticleId() == null) {
            return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
        }

        // 2. 是否登录
        ApUser user = AppThreadLocalUtil.getUser();
        if(user == null) {
            return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
        }

        // 3. 更新阅读次数
        String readBehaviorJson = (String) cacheService.hGet(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
        if(StringUtils.isNotBlank(readBehaviorJson)) {
            ReadBehaviorDto readBehaviorDto = JSON.parseObject(readBehaviorJson, ReadBehaviorDto.class);
            dto.setCount((short) (readBehaviorDto.getCount() + dto.getCount()));
        }

        // 4. 保存当前key
        log.info("保存当前key: {} {} {}", dto.getArticleId(), user.getId(), dto);
        cacheService.hPut(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));

        // 5. 发送消息,数据聚合
        UpdateArticleMess mess = new UpdateArticleMess();
        mess.setArticleId(dto.getArticleId());
        mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);
        mess.setAdd(1);
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(mess));

        // 6. 结果返回
        return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
    }
}

2.2 使用KafkaStream实时接收消息,聚合内容

步骤①:在heima-leadnews-article微服务中集成kafkaStream

在pom.xml(heima-leadnews-article)添加Kafka Stream的依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <exclusions>
        <exclusion>
            <artifactId>connect-json</artifactId>
            <groupId>org.apache.kafka</groupId>
        </exclusion>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>

KafkaStreamConfig:

package com.heima.article.config;

import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;

import java.util.HashMap;
import java.util.Map;

/**
 * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
 */

@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
    private String hosts;
    private String group;
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
        props.put(StreamsConfig.RETRIES_CONFIG, 10);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return new KafkaStreamsConfiguration(props);
    }
}

在nacos配置正常的leadnews-article添加如下

# 。。。 。。。省略
kafka:
  hosts: 192.168.200.130:9092
  group: ${spring.application.name}

步骤②:定义实体类,用于聚合之后的分值封装

package com.heima.model.article.mess;

import lombok.Data;

@Data
public class ArticleVisitStreamMess {
    /**
     * 文章id
     */
    private Long articleId;
    /**
     * 阅读
     */
    private int view;
    /**
     * 收藏
     */
    private int collect;
    /**
     * 评论
     */
    private int comment;
    /**
     * 点赞
     */
    private int like;
}

步骤③:定义stream,接收消息聚合

package com.heima.article.stream;

import com.alibaba.fastjson.JSON;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import com.heima.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;

@Configuration
@Slf4j
public class HotArticleStreamHandler {

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //接收消息
        KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
        //聚合流式处理
        stream.map((key,value)->{
                    UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
                    //重置消息的key:1234343434   和  value: likes:1
                    return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
                })
                //按照文章id进行聚合
                .groupBy((key,value)->key)
                //时间窗口
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                /**
                 * 自行的完成聚合的计算
                 */
                .aggregate(new Initializer<String>() {
                    /**
                     * 初始方法,返回值是消息的value
                     * @return
                     */
                    @Override
                    public String apply() {
                        return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
                    }
                    /**
                     * 真正的聚合操作,返回值是消息的value
                     */
                }, new Aggregator<String, String, String>() {
                    /**
                     *
                     * @param key
                     * @param value  likes:1
                     * @param aggValue   COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
                     * @return
                     */
                    @Override
                    public String apply(String key, String value, String aggValue) {
                        System.out.println(value);
                        if(StringUtils.isBlank(value)){
                            return aggValue;
                        }
                        String[] aggAry = aggValue.split(",");
                        int col = 0,com=0,lik=0,vie=0;
                        for (String agg : aggAry) {
                            String[] split = agg.split(":");
                            /**
                             * 获得初始值,也是时间窗口内计算之后的值
                             */
                            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                                case COLLECTION:
                                    col = Integer.parseInt(split[1]);
                                    break;
                                case COMMENT:
                                    com = Integer.parseInt(split[1]);
                                    break;
                                case LIKES:
                                    lik = Integer.parseInt(split[1]);
                                    break;
                                case VIEWS:
                                    vie = Integer.parseInt(split[1]);
                                    break;
                            }
                        }
                        /**
                         * 累加操作   likes:1
                         */
                        String[] valAry = value.split(":");
                        switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){
                            case COLLECTION:
                                col += Integer.parseInt(valAry[1]);
                                break;
                            case COMMENT:
                                com += Integer.parseInt(valAry[1]);
                                break;
                            case LIKES:
                                lik += Integer.parseInt(valAry[1]);
                                break;
                            case VIEWS:
                                vie += Integer.parseInt(valAry[1]);
                                break;
                        }

                        String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
                        System.out.println("文章的id:"+key);
                        System.out.println("当前时间窗口内的消息处理结果:"+formatStr);
                        return formatStr;
                    }
                }, Materialized.as("hot-atricle-stream-count-001"))
                .toStream()
                .map((key,value)->{
                    return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
                })
                //发送消息
                .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);

        return stream;
    }

    /**
     * 格式化消息的value数据
     * @param articleId
     * @param value
     * @return
     */
    public String formatObj(String articleId,String value){
        ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
        mess.setArticleId(Long.valueOf(articleId));
        //COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
        String[] valAry = value.split(",");
        for (String val : valAry) {
            String[] split = val.split(":");
            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                case COLLECTION:
                    mess.setCollect(Integer.parseInt(split[1]));
                    break;
                case COMMENT:
                    mess.setComment(Integer.parseInt(split[1]));
                    break;
                case LIKES:
                    mess.setLike(Integer.parseInt(split[1]));
                    break;
                case VIEWS:
                    mess.setView(Integer.parseInt(split[1]));
                    break;
            }
        }
        log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));
        return JSON.toJSONString(mess);

    }
}

2.3 重新计算文章的分值,更新到数据库和缓存中

步骤①:在ApArticleService添加方法,用于更新数据库中的文章分值

/**
     * 更新文章的分值  同时更新缓存中的热点文章数据
     * @param mess
     */
public void updateScore(ArticleVisitStreamMess mess);

实现类方法:

/**
     * 更新文章的分值  同时更新缓存中的热点文章数据
     * @param mess
     */
@Override
public void updateScore(ArticleVisitStreamMess mess) {
    //1.更新文章的阅读、点赞、收藏、评论的数量
    ApArticle apArticle = updateArticle(mess);
    //2.计算文章的分值
    Integer score = computeScore(apArticle);
    score = score * 3;

    //3.替换当前文章对应频道的热点数据
    replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());

    //4.替换推荐对应的热点数据
    replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG);

}

/**
     * 替换数据并且存入到redis
     * @param apArticle
     * @param score
     * @param s
     */
private void replaceDataToRedis(ApArticle apArticle, Integer score, String s) {
    String articleListStr = cacheService.get(s);
    if (StringUtils.isNotBlank(articleListStr)) {
        List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleListStr, HotArticleVo.class);

        boolean flag = true;

        //如果缓存中存在该文章,只更新分值
        for (HotArticleVo hotArticleVo : hotArticleVoList) {
            if (hotArticleVo.getId().equals(apArticle.getId())) {
                hotArticleVo.setScore(score);
                flag = false;
                break;
            }
        }

        //如果缓存中不存在,查询缓存中分值最小的一条数据,进行分值的比较,如果当前文章的分值大于缓存中的数据,就替换
        if (flag) {
            if (hotArticleVoList.size() >= 30) {
                hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
                HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);
                if (lastHot.getScore() < score) {
                    hotArticleVoList.remove(lastHot);
                    HotArticleVo hot = new HotArticleVo();
                    BeanUtils.copyProperties(apArticle, hot);
                    hot.setScore(score);
                    hotArticleVoList.add(hot);
                }


            } else {
                HotArticleVo hot = new HotArticleVo();
                BeanUtils.copyProperties(apArticle, hot);
                hot.setScore(score);
                hotArticleVoList.add(hot);
            }
        }
        //缓存到redis
        hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
        cacheService.set(s, JSON.toJSONString(hotArticleVoList));

    }
}

/**
     * 更新文章行为数量
     * @param mess
     */
private ApArticle updateArticle(ArticleVisitStreamMess mess) {
    ApArticle apArticle = getById(mess.getArticleId());
    apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());
    apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());
    apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());
    apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());
    updateById(apArticle);
    return apArticle;

}

/**
     * 计算文章的具体分值
     * @param apArticle
     * @return
     */
private Integer computeScore(ApArticle apArticle) {
    Integer score = 0;
    if(apArticle.getLikes() != null){
        score += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;
    }
    if(apArticle.getViews() != null){
        score += apArticle.getViews();
    }
    if(apArticle.getComment() != null){
        score += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;
    }
    if(apArticle.getCollection() != null){
        score += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;
    }

    return score;
}

步骤②:定义监听,接收聚合之后的数据,文章的分值重新进行计算

package com.heima.article.listener;

import com.alibaba.fastjson.JSON;
import com.heima.article.service.ApArticleService;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ArticleIncrHandleListener {

    @Autowired
    private ApArticleService apArticleService;

    @KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
    public void onMessage(String mess){
        if(StringUtils.isNotBlank(mess)){
            ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);
            apArticleService.updateScore(articleVisitStreamMess);

        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1957313.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Skim通过Apple Script为选中文本添加特定颜色的高亮

一、代码编写 Skim的Apple Script维基页面 使用Mac的Script Editor编写以下代码&#xff1a; tell application "Skim"activatetell document 1set theSel to (get selection)set theNote to make note with data theSel with properties {type:highlight note, co…

Swift学习入门,新手小白看过来

&#x1f604;作者简介&#xff1a; 小曾同学.com,一个致力于测试开发的博主⛽️&#xff0c;主要职责&#xff1a;测试开发、CI/CD 如果文章知识点有错误的地方&#xff0c;还请大家指正&#xff0c;让我们一起学习&#xff0c;一起进步。 &#x1f60a; 座右铭&#xff1a;不…

零代码拖拽,轻松搞定GIS场景编辑

在三维GIS领域&#xff0c;编辑场景和处理影像数据通常是一个复杂且费时的过程&#xff0c;但现在有了山海鲸可视化&#xff0c;这一切都变得简单有趣。这款免费可视化工具为您提供了零代码拖拽式编辑的体验&#xff0c;让您无需编程知识就能轻松创建和优化GIS场景。通过直观的…

STM32——GPIO(点亮LEDLED闪烁)

一、什么是GPIO&#xff1f; GPIO&#xff08;通用输入输出接口&#xff09;&#xff1a; 1.GPIO 功能概述 GPIO 是通用输入/输出&#xff08;General Purpose I/O&#xff09;的简称&#xff0c;既能当输入口使用&#xff0c;又能当输出口使用。端口&#xff0c;就是元器件…

记一次因为在html两个地方引入vue.js导致组件注入失败的问题

这个问题我遇到两次了&#xff0c;是在恼火&#xff0c;不对&#xff0c;三次了&#xff0c;我如果不做这个笔记&#xff0c;我确定我还会遇到第三次。 尾部这个去掉就行 因为头部有了 遇到这种bu g好恼火&#xff0c;解决了又怎么样呢&#xff1f;重蹈覆辙的滋味不好受

Python技能达到这个水平,高薪就业不是梦

一&#xff0c;高薪就业的必备基础 要达到高薪就业的水平&#xff0c;Python开发者通常需要具备以下几方面的技能和经验&#xff1a; 如需Python籽料直接戳&#xff1a; 2024年最新python教程全套&#xff0c;学完即可进大厂&#xff01;&#xff08;附全套视频 下载&#xf…

#IO进程 笔记一

标准IO 文件IO 文件属性获取 目录操作 库 进程: process 线程(thread)、同步、互斥、条件变量 进程间通信: 6种(一共7种) 无名管道(pipe)、有名管道(fifo)、信号(sginal)、信号灯集(semphore)、 共享内存(shared memory)、消息队列(message queue) 标准IO 1. 概念 标准IO&…

详细分析示波器导至U盘的数据(Excel表格)示波器具体名称分析

一般由示波器导入U盘的csv文件&#xff08;即Excel表格数据&#xff09;的图如下图所示&#xff1a; 下面小编就对上表格的各个数据表示进行逐一解释 1、Memory Length&#xff1a;4000 在示波器&#xff08;Oscilloscope&#xff09;中&#xff0c;“Memory Length”&#x…

【算法】代码随想录之字符串(更新中)

文章目录 前言 一、反转字符串&#xff08;LeetCode--344&#xff09; 二、反转字符串II&#xff08;LeetCode--541&#xff09; 三、反转字符串中的单词&#xff08;LeetCode--151&#xff09; 前言 跟随代码随想录&#xff0c;学习字符串相关的算法题目&#xff0c;记录…

20240730 每日AI必读资讯

&#x1f3ac;燃爆&#xff01;奥运8分钟AI影片火了&#xff0c;巴赫主席&#xff1a;感谢中国黑科技 - 短片名为《永不失色的她》&#xff08;To the Greatness of HER&#xff09;&#xff0c;由阿里巴巴和国际奥委会联合推出。 - 百年奥运史上伟大女性的影响故事在此被浓缩…

VBA技术资料MF183:将图片导入word并调整大小

我给VBA的定义&#xff1a;VBA是个人小型自动化处理的有效工具。利用好了&#xff0c;可以大大提高自己的工作效率&#xff0c;而且可以提高数据的准确度。“VBA语言専攻”提供的教程一共九套&#xff0c;分为初级、中级、高级三大部分&#xff0c;教程是对VBA的系统讲解&#…

人生最大的毛病,就是一个“ 傲 ”字

99天 傲慢之害&#xff0c;人要勤勉恭敬 51.27 先生说&#xff1a;人生最大的毛病&#xff0c;就是一个“ 傲 ”字。 当今人们的问题&#xff0c;主要就是“ 傲 ”。千罪百恶&#xff0c;都从傲而来。一个人要是傲&#xff0c;就会自高自大、自以为是&#xff0c;不肯屈于人下…

cf960(div2)

A. Submission Bait&#xff08;博弈&#xff09; 题意&#xff1a;爱丽丝和鲍勃在大小为n的数组a中进行游戏&#xff0c;他们轮流进行运算&#xff0c;爱丽丝先开始&#xff0c;不能运算的一方输&#xff0c;一开始mx0&#xff0c;每次操作&#xff0c;玩家可以选择一个牵引i…

pikachu靶场之目录遍历、敏感信息泄露

一、目录遍历 漏洞概述 在web功能设计中,很多时候我们会要将需要访问的文件定义成变量&#xff0c;从而让前端的功能便的更加灵活。 当用户发起一个前端的请求时&#xff0c;便会将请求的这个文件的值(比如文件名称)传递到后台&#xff0c;后台再执行其对应的文件。 在这个过…

汽车辐射大?技术来救它:整车辐射抗扰发射天线仿真建模及性能预测

摘要 针对车辆电磁辐射抗扰度测试条件要求高、预测难度大的问题&#xff0c;通过仿真软件建立电磁抗扰度测试发射天线&#xff08;简称抗扰发射天线&#xff09;模型及无车情况下的电磁抗扰试验场强环境&#xff0c;为整车电磁辐射抗扰性能的预测搭建了一个仿真平台。 验证试验…

第5章Excel数据分析之数据透视表遇见SQL

文章目录 第5章 数据透视表遇见SQL5-1如何在查询中使用SQL语句&#xff1f;5-2SQL查询语句&#xff08;数据透视表的辅助列&#xff09;5-3SQL常用运算符&#xff08;案例&#xff1a;添加分析维度&#xff09;5-4SQL筛选语句&#xff08;数据透视表数据源的过滤&#xff09;5-…

【单片机毕业设计选题24085】-基于STM32的心电采集系统设计

系统功能: 系统上电后&#xff0c;OLED显示“欢迎使用心电采集系统请稍后”两秒后进入正常页面显示。 第一行显示心率和血氧值。 第二行显示心率设定高值。 第三行显示心率设定低值。 第四行显示心率状态&#xff0c;"Rate OK", "Rate High", "R…

C++中的依赖注入

目录 1.概述 2.构造函数注入 3.setter方法注入 4.接口注入 5.依赖注入框架 6.依赖注入容器 7.依赖注入框架的工作原理 8.依赖注入的优势 9.总结 1.概述 依赖注入是一种设计模式&#xff0c;它允许我们在不直接创建对象的情况下为对象提供其依赖项&#xff1b;它通过将…

学习c语言第十四天(调试+练习)

一、调试 所有发生的事情都一定有迹可循&#xff0c;如果问心无愧&#xff0c;就不需要掩盖也就没有迹象了&#xff0c;如果问心有愧 就必然需要掩盖&#xff0c;那就一定会有迹象&#xff0c;迹象越多就越容易顺藤而上&#xff0c;这就是推理的途径。 顺着这条途径顺流而下就…

广东省某地区智慧水务平台全面升级启航(案例解析)

项目背景 随着物联网、人工智能、大数据、数字孪生等技术的应用&#xff0c;对精准调控、漏损控制、可视化管理、节能降耗、高效指挥等方面的要求越来越高。鉴于此&#xff0c;广东某地区的现有智慧水务平台已无法满足日益增长的智能化管理需求&#xff0c;亟待进行智慧化升级以…