【黑马头条】-day11热点文章实时计算-kafka-kafkaStream-Redis

news2025/1/14 4:16:47

文章目录

  • 今日内容
  • 1 实时流式计算
    • 1.1 应用场景
    • 1.2 技术方案选型
  • 2 Kafka Stream
    • 2.1 概述
    • 2.2 KafkaStream
    • 2.3 入门demo
      • 2.3.1 需求分析
      • 2.3.2 实现
        • 2.3.2.1 添加依赖
        • 2.3.2.2 创建快速启动,生成kafka流
        • 2.3.2.3 修改生产者
        • 2.3.2.4 修改消费者
        • 2.3.2.5 测试
    • 2.4 SpringBoot集合KafkaStream
      • 2.4.1 创建自定配置参数类
      • 2.4.2 修改配置文件
      • 2.4.3 创建配置类创建KStream对象
      • 2.4.4 测试
  • 3 热点文章实时计算
    • 3.1 思路说明
    • 3.2 实现步骤
    • 3.3 具体实现
      • 3.3.1 为行为微服务添加kafka配置
      • 3.3.2 行为微服务中发送消息的消息体实体类
      • 3.3.3 定义kafka流接收的topic
      • 3.3.4 修改用户行为后的逻辑(相当于生产者)
      • 3.3.5 Stream聚合
        • 3.3.5.1 创建自定配置参数类
        • 3.3.5.2 添加kafkaStream的配置
        • 3.3.5.3 定义kafka流转发的topic
        • 3.3.5.4 文章微服务中的发送消息的消息体实体类
        • 3.3.5.5 创建配置类创建KStream对象
      • 3.3.6 创建消费者,用于监听聚合后的消息
      • 3.3.7 在文章微服务中更新当前分值
      • 3.3.8 Service添加updateScore方法
    • 3.4 测试


今日内容

在这里插入图片描述

1 实时流式计算

在这里插入图片描述

1.1 应用场景

在这里插入图片描述

1.2 技术方案选型

在这里插入图片描述

2 Kafka Stream

2.1 概述

在这里插入图片描述

在这里插入图片描述

2.2 KafkaStream

在这里插入图片描述

2.3 入门demo

2.3.1 需求分析

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

2.3.2 实现

在这里插入图片描述

还是在kafka-demo的模块里实现

2.3.2.1 添加依赖
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <exclusions>
        <exclusion>
            <artifactId>connect-json</artifactId>
            <groupId>org.apache.kafka</groupId>
        </exclusion>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>
2.3.2.2 创建快速启动,生成kafka流
public class KafkaStreamQuickStart {

    public static void main(String[] args) {

        //kafka的配置信息
        Properties prop = new Properties();
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.129:9092");
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");

        //stream 构建器
        StreamsBuilder streamsBuilder = new StreamsBuilder();

        //流式计算
        streamProcessor(streamsBuilder);

        //创建kafkaStream对象
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
        //开启流式计算
        kafkaStreams.start();
    }

