黑马头条 分布式任务调度 定时计算热点文章、xxl-job、kafkaStream

news2025/1/20 15:45:57

xxl-Job分布式任务调度

1 今日内容

1.1 需求分析

目前实现的思路:从数据库直接按照发布时间倒序查询

  • 问题1: 如何访问量较大,直接查询数据库,压力较大
  • 问题2: 新发布的文章会展示在前面,并不是热点文章

1.2 实现思路

把热点数据存入redis进行展示

判断文章是否是热点,有几项标准: 点赞数量,评论数量,阅读数量,收藏数量

计算文章热度,有两种方案:

  • 定时计算文章热度
  • 实时计算文章热度

1.3 定时计算

  • 根据文章的行为(点赞、评论、阅读、收藏)计算文章的分值,利用定时任务每天完成一次计算
  • 把分值较大的文章数据存入到redis
  • App端用户查询文章列表的时候,优先从redis中查询热度较高的文章数据

1.4 定时任务框架-xxljob

spring传统的定时任务@Scheduled,但是这样存在这一些问题 :

  • 做集群任务的重复执行问题(每个微服务都会做定时任务)
  • cron表达式定义在代码之中,修改不方便
  • 定时任务失败了,无法重试也没有统计
  • 如果任务量过大,不能有效的分片执行

解决这些问题的方案为: xxl-job 分布式任务调度框架

1.5 学习目录

  • xxl-job概述
  • xxl-job入门案例
  • xxl-job高级部分
  • 热点文章定时计算
  • 查询文章接口改造

2.分布式任务调度

2.1 什么是分布式任务调度

当前软件的架构已经开始向分布式架构转变,将单体结构拆分为若干服务,服务之间通过网络交互来完成业务处理。

在分布式架构下,一个服务往往会部署多个实例来运行我们的业务,如果在这种分布式系统环境下运行任务调度,我们称之为布式任务调

将任务调度程序分布式构建,这样就可以具有分布式系统的特点,并且提高任务的调度处理能力

1、并行任务调度

并行任务调度实现靠多线程,如果有大量任务需要调度,此时光靠多线程就会有瓶颈了,因为一台计算机CPU的处理能力是有限的。

如果将任务调度程序分布式部署,每个结点还可以部署为集群,这样就可以让多台计算机共同去完成任务调度,我们可以将任务分割为若干个分片,由不同的实例并行执行,来提高任务调度的处理效率。

2、高可用

若某一个实例宕机,不影响其他实例来执行任务。

3、弹性扩容

当集群中增加实例就可以提高并执行任务的处理效率。

4、任务管理与监测

对系统中存在的所有定时任务进行统一的管理及监测。让开发人员及运维人员能够时刻了解任务执行情况,从而做出快速的应急处理响应。

分布式任务调度面临的问题:

当任务调度以集群方式部署,同一个任务调度可能会执行多次,例如:电商系统定期发放优惠券,就可能重复发放优惠券,对公司造成损失,信用卡还款提醒就会重复执行多次,给用户造成烦恼,所以我们需要控制相同的任务在多个运行实例上只执行一次。常见解决方案:

  • 分布式锁,多个实例在任务执行前首先需要获取锁,如果获取失败那么就证明有其他服务已经在运行,如果获取成功那么证明没有服务在运行定时任务,那么就可以执行。
  • ZooKeeper选举,利用ZooKeeper对Leader实例执行定时任务,执行定时任务的时候判断自己是否是Leader,如果不是则不执行,如果是则执行业务逻辑,这样也能达到目的。

2.2 xxl-Job简介

针对分布式任务调度的需求,市场上出现了很多的产品:

1) TBSchedule:淘宝推出的一款非常优秀的高性能分布式调度框架,目前被应用于阿里、京东、支付宝、国美等很多互联网企业的流程调度系统中。但是已经多年未更新,文档缺失严重,缺少维护。

2) XXL-Job:大众点评的分布式任务调度平台,是一个轻量级分布式任务调度平台, 其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用

3)Elastic-job:当当网借鉴TBSchedule并基于quartz 二次开发的弹性分布式任务调度系统,功能丰富强大,采用zookeeper实现分布式协调,具有任务高可用以及分片功能。

4)Saturn: 唯品会开源的一个分布式任务调度平台,基于Elastic-job,可以全域统一配置,统一监

控,具有任务高可用以及分片功能。

XXL-JOB是一个分布式任务调度平台(美团公司成员开源的项目),其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。

源码地址:xxl-job: 一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。

文档地址:分布式任务调度平台XXL-JOB

特性

  • 简单灵活

提供Web页面对任务进行管理,管理系统支持用户管理、权限控制;

支持容器部署;

支持通过通用HTTP提供跨平台任务调度;

  • 丰富的任务管理功能

支持页面对任务CRUD操作;

支持在页面编写脚本任务、命令行任务、Java代码任务并执行;

支持任务级联编排,父任务执行结束后触发子任务执行;

支持设置指定任务执行节点路由策略,包括轮询、随机、广播、故障转移、忙碌转移等;

支持Cron方式、任务依赖、调度中心API接口方式触发任务执行

  • 高性能

任务调度流程全异步化设计实现,如异步调度、异步运行、异步回调等,有效对密集调度进行流量削峰;

  • 高可用

任务调度中心、任务执行节点均 集群部署,支持动态扩展、故障转移

支持任务配置路由故障转移策略,执行器节点不可用是自动转移到其他节点执行

支持任务超时控制、失败重试配置

支持任务处理阻塞策略:调度当任务执行节点忙碌时来不及执行任务的处理策略,包括:串行、抛弃、覆盖策略

  • 易于监控运维

支持设置任务失败邮件告警,预留接口支持短信、钉钉告警;

支持实时查看任务执行运行数据统计图表、任务进度监控数据、任务完整执行日志;

2.3 XXL-Job-环境搭建

2.3.1 调度中心环境要求

  • Maven3+
  • Jdk1.8+
  • Mysql5.7+

2.3.2 源码仓库地址

源码仓库地址

Release Download

GitHub - xuxueli/xxl-job: A distributed task scheduling framework.(分布式任务调度平台XXL-JOB)

Download

xxl-job: 一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。

Download

也可以使用资料文件夹中的源码

2.3.3 初始化“调度数据库”

请下载项目源码并解压,获取 “调度数据库初始化SQL脚本” 并执行即可。

位置:/xxl-job/doc/db/tables_xxl_job.sql 共8张表

- xxl_job_lock:任务调度锁表;
- xxl_job_group:执行器信息表,维护任务执行器信息;
- xxl_job_info:调度扩展信息表: 用于保存XXL-JOB调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等;
- xxl_job_log:调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;
- xxl_job_logglue:任务GLUE日志:用于保存GLUE更新历史,用于支持GLUE的版本回溯功能;
- xxl_job_registry:执行器注册表,维护在线的执行器和调度中心机器地址信息;
- xxl_job_user:系统用户表;

调度中心支持集群部署,集群情况下各节点务必连接同一个mysql实例;

如果mysql做主从,调度中心集群节点务必强制走主库;

2.3.4 编译源码

解压源码,按照maven格式将源码导入IDE, 使用maven进行编译即可,源码结构如下:

2.3.5 配置部署“调度中心”

调度中心项目:xxl-job-admin

作用:统一管理任务调度平台上调度任务,负责触发调度执行,并且提供任务管理平台

步骤一:调度中心配置

调度中心配置文件地址:/xxl-job/xxl-job-admin/src/main/resources/application.properties

下面这个只改25行数据库的连接信息就行,修改为自己的数据库

### web
server.port=8888
server.servlet.context-path=/xxl-job-admin

### actuator
management.server.servlet.context-path=/actuator
management.health.mail.enabled=false

### resources
spring.mvc.servlet.load-on-startup=0
spring.mvc.static-path-pattern=/static/**
spring.resources.static-locations=classpath:/static/

### freemarker
spring.freemarker.templateLoaderPath=classpath:/templates/
spring.freemarker.suffix=.ftl
spring.freemarker.charset=UTF-8
spring.freemarker.request-context-attribute=request
spring.freemarker.settings.number_format=0.##########

### mybatis
mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml
#mybatis.type-aliases-package=com.xxl.job.admin.core.model

### xxl-job, datasource
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?Unicode=true&serverTimezone=Asia/Shanghai&characterEncoding=UTF-8
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

### datasource-pool
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.minimum-idle=10
spring.datasource.hikari.maximum-pool-size=30
spring.datasource.hikari.auto-commit=true
spring.datasource.hikari.idle-timeout=30000
spring.datasource.hikari.pool-name=HikariCP
spring.datasource.hikari.max-lifetime=900000
spring.datasource.hikari.connection-timeout=10000
spring.datasource.hikari.connection-test-query=SELECT 1

### xxl-job, email
spring.mail.host=smtp.qq.com
spring.mail.port=25
spring.mail.username=xxx@qq.com
spring.mail.password=xxx
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory

### xxl-job, access token
xxl.job.accessToken=

### xxl-job, i18n (default is zh_CN, and you can choose "zh_CN", "zh_TC" and "en")
xxl.job.i18n=zh_CN

## xxl-job, triggerpool max size
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100

### xxl-job, log retention days
xxl.job.logretentiondays=30

启动调度中心,默认登录账号 “admin/123456”, 登录后运行界面如下图所示。

2.4 配置部署调度中心-docker安装

1.创建mysql容器,初始化xxl-job的SQL脚本 设了密码是root;

他给的centOs里面已经有了叫mysql57

docker run -p 3306:3306 --name mysql57 \
-v /opt/mysql/conf:/etc/mysql \
-v /opt/mysql/logs:/var/log/mysql \
-v /opt/mysql/data:/var/lib/mysql \
-e MYSQL_ROOT_PASSWORD=root \
-d mysql:5.7

2.拉取镜像

docker pull xuxueli/xxl-job-admin:2.3.0

3.创建容器

docker run -e PARAMS="--spring.datasource.url=jdbc:mysql://192.168.200.130:3306/xxl_job?Unicode=true&characterEncoding=UTF-8 \
--spring.datasource.username=root \
--spring.datasource.password=root" \
-p 8888:8080 -v /tmp:/data/applogs \
--name xxl-job-admin --restart=always  -d xuxueli/xxl-job-admin:2.3.0

2.5 xxl-job入门案例编写

2.5.1 登录调度中心,点击下图所示“新建任务”按钮,新建示例任务

2.5.2 创建xxljob-demo项目,导入依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!--xxl-job-->
    <dependency>
        <groupId>com.xuxueli</groupId>
        <artifactId>xxl-job-core</artifactId>
        <version>2.3.0</version>
    </dependency>
</dependencies>

2.5.3 application.yml配置

server:
  port: 8881

xxl:
  job:
    admin:
      addresses: http://192.168.200.130:8888/xxl-job-admin
    executor:
      appname: xxl-job-executor-sample
      port: 9999

2.5.4 新建配置类

package com.heima.xxljob.config;

import org.springframework.context.annotation.Configuration;

/**
 * xxl-job config
 *
 * @author xuxueli 2017-04-28
 */
@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.port}")
    private int port;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setPort(port);
        return xxlJobSpringExecutor;
    }
}

2.5.4 任务代码,重要注解:@XxlJob(“JobHandler”)

package com.heima.xxljob.job;

import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;

@Component
public class HelloJob {

    @XxlJob("demoJobHandler")
    public void helloJob(){
        System.out.println("简单任务执行了。。。。");

    }
}

2.5.5 测试-单节点

  • 启动微服务
  • 在xxl-job的调度中心中启动任务

2.6 任务详解-执行器

  • 执行器:
    • 任务的绑定的执行器,任务触发调度时将会自动发现注册成功的执行器, 实现任务自动发现功能;
  • 另一方面也可以方便的进行任务分组。每个任务必须绑定一个执行器

以下是执行器的属性说明:

属性名称

说明

AppName

是每个执行器集群的唯一标示AppName, 执行器会周期性以AppName为对象进行自动注册。可通过该配置自动发现注册成功的执行器, 供任务调度时使用;

名称

执行器名称, 因为AppName限制字母数字等组成,可读性不强, 名称为了提高执行器的可读性;如 分片广播执行器

排序

执行器的排序, 系统中需要执行器的地方,如任务新增, 将会按照该排序读取可用的执行器列表;

注册方式

调度中心获取执行器地址的方式

机器地址

注册方式为"手动录入"时有效,支持人工维护执行器的地址信息

自动注册和手动注册的区别和配置

2.7 任务详解-基础配置

基础配置

  • 执行器:每个任务必须绑定一个执行器, 方便给任务进行分组
  • 任务描述:任务的描述信息,便于任务管理
  • 负责人:任务的负责人
  • 报警邮件:
    • 任务调度失败时邮件通知的邮箱地址,支持配置多邮箱地址,配置多个邮箱地址时用逗号分隔

调度配置

  • 调度类型:
    • 无:该类型不会主动触发调度;
    • CRON:该类型将会通过CRON,触发任务调度;
    • 固定速度:该类型将会以固定速度,触发任务调度;按照固定的间隔时间,周期性触发;

任务配置

  • 运行模式:

BEAN模式(Spring管理):任务以JobHandler方式维护在执行器端;需要结合 "JobHandler" 属性匹配执行器中任务;

  • JobHandler:运行模式为 "BEAN模式" 时生效,对应执行器中新开发的JobHandler类“@JobHandler”注解自定义的value值
  • 执行参数:任务执行所需的参数

阻塞处理策略

阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;

  • 单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO(First Input First Output)队列并以串行方式运行;
  • 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败
  • 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;

路由策略

当执行器集群部署时,提供丰富的路由策略,包括;

  • FIRST(第一个):固定选择第一个机器;
  • LAST(最后一个):固定选择最后一个机器;
  • ROUND(轮询)
  • RANDOM(随机):随机选择在线的机器;
  • CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
  • LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
  • LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
  • FAILOVER(故障转移):
    • 按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
  • BUSYOVER(忙碌转移):
    • 按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
  • SHARDING_BROADCAST(分片广播):
    • 广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

2.8 路由策略(轮询)-案例

1.修改任务为轮询

2.启动多个微服务

修改yml配置文件

server:
  port: ${port:8881}


xxl:
  job:
    admin:
      addresses: http://192.168.200.130:8888/xxl-job-admin
    executor:
      appname: xxl-job-executor-sample
      port: ${executor.port:9999}

3.启动多个微服务

每个微服务轮询的去执行任务

2.9 路由策略(分片广播)

2.9.1 分片逻辑

执行器集群部署时,任务路由策略选择”分片广播”情况下,一次任务调度将会广播触发对应集群中所有执行器执行一次任务

2.9.2 路由策略(分片广播)-案例

需求:让两个节点同时执行10000个任务,每个节点分别执行5000个任务

①:创建分片执行器

②:创建任务,路由策略为分片广播

③:分片广播代码

分片参数

index:当前分片序号(从0开始),执行器集群列表中当前执行器的序号

total:总分片数,执行器集群的总机器数

修改yml配置

server:
  port: ${port:8881}

xxl:
  job:
    admin:
      addresses: http://192.168.200.130:8888/xxl-job-admin
    executor:
      appname: xxl-job-sharding-executor
      port: ${executor.port:9999}

代码

package com.heima.xxljob.job;
import java.util.ArrayList;
import java.util.List;

@Component
public class HelloJob {

    @Value("${server.port}")
    private String port;

    @XxlJob("demoJobHandler")
    public void helloJob(){
        System.out.println("简单任务执行了。。。。"+port);

    }

    @XxlJob("shardingJobHandler")
    public void shardingJobHandler(){
        //分片的参数
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();

        //业务逻辑
        List<Integer> list = getList();
        for (Integer integer : list) {
            if(integer % shardTotal == shardIndex){
                System.out.println("当前第"+shardIndex+"分片执行了,任务项为:"+integer);
            }
        }
    }

    public List<Integer> getList(){
        List<Integer> list = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            list.add(i);
        }
        return list;
    }
}

④:测试

启动多个微服务测试,一次执行可以执行多个任务

3.热点文章-定时计算

3.1 需求分析

需求:为每个频道缓存热度较高的30条文章优先展示

判断文章热度较高的标准是什么?

文章:阅读,点赞,评论,收藏

3.2 实现思路

前五天才有热度

3.3 实现步骤

分值计算不涉及到前端工程,也无需提供api接口,是一个纯后台的功能的开发

3.3.1 频道列表远程接口准备

计算完成新热数据后,需要给每个频道缓存一份数据,所以需要查询所有频道信息

Vo包含文章分值Score

① 在heima-leadnews-feign-api定义远程接口

package com.heima.apis.wemedia;

import com.heima.model.common.dtos.ResponseResult;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;

@FeignClient("leadnews-wemedia")
public interface IWemediaClient {

    @GetMapping("/api/v1/channel/list")
    public ResponseResult getChannels();
}

② heima-leadnews-wemedia端提供接口

package com.heima.wemedia.feign;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class WemediaClient implements IWemediaClient {

    @Autowired
    private WmChannelService wmChannelService;

    @GetMapping("/api/v1/channel/list")
    @Override
    public ResponseResult getChannels() {
        return wmChannelService.findAll();
    }
}

在ApArticleMapper.xml新增方法

<select id="findArticleListByLast5days" resultMap="resultMap">
    SELECT
    aa.*
    FROM
    `ap_article` aa
    LEFT JOIN ap_article_config aac ON aa.id = aac.article_id
    <where>
        and aac.is_delete != 1
        and aac.is_down != 1
        <if test="dayParam != null">
            and aa.publish_time <![CDATA[>=]]> #{dayParam}
        </if>
    </where>
</select>

修改ApArticleMapper类

package com.heima.article.mapper;
import java.util.Date;
import java.util.List;

@Mapper
public interface ApArticleMapper extends BaseMapper<ApArticle> {

    /**
     * 加载文章列表
     * @param dto
     * @param type  1  加载更多   2记载最新
     * @return
     */
    public List<ApArticle> loadArticleList(ArticleHomeDto dto,Short type);

    public List<ApArticle> findArticleListByLast5days(@Param("dayParam") Date dayParam);
}

3.3.2 热文章业务层

定义业务层接口

package com.heima.article.service;

public interface HotArticleService {
    /**
     * 计算热点文章
     */
    public void computeHotArticle();
}

修改ArticleConstans

package com.heima.common.constants;

public class ArticleConstants {
    public static final Short LOADTYPE_LOAD_MORE = 1;
    public static final Short LOADTYPE_LOAD_NEW = 2;
    public static final String DEFAULT_TAG = "__all__";

    public static final String ARTICLE_ES_SYNC_TOPIC = "article.es.sync.topic";

    public static final Integer HOT_ARTICLE_LIKE_WEIGHT = 3;
    public static final Integer HOT_ARTICLE_COMMENT_WEIGHT = 5;
    public static final Integer HOT_ARTICLE_COLLECTION_WEIGHT = 8;

    public static final String HOT_ARTICLE_FIRST_PAGE = "hot_article_first_page_";
}

创建一个vo接收计算分值后的对象

package com.heima.model.article.vos;

import com.heima.model.article.pojos.ApArticle;
import lombok.Data;

@Data
public class HotArticleVo extends ApArticle {
    /**
     * 文章分值
     */
    private Integer score;
}

业务层实现类

package com.heima.article.service.impl;
import java.util.List;
import java.util.stream.Collectors;

@Service
@Slf4j
@Transactional
public class HotArticleServiceImpl implements HotArticleService {

    @Autowired
    private ApArticleMapper apArticleMapper;

    /**
     * 计算热点文章
     */
    @Override
    public void computeHotArticle() {
        //1.查询前5天的文章数据
        Date dateParam = DateTime.now().minusDays(50).toDate();
        List<ApArticle> apArticleList = apArticleMapper.findArticleListByLast5days(dateParam);

        //2.计算文章的分值
        List<HotArticleVo> hotArticleVoList = computeHotArticle(apArticleList);

        //3.为每个频道缓存30条分值较高的文章
        cacheTagToRedis(hotArticleVoList);

    }

    @Autowired
    private IWemediaClient wemediaClient;

    @Autowired
    private CacheService cacheService;

    /**
     * 为每个频道缓存30条分值较高的文章
     * @param hotArticleVoList
     */
    private void cacheTagToRedis(List<HotArticleVo> hotArticleVoList) {
        //每个频道缓存30条分值较高的文章
        ResponseResult responseResult = wemediaClient.getChannels();
        if(responseResult.getCode().equals(200)){
            String channelJson = JSON.toJSONString(responseResult.getData());
            List<WmChannel> wmChannels = JSON.parseArray(channelJson, WmChannel.class);
            //检索出每个频道的文章
            if(wmChannels != null && wmChannels.size() > 0){
                for (WmChannel wmChannel : wmChannels) {
                    List<HotArticleVo> hotArticleVos = hotArticleVoList.stream().filter(x -> x.getChannelId().equals(wmChannel.getId())).collect(Collectors.toList());
                    //给文章进行排序,取30条分值较高的文章存入redis  key:频道id   value:30条分值较高的文章
                    sortAndCache(hotArticleVos, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + wmChannel.getId());
                }
            }
        }


        //设置推荐数据
        //给文章进行排序,取30条分值较高的文章存入redis  key:频道id   value:30条分值较高的文章
        sortAndCache(hotArticleVoList, ArticleConstants.HOT_ARTICLE_FIRST_PAGE+ArticleConstants.DEFAULT_TAG);


    }

    /**
     * 排序并且缓存数据
     * @param hotArticleVos
     * @param key
     */
    private void sortAndCache(List<HotArticleVo> hotArticleVos, String key) {
        hotArticleVos = hotArticleVos.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
        if (hotArticleVos.size() > 30) {
            hotArticleVos = hotArticleVos.subList(0, 30);
        }
        cacheService.set(key, JSON.toJSONString(hotArticleVos));
    }

    /**
     * 计算文章分值
     * @param apArticleList
     * @return
     */
    private List<HotArticleVo> computeHotArticle(List<ApArticle> apArticleList) {

        List<HotArticleVo> hotArticleVoList = new ArrayList<>();

        if(apArticleList != null && apArticleList.size() > 0){
            for (ApArticle apArticle : apArticleList) {
                HotArticleVo hot = new HotArticleVo();
                BeanUtils.copyProperties(apArticle,hot);
                Integer score = computeScore(apArticle);
                hot.setScore(score);
                hotArticleVoList.add(hot);
            }
        }
        return hotArticleVoList;
    }

    /**
     * 计算文章的具体分值
     * @param apArticle
     * @return
     */
    private Integer computeScore(ApArticle apArticle) {
        Integer scere = 0;
        if(apArticle.getLikes() != null){
            scere += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;
        }
        if(apArticle.getViews() != null){
            scere += apArticle.getViews();
        }
        if(apArticle.getComment() != null){
            scere += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;
        }
        if(apArticle.getCollection() != null){
            scere += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;
        }

        return scere;
    }
}

在ArticleApplication的引导类中添加以下注解

@EnableFeignClients(basePackages = "com.heima.apis")

现在数据库中准备点数据

package com.heima.article.service.impl;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = ArticleApplication.class)
@RunWith(SpringRunner.class)
public class HotArticleServiceImplTest {

    @Autowired
    private HotArticleService hotArticleService;

    @Test
    public void computeHotArticle() {
        hotArticleService.computeHotArticle();
    }
}

3.3.3 xxl-job定时计算-步骤

①:在heima-leadnews-article中的pom文件中新增依赖

<!--xxl-job-->
<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.3.0</version>
</dependency>

② 在xxl-job-admin中新建执行器和任务

新建执行器:leadnews-hot-article-executor

新建任务:路由策略为轮询,Cron表达式:0 0 2 * * ? 每天凌晨2点执行

③ leadnews-article中集成xxl-job

XxlJobConfig

package com.heima.article.config;
import org.springframework.context.annotation.Configuration;

/**
 * xxl-job config
 *
 * @author xuxueli 2017-04-28
 */
@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setPort(port);
        return xxlJobSpringExecutor;
    }


}

在nacos配置新增配置

xxl:
  job:
    admin:
      addresses: http://192.168.200.130:8888/xxl-job-admin
    executor:
      appname: leadnews-hot-article-executor
      port: 9999

④:在article微服务中新建任务类

package com.heima.article.job;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ComputeHotArticleJob {

    @Autowired
    private HotArticleService hotArticleService;

    @XxlJob("computeHotArticleJob")
    public void handle(){
        log.info("热文章分值计算调度任务开始执行...");
        hotArticleService.computeHotArticle();
        log.info("热文章分值计算调度任务结束...");

    }
}

4.查询文章接口改造

4.1 思路分析

4.2 功能实现

4.2.1 在ApArticleService中新增方法

/**
     * 加载文章列表
     * @param dto
     * @param type  1 加载更多   2 加载最新
     * @param firstPage  true  是首页  flase 非首页
     * @return
     */
public ResponseResult load2(ArticleHomeDto dto,Short type,boolean firstPage);

实现方法

/**
     * 加载文章列表
     * @param dto
     * @param type      1 加载更多   2 加载最新
     * @param firstPage true  是首页  flase 非首页
     * @return
     */
@Override
public ResponseResult load2(ArticleHomeDto dto, Short type, boolean firstPage) {
    if(firstPage){
        String jsonStr = cacheService.get(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + dto.getTag());
        if(StringUtils.isNotBlank(jsonStr)){
            List<HotArticleVo> hotArticleVoList = JSON.parseArray(jsonStr, HotArticleVo.class);
            ResponseResult responseResult = ResponseResult.okResult(hotArticleVoList);
            return responseResult;
        }
    }
    return load(type,dto);
}

4.2.2 修改控制器

/**
     * 加载首页
     * @param dto
     * @return
     */
@PostMapping("/load")
public ResponseResult load(@RequestBody ArticleHomeDto dto){
    //        return apArticleService.load(dto, ArticleConstants.LOADTYPE_LOAD_MORE);
    return apArticleService.load2(dto, ArticleConstants.LOADTYPE_LOAD_MORE,true);
}

热点文章-实时计算

1 今日内容

1.1 定时计算与实时计算

1.2 今日内容

kafkaStream

  • 什么是流式计算
  • kafkaStream概述
  • kafkaStream入门案例
  • Springboot集成kafkaStream

实时计算

  • 用户行为发送消息
  • kafkaStream聚合处理消息
  • 更新文章行为数量
  • 替换热点文章数据

2 实时流式计算

2.1 概念

一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。

流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。

2.2 应用场景

  • 日志分析

网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策

  • 大屏看板统计

可以实时的查看网站注册数量,订单数量,购买数量,金额等。

  • 公交实时数据

可以随时更新公交车方位,计算多久到达站牌

  • 实时文章分值计算

头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。

2.3 技术方案选型

  • Hadoop

  • Apche Storm

Storm 是一个分布式实时大数据处理系统,可以帮助我们方便地处理海量数据,具有高可靠、高容错、高扩展的特点。是流式框架,有很高的数据吞吐能力。

  • Kafka Stream

可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成。

3 Kafka Stream

3.1 概述

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。

Kafka Stream的特点如下:

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展和顺序性保证
  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
  • 支持正好一次处理语义
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

3.2 Kafka Streams的关键概念

  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

3.3 KStream

(1)数据结构类似于map,如下图,key-value键值对

(2)KStream

KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。

数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:

(“ alice”,1)->(“” alice“,3)

如果您的流处理应用是要总结每个用户的价值,它将返回4alice。为什么?因为第二条数据记录将不被视为先前记录的更新。(insert)新数据

3.4 Kafka Stream入门案例编写

(1)需求分析,求单词个数(word count)

(2)引入依赖

在之前的kafka-demo工程的pom文件中引入

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <exclusions>
        <exclusion>
            <artifactId>connect-json</artifactId>
            <groupId>org.apache.kafka</groupId>
        </exclusion>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>

(3)创建原生的kafka staream入门案例

package com.heima.kafka.sample;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * 流式处理
 */
public class KafkaStreamQuickStart {

    public static void main(String[] args) {

        //kafka的配置信息
        Properties prop = new Properties();
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");

        //stream 构建器
        StreamsBuilder streamsBuilder = new StreamsBuilder();

        //流式计算
        streamProcessor(streamsBuilder);

        //创建kafkaStream对象
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
        //开启流式计算
        kafkaStreams.start();
    }

    /**
     * 流式计算
     * 消息的内容:hello kafka  hello itcast
     * @param streamsBuilder
     */
    private static void streamProcessor(StreamsBuilder streamsBuilder) {
        //创建kstream对象,同时指定从那个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        /**
         * 处理消息的value
         */
        stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split(" "));
            }
        })
                //按照value进行聚合处理
                .groupBy((key,value)->value)
                //时间窗口 
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //统计单词的个数
                .count()
                //转换为kStream
                .toStream()
                .map((key,value)->{
                    System.out.println("key:"+key+",vlaue:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //发送消息
                .to("itcast-topic-out");
    }
}

(4)测试准备

  • 使用生产者在topic为:itcast_topic_input中发送多条消息
  • 使用消费者接收topic为:itcast_topic_out

结果:

  • 通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出

3.5 SpringBoot集成Kafka Stream

(1)自定配置参数

package com.heima.kafka.config;
import java.util.HashMap;
import java.util.Map;

/**
 * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
 */

@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
    private String hosts;
    private String group;
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);//连接信息
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");//组
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");//应用名称
        props.put(StreamsConfig.RETRIES_CONFIG, 10);//重试次数
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//key序列化器
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return new KafkaStreamsConfiguration(props);
    }
}

修改application.yml文件,在最下方添加自定义配置

kafka:
  hosts: 192.168.200.130:9092
  group: ${spring.application.name}

(2)新增配置类,创建KStream对象,进行聚合

package com.heima.kafka.stream;
import java.time.Duration;
import java.util.Arrays;

@Configuration
@Slf4j
public class KafkaStreamHelloListener {

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //创建kstream对象,同时指定从那个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split(" "));
            }
        })
                //根据value进行聚合分组
                .groupBy((key,value)->value)
                //聚合计算时间间隔
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //求单词的个数
                .count()
                .toStream()
                //处理后的结果转换为string字符串
                .map((key,value)->{
                    System.out.println("key:"+key+",value:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //发送消息
                .to("itcast-topic-out");
        return stream;
    }
}

测试:

启动微服务,正常发送消息,可以正常接收到消息

3 app端热点文章计算

3.1 思路说明

3.2 功能实现

3.2.1 用户行为(阅读量,评论,点赞,收藏)发送消息,以阅读和点赞为例

①在heima-leadnews-behavior微服务中集成kafka生产者配置

修改nacos,新增内容

spring:
  application:
    name: leadnews-behavior
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

②修改ApLikesBehaviorServiceImpl新增发送消息

定义消息发送封装类:UpdateArticleMess

package com.heima.model.mess;

import lombok.Data;

@Data
public class UpdateArticleMess {

    /**
     * 修改文章的字段类型
      */
    private UpdateArticleType type;
    /**
     * 文章ID
     */
    private Long articleId;
    /**
     * 修改数据的增量,可为正负
     */
    private Integer add;

    public enum UpdateArticleType{
        COLLECTION,COMMENT,LIKES,VIEWS;
    }
}

topic常量类:

package com.heima.common.constants;
public class HotArticleConstants {
    public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
}

完整代码如下:

package com.heima.behavior.service.impl;

import org.springframework.transaction.annotation.Transactional;

@Service
@Transactional
@Slf4j
public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService {

    @Autowired
    private CacheService cacheService;

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @Override
    public ResponseResult like(LikesBehaviorDto dto) {

        //1.检查参数
        if (dto == null || dto.getArticleId() == null || checkParam(dto)) {
            return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
        }

        //2.是否登录
        ApUser user = AppThreadLocalUtil.getUser();
        if (user == null) {
            return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
        }

        UpdateArticleMess mess = new UpdateArticleMess();
        mess.setArticleId(dto.getArticleId());
        mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);

        //3.点赞  保存数据
        if (dto.getOperation() == 0) {
            Object obj = cacheService.hGet(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
            if (obj != null) {
                return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID, "已点赞");
            }
            // 保存当前key
            log.info("保存当前key:{} ,{}, {}", dto.getArticleId(), user.getId(), dto);
            cacheService.hPut(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));
            mess.setAdd(1);
        } else {
            // 删除当前key
            log.info("删除当前key:{}, {}", dto.getArticleId(), user.getId());
            cacheService.hDelete(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
            mess.setAdd(-1);
        }

        //发送消息,数据聚合
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));

        return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
    }
    /**
     * 检查参数
     *
     * @return
     */
    private boolean checkParam(LikesBehaviorDto dto) {
        if (dto.getType() > 2 || dto.getType() < 0 || dto.getOperation() > 1 || dto.getOperation() < 0) {
            return true;
        }
        return false;
    }
}

③修改阅读行为的类ApReadBehaviorServiceImpl发送消息

完整代码:

package com.heima.behavior.service.impl;
import org.springframework.transaction.annotation.Transactional;

@Service
@Transactional
@Slf4j
public class ApReadBehaviorServiceImpl implements ApReadBehaviorService {

    @Autowired
    private CacheService cacheService;

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @Override
    public ResponseResult readBehavior(ReadBehaviorDto dto) {
        //1.检查参数
        if (dto == null || dto.getArticleId() == null) {
            return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
        }

        //2.是否登录
        ApUser user = AppThreadLocalUtil.getUser();
        if (user == null) {
            return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
        }
        //更新阅读次数
        String readBehaviorJson = (String) cacheService.hGet(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
        if (StringUtils.isNotBlank(readBehaviorJson)) {
            ReadBehaviorDto readBehaviorDto = JSON.parseObject(readBehaviorJson, ReadBehaviorDto.class);
            dto.setCount((short) (readBehaviorDto.getCount() + dto.getCount()));
        }
        // 保存当前key
        log.info("保存当前key:{} {} {}", dto.getArticleId(), user.getId(), dto);
        cacheService.hPut(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));

        //发送消息,数据聚合
        UpdateArticleMess mess = new UpdateArticleMess();
        mess.setArticleId(dto.getArticleId());
        mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);
        mess.setAdd(1);
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));
        
        return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
    }
}

3.2.2 使用kafkaStream实时接收消息,聚合内容

①在leadnews-article微服务中集成kafkaStream (参考kafka-demo)

②定义实体类,用于聚合之后的分值封装

package com.heima.model.article.mess;
import lombok.Data;

@Data
public class ArticleVisitStreamMess {
    /**
     * 文章id
     */
    private Long articleId;
    /**
     * 阅读
     */
    private int view;
    /**
     * 收藏
     */
    private int collect;
    /**
     * 评论
     */
    private int comment;
    /**
     * 点赞
     */
    private int like;
}

修改常量类:增加常量

package com.heima.common.constans;
public class HotArticleConstants {
    public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
    public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="hot.article.incr.handle.topic";
}

③ 定义stream,接收消息并聚合

package com.heima.article.stream;
import java.time.Duration;

@Configuration
@Slf4j
public class HotArticleStreamHandler {

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //接收消息
        KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
        //聚合流式处理
        stream.map((key,value)->{
            UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
            //重置消息的key:1234343434   和  value: likes:1
            return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
        })
                //按照文章id进行聚合
                .groupBy((key,value)->key)
                //时间窗口
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                /**
                 * 自行的完成聚合的计算
                 */
                .aggregate(new Initializer<String>() {
                    /**
                     * 初始方法,返回值是消息的value
                     * @return
                     */
                    @Override
                    public String apply() {
                        return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
                    }
                    /**
                     * 真正的聚合操作,返回值是消息的value
                     */
                }, new Aggregator<String, String, String>() {
                    @Override
                    public String apply(String key, String value, String aggValue) {
                        if(StringUtils.isBlank(value)){
                            return aggValue;
                        }
                        String[] aggAry = aggValue.split(",");
                        int col = 0,com=0,lik=0,vie=0;
                        for (String agg : aggAry) {
                            String[] split = agg.split(":");
                            /**
                             * 获得初始值,也是时间窗口内计算之后的值
                             */
                            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                                case COLLECTION:
                                    col = Integer.parseInt(split[1]);
                                    break;
                                case COMMENT:
                                    com = Integer.parseInt(split[1]);
                                    break;
                                case LIKES:
                                    lik = Integer.parseInt(split[1]);
                                    break;
                                case VIEWS:
                                    vie = Integer.parseInt(split[1]);
                                    break;
                            }
                        }
                        /**
                         * 累加操作
                         */
                        String[] valAry = value.split(":");
                        switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){
                            case COLLECTION:
                                col += Integer.parseInt(valAry[1]);
                                break;
                            case COMMENT:
                                com += Integer.parseInt(valAry[1]);
                                break;
                            case LIKES:
                                lik += Integer.parseInt(valAry[1]);
                                break;
                            case VIEWS:
                                vie += Integer.parseInt(valAry[1]);
                                break;
                        }

                        String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
                        System.out.println("文章的id:"+key);
                        System.out.println("当前时间窗口内的消息处理结果:"+formatStr);
                        return formatStr;
                    }
                }, Materialized.as("hot-atricle-stream-count-001"))
                .toStream()
                .map((key,value)->{
                    return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
                })
                //发送消息
                .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);

        return stream;
    }

    /**
     * 格式化消息的value数据
     * @param articleId
     * @param value
     * @return
     */
    public String formatObj(String articleId,String value){
        ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
        mess.setArticleId(Long.valueOf(articleId));
        //COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
        String[] valAry = value.split(",");
        for (String val : valAry) {
            String[] split = val.split(":");
            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                case COLLECTION:
                    mess.setCollect(Integer.parseInt(split[1]));
                    break;
                case COMMENT:
                    mess.setComment(Integer.parseInt(split[1]));
                    break;
                case LIKES:
                    mess.setLike(Integer.parseInt(split[1]));
                    break;
                case VIEWS:
                    mess.setView(Integer.parseInt(split[1]));
                    break;
            }
        }
        log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));
        return JSON.toJSONString(mess);
    }
}

3.2.3 重新计算文章的分值,更新到数据库和缓存中

①在ApArticleService添加方法,用于更新数据库中的文章分值

/**
     * 更新文章的分值  同时更新缓存中的热点文章数据
     * @param mess
     */
public void updateScore(ArticleVisitStreamMess mess);

实现类方法

/**
     * 更新文章的分值  同时更新缓存中的热点文章数据
     * @param mess
     */
@Override
public void updateScore(ArticleVisitStreamMess mess) {
    //1.更新文章的阅读、点赞、收藏、评论的数量
    ApArticle apArticle = updateArticle(mess);
    //2.计算文章的分值
    Integer score = computeScore(apArticle);
    score = score * 3;

    //3.替换当前文章对应频道的热点数据
    replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());

    //4.替换推荐对应的热点数据
    replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG);
}

/**
     * 替换数据并且存入到redis
     * @param apArticle
     * @param score
     * @param s
     */
private void replaceDataToRedis(ApArticle apArticle, Integer score, String s) {
    String articleListStr = cacheService.get(s);
    if (StringUtils.isNotBlank(articleListStr)) {
        List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleListStr, HotArticleVo.class);

        boolean flag = true;

        //如果缓存中存在该文章,只更新分值
        for (HotArticleVo hotArticleVo : hotArticleVoList) {
            if (hotArticleVo.getId().equals(apArticle.getId())) {
                hotArticleVo.setScore(score);
                flag = false;
                break;
            }
        }

        //如果缓存中不存在,查询缓存中分值最小的一条数据,进行分值的比较,如果当前文章的分值大于缓存中的数据,就替换
        if (flag) {
            if (hotArticleVoList.size() >= 30) {
                hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
                HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);
                if (lastHot.getScore() < score) {
                    hotArticleVoList.remove(lastHot);
                    HotArticleVo hot = new HotArticleVo();
                    BeanUtils.copyProperties(apArticle, hot);
                    hot.setScore(score);
                    hotArticleVoList.add(hot);
                }

            } else {
                HotArticleVo hot = new HotArticleVo();
                BeanUtils.copyProperties(apArticle, hot);
                hot.setScore(score);
                hotArticleVoList.add(hot);
            }
        }
        //缓存到redis
        hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
        cacheService.set(s, JSON.toJSONString(hotArticleVoList));
    }
}

/**
     * 更新文章行为数量
     * @param mess
     */
private ApArticle updateArticle(ArticleVisitStreamMess mess) {
    ApArticle apArticle = getById(mess.getArticleId());
    apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());
    apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());
    apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());
    apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());
    updateById(apArticle);
    return apArticle;
}

/**
     * 计算文章的具体分值
     * @param apArticle
     * @return
     */
private Integer computeScore(ApArticle apArticle) {
    Integer score = 0;
    if(apArticle.getLikes() != null){
        score += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;
    }
    if(apArticle.getViews() != null){
        score += apArticle.getViews();
    }
    if(apArticle.getComment() != null){
        score += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;
    }
    if(apArticle.getCollection() != null){
        score += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;
    }
    return score;
}

②定义监听,接收聚合之后的数据,文章的分值重新进行计算

package com.heima.article.listener;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ArticleIncrHandleListener {

    @Autowired
    private ApArticleService apArticleService;

    @KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
    public void onMessage(String mess){
        if(StringUtils.isNotBlank(mess)){
            ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);
            apArticleService.updateScore(articleVisitStreamMess);
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/768430.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Flutter——最详细(NavigationRail)使用教程

NavigationRail 简介 一个 Material Design 小部件&#xff0c;旨在显示在应用程序的左侧或右侧&#xff0c;以便在少量视图&#xff08;通常在三到五个视图之间&#xff09;之间导航。 使用场景&#xff1a; 通过Row属性&#xff0c;左侧或右侧菜单栏按钮 属性作用onDestinati…

Hadoop——大数据生态体系详解

一.大数据概论 1.1 大数据概念 大数据&#xff08;big data&#xff09;&#xff1a;指无法在一定时间范围内用常规软件工具进行捕捉、管理 和处理的数据集合&#xff0c;是需要新处理模式才能具有更强的决策力、洞察发现力和流程 优化能力的海量、高增长率和多样化的信息资产…

前端 Jenkins 自动化部署

由于公司使用自己搭建的 svn 服务器来进行代码管理&#xff0c;因此这里 Jenkins 是针对 svn 服务器来进行的配置&#xff0c;其实跟Git 配置基本一致。 在没有自动化部署前 之前项目每次修改之后都需要本地 ​​npm run build ​​一次手动发布到服务器上方便测试和产品查看…

微服务组件Feign实战 - 解决日志配置失效

1. RPC概述 思考&#xff1a; 微服务之间如何方便优雅的实现服务间的远程调用&#xff1f; RPC 全称是 Remote Procedure Call &#xff0c;即远程过程调用&#xff0c;其对应的是我们的本地调用。RPC 的目的是&#xff1a;让我们调用远程方法像调用本地方法一样。 //本地调用…

【unity之IMGUI实践】游戏玩法逻辑实现【四】

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;uni…

3.11 Bootstrap 徽章(Badges)

文章目录 Bootstrap 徽章&#xff08;Badges&#xff09;激活导航状态 Bootstrap 徽章&#xff08;Badges&#xff09; 下面将讲解 Bootstrap 徽章&#xff08;Badges&#xff09;。徽章与标签相似&#xff0c;主要的区别在于徽章的边角更加圆滑。 徽章&#xff08;Badges&…

短视频seo抖音矩阵号系统源码开发搭建分享

我们自主研发的短视频矩阵系统源码&#xff0c;技术研发的独立核心算法的视频内容管理和展示功能。无需额外的流量接口费用和复杂的配置&#xff0c;轻松地创建和管理短视频内容&#xff0c;短视频矩阵源码是指将抖音平台上的视频资源进行筛选、排序等操作&#xff0c;进而提升…

【ROS】ROS1人机界面开发:在QtCreator中创建ROS1功能包

【ROS】郭老二博文之:ROS目录 1、版本要求 ROS的QtCreator插件要和QtCreator版本对应一致,否则报错。 本人QtCreator版本为:10.0.1,需要下载安装ros_qtc_plugin的版本也要为10.0版本 2、安装 ros_qtc_plugin 2.1 下载插件 github:https://github.com/ros-industrial…

基于fpga实现tft屏幕显示数字、字母

简介 开发平台&#xff1a;ZYNQ 开发工具&#xff1a;Vivado 2018.3 tft屏幕分辨率&#xff1a;800*480 在PL端使用纯verilog实现bitmap模块&#xff0c;基于该模块实现在tft屏幕显示数字0-9&#xff0c;以及FPGA字母 Bitmap模块 该模块为5*5的bitmap&#xff0c;纯组合逻辑&…

7-18作业

#include<iostream>using namespace std; class My_stack { private:int* ptr; //执行堆区空间int top;int size; //记录栈顶元素public:My_stack() :ptr(new int[10]), top(-1), size(10) {}//有参构造My_stack(int size) :ptr(new int[10]), top(-1), size(siz…

Android 网络游戏开发入门简单示例

在Android系统上开发是Android开发学习者所向往的&#xff0c;有成就感也有乐趣&#xff0c;还能取得经济上的报酬。那怎样开发Android网络游戏攻略呢&#xff1f;下面介绍一个简单的入门实例。 一、创建新工程   首先&#xff0c;我们在Eclipse中新建一个名为Movement的工程…

简单工厂模式(java)

目录 结构 案例 类图 代码实现 简单咖啡工厂类 咖啡店类 咖啡类 具体咖啡类 简单工厂方法的优缺点 优点 缺点 结构 简单工厂包含如下角色&#xff1a; 抽象产品 &#xff1a;定义了产品的规范&#xff0c;描述了产品的主要特性和功能。具体产品 &#xff1a;实现或者…

前端 | (五)CSS三大特性及常用属性 | 尚硅谷前端html+css零基础教程2023最新

学习来源&#xff1a;尚硅谷前端htmlcss零基础教程&#xff0c;2023最新前端开发html5css3视频 文章目录 &#x1f4da;CSS三大属性&#x1f407;层叠性&#x1f407;继承性&#x1f407;优先级 &#x1f4da;CSS常用属性&#x1f407;像素的概念&#x1f407;颜色的表示⭐️表…

OCR学术前沿及产业应用高峰论坛202204

OCR学术前沿及产业应用高峰论坛 相关议程&#xff1a;https://mp.weixin.qq.com/s/LYoKHFad9D-gjhGlVF3Czg 广告OCR技术研究与应用-腾讯 视频制作ASR&#xff0c;ocr得到字幕 计算机动画CG OCR实践与技术创新 - 蚂蚁 loss优化 数据合成 对比学习的方式&#xff0c;什么样是…

让小程序动起来-轮播图的两种方式--【浅入深出系列002】

浅入深出系列总目录在000集 如何0元学微信小程序–【浅入深出系列000】 文章目录 本系列校训学习资源的选择啥是轮播图轮播图的关键代码最常见的轮播图代码便于理解的轮播代码两种轮播代码的比较 实际操练第一步&#xff0c;就是找到文件。第二步&#xff0c;先改动一下最显眼…

springboot整合feign实现RPC调用,并通过Hystrix实现降级

目录 一、服务提供者 二、服务消费者 三、测试效果 四、开启Hystrix实现降级功能 feign/openfeign和dubbo是常用的微服务RPC框架&#xff0c;由于feigin内部已经集成ribbon&#xff0c;自带了负载均衡的功能&#xff0c;当有多个同名的服务注册到注册中心时&#xff0c;会根…

数组前缀和

前缀和 前缀和就是指前缀的和&#xff0c;例如在数组中&#xff0c;从开始到 i 就是到 i 的前缀和。前缀和一般用来求中间连续某一段的和&#xff0c;例如sum[i] - sum[j - 1]就可以求出j 到 i 这一段的和。 在这一道题目里面&#xff0c;中间某一段连续子数组和为k&#xff0…

知识库分享|《快手电商中小商家成长指南》

《快手电商中小商家成长指南》致力于帮助快手电商平台的新商家、中小商家可以快速掌握电商经营流程&#xff0c;理解电商经营方法&#xff0c;提升电商经营能力&#xff0c;助力中小商家快速实现冷启。 ‼️ 包含中小商家冷启必备的六大板块两大方向&#xff1a; 基础操作、账…

SpringCloud(三)LoadBalancer负载均衡

一、负载均衡 实际上&#xff0c;在添加LoadBalanced注解之后&#xff0c;会启用拦截器对我们发起的服务调用请求进行拦截&#xff08;注意这里是针对我们发起的请求进行拦截&#xff09;&#xff0c;叫做LoadBalancerInterceptor&#xff0c;它实现ClientHttpRequestIntercep…

JAVA的swing技术到底实用不实用?

文章目录 先说结论JAVA的知识范围那为什么还要学&#xff1f; 总结 先说结论 不实用 1 尚硅谷Java入门视频教程&#xff0c;宋红康java基础视频 必须要排在第一位。1600万的播放量呀。 717集 我的天啦&#xff01; 目录&#xff1a; Java视频及配套资料下载指南 尚硅谷Java基…