目录
引言
需求分析:
思路
数据源:
数据传输:
数据处理:
数据统计:
数据可视化:
数据提取:
技术栈
技术实现
前端界面搭建
布局:
组件:
通信:
Redis客户端搭建
依赖:
Controller示例:
Service示例:
storm拓扑示例:
核心统计bolt:
统计结果持久化bolt:
Redis连接池:
股票大数据统计展屏(基于docker搭建的storm、zookeeper、kafka集群实现)
引言
前面的文章已经基于Docker-compose搭建kafka、storm、zookeeper集群,并对集群进行了测试。首先使用kafka读取文件数据,而后将数据发送给storm,最后使用storm实现对股票交易量以及交易金额的实时统计。
经过了之前的关于实时流计算组件的学习,对storm、kafka、zookeeper一列大数据组件有了一定的了解。《实时计算》这门课程的大作业要求我们实时接收、处理数据模拟器中的数据,并且按照要求展示到前端。
需求分析:
思路
数据源:
数据来源于老师提供的股票数据模拟器,该模拟器将会产生如下图的股票数据。关于数据源的获取,考虑使用kafka循环读取文件末尾的数据。
数据传输:
当kafka将文件中的数据读取到消息队列时,便将该数据向storm发送。
数据处理:
1、storm使用spout接收来自kafka的数据,而后将数据传递给bolt,通过bolt直接进行统计
2、storm使用spout接收来自kafka的数据,而后将数据传递给bolt,在bolt中主要将原始数据重构为适合Hbase存储结构并且方便统计的数据格式,最后将重构的数据存入Hbase中。
数据统计:
1、使用队列存储需要统计的数据量。使用队列储存数据的time作为时间窗口,将新来的数据的time插入队列后对其进行排序,查看队首队尾长度,累计到1min便将存储数据的队列头部元素弹出,每次弹出的元素通过一个变量对其进行相加吸收,如此每分钟的量与累计的量都能够统计到。
2、重构后的数据根据不同的统计任务具有不同的Rowkey,根据任务需求进行不同的查询。
数据可视化:
前端基于vue3框架使用dataV实现数据大屏前端页面搭建,并且按照要求每秒向redis/Hbase请求需要展示的数据。
数据提取:
后端基于Spring boot整合redis实现高并发的redis访问,根据前端的key取出value返回。
技术栈
kafka、storm、zookeeper、redis、vue3
技术实现
前端界面搭建
布局:
将屏幕分为左中右三部分,其中的每部分又分为上中下三部分,从左到右,从上到下依次用于显示实时交易金额/交易量统计、交易金额排名(累计/每分钟)、不同地区用户下单量、订单处理速度、交易量分布情况、实时买入/卖出交易量统计、交易量排名(累计/每分钟)
组件:
组件上主要使用echarts构建排名轮播图、油量表、地图、交易分布饼图
通信:
使用axios实现与后端的通信
//用于请求后端数据的接口
//获取订单的处理速度
export const getSpeed = (params: StockDataV.ReqPriceRank) => {
console.log("getSpeed", params);
return http.get<StockDataV.ResPriceRank>(`/redis/getSpeed`, params);
};
/**
* @description 常用请求方法封装
*/
get<T>(url: string, params?: object, _object = {}): Promise<ResultData<T>> {
return this.service.get(url, { params, ..._object });
}
class RequestHttp {
service: AxiosInstance;
public constructor(config: AxiosRequestConfig) {
// instantiation
this.service = axios.create(config);
/**
* @description 请求拦截器
* 客户端发送请求 -> [请求拦截器] -> 服务器
* token校验(JWT) : 接受服务器返回的 token,存储到 vuex/pinia/本地储存当中
*/
this.service.interceptors.request.use(
(config: CustomAxiosRequestConfig) => {
const userStore = useUserStore();
// 当前请求不需要显示 loading,在 api 服务中通过指定的第三个参数: { loading: false } 来控制
config.loading ?? (config.loading = true);
// config.loading && showFullScreenLoading();
if (config.headers && typeof config.headers.set === "function") {
config.headers.set("x-access-token", userStore.token);
}
return config;
},
(error: AxiosError) => {
return Promise.reject(error);
}
);
/**
* @description 响应拦截器
* 服务器换返回信息 -> [拦截统一处理] -> 客户端JS获取到信息
*/
this.service.interceptors.response.use(
(response: AxiosResponse) => {
const { data } = response;
const userStore = useUserStore();
tryHideFullScreenLoading();
// 登陆失效
if (data.code == ResultEnum.OVERDUE) {
userStore.setToken("");
router.replace(LOGIN_URL);
ElMessage.error(data.msg);
return Promise.reject(data);
}
// 全局错误信息拦截(防止下载文件的时候返回数据流,没有 code 直接报错)
if (data.code && data.code !== ResultEnum.SUCCESS) {
ElMessage.error(data.msg);
return Promise.reject(data);
}
// 成功请求(在页面上除非特殊情况,否则不用处理失败逻辑)
return data;
},
async (error: AxiosError) => {
const { response } = error;
tryHideFullScreenLoading();
// 请求超时 && 网络错误单独判断,没有 response
if (error.message.indexOf("timeout") !== -1) ElMessage.error("请求超时!请您稍后重试");
if (error.message.indexOf("Network Error") !== -1) ElMessage.error("网络错误!请您稍后重试");
// 根据服务器响应的错误状态码,做不同的处理
if (response) checkStatus(response.status);
// 服务器结果都没有返回(可能服务器错误可能客户端断网),断网处理:可以跳转到断网页面
if (!window.navigator.onLine) router.replace("/500");
return Promise.reject(error);
}
);
}
export default new RequestHttp(config);
Redis客户端搭建
依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
<scope>provided</scope>
</dependency>
<!-- 集成redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<artifactId>lettuce-core</artifactId>
<groupId>io.lettuce</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<!-- redis线程池 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.14.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.14.2</version>
</dependency>
</dependencies>
Controller示例:
该段代码通过RequestMapping配置了访问的路有,而后调用service层具体实现前端的请求
/**
* @description: 后段controller
* @author: qincheng
*/
@RestController
@RequestMapping("redis")
public class RedisController {
private final StockService stockService;
public RedisController(StockService stockService){
this.stockService = stockService;
}
@RequestMapping("getVPValue")
public ResponseEntity<StockDataV> getValue(@RequestParam("totalPrice") String totalPrice, @RequestParam("totalVolume") String totalVolume,
@RequestParam("perMinPrice") String perMinPrice, @RequestParam("perMinVolume") String perMinVolume){
System.out.println("totalPrice "+totalPrice);
StockDataV value = stockService.getPriceVolume(totalPrice, totalVolume, perMinPrice, perMinVolume);
if (value != null){
return ResponseWrapper.responseEntityAccept(value);
}else {
return ResponseWrapper.responseEntityFail("未找到key");
}
}
}
Service示例:
该段代码根据前端的参数向redis请求数据,由于存储的数据是小数位数过长,使用了一个工具类对其进行保留三位小数
@Service
public class StockService {
private final StringRedisTemplate stringRedisTemplate ;
private final ObjectMapper objectMapper = new ObjectMapper();
public StockService(StringRedisTemplate stringRedisTemplate){
this.stringRedisTemplate = stringRedisTemplate;
}
public StockDataV getPriceVolume(String totalPriceKey, String totalVolumeKey, String perMinPriceKey, String perMinVolumeKey){
ConvertValue convertValue = new ConvertValue();
// 向redis请求数据
String perMinPrice = convertValue.displayDecimal(stringRedisTemplate.opsForValue().get(perMinPriceKey));
String totalPrice = convertValue.displayDecimal(stringRedisTemplate.opsForValue().get(totalPriceKey));
String perMinVolume = stringRedisTemplate.opsForValue().get(perMinVolumeKey);
String totalVolume = stringRedisTemplate.opsForValue().get(totalVolumeKey);
// System.out.println("perMinPrice " + perMinPrice);
// 使用StockDataV存储数据
StockDataV stockDataV = new StockDataV();
stockDataV.setPerMinPrice(perMinPrice);
stockDataV.setPerMinVolume(perMinVolume);
stockDataV.setTotalVolume(totalVolume);
stockDataV.setTotalPrice(totalPrice);
return stockDataV;
}
storm拓扑示例:
该段代码创建了一个拓扑并设置了两个bolt用于统计交易金额与交易量以及将统计结果插入到Redis数据库中
public class TopologyForStock {
public static void main(String[] args) throws Exception {
// 配置拓扑
TopologyBuilder topologyBuilder = new TopologyBuilder();
// 配置spout
KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig.builder("192.168.43.219:9092,192.168.43.219:9093,192.168.43.219:9094", "stock_2_1").setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup").build();
// KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig.builder("broker1:9092,broker2:9092,broker3:9092", "stock_4").setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup").build();
// 设置spout
topologyBuilder.setSpout("kafka-spout", new KafkaSpout<>(kafkaSpoutConfig), 4);
// 设置bolt1 处理交易金额与交易量
topologyBuilder.setBolt("process-bolt", new TradeVPBolt(), 2).allGrouping("kafka-spout");
// 设置bolt1_1 向redis插入数据
topologyBuilder.setBolt("next-bolt1", new TradeVPBolt1(), 4).allGrouping("process-bolt");
// 配置
Config config = new Config();
// config.setMessageTimeoutSecs(600);
config.setMaxSpoutPending(200);
config.setNumAckers(6);
config.setNumWorkers(6);
// 提交topology
// StormSubmitter.submitTopology("stockStatistic", config, topologyBuilder.createTopology());
// 创建本地集群进行测试
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalReadingFromKafkaApp", config, topologyBuilder.createTopology());
}
}
核心统计bolt:
为了统计累计的交易金额与每分钟的交易金额,设计了一个时间窗口、丢弃累积容器以及一个在时间窗口限制下的交易金额记录队列,具体实现思路:每当接收到数据时,提取数据中time放入时间窗口,并对其排序,查看队首与队尾的时间间隔是否达到1分钟,若是则丢弃队首的time数据,同时丢弃交易金额记录队列队首的元素,并将其与迭起累积容器相加,通过这样一个算法便能够实现每分钟与累积的交易金额的统计,针对多个不同实体的交易金额情况统计,则通过一个Map数据结构实现,实体名称作key,交易金额记录队列作值。
/**
*统计交易金额与交易量
**/
public class TradeVPBolt extends BaseRichBolt {
private OutputCollector collector;
private Map<String, Deque<Long>> tradeVolumeMap;
private Map<String, Long> tradeVolumePopMap;
private Map<String, Deque<Double>> totalPriceMap;
private Map<String, Double> totalPricePopMap;
private List<Long> timeWindow;
private TimeFormatter timeFormatter;
private Long startTime;
private Long count;
@Override
public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
// 创建交易量记录map
tradeVolumeMap = new HashMap<>();
// 创建交易量 丢弃记录Map
tradeVolumePopMap = new HashMap<>();
// 创建总金额记录map
totalPriceMap = new HashMap<>();
// 创建总金额 丢弃记录Map
totalPricePopMap = new HashMap<>();
// 初始化timeFormatter
timeFormatter = new TimeFormatter();
// 初始化有序定长队列 作时间窗口
timeWindow = new LinkedList<>();
// 初始化开始时间
startTime = 0L;
// 初始化处理的单数
count = 0L;
}
@Override
public void execute(Tuple tuple) {
// 获取到spout的数据,将数据设计为特定的RowKey格式
String value = tuple.getStringByField("value");
// 将消息拆分
String[] parts = value.split(" ");
// System.out.println(value);
// 取出需要的字段
if (parts.length > 1) {
// 取出time
String time = parts[0] + " " + parts[1];
// System.out.println(time);
Long formattedTime = timeFormatter.convertTime(time);
// 取出trade_code code parts[2]
String key = "total";
// 取出省份
String province = parts[7];
// 取出price与volume
double price = Double.parseDouble(parts[4]);
long volume = Long.parseLong(parts[5]);
double totalPrice = price * volume;
// 创建一分钟时间窗口 有可能先产生的数据 后到!!
// 创建一个有序定长的队列 作时间窗口
timeWindow.add(formattedTime);
// 将时间窗口排序
Collections.sort(timeWindow);
if (tradeVolumeMap.containsKey(key)){
// 累积交易量 将新接受到了的数据压入队列
tradeVolumeMap.get(key).add(volume);
// 累积交易金额
totalPriceMap.get(key).add(totalPrice);
// 窗口时间达到1分钟 便记录1分钟的交易量 丢弃队列最前面的交易量 记录丢交易量的总数
if (timeWindow.get(timeWindow.size() - 1) - timeWindow.get(0) >= 60000){
// 弹出第一个时间点
timeWindow.remove(0);
// 弹出交易量记录map的头部元素
Long poppedVolume = tradeVolumeMap.get(key).poll();
// 弹出交易金额记录map的头部元素
Double poppedPrice = totalPriceMap.get(key).poll();
if (poppedVolume != null) {
// 取出丢弃记录map 与 新丢弃值相加
Long curTradeVolume = tradeVolumePopMap.get(key);
tradeVolumePopMap.put(key, poppedVolume + curTradeVolume);
}
if (poppedPrice != null) {
// 取出丢弃记录map 与 新丢弃值相加
Double curTradePrice = totalPricePopMap.get(key);
totalPricePopMap.put(key, poppedPrice + curTradePrice);
}
}
}else {
// 创建动态数组队列
Deque<Long> longDeque = new ArrayDeque<>();
longDeque.add(volume);
tradeVolumeMap.put(key, longDeque);
// 初始化tradeVolumePopMap
tradeVolumePopMap.put(key, 0L);
// 创建动态数组队列
Deque<Double> doubleDeque = new ArrayDeque<>();
doubleDeque.add(totalPrice);
totalPriceMap.put(key, doubleDeque);
totalPricePopMap.put(key, 0.0);
}
// System.out.println("1"+tradeVolumeMap);
// System.out.println(tradeVolumePopMap);
// System.out.println("-----------------------------------------------");
// 创建一个线程安全的交易量map 用于emit
CreateCopy createCopy = new CreateCopy();
Map<String, Deque<Long>> copyTradeVolumeMap = createCopy.copyForVolume(tradeVolumeMap);
// 创建一个线程安全的 丢弃交易量map 用于emit
Map<String, Long> copyTradeVolumePopMap = createCopy.copyForVolumePop(tradeVolumePopMap);
// 交易金额
Map<String, Deque<Double>> copyTotalPriceMap = createCopy.copyForPrice(totalPriceMap);
// 创建一个线程安全的 丢弃交易金额map 用于emit
Map<String, Double> copyTotalPricePopMap= createCopy.copyForPricePop(totalPricePopMap);
// 统计处理订单的速度
count += 1L;
Long currentTime = System.currentTimeMillis();
// 创建redis客户端
ClientRedis clientRedis = new ClientRedis();
// 每1s统计一次
if (currentTime - startTime >= 1000L){
// 插入redis
clientRedis.RedisSet("count", String.valueOf(count / 10));
// System.out.println("count1" + count);
// 更新startTime
startTime = currentTime;
// 重置count
count = 0L;
}
this.collector.emit(new Values(copyTradeVolumeMap, copyTradeVolumePopMap, copyTotalPriceMap, copyTotalPricePopMap));
this.collector.ack(tuple);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("copyTradeVolumeMap", "copyTradeVolumePopMap", "copyTotalPriceMap", "copyTotalPricePopMap"));
}
}
统计结果持久化bolt:
该段代码接受上一个bolt传递的存储丢弃累积容器以及交易金额记录队列的Map,遍历Map通过Redis客户端将统计结果存入Redis
/**
* 每小时交易量与累积交易量准确
*/
public class TradeVPBolt1 extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
// 获取到tradeVolumeMap tradeVolumePopMap
Map<String, Deque<Long>> tradeVolumeMap = (Map<String, Deque<Long>>) tuple.getValueByField("copyTradeVolumeMap");
Map<String, Long> tradeVolumePopMap = (Map<String, Long>) tuple.getValueByField("copyTradeVolumePopMap");
// 获取到totalPriceMap totalPricePopMap
Map<String, Deque<Double>> totalPriceMap = (Map<String, Deque<Double>>) tuple.getValueByField("copyTotalPriceMap");
Map<String, Double> totalPricePopMap = (Map<String, Double>) tuple.getValueByField("copyTotalPricePopMap");
Long perMinVolume = null;
Long totalVolume = null;
Double perMinPrice = null;
Double totalPrice = null;
// 连接redis
ClientRedis clientRedis = new ClientRedis();
// 开始统计每分钟的交易量
for (Map.Entry<String, Deque<Long>> entry : tradeVolumeMap.entrySet()) {
Deque<Long> volumeMap = entry.getValue();
// Long first = volumeMap.getFirst();
// Long last = volumeMap.getLast();
// System.out.println("first:" + first);
// System.out.println("last:" + last);
// System.out.println("每分钟交易量map队列长度:"+entry.getValue().size());
perMinVolume = entry.getValue().stream().mapToLong(Long::longValue).sum();
}
if (tradeVolumePopMap.get("total").equals(0L)) {
// 统计累积量
totalVolume = perMinVolume;
// 将累积交易量插入redis
clientRedis.RedisSet("totalVolume", String.valueOf(totalVolume));
// System.out.println("累积交易量为:" + totalVolume);
}else {
totalVolume = perMinVolume + tradeVolumePopMap.get("total");
clientRedis.RedisSet("totalVolume", String.valueOf(totalVolume));
// System.out.println("插入累积交易量成功");
// System.out.println("累积交易量为:" + totalVolume);
clientRedis.RedisSet("perMinVolume", String.valueOf(perMinVolume));
// System.out.println("插入每小时累积交易量成功");
// System.out.println("每分钟交易量为:"+ perMinVolume);
}
// 开始统计每分钟的交易金额
for (Map.Entry<String, Deque<Double>> entry : totalPriceMap.entrySet()) {
perMinPrice = entry.getValue().stream().mapToDouble(Double::doubleValue).sum();
}
if (totalPricePopMap.get("total").equals(0.0)) {
// 统计累积交易金额
totalPrice = perMinPrice;
// 将累积交易金额插入redis
clientRedis.RedisSet("totalPrice", String.valueOf(totalPrice));
// System.out.println("累积交易金额为:" + totalPrice);
}else {
totalPrice = perMinPrice + totalPricePopMap.get("total");
clientRedis.RedisSet("totalPrice", String.valueOf(totalPrice));
// System.out.println("插入累积交易金额成功");
// System.out.println("累积交易金额为:" + totalPrice);
clientRedis.RedisSet("perMinPrice", String.valueOf(perMinPrice));
// System.out.println("插入每小时累积交易金额成功");
// System.out.println("每分钟累积交易金额为:"+ perMinPrice);
}
System.out.println("--------------------------------------------------------");
this.collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
Redis连接池:
为了应对插入数据时频繁的连接Redis,考虑建立一个连接池,并对连接池的进行最大连接数、最大空间连接数以及最大等待时间等参数进行配置。
/**
* 通过创建线程池 维护redis读写稳定
*/
public class ClientRedis {
private static final String Host = "127.0.0.1";
private static final Integer Port = 6379;
private static volatile JedisPool jedisPool;
public ClientRedis(){
jedisPool = ConnectRedis();
}
/**
* 双重检查锁定模式
* @return jedisPool
*/
public static JedisPool ConnectRedis(){
if (jedisPool == null){
synchronized (ClientRedis.class){
if (jedisPool == null){
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxIdle(1000); // 最大连接数
jedisPoolConfig.setMaxIdle(32); // 最大空闲连接数
jedisPoolConfig.setMaxWaitMillis(100 * 1000); // 最大等待时间
jedisPoolConfig.setTestOnBorrow(true); // 检查连接可用性 确保获取的redis实例可用
jedisPool = new JedisPool(jedisPoolConfig, Host, Port);
}
}
}
return jedisPool;
}
/**
* 向连接池中取出实例 用完放回
* @param key 键
* @param value 值
*/
public void RedisSet(String key, String value){
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.set(key, value); // 插入数据
}catch (Exception e){
e.printStackTrace();
}finally {
if (jedis != null){
jedis.close(); // 关闭连接
}
}
}
}