1、controller层控制
@Resource
private RedissonClient redissonClient;
@Slf4j
@Service
public class CustomerSettlementExportServiceImpl implements ICustomerSettlementExportService {
/**
* 文件加入队列顺序导出
*
* @param pubFileExportList 参数
* @return 结果
*/
public AjaxResult pubFileExport(List<PubFileExport> pubFileExportList) {
if (CollectionUtils.isEmpty(pubFileExportList)) {
return AjaxResult.error("客户信息不能为空!");
}
RQueue<String> queue = redissonClient.getQueue("downloadQueue:file:export");
for (PubFileExport pubFileExport : pubFileExportList) {
try {
queue.add(JSONObject.toJSONString(pubFileExport));
} catch (Exception e) {
log.error("加入队列失败=", e);
return AjaxResult.error("添加导出任务到队列失败!");
}
}
return AjaxResult.success();
}
}
以下代码是通过定时任务拉取队列进行文件下载
@Slf4j
@Service
public class FileDownloadServiceImpl implements IFileDownloadService {
@Resource
private ICustomerSettlementExportService customerSettlementExportService;
@Resource
private IPubFileExportService pubFileExportService;
@Resource
private RedisTemplate<String, String> redisTemplate;
@Resource
private RedissonClient redissonClient;
@Scheduled(fixedRate = 10000)
public void processDownloadsInit() {
// 提前过滤掉特定 IP 地址
if (isLocalIp()) {
return;
}
RLock lock = null;
try {
lock = redissonClient.getLock(BmsConstants.QUEUE_KEY_LOCK);
boolean isOrderLock = lock.tryLock(10, TimeUnit.SECONDS);
if (!isOrderLock) {
// 当前任务未执行完,不能获取锁
log.error("未能获取分布式锁,当前任务未执行完或有其他实例正在执行");
return;
}
// 阻塞获取队列中的任务
String exportTask = redisTemplate.opsForList().rightPop(BmsConstants.QUEUE_KEY, 10, TimeUnit.SECONDS);
if (exportTask != null) {
log.info("队列文件下载开始=" + DateUtils.getTime());
handleDownload(exportTask);
log.info("队列文件下载结束=" + DateUtils.getTime());
}
} catch (Exception e) {
log.error("队列文件下载异常=", e);
} finally {
safeUnlock(lock);
}
}
/**
* 执行队列文件下载
*
* @param exportTask 导出参数
*/
private void handleDownload(String exportTask) {
try {
PubFileExport pubFileExport = JSONObject.parseObject(exportTask, PubFileExport.class);
// 实际的文件下载逻辑
if (pubFileExport.getExportStartTime() == null
|| pubFileExport.getExportEndTime() == null
|| StringUtils.isBlank(pubFileExport.getClientCode())) {
log.error("客户数据文件下载的参数为空请检查!");
return;
}
AjaxResult ajaxResult = customerSettlementExportService.exportSheets(pubFileExport);
pubFileExportService.updateExportAllExcel(ajaxResult, pubFileExport.getGid(), pubFileExport.getFinalUrl());
} catch (Exception e) {
log.error("执行队列文件下载异常=", e);
}
}
/**
* 检查是否为本地 IP 地址
*
* @return 是否为本地 IP 地址
*/
private boolean isLocalIp() {
try {
InetAddress localAddress = InetAddress.getLocalHost();
String ip = localAddress.getHostAddress();
return ip.startsWith("192.168");
} catch (Exception e) {
log.error("获取本地 IP 地址异常=", e);
return false;
}
}
/**
* 安全释放锁
*
* @param lock 锁对象
*/
private void safeUnlock(RLock lock) {
if (lock != null && lock.isLocked() && lock.isHeldByCurrentThread()) {
try {
lock.unlock();
} catch (Exception e) {
log.error("释放锁异常=", e);
}
}
}
}
3、解析下代码
这段代码实现了文件下载任务的队列管理和处理。
主要功能包括:
任务入队:将多个文件导出任务加入Redis队列。
定时任务处理:每隔10秒从队列中取出任务并执行文件下载。
分布式锁控制:确保同一时间只有一个实例处理任务,避免重复执行。
本地IP过滤:防止本地IP地址触发任务。
控制流图(CFG)
flowchart TD
A[开始] --> B{是否为本地IP}
B -->|是| E[结束]
B -->|否| C[获取分布式锁]
C --> D{是否获取成功}
D -->|否| F[记录日志并结束]
D -->|是| G[从队列取任务]
G --> H{是否有任务}
H -->|否| I[释放锁并结束]
H -->|是| J[处理下载任务]
J --> K[记录开始时间]
K --> L[调用下载逻辑]
L --> M[记录结束时间]
M --> N[释放锁并结束]
详细说明
任务入队 (pubFileExport 方法):
检查任务列表是否为空。
将每个任务序列化后加入Redis队列。
定时任务处理 (processDownloadsInit 方法):
检查是否为本地IP地址,如果是则直接返回。
尝试获取分布式锁,如果失败则记录日志并返回。
从Redis队列中取出任务,如果有任务则调用处理方法。
记录任务开始和结束时间,并释放锁。
处理下载任务 (handleDownload 方法):
反序列化任务参数。
检查参数是否完整,如果不完整则记录错误并返回。
调用客户结算导出服务进行文件下载,并更新导出状态。
辅助方法:
isLocalIp:检查当前IP是否为本地IP。
safeUnlock:安全释放分布式锁。