    /**
     * 流式计算
     * 消息的内容:hello kafka  hello itcast
     * @param streamsBuilder
     */
    private static void streamProcessor(StreamsBuilder streamsBuilder) {
        //创建kstream对象,同时指定从那个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        /**
         * 处理消息的value
         */
        stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                    @Override
                    public Iterable<String> apply(String value) {
                        return Arrays.asList(value.split(" "));
                    }
                })
                //按照value进行聚合处理
                .groupBy((key,value)->value)
                //时间窗口
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //统计单词的个数
                .count()
                //转换为kStream
                .toStream()
                .map((key,value)->{
                    System.out.println("key:"+key+",vlaue:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //发送消息
                .to("itcast-topic-out");
    }
}
2.3.2.3 修改生产者

修改com.heima.kafka.sample.ProducerQuickStart的方法

public class ProducerQuickStart {
 
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.129:9092");
        //发送失败,失败的重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG,5);
        //消息key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
 
        //2.生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

        /**
         * 第一个参数:topic 第二个参数:key 第三个参数:value
         */
        //封装发送的消息
        //ProducerRecord<String,String> record = new ProducerRecord<String, String>("topic-first","key-001","hello kafka");
        for(int i=0;i<5;i++){
            ProducerRecord<String,String> record = new ProducerRecord<String, String>("itcast-topic-input","hello kafka"+" "+i);
            //3.发送消息
            producer.send(record);
        }
        producer.close();

2.3.2.4 修改消费者

修改com.heima.kafka.sample.ConsumerQuickStart的方法

public class ConsumerQuickStart {
 
    public static void main(String[] args) {
        //1.添加kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.204.129:9092");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        //消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //手动提交偏移量
        //properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        //2.消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
 
        //3.订阅主题
        consumer.subscribe(Collections.singletonList("itcast-topic-out"));

 
        //当前线程一直处于监听状态
        while (true) {
            //4.获取消息
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
                System.out.println(consumerRecord.offset());
                System.out.println(consumerRecord.partition());
            }
        }
 
    }
 
}
2.3.2.5 测试

先启动消费者,再启动kafkaStream,再启动生产者

发送消息为"hello kafka"+" "+i,一共五次

在这里插入图片描述

符合我们发的

2.4 SpringBoot集合KafkaStream

在这里插入图片描述

在这里插入图片描述

2.4.1 创建自定配置参数类

在kafka-demo中创建com.heima.kafka.config.KafkaStreamConfig类

/**
 * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
 */
@Getter
@Setter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
    private String hosts;
    private String group;
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);//连接信息
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");//组
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");//应用名称
        props.put(StreamsConfig.RETRIES_CONFIG, 10);//重试次数
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//key序列化器
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return new KafkaStreamsConfiguration(props);
    }
}

2.4.2 修改配置文件

修改heima-leadnews-test/kafka-demo/src/main/resources/application.yaml

将其放到最底下

kafka:
  hosts: 192.168.204.129:9092
  group: ${spring.application.name}
server:
  port: 9991
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: 192.168.204.129:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:
  hosts: 192.168.204.129:9092
  group: ${spring.application.name}

2.4.3 创建配置类创建KStream对象

创建com.heima.kafka.stream.KafkaStreamHelloListener

等于KStream放入spring容器中进行直接监听

@Configuration
@Slf4j
public class KafkaStreamHelloListener {
    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //创建kstream对象,同时指定从那个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split(" "));
            }
        })
                //根据value进行聚合分组
                .groupBy((key,value)->value)
                //聚合计算时间间隔
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //求单词的个数
                .count()
                .toStream()
                //处理后的结果转换为string字符串
                .map((key,value)->{
                    System.out.println("key:"+key+",value:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //发送消息
                .to("itcast-topic-out");
        return stream;
    }
}

2.4.4 测试

启动kafka启动类,启动消费者和生产者

发送消息是"hello kafka",一共五次

在这里插入图片描述

3 热点文章实时计算

3.1 思路说明

在这里插入图片描述

3.2 实现步骤

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

3.3 具体实现

3.3.1 为行为微服务添加kafka配置

在heima-leadnews-behavior微服务中集成kafka生产者配置

spring:
  application:
    name: leadnews-behavior
  kafka:
    bootstrap-servers: 192.168.204.129:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

3.3.2 行为微服务中发送消息的消息体实体类

定义消息发送封装类:UpdateArticleMess

在heima-leadnews-model中创建com.heima.model.message.UpdateArticleMess实体类

package com.heima.model.message;
 
import lombok.Data;
 
@Data
public class UpdateArticleMess {
 
    /**
     * 修改文章的字段类型
      */
    private UpdateArticleType type;
    /**
     * 文章ID
     */
    private Long articleId;
    /**
     * 修改数据的增量,可为正负
     */
    private Integer add;
 
    public enum UpdateArticleType{
        COLLECTION,COMMENT,LIKES,VIEWS;
    }
}

3.3.3 定义kafka流接收的topic

在heima-leadnews-common中创建com.heima.common.constants.HotArticleConstants常量类

package com.heima.common.constants;
public class HotArticleConstants {
    public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
}

3.3.4 修改用户行为后的逻辑(相当于生产者)

点赞之后就要发送消息了,所以去修改用户点赞的实现类com.heima.behavior.service.impl.ApLikesBehaviorServiceImpl

@Service
@Transactional
@Slf4j
public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService {

    @Autowired
    private CacheService cacheService;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Override
    public ResponseResult like(LikesBehaviorDto dto) {

        //1.检查参数
        if (dto == null || dto.getArticleId() == null || checkParam(dto)) {
            return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
        }

        //2.是否登录
        ApUser user = AppThreadLocalUtil.getUser();
        if (user == null) {
            return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
        }

        //组装发送给kafka的消息类
        UpdateArticleMess message =new UpdateArticleMess();
        message.setArticleId(dto.getArticleId());
        message.setType(UpdateArticleMess.UpdateArticleType.LIKES);

        //3.点赞  保存数据
        if (dto.getOperation() == 0) {
            Object obj = cacheService.hGet(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
            if (obj != null) {
                return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID, "已点赞");
            }
            // 保存当前key
            log.info("保存当前key:{} ,{}, {}", dto.getArticleId(), user.getId(), dto);
            cacheService.hPut(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));
            //添加行为的正负
            message.setAdd(1);
        } else {
            // 删除当前key
            log.info("删除当前key:{}, {}", dto.getArticleId(), user.getId());
            cacheService.hDelete(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
            //添加行为的正负
            message.setAdd(-1);
        }

        //4.给kafka发送消息
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(message));
        
        return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);

    }

    /**
     * 检查参数
     *
     * @return
     */
    private boolean checkParam(LikesBehaviorDto dto) {
        if (dto.getType() > 2 || dto.getType() < 0 || dto.getOperation() > 1 || dto.getOperation() < 0) {
            return true;
        }
        return false;
    }
}

点赞有,阅读都有,一样需要改

@Service
@Transactional
@Slf4j
public class ApReadBehaviorServiceImpl implements ApReadBehaviorService {

    @Autowired
    private CacheService cacheService;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Override
    public ResponseResult readBehavior(ReadBehaviorDto dto) {
        //1.检查参数
        if (dto == null || dto.getArticleId() == null) {
            return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
        }

        //2.是否登录
        ApUser user = AppThreadLocalUtil.getUser();
        if (user == null) {
            return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
        }
        //组装发送给kafka的消息类
        UpdateArticleMess message =new UpdateArticleMess();
        message.setArticleId(dto.getArticleId());
        message.setType(UpdateArticleMess.UpdateArticleType.VIEWS);

        //更新阅读次数
        String readBehaviorJson = (String) cacheService.hGet(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
        if (StringUtils.isNotBlank(readBehaviorJson)) {
            ReadBehaviorDto readBehaviorDto = JSON.parseObject(readBehaviorJson, ReadBehaviorDto.class);
            dto.setCount((short) (readBehaviorDto.getCount() + dto.getCount()));
        }
        // 保存当前key
        log.info("保存当前key:{} {} {}", dto.getArticleId(), user.getId(), dto);
        cacheService.hPut(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));
        //添加行为的正负
        message.setAdd(1);
        //发送消息
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(message));
        return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);

    }
}

3.3.5 Stream聚合

因为用户行为最后都体现在文章上面,所以kafkaStream的数据聚合应该在文章微服务中。

3.3.5.1 创建自定配置参数类

在heima-leadnews-article中创建com.heima.article.config.KafkaStreamConfig配置类

@Getter
@Setter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
    private String hosts;
    private String group;
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);//连接信息
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");//组
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");//应用名称
        props.put(StreamsConfig.RETRIES_CONFIG, 10);//重试次数
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//key序列化器
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return new KafkaStreamsConfiguration(props);
    }
}
3.3.5.2 添加kafkaStream的配置

在nacos中为文章微服务添加kafkaStream的配置

kafka:
  hosts: 192.168.204.129:9092
  group: ${spring.application.name}
3.3.5.3 定义kafka流转发的topic

在com.heima.common.constants.HotArticleConstants中添加HOT_ARTICLE_INCR_HANDLE_TOPIC

public class HotArticleConstants {
    public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
    public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="hot.article.incr.handle.topic";
}
3.3.5.4 文章微服务中的发送消息的消息体实体类

因为聚合后的数据是COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0,不包含文章id,所以需要一个类把他们封装起来

在heima-leadnews-model中创建com.heima.model.message.ArticleVisitStreamMess实体类

@Data
public class ArticleVisitStreamMess {
    /**
     * 文章id
     */
    private Long articleId;
    /**
     * 阅读
     */
    private int view;
    /**
     * 收藏
     */
    private int collect;
    /**
     * 评论
     */
    private int comment;
    /**
     * 点赞
     */
    private int like;
}
3.3.5.5 创建配置类创建KStream对象

定义stream,接收消息并聚合,创建com.heima.article.stream.HotArticleStreamHandler类

@Configuration
@Slf4j
public class HotArticleStreamHandler {

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //接收消息
        KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
        //聚合流式处理
        stream.map((key,value)->{
                    UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
                    //重置消息的key:1234343434   和  value: likes:1
                    return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
                })
                //按照文章id进行聚合
                .groupBy((key,value)->key)
                //时间窗口
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                /**
                 * 自行的完成聚合的计算
                 */
                .aggregate(new Initializer<String>() {
                    /**
                     * 初始方法,返回值是消息的value 初始值,也就是aggValue
                     * @return
                     */
                    @Override
                    public String apply() {
                        return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
                    }
                    /**
                     * 真正的聚合操作,返回值是消息的value
                     */
                }, new Aggregator<String, String, String>() {
                    /**
                     * key:文章id value:消息的value  aggValue:初始值
                     * @param key key:1234343434
                     * @param value value: likes:1
                     * @param aggValue 初始值 COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
                     * @return
                     */
                    @Override
                    public String apply(String key, String value, String aggValue) {
                        if(StringUtils.isBlank(value)){
                            return aggValue;
                        }
                        String[] aggAry = aggValue.split(",");
                        int col = 0,com=0,lik=0,vie=0;
                        for (String agg : aggAry) {
                            String[] split = agg.split(":");
                            /**
                             * 获得初始值,也是时间窗口内计算之后的值
                             */
                            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                                case COLLECTION:
                                    col = Integer.parseInt(split[1]);
                                    break;
                                case COMMENT:
                                    com = Integer.parseInt(split[1]);
                                    break;
                                case LIKES:
                                    lik = Integer.parseInt(split[1]);
                                    break;
                                case VIEWS:
                                    vie = Integer.parseInt(split[1]);
                                    break;
                            }
                        }
                        /**
                         * 累加操作
                         */
                        String[] valAry = value.split(":");
                        switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){
                            case COLLECTION:
                                col += Integer.parseInt(valAry[1]);
                                break;
                            case COMMENT:
                                com += Integer.parseInt(valAry[1]);
                                break;
                            case LIKES:
                                lik += Integer.parseInt(valAry[1]);
                                break;
                            case VIEWS:
                                vie += Integer.parseInt(valAry[1]);
                                break;
                        }

                        String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
                        System.out.println("文章的id:"+key);
                        System.out.println("当前时间窗口内的消息处理结果:"+formatStr);
                        return formatStr;
                    }
                }, Materialized.as("hot-atricle-stream-count-001"))
                .toStream()
                /**
                 * 格式化消息的key和value
                 */
                .map((key,value)->{
                    return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
                })
                //发送消息
                .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);

        return stream;
    }

    /**
     * 格式化消息的value数据
     * @param articleId
     * @param value
     * @return
     */
    public String formatObj(String articleId,String value){
        ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
        mess.setArticleId(Long.valueOf(articleId));
        //COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
        String[] valAry = value.split(",");
        for (String val : valAry) {
            String[] split = val.split(":");
            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                case COLLECTION:
                    mess.setCollect(Integer.parseInt(split[1]));
                    break;
                case COMMENT:
                    mess.setComment(Integer.parseInt(split[1]));
                    break;
                case LIKES:
                    mess.setLike(Integer.parseInt(split[1]));
                    break;
                case VIEWS:
                    mess.setView(Integer.parseInt(split[1]));
                    break;
            }
        }
        log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));
        return JSON.toJSONString(mess);
    }
}

3.3.6 创建消费者,用于监听聚合后的消息

创建com.heima.article.listener.ArticleIncrHandleListener用于监听和处理聚合后的消息

@Component
@Slf4j
public class ArticleIncrHandleListener {
 
    @Autowired
    private ApArticleService apArticleService;
 
    @KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
    public void onMessage(String mess){
        if(StringUtils.isNotBlank(mess)){
            ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);
            apArticleService.updateScore(articleVisitStreamMess);
        }
    }
}

3.3.7 在文章微服务中更新当前分值

在这里插入图片描述

3.3.8 Service添加updateScore方法

在文章微服务的service中完善功能

接口

void updateScore(ArticleVisitStreamMess articleVisitStreamMess);

实现:

/**
 * 更新文章的分值  同时更新缓存中的热点文章数据
 * @param mess
 */
@Override
public void updateScore(ArticleVisitStreamMess mess) {
    //1.更新文章的阅读、点赞、收藏、评论的数量
    ApArticle apArticle = updateArticle(mess);
    //2.计算文章的分值
    Integer score = computeScore(apArticle);
    score = score * 3;

    //3.替换当前文章对应频道的热点数据
    replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());

    //4.替换推荐对应的热点数据
    replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG);
}

/**
 * 替换数据并且存入到redis
 * @param apArticle
 * @param score
 * @param s
 */
private void replaceDataToRedis(ApArticle apArticle, Integer score, String s) {
    String articleListStr = cacheService.get(s);
    if (StringUtils.isNotBlank(articleListStr)) {
        List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleListStr, HotArticleVo.class);

        boolean flag = true;

        //如果缓存中存在该文章,只更新分值
        for (HotArticleVo hotArticleVo : hotArticleVoList) {
            if (hotArticleVo.getId().equals(apArticle.getId())) {
                hotArticleVo.setScore(score);
                flag = false;
                break;
            }
        }

        //如果缓存中不存在,查询缓存中分值最小的一条数据,进行分值的比较,如果当前文章的分值大于缓存中的数据,就替换
        if (flag) {
            if (hotArticleVoList.size() >= 30) {
                hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
                HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);
                if (lastHot.getScore() < score) {
                    hotArticleVoList.remove(lastHot);
                    HotArticleVo hot = new HotArticleVo();
                    BeanUtils.copyProperties(apArticle, hot);
                    hot.setScore(score);
                    hotArticleVoList.add(hot);
                }

            } else {
                HotArticleVo hot = new HotArticleVo();
                BeanUtils.copyProperties(apArticle, hot);
                hot.setScore(score);
                hotArticleVoList.add(hot);
            }
        }
        //缓存到redis
        hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
        cacheService.set(s, JSON.toJSONString(hotArticleVoList));
    }
}

/**
 * 更新文章行为数量
 * @param mess
 */
private ApArticle updateArticle(ArticleVisitStreamMess mess) {
    ApArticle apArticle = getById(mess.getArticleId());
    apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());
    apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());
    apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());
    apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());
    updateById(apArticle);
    return apArticle;
}

/**
 * 计算文章的具体分值
 * @param apArticle
 * @return
 */
private Integer computeScore(ApArticle apArticle) {
    Integer score = 0;
    if(apArticle.getLikes() != null){
        score += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;
    }
    if(apArticle.getViews() != null){
        score += apArticle.getViews()* ArticleConstants.HOT_ARTICLE_VIEW_WEIGHT;
    }
    if(apArticle.getComment() != null){
        score += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;
    }
    if(apArticle.getCollection() != null){
        score += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;
    }
    return score;
}

3.4 测试

前端有问题,就不测试了,功能能明白就行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1623438.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

机器学习作业3____决策树(CART算法)

目录 一、简介 二、具体步骤 样例&#xff1a; 三、代码 四、结果 五、问题与解决 一、简介 CART&#xff08;Classification and Regression Trees&#xff09;是一种常用的决策树算法&#xff0c;可用于分类和回归任务。这个算法由Breiman等人于1984年提出&#xff0c;它…

Clion连接MySQL数据库:实现C/C++语言与MySQL交互

确保你的电脑里已经有了MySQL。 1、找到MySQL的目录 2、进入lib目录 3、复制libmysql.dll和libmysql.lib文件 4、将这俩文件粘贴到你的clion项目的cmake-build-debug目录下 如果不是在这个目录下&#xff0c;运行时会出以下错误报错&#xff1a; 进程已结束&#xff0c;退…

火绒安全的应用介绍

火绒安全软件是一款集成了杀毒、防御和管控功能的安全软件&#xff0c;旨在为用户提供全面的计算机安全保障。以下是火绒安全软件的一些详细介绍&#xff1a; 系统兼容性强&#xff1a;该软件支持多种操作系统&#xff0c;包括Windows 11、Windows 10、Windows 8、Windows 7、…

AI预测福彩3D第9套算法实战化测试第3弹2024年4月25日第3次测试

今天继续进行新算法的测试&#xff0c;今天是第3次测试。好了&#xff0c;废话不多说了&#xff0c;直接上图上结果。 2024年4月25日福彩3D预测结果 6码定位方案如下&#xff1a; 百位&#xff1a;6、4、3、7、2、8 十位&#xff1a;8、4、9、3、1、0 个位&#xff1a;7、6、9、…

Linux进程间通信 管道系列: 利用管道实现进程池(匿名和命名两个版本)

Linux进程间通信 管道系列: 利用管道实现进程池[匿名和命名两个版本] 一.匿名管道实现进程池1.池化技术2.搭架子3.代码编写1.创建子进程1.利用命令行参数传入创建几个子进程2.创建管道和子进程(封装Channel类)1.先描述2.在组织3.开始创建 2.封装MainProcess类3.控制子进程1.封装…

无限滚动分页加载与下拉刷新技术探析:原理深度解读与实战应用详述

滚动分页加载&#xff08;也称为无限滚动加载、滚动分页等&#xff09;是一种常见的Web和移动端应用界面设计模式&#xff0c;用于在用户滚动到底部时自动加载下一页内容&#xff0c;而无需点击传统的分页按钮。这种设计旨在提供更加流畅、连续的浏览体验&#xff0c;减少用户交…

人耳的七个效应

1、掩蔽效应 • 人们在安静环境中能够分辨出轻微的声音&#xff0c;即人耳对这个声音的听域很低&#xff0c;但在嘈杂的环境中轻微的声音就会被淹没掉&#xff0c;这时将轻微的声音增强才能听到。 • 这种在聆听时&#xff0c;一个声音的听阈因另一声音的出现而提高的现象&…

ThinkPad E14 Gen 4,R14 Gen 4,E15 Gen 4(21E3,21E4,21E5,21E6,21E7)原厂Win11系统恢复镜像下载

lenovo联想ThinkPad笔记本电脑原装出厂Windows11系统安装包&#xff0c;恢复出厂开箱状态一模一样 适用型号&#xff1a;ThinkPad E14 Gen 4,ThinkPad R14 Gen 4,ThinkPad E15 Gen 4 (21E3,21E4,21E5,21E6,21E7) 链接&#xff1a;https://pan.baidu.com/s/1QRHlg2yT_RFQ81Tg…

服务部署后出错怎么快速调试?试试JDWP协议

前言 原文链接&#xff1a;教你使用 JDWP 远程调试服务 在我们日常开发工作中&#xff0c;经常会遇到写好的代码线上出了问题&#xff0c;但是本地又无法复现&#xff0c;看着控制台输出的日志恨自己当初没有多打几条日志&#xff0c;然后追着日志一条一条查&#xff0c;不说…

安装 Nginx 的三种方式

通过 Nginx 源码安装需要提前准备的内容&#xff1a; GCC 编译器 Nginx 是使用 C 语言编写的程序&#xff0c;因此想要运行 Nginx 就需要安装一个编译工具 GCC 就是一个开源的编译器集合&#xff0c;用于处理各种各样的语言&#xff0c;其中就包含了 C 语言 使用命令 yum i…

python基础语法--列表

一、列表的概念 列表&#xff08;List&#xff09;是一种有序、可变、允许重复元素的数据结构。列表用于存储一组相关的元素&#xff0c;并且可以根据需要动态地进行增加、删除、修改和访问。以下是列表的主要特点和操作&#xff1a; 有序性&#xff1a; 列表中的元素是按照它…

工作与生活,如何找到平衡点,实现双赢?(2个简单工具答案一目了然)

前言 很多 35岁左右上有老下有小的程序员会陷入一个瓶颈期&#xff0c;在工作上想努力多赚钱&#xff0c;但是每天回到家 23 点&#xff0c;老婆孩子早已熟睡。好不容易周末有点休息时间&#xff0c;但是一个电话接一个&#xff0c;由于是生产问题还不得不接。 那么职场人应该如…

激活IDM下载器并配置百度网盘

前言&#xff1a; 最近想下载一些软件&#xff0c;奈何不充钱的百度网盘的速度实在太慢了&#xff0c;不到一个G的文件夹奈何下了一晚上&#xff0c;只能重新找一下idm的下载了。 但是idm的正版是需要收费的&#xff0c;所以有白嫖党的破解版就横空出世了。 正文&#xff1a…

【目标跟踪】ByteTrack详解与代码细节

文章目录 一、前言二、代码详解2.1、新起航迹2.2、预测2.3、匹配2.4、结果发布2.5、总结 三、流程图四、部署 一、前言 论文地址&#xff1a;https://arxiv.org/pdf/2110.06864.pdf git地址&#xff1a;https://github.com/ifzhang/ByteTrack ByteTrack 在是在 2021 年 10 月…

OpenAIGPT-4.5提前曝光?

OpenAI GPT-4.5的神秘面纱&#xff1a;科技界的震撼新篇章 在人工智能的世界里&#xff0c;每一次技术的飞跃都不仅仅是一次更新&#xff0c;而是对未来无限可能的探索。近日&#xff0c;科技巨头OpenAI似乎再次站在了这场革命的前沿&#xff0c;其潜在的新产品——GPT-4.5 Tur…

Https协议原理剖析【计算机网络】【三种加密方法 | CA证书 】

目录 一&#xff0c;fidler工具 前提知识 二&#xff0c;Https原理解析 1. 中间人攻击 2. 常见的加密方式 1&#xff09;. 对称加密 2&#xff09;. 非对称加密 对称加密 4&#xff09;. CA证书 1. 数据摘要 3. 数字签名 CA证书 理解数据签名 存在的安全疑问&am…

ubuntu ROS1 C++下使用免安装eigen库的方法

1、eigen库的定义及头文件介绍 Eigen是一个高层次的C 库&#xff0c;有效支持线性代数&#xff0c;矩阵和矢量运算&#xff0c;数值分析及其相关的算法。 2、获取eigen库安装包 下载地址&#xff1a;eigen库官网 &#xff0c;如下图所示&#xff1a; 下载最新版tar.bz2即可&…

嵌入式Linux driver开发实操(二十三):ASOC

ASoC的结构及嵌入到Linux音频框架 ALSA片上系统(ASoC)层的总体项目目标是为嵌入式片上系统处理器(如pxa2xx、au1x00、iMX等)和便携式音频编解码器提供更好的ALSA支持。在ASoC子系统之前,内核中对SoC音频有一些支持,但它有一些局限性: ->编解码器驱动程序通常与底层So…

甘特图是什么?如何利用其优化项目管理流程?

甘特图是项目管理软件中十分常见的功能&#xff0c;可以说每一个项目经理都要学会使用甘特图才能更好的交付项目。什么是甘特图&#xff1f;甘特图用来做什么&#xff1f;简单来说一种将项目任务与时间关系直观表示的图表&#xff0c;直观地展示了任务进度和持续时间。 一、甘特…

博睿数据亮相GOPS全球运维大会,Bonree ONE 2024春季正式版发布!

2024年4月25日&#xff0c;博睿数据 Bonree ONE 2024 春季正式版焕新发布。同时&#xff0c;博睿数据AIOps首席专家兼产品总监贺安辉携核心产品新一代一体化智能可观测平台 Bonree ONE 亮相第二十二届 GOPS 全球运维大会深圳站。 Bonree ONE 2024 春季版产品重点升级数据采集、…