一、今日内容
1. 定时计算与实时计算
2. 今日内容
KafkaStream
- 什么是流式计算
- KafkaStream概述
- KafkaStream入门案例
- SpringBoot集成KafkaStream
实时计算
- 用户行为发送消息
- KafkaStream聚合处理消息
- 更新文章行为数量
- 替换热点文章数据
二、实时流式计算
1. 概念
一般流式计算会与批量计算相比较。在流式计算模型这种,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算替代全量计算。
流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。
2. 应用场景
日志分析:
对网站的用户访问日志进行实时的分析,计算访问量、用户画像、留存率等,实时进行数据分析,帮助企业进行决策
大屏看板统计:
可以实时的查看网站注册数量、订单数量、购买数量、金额等
公交实时数据:
可以随时更新公交车方位,计算多久到达站牌
实时文章分值计算:
头条类文章的分值计算,通过用户的行为实时计算文章的分值,分值越高就越被推荐。
3. 技术方案选项
Hadoop
Apache Storm
Storm 是一个分布式实时大数据处理系统,可以帮助我们方便地处理海量数据,具有高可靠、高容错、高扩展的特点。是流式框架,有很高的数据吞吐能力。
Kafka Stream
可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包、部署和操作工具集成。
三、Kafka Stream
1. 概述
Kafka Stream 是Apache Kafka从1.0版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。
Kafka Stream的特点如下:
- Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
- 除了Kafka外,无任何外部依赖
- 充分利用Kafka分区机制实现水平扩展和顺序性保证
- 通过可容错的State store实现高效的状态操作(如windowed join和aggregation)
- 支持正好一次处理语义
- 提高记录级的处理能力,从而实现毫秒级的低延迟
- 支持基于事件时间的窗口操作,并且可以处理晚到的数据(late arrival of records)
- 同时提供底层的处理原理Processor(类似于Storm 的spout和bolt),以及高层抽象的DSL(类似于Spark 的map/group/reduce)
2. Kafka Stream的关键概念
源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个Kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题。
3. KStream
(1)数据结构类似于map,如下图,key-value键值对
(2)KStream
KStream数据流(data stream):即是一段顺序的,可以无限长,不断更新的数据集。数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。
KStream负责抽象的,就是数据流。与Kafka自身的topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。
为了说明这一点,让我们想象一下两个数据记录正在发送到流中:
("alice", 1) -> ("alice", 3)
如果您的流处理应用是要总结每个用户的价值,它将返回了4了alice。为什么?因为第二条数据记录将不被视为先前记录的更新。(insert)新数据
4. Kafka Stream入门案例
(1)需求分析,求单词个数(word count)
(2)引入依赖
在之前的kafka-demo工程的pom文件中引入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<exclusions>
<exclusion>
<artifactId>connect-json</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
(3)创建原生的kafka stream入门案例
package com.heima.kafka.sample;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* 流式处理
*/
public class KafkaStreamQuickStart {
public static void main(String[] args) {
// kafka的配置中心
Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");
// stream构建器
StreamsBuilder streamsBuilder = new StreamsBuilder();
// 流式处理
StreamProcessor(streamsBuilder);
// 创建KafkaStream对象
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
// 开启流式计算
kafkaStreams.start();
}
/**
* 流式计算
* 消息的内容:hello kafka hello itcast
* @param streamsBuilder
*/
private static void StreamProcessor(StreamsBuilder streamsBuilder) {
// 1. 创建kstream对象,同时指定从哪个topic中接收消息
KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
// 2. 处理消息的value
stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.split(" "));
}
})
// 按照value进行聚合处理
.groupBy((key, value) -> value)
// 时间窗口
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
// 统计单词的个数
.count()
// 转换为KStream
.toStream()
.map((key, value) -> {
System.out.println("key: " + key + ", value: " + value);
return new KeyValue<>(key.key().toString(), value.toString());
})
// 发送消息
.to("itcast-topic-out");
}
}
(4)测试准备
使用生产者在topic为:itcast_topic_input中发送多条消息
package com.heima.kafka.sample;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* 生产者
*/
public class ProducerQuickStart {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.kafka链接配置信息
Properties prop = new Properties();
//kafka链接地址
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
//key和value的序列化
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//ack配置 消息确认机制
prop.put(ProducerConfig.ACKS_CONFIG,"all");
//重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,10);
//数据压缩
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
//2.创建kafka生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<String,String>(prop);
//3.发送消息
/**
* 第一个参数 :topic
* 第二个参数:消息的key
* 第三个参数:消息的value
*/
for (int i = 0; i < 5; i++) {
ProducerRecord<String,String> kvProducerRecord = new ProducerRecord<String,String>("itcast-topic-input","hello kafka");
producer.send(kvProducerRecord);
}
//同步发送消息
/*RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
System.out.println(recordMetadata.offset());*/
//异步消息发送
/* producer.send(kvProducerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null){
System.out.println("记录异常信息到日志表中");
}
System.out.println(recordMetadata.offset());
}
});*/
//4.关闭消息通道 必须要关闭,否则消息发送不成功
producer.close();
}
}
使用消费者接收topic为:itcast_topic_input
package com.heima.kafka.sample;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* 消费者
*/
public class ConsumerQuickStart {
public static void main(String[] args) {
//1.kafka的配置信息
Properties prop = new Properties();
//链接地址
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
//key和value的反序列化器
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//设置消费者组
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
//手动提交偏移量
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//2.创建消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
//3.订阅主题
consumer.subscribe(Collections.singletonList("itcast-topic-out"));
//4.拉取消息
//同步提交和异步提交偏移量
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key());
System.out.println(consumerRecord.value());
/* System.out.println(consumerRecord.offset());
System.out.println(consumerRecord.partition());*/
}
//异步提交偏移量
consumer.commitAsync();
}
}catch (Exception e){
e.printStackTrace();
System.out.println("记录错误的信息:"+e);
}finally {
//同步
consumer.commitSync();
}
/*while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key());
System.out.println(consumerRecord.value());
System.out.println(consumerRecord.offset());
System.out.println(consumerRecord.partition());
*//* try {
//同步提交偏移量
consumer.commitSync();
}catch (CommitFailedException e){
System.out.println("记录提交失败的异常:"+e);
}*//*
}
//异步的方式提交偏移量
*//*consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if(e != null){
System.out.println("记录错误的提交偏移量:"+map+",异常信息为:"+e);
}
}
});*//*
}*/
}
}
先启动Consumer,再启动KafkaStream,最后启动Producer发送消息
结果:通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出
5. SpringBoot集成Kafka Stream
(1)自定义配置参数
package com.heima.kafka.config;
import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import java.util.HashMap;
import java.util.Map;
/**
* 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
*/
@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix = "kafka")
public class KafkaStreamConfig {
private static final int MAX_MESSAGE_SIZE = 16 * 1024 * 1024;
private String hosts;
private String group;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
props.put(StreamsConfig.RETRIES_CONFIG, 10);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return new KafkaStreamsConfiguration(props);
}
}
修改application.yml文件,在最下方添加自定义配置
kafka:
hosts: 192.168.200.130:9092
group: ${spring.application.name}
(2)新增配置类,创建KStream对象,进行聚合
package com.heima.kafka.stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
import java.util.Arrays;
@Configuration
@Slf4j
public class KafkaStreamHelloListener {
@Bean
public KStream<String, String> kStream(StreamsBuilder streamsBuilder) {
// 创建KStream对象,同时指定从哪个topic中接收消息
KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.split(" "));
}
})
//根据value进行聚合分组
.groupBy((key,value)->value)
//聚合计算时间间隔
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
//求单词的个数
.count()
.toStream()
//处理后的结果转换为string字符串
.map((key,value)->{
System.out.println("key:"+key+",value:"+value);
return new KeyValue<>(key.key().toString(),value.toString());
})
//发送消息
.to("itcast-topic-out");
return stream;
}
}
(3)测试:先启动ConsumerQuickStart -> KafkaDemoApplication -> ProducerQuickStart
启动微服务,正常发送消息,可以正常接收到消息
四、App端热点文章计算
1. 思路说明
2. 功能实现
2.1 用户行为(阅读、评论、点赞、收藏)发送消息,以阅读和点赞为例
(在第9天的实战中,我已经把这部分做了)
步骤①:在heima-leadnews-behavior微服务中集成Kafka生产者配置
在nacos修改leadnews-behavior.yml
spring:
kafka:
bootstrap-servers: 192.168.200.130:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
autoconfigure:
exclude: org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
redis:
host: 192.168.200.130
password: leadnews
port: 6379
步骤②:修改ApLikesBehaviorServiceImpl,新增发送消息
定义消息发送封装类:UpdateArticleMess
package com.heima.model.mess;
import lombok.Data;
@Data
public class UpdateArticleMess {
/**
* 修改文章的字段类型
*/
private UpdateArticleType type;
/**
* 文章ID
*/
private Long articleId;
/**
* 修改数据的增量,可为正负
*/
private Integer add;
public enum UpdateArticleType{
COLLECTION,COMMENT,LIKES,VIEWS;
}
}
topic常量类:HotArticleConstants
package com.heima.common.constants;
public class HotArticleConstants {
public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="hot.article.incr.handle.topic";
}
ApLikesBehaviorServiceImpl:
package com.heima.behavior.service.impl;
import com.alibaba.fastjson.JSON;
import com.heima.behavior.service.ApLikesBehaviorService;
import com.heima.common.constants.BehaviorConstants;
import com.heima.common.constants.HotArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.behavior.dtos.LikesBehaviorDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.mess.UpdateArticleMess;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.thread.AppThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Transactional
@Slf4j
public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService {
@Autowired
private CacheService cacheService;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 点赞或取消点赞
* @param dto 0:点赞 1:取消点赞
* @return
*/
@Override
public ResponseResult like(LikesBehaviorDto dto) {
// 1. 检查参数
if(dto == null || dto.getArticleId() == null || checkParam(dto)) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
// 2. 是否登录
ApUser apUser = AppThreadLocalUtil.getUser();
if(apUser == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
}
UpdateArticleMess mess = new UpdateArticleMess();
mess.setArticleId(dto.getArticleId());
mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);
// 3. 点赞,保存数据
if(dto.getOperation() == 0) {
Object obj = cacheService.hGet(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), apUser.getId().toString());
if(obj != null) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID, "已点赞");
}
// 保存当前key
log.info("保存当前key:{}, {}, {}", dto.getArticleId(), apUser.getId(), dto);
cacheService.hPut(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), apUser.getId().toString(), JSON.toJSONString(dto));
mess.setAdd(1);
} else {
// 取消点赞,删除当前key
log.info("删除当前key:{}, {}", dto.getArticleId(), apUser.getId());
cacheService.hDelete(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), apUser.getId().toString());
mess.setAdd(-1);
}
// 4. 发送消息,数据聚合
kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));
// 5. 结果返回
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
/**
* 检查参数
* @param dto
* @return
*/
private boolean checkParam(LikesBehaviorDto dto) {
// 参数有误
if(dto.getType() > 2 || dto.getType() < 0 || dto.getOperation() > 1 || dto.getOperation() < 0) {
return true;
}
return false;
}
}
步骤③:修改阅读行为的类ApReadBehaviorServiceImpl发送消息
package com.heima.behavior.service.impl;
import com.alibaba.fastjson.JSON;
import com.heima.behavior.service.ApReadBehaviorService;
import com.heima.common.constants.BehaviorConstants;
import com.heima.common.constants.HotArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.behavior.dtos.ReadBehaviorDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.mess.UpdateArticleMess;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.thread.AppThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Transactional
@Slf4j
public class ApReadBehaviorServiceImpl implements ApReadBehaviorService {
@Autowired
private CacheService cacheService;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 用户行为 - 阅读
* @param dto
* @return
*/
@Override
public ResponseResult readBehavior(ReadBehaviorDto dto) {
// 1. 检查参数
if(dto == null || dto.getArticleId() == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
// 2. 是否登录
ApUser user = AppThreadLocalUtil.getUser();
if(user == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
}
// 3. 更新阅读次数
String readBehaviorJson = (String) cacheService.hGet(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
if(StringUtils.isNotBlank(readBehaviorJson)) {
ReadBehaviorDto readBehaviorDto = JSON.parseObject(readBehaviorJson, ReadBehaviorDto.class);
dto.setCount((short) (readBehaviorDto.getCount() + dto.getCount()));
}
// 4. 保存当前key
log.info("保存当前key: {} {} {}", dto.getArticleId(), user.getId(), dto);
cacheService.hPut(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));
// 5. 发送消息,数据聚合
UpdateArticleMess mess = new UpdateArticleMess();
mess.setArticleId(dto.getArticleId());
mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);
mess.setAdd(1);
kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(mess));
// 6. 结果返回
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
}
2.2 使用KafkaStream实时接收消息,聚合内容
步骤①:在heima-leadnews-article微服务中集成kafkaStream
在pom.xml(heima-leadnews-article)添加Kafka Stream的依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<exclusions>
<exclusion>
<artifactId>connect-json</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
KafkaStreamConfig:
package com.heima.article.config;
import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import java.util.HashMap;
import java.util.Map;
/**
* 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
*/
@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
private String hosts;
private String group;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
props.put(StreamsConfig.RETRIES_CONFIG, 10);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return new KafkaStreamsConfiguration(props);
}
}
在nacos配置正常的leadnews-article添加如下
# 。。。 。。。省略
kafka:
hosts: 192.168.200.130:9092
group: ${spring.application.name}
步骤②:定义实体类,用于聚合之后的分值封装
package com.heima.model.article.mess;
import lombok.Data;
@Data
public class ArticleVisitStreamMess {
/**
* 文章id
*/
private Long articleId;
/**
* 阅读
*/
private int view;
/**
* 收藏
*/
private int collect;
/**
* 评论
*/
private int comment;
/**
* 点赞
*/
private int like;
}
步骤③:定义stream,接收消息聚合
package com.heima.article.stream;
import com.alibaba.fastjson.JSON;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import com.heima.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
@Configuration
@Slf4j
public class HotArticleStreamHandler {
@Bean
public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
//接收消息
KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
//聚合流式处理
stream.map((key,value)->{
UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
//重置消息的key:1234343434 和 value: likes:1
return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
})
//按照文章id进行聚合
.groupBy((key,value)->key)
//时间窗口
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
/**
* 自行的完成聚合的计算
*/
.aggregate(new Initializer<String>() {
/**
* 初始方法,返回值是消息的value
* @return
*/
@Override
public String apply() {
return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
}
/**
* 真正的聚合操作,返回值是消息的value
*/
}, new Aggregator<String, String, String>() {
/**
*
* @param key
* @param value likes:1
* @param aggValue COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
* @return
*/
@Override
public String apply(String key, String value, String aggValue) {
System.out.println(value);
if(StringUtils.isBlank(value)){
return aggValue;
}
String[] aggAry = aggValue.split(",");
int col = 0,com=0,lik=0,vie=0;
for (String agg : aggAry) {
String[] split = agg.split(":");
/**
* 获得初始值,也是时间窗口内计算之后的值
*/
switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
case COLLECTION:
col = Integer.parseInt(split[1]);
break;
case COMMENT:
com = Integer.parseInt(split[1]);
break;
case LIKES:
lik = Integer.parseInt(split[1]);
break;
case VIEWS:
vie = Integer.parseInt(split[1]);
break;
}
}
/**
* 累加操作 likes:1
*/
String[] valAry = value.split(":");
switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){
case COLLECTION:
col += Integer.parseInt(valAry[1]);
break;
case COMMENT:
com += Integer.parseInt(valAry[1]);
break;
case LIKES:
lik += Integer.parseInt(valAry[1]);
break;
case VIEWS:
vie += Integer.parseInt(valAry[1]);
break;
}
String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
System.out.println("文章的id:"+key);
System.out.println("当前时间窗口内的消息处理结果:"+formatStr);
return formatStr;
}
}, Materialized.as("hot-atricle-stream-count-001"))
.toStream()
.map((key,value)->{
return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
})
//发送消息
.to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);
return stream;
}
/**
* 格式化消息的value数据
* @param articleId
* @param value
* @return
*/
public String formatObj(String articleId,String value){
ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
mess.setArticleId(Long.valueOf(articleId));
//COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
String[] valAry = value.split(",");
for (String val : valAry) {
String[] split = val.split(":");
switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
case COLLECTION:
mess.setCollect(Integer.parseInt(split[1]));
break;
case COMMENT:
mess.setComment(Integer.parseInt(split[1]));
break;
case LIKES:
mess.setLike(Integer.parseInt(split[1]));
break;
case VIEWS:
mess.setView(Integer.parseInt(split[1]));
break;
}
}
log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));
return JSON.toJSONString(mess);
}
}
2.3 重新计算文章的分值,更新到数据库和缓存中
步骤①:在ApArticleService添加方法,用于更新数据库中的文章分值
/**
* 更新文章的分值 同时更新缓存中的热点文章数据
* @param mess
*/
public void updateScore(ArticleVisitStreamMess mess);
实现类方法:
/**
* 更新文章的分值 同时更新缓存中的热点文章数据
* @param mess
*/
@Override
public void updateScore(ArticleVisitStreamMess mess) {
//1.更新文章的阅读、点赞、收藏、评论的数量
ApArticle apArticle = updateArticle(mess);
//2.计算文章的分值
Integer score = computeScore(apArticle);
score = score * 3;
//3.替换当前文章对应频道的热点数据
replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());
//4.替换推荐对应的热点数据
replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG);
}
/**
* 替换数据并且存入到redis
* @param apArticle
* @param score
* @param s
*/
private void replaceDataToRedis(ApArticle apArticle, Integer score, String s) {
String articleListStr = cacheService.get(s);
if (StringUtils.isNotBlank(articleListStr)) {
List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleListStr, HotArticleVo.class);
boolean flag = true;
//如果缓存中存在该文章,只更新分值
for (HotArticleVo hotArticleVo : hotArticleVoList) {
if (hotArticleVo.getId().equals(apArticle.getId())) {
hotArticleVo.setScore(score);
flag = false;
break;
}
}
//如果缓存中不存在,查询缓存中分值最小的一条数据,进行分值的比较,如果当前文章的分值大于缓存中的数据,就替换
if (flag) {
if (hotArticleVoList.size() >= 30) {
hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);
if (lastHot.getScore() < score) {
hotArticleVoList.remove(lastHot);
HotArticleVo hot = new HotArticleVo();
BeanUtils.copyProperties(apArticle, hot);
hot.setScore(score);
hotArticleVoList.add(hot);
}
} else {
HotArticleVo hot = new HotArticleVo();
BeanUtils.copyProperties(apArticle, hot);
hot.setScore(score);
hotArticleVoList.add(hot);
}
}
//缓存到redis
hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
cacheService.set(s, JSON.toJSONString(hotArticleVoList));
}
}
/**
* 更新文章行为数量
* @param mess
*/
private ApArticle updateArticle(ArticleVisitStreamMess mess) {
ApArticle apArticle = getById(mess.getArticleId());
apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());
apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());
apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());
apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());
updateById(apArticle);
return apArticle;
}
/**
* 计算文章的具体分值
* @param apArticle
* @return
*/
private Integer computeScore(ApArticle apArticle) {
Integer score = 0;
if(apArticle.getLikes() != null){
score += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;
}
if(apArticle.getViews() != null){
score += apArticle.getViews();
}
if(apArticle.getComment() != null){
score += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;
}
if(apArticle.getCollection() != null){
score += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;
}
return score;
}
步骤②:定义监听,接收聚合之后的数据,文章的分值重新进行计算
package com.heima.article.listener;
import com.alibaba.fastjson.JSON;
import com.heima.article.service.ApArticleService;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ArticleIncrHandleListener {
@Autowired
private ApArticleService apArticleService;
@KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
public void onMessage(String mess){
if(StringUtils.isNotBlank(mess)){
ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);
apArticleService.updateScore(articleVisitStreamMess);
}
}
}