场景描述
设备比较多,几十万甚至上百万,设备在时不时会上报消息。
用户可以设置设备60分钟、50分钟、40分钟、30分钟未上报数据,发送通知给用户,消息要及时可靠。
基本思路
思路:
由于设备在一直上报,如果直接存储到数据库对数据库的压力比较大,考虑使用缓存,每次上报都更新到缓存中; 由于是多久未上报发通知,考虑使用定时任务查找超过60/50/40/30min的设备;定时任务遍历时要尽可能少的查询设备缓存,因为绝大多数设备是不需要进行通知的,最好是只遍历需要发送通知的设备缓存,可以考虑使用类似于时间窗口机制,将设备缓存按时间进行分割,建立两个缓存,缓存1设备数据指向缓存2(主要用于实现设备数据在缓存2不用时间窗口转换),缓存2数据,用于定时任务数据扫描;考虑到消息通知的及时性,考虑使用延迟定时任务,来及时发送消息通知。由于设备比较大,考虑对缓存1按hash算法分割开来,来提升性能。
思路转化方案:
- 涉及的Redis缓存
- 缓存1(hash),用于找到缓存2
大Key:
device:one:0
,小Key:pk:8620241008283980
,Value:device:two:202410091900
, 即{"pk:8620241008283980":"device:two:202410091900"}
- 缓存2(hash), 通过缓存2达到过滤数据的目的
大Key:
device:two:202410091900
,小Key:pk:8620241008283980
,Value:1728473450149
, 即{"pk:8620241008283980":"1728473450149"}
- PK:DK按照hash算法,分成100份,设备上报时,存储到缓存1中
- 按照1分钟为跨度,设备上报时,将当前设备数据存储到缓存2中
- 设备上报时,判断该设备是否有延迟定时任务,如果存在删除该延迟定时任务,判断该设备是否存在缓存1与缓存2,如果存在先删除,再添加。(其过程实现了数据在缓存2不同集合的转化)
- 定时任务:根据当前时间,扫描对应60/50/40min前的缓存2数据,并添加到延迟定时任务(考虑到消息要及时发送)中
- 延迟定时任执行:删除缓存1该设备数据,删除缓存2该设备数据,下发通知
基本流程
方案示意图:
设备上报处理流程:
定时任务处理流程:
业务流程实现
设备上报处理逻辑
场景1: 缓存1中存在,缓存2中也存在,延迟定时任务中也存在
删除该设备延迟定时任务数据,删除缓存2数据,删除缓存1数据,新增缓存2,新增缓存1
场景2: 缓存1中存在,缓存2中也存在,延迟定时任务中不存在
删除缓存2数据,删除缓存1数据,新增缓存2,新增缓存1
场景3: 缓存1不存在,缓存2中不存在,延迟定时任务中不存在
新增缓存2,新增缓存1
相关代码:
package com.angel.ocean.service;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.angel.ocean.redis.RedisCacheKey;
import com.angel.ocean.util.FutureTaskUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Map;
@Slf4j
@Service
public class DataHandlerService {
private static final String COMMA = ":";
@Resource
private RedissonClient redissonClient;
@Resource
private ScheduleTaskService scheduleTaskService;
public void setCache(String productKey, String deviceKey, long ts, int expiredNoticeTime) {
String childKey = productKey + COMMA + deviceKey;
String oneKey = RedisCacheKey.getCacheOneHashKey(productKey, deviceKey);
RMap<String, String> oneHash = redissonClient.getMap(oneKey);
String oldTwoKey = oneHash.get(childKey);
if(StrUtil.isNotEmpty(oldTwoKey)) {
if(FutureTaskUtil.futureTasks.containsKey(childKey)) {
log.info("移除通知延迟任务,{}", childKey);
scheduleTaskService.stopTask(childKey);
}
RMap<String, String> oldTwoHash = redissonClient.getMap(oldTwoKey);
log.info("该设备缓存已存在,先删除历史缓存,再更新,{}", childKey);
// 删除缓存2
oldTwoHash.remove(childKey);
// 删除缓存1
oneHash.remove(childKey);
}
String twoKey = RedisCacheKey.getCacheTwoHashKey(ts);
RMap<String, String> twoHash = redissonClient.getMap(twoKey);
long expiredTime = ts + expiredNoticeTime * 60 * 1000L;
twoHash.put(childKey, Long.toString(expiredTime));
oneHash.put(childKey, twoKey);
}
}
缓存工具类:
package com.angel.ocean.redis;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
/**
* 缓存键
*/
public class RedisCacheKey {
public static final String COMMA = ":";
private static final int n = 100;
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
/**
* 获取缓存1 Key,依据pk和dk
* @param productKey
* @param deviceKey
* @return
*/
public static String getCacheOneHashKey(String productKey, String deviceKey) {
String data = productKey + COMMA + deviceKey;
return "device:one:" + Math.abs(data.hashCode()) % n;
}
/**
* 获取缓存2 Key,依据时间戳
* @param ts
* @return
*/
public static String getCacheTwoHashKey(long ts) {
// 将时间戳转换为 Instant
Instant instant = Instant.ofEpochMilli(ts);
ZoneId zoneId = ZoneId.systemDefault();
// 转换为 ZonedDateTime
ZonedDateTime zdt = instant.atZone(zoneId);
// 格式化 ZonedDateTime
String formattedDateTime = zdt.format(formatter);
// 构建并返回缓存键
return "device:two:" + formattedDateTime;
}
public static void main(String[] args) {
System.out.println(getCacheTwoHashKey(System.currentTimeMillis()));
}
}
定时任务逻辑(每分钟执行一次)
- 依据当前时间和多久未上报(60/50/40min),获取对应的缓存2数据
- 遍历该缓存2集合
- 判断该设备的通知时间,是否小于当前时间加上1分钟,如果小于就加入到延迟定时任务中
- 延迟定时任务执行时,删除该设备的缓存2数据,删除该设备的缓存1数据
相关代码:
package com.angel.ocean.task;
import com.angel.ocean.service.DataHandlerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Slf4j
@Component
public class ScheduledTasks {
@Resource
private DataHandlerService dataHandlerService;
// 每1分钟执行一次
// 遍历缓存2,放入延迟定时任务中
@Scheduled(cron = "0 0/1 * * * ?")
public void dataHandler() {
log.info("dataHandler....");
// 60分钟未上报通知
dataHandlerService.delayTaskHandler(60);
// 50分钟未上报通知
dataHandlerService.delayTaskHandler(50);
// 40分钟未上报通知
dataHandlerService.delayTaskHandler(40);
}
}
package com.angel.ocean.service;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.angel.ocean.redis.RedisCacheKey;
import com.angel.ocean.util.FutureTaskUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Map;
@Slf4j
@Service
public class DataHandlerService {
private static final String COMMA = ":";
@Resource
private RedissonClient redissonClient;
@Resource
private ScheduleTaskService scheduleTaskService;
// 将数据放入延迟定时任务
public void delayTaskHandler(int delayTime) {
long start = System.currentTimeMillis();
log.info("delayTaskHandler() start..., time:{}", System.currentTimeMillis());
long now = System.currentTimeMillis();
long ts = now - delayTime * 60 * 1000L;
String twoKey = RedisCacheKey.getCacheTwoHashKey(ts);
RMap<String, String> hashMap = redissonClient.getMap(twoKey);
if(CollUtil.isEmpty(hashMap)) {
return;
}
Map<String, String> allEntries = hashMap.readAllMap();
allEntries.forEach((key, value) -> {
long tsLimit = now + 60000;
log.info("tsLimit={}, ts={}", tsLimit, value);
if(Long.parseLong(value) < tsLimit) {
Runnable task = () -> {
noticeHandler(key, twoKey);
};
if(Long.parseLong(value) <= System.currentTimeMillis()) {
scheduleTaskService.singleTask(task);
} else {
scheduleTaskService.delayTask(key, task, Long.parseLong(value) - System.currentTimeMillis());
}
}
});
long end = System.currentTimeMillis();
log.info("delayTaskHandler() end..., 耗时:{}毫秒", (end - start));
}
// 模拟通知逻辑
private void noticeHandler(String childKey, String twoKey) {
log.info("发送通知,设备:{}, ts={}", childKey, System.currentTimeMillis());
String[] arr = childKey.split(RedisCacheKey.COMMA);
String oneKey = RedisCacheKey.getCacheOneHashKey(arr[0], arr[1]);
RMap<String, String> oneHash = redissonClient.getMap(oneKey);
String currentTwoKey = oneHash.get(childKey);
// 由于并发问题,会存在延迟定时任务(twoKey)的与缓存1中存储的值(currentTwoKey)不一致,因此,需要校验两个值是否相同。
if(StrUtil.isNotEmpty(currentTwoKey) && currentTwoKey.equals(twoKey)) {
// TODO 相同的话执行通知逻辑,删除缓存1
// 删除缓存1
oneHash.remove(childKey);
}
// 删除缓存2,无论twoKey与currentTwoKey相不相同都删除
RMap<String, String> twoHash = redissonClient.getMap(twoKey);
twoHash.remove(childKey);
}
}
延迟定时任务实现
Springboot定时任务,线程池配置
package com.angel.ocean.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Configuration
public class SchedulerConfig {
@Bean
public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(20); // 设置线程池大小
scheduler.setThreadNamePrefix("Thread-task-"); // 设置线程名称前缀
scheduler.setDaemon(true); // 设置为守护线程
// 你可以继续设置其他属性...
return scheduler;
}
}
定时任务工具类
package com.angel.ocean.util;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
@Slf4j
public class FutureTaskUtil {
private FutureTaskUtil() {
}
// FutureTask集合
public static ConcurrentMap<String, ScheduledFuture<?>> futureTasks = new ConcurrentHashMap<String, ScheduledFuture<?>>();
/**
* 判断是否包含 futureTask
* @param taskId
* @return
*/
public static boolean isContains(String taskId) {
boolean result = false;
if(futureTasks.containsKey(taskId)) {
result = true;
}
return result;
}
/**
* 添加 futureTask
* @param taskId
* @param futureTask
*/
public static void addFutureTask(String taskId, ScheduledFuture<?> futureTask) {
if(futureTasks.containsKey(taskId)) {
log.error("FutureTaskUtil.addFutureTask(), key: {}已存在", taskId);
return;
}
futureTasks.put(taskId, futureTask);
}
/**
* 获取 futureTask
* @param taskId
* @return
*/
public static ScheduledFuture<?> getFutureTask(String taskId) {
ScheduledFuture<?> futureTask = null;
if(futureTasks.containsKey(taskId)) {
log.info("FutureTaskUtil.getFutureTask(), taskId: {}", taskId);
futureTask = futureTasks.get(taskId);
}
return futureTask;
}
/**
* 移除 futureTask
* @param taskId
*/
public static void removeFutureTask(String taskId) {
if(futureTasks.containsKey(taskId)) {
log.info("FutureTaskUtil.removeFutureTask(), taskId: {}", taskId);
futureTasks.remove(taskId);
}
}
}
需要关注的问题
- 并发问题如何处理?
由于并发问题,会造成缓存1和缓存2的数据不一致,延迟任务执行时校验缓存1中存储的缓存2的Key于延迟定时任务的缓存Key是否一致,一致的话才下发通知。
- 服务重启,造成延迟定时任务数据丢失,如何补发通知?
由于延迟定时任务存在于内存中,服务重新启动,会导致其数据丢失,可以考虑从缓存2再拿一次数据,做个数据补偿。
package com.angel.ocean.runner;
import com.angel.ocean.service.DataHandlerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Slf4j
@Component
public class StartupRunner implements CommandLineRunner {
@Resource
private DataHandlerService dataHandlerService;
@Override
public void run(String... args) throws Exception {
// 只处理近2个小时的数据
int i = 120;
while (i > 50) {
dataHandlerService.delayTaskHandler(i);
i = i - 1;
}
}
}
模拟验证
package com.angel.ocean.test;
import cn.hutool.core.util.RandomUtil;
import com.angel.ocean.domain.DeviceCacheInfo;
import java.util.ArrayList;
import java.util.List;
public class DeviceDataUtil {
private static int deviceNumber = 500000;
private static List<String> dks = new ArrayList<>(deviceNumber + 5);
private static boolean initFlag = false;
private static void init() {
int number = 1;
while (number <= deviceNumber) {
String formattedNumber = String.format("%06d", number);
String dk = "8620241008" + formattedNumber;
dks.add(dk);
number++;
}
initFlag = true;
}
public static void setDeviceNumber(int number) {
DeviceDataUtil.deviceNumber = number;
}
public static DeviceCacheInfo deviceReport() {
if(!initFlag) {
init();
}
DeviceCacheInfo deviceCacheInfo = new DeviceCacheInfo();
deviceCacheInfo.setProductKey("pk");
deviceCacheInfo.setTs(System.currentTimeMillis());
deviceCacheInfo.setExpiredNoticeTime(60);
String dk = dks.get(RandomUtil.randomInt(1, deviceNumber));
deviceCacheInfo.setDeviceKey(dk);
return deviceCacheInfo;
}
}
package com.angel.ocean;
import com.angel.ocean.domain.DeviceCacheInfo;
import com.angel.ocean.service.DataHandlerService;
import com.angel.ocean.test.DeviceDataUtil;
import com.angel.ocean.util.ThreadPoolUtil;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
@Slf4j
@SpringBootTest
class ApplicationTests {
@Resource
private DataHandlerService dataHandlerService;
@Test
void contextLoads() {
for(int i = 0; i < 500000; i++) {
DeviceCacheInfo deviceCacheInfo = DeviceDataUtil.deviceReport();
Runnable task = () -> {
dataHandlerService.setCache(deviceCacheInfo.getProductKey(), deviceCacheInfo.getDeviceKey(), deviceCacheInfo.getTs(), deviceCacheInfo.getExpiredNoticeTime());
};
ThreadPoolUtil.pools.submit(task);
try {
Thread.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
package com.angel.ocean.util;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolUtil {
private ThreadPoolUtil() {}
public static final ThreadPoolExecutor pools = new ThreadPoolExecutor(16, 50, 60, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(10000),
new ThreadFactoryBuilder().setNamePrefix("MyThread-").build(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
缓存截图:
缓存1:
缓存2:
运行日志截图:
执行延迟定时任务日志截图: