目录
一、概览
二、过期文件删除机制
三、参考资料
一、概览
RocketMQ操作CommitLog、ConsumeQueue文件是基于内存映射机制并在启动时会加载commitlog、consumequeue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储Broker服务器上,所以需要删除己过期的文件。
RocketMQ采用顺序写Commitlog文件、ConsumeQueue文件,所有写操作全部落在最后一个CommitLog或ConsumeQueue文件上,之前的文件在下一个文件创建后将不会再被更新。若是更新消息时(如:更新消息的延迟重试次数),采用重新写入的方式,而不是直接更新原始消息。
RocketMQ清除过期文件的方法是:如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,不会关注该文件的消息是否全部被消费。默认每个文件的过期时间为48小时,通过broker.conf配置文件中fileReservedTime参数来改变过期时间,单位为小时。
二、过期文件删除机制
Broker启动时,会创建Schedule定时任务并启动,方法org.apache.rocketmq.store.DefaultMessageStore#addScheduleTask执行文件过期删除、消息存储自检查、存储锁住、Checkpoint刷盘等定时任务。如下代码所示,文件过期删除,每10s执行一次。方法调用链如下,看出CommitLog、ConsumeQueue过期文件删除共用一套删除机制,这里介绍Commitlog文件过期删除。
/**
* 日程任务,如:文件过期删除、消息存储自检查、存储锁住、Checkpoint刷盘
* 总入口:DefaultMessageStore#start(),即:Broker启动时,添加这些日程
*/
private void addScheduleTask() {
// 文件过期删除,每10s执行一次
this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
@Override
public void run2() {
DefaultMessageStore.this.cleanFilesPeriodically();
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
......
}
// org.apache.rocketmq.store.DefaultMessageStore#cleanFilesPeriodically
// 删除CommitLog、ConsumeQueue过期文件
private void cleanFilesPeriodically() {
// CommitLog文件
this.cleanCommitLogService.run();
// ConsumeQueue文件
this.cleanConsumeQueueService.run();
// 更新逻辑偏移量
this.correctLogicOffsetService.run();
}
org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#deleteExpiredFiles是Commitlog文件过期删除的核心方法,其调用链及代码如下所示。
注意,触发删除过期文件的三个条件(满足其一即可删除):
- 指定时间点删除文件(默认每天凌晨4点,通过brker.conf配置文件参数:deleteWhen);
- 磁盘不足;
- 人工删除,调用excuteDeleteFilesManualy方法手工触发过期文件删除。
/**
* 删除过期Commitlog文件
*/
private void deleteExpiredFiles() {
// 删除文件计数
int deleteCount = 0;
// 文件保留时间,离最后一次修改多长时间后删除,默认48h(通过brker.conf配置文件参数:fileReservedTime)
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
// 两次删除真实物理文件的间隔时间
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
// 离第一次拒绝删除的最大保留时间(文件被其他线程所引用,拒绝删除),超出改时间后,强制删除
int destroyMappedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
// 批量删除文件的最大数量
int deleteFileBatchMax = DefaultMessageStore.this.getMessageStoreConfig().getDeleteFileBatchMax();
/*
* 满足下列3条之一,则直接删除:
* 1. 指定时间点删除文件(默认每天凌晨4点,通过brker.conf配置文件参数:deleteWhen)
* 2. 磁盘不足
* 3. 人工删除
*/
boolean isTimeUp = this.isTimeToDelete(); // 指定时间点删除文件
boolean isUsageExceedsThreshold = this.isSpaceToDelete(); // 磁盘不足
boolean isManualDelete = this.manualDeleteFileSeveralTimes > 0; // 人工删除
if (isTimeUp || isUsageExceedsThreshold || isManualDelete) {
if (isManualDelete) {
this.manualDeleteFileSeveralTimes--;
}
// 是否可直接删除文件
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
LOGGER.info("begin to delete before {} hours file. isTimeUp: {} isUsageExceedsThreshold: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {} deleteFileBatchMax: {}",
fileReservedTime,
isTimeUp,
isUsageExceedsThreshold,
manualDeleteFileSeveralTimes,
cleanAtOnce,
deleteFileBatchMax);
fileReservedTime *= 60 * 60 * 1000;
// 执行文件删除
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMappedFileIntervalForcibly, cleanAtOnce, deleteFileBatchMax);
if (deleteCount > 0) {
// If in the controller mode, we should notify the AutoSwitchHaService to truncateEpochFile
if (DefaultMessageStore.this.brokerConfig.isEnableControllerMode()) {
if (DefaultMessageStore.this.haService instanceof AutoSwitchHAService) {
final long minPhyOffset = getMinPhyOffset();
((AutoSwitchHAService) DefaultMessageStore.this.haService).truncateEpochFilePrefix(minPhyOffset - 1);
}
}
} else if (isUsageExceedsThreshold) {
LOGGER.warn("disk space will be full soon, but delete file failed.");
}
}
}
org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#isSpaceToDelete是判定磁盘不足的核心方法,代码如下。
/**
* 磁盘不足,则直接删除文件
* Commilog文件、ConsumeQueue文件所在磁盘分区使用率:
* > diskSpaceWarningLevelRatio设置,默认0.9时,则设置磁盘不可写,拒绝新消息的写入
* > diskSpaceCleanForciblyRatio设置,默认0.85,则执行过期文件清除,但不会拒绝新消息的写入
* < diskSpaceCleanForciblyRatio时,恢复磁盘可写
* 磁盘最大使用量:超出则直接删除文件
* @return
*/
private boolean isSpaceToDelete() {
// 是否直接删除
cleanImmediately = false;
/*
磁盘使用率处理:
> diskSpaceWarningLevelRatio设置,默认0.9时,则设置磁盘不可写,拒绝新消息的写入
> diskSpaceCleanForciblyRatio设置,默认0.85,则执行过期文件清除,但不会拒绝新消息的写入
< diskSpaceCleanForciblyRatio时,恢复磁盘可写
*/
// Commilog文件所在磁盘分区使用率
String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
String[] storePaths = commitLogStorePath.trim().split(MixAll.MULTI_PATH_SPLITTER);
Set<String> fullStorePath = new HashSet<>();
double minPhysicRatio = 100;
String minStorePath = null;
for (String storePathPhysic : storePaths) {
// Commitlog文件所在分区的磁盘使用率
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
if (minPhysicRatio > physicRatio) {
minPhysicRatio = physicRatio;
minStorePath = storePathPhysic;
}
// 系统参数-Drocketmq.broker.diskSpaceCleanForciblyRatio设置,默认0.85。磁盘分区使用率超过该阈值,则执行过期文件清除,但不会拒绝新消息的写入
if (physicRatio > getDiskSpaceCleanForciblyRatio()) {
fullStorePath.add(storePathPhysic);
}
}
DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
// 系统参数-Drocketmq.broker.diskSpaceWarningLevelRatio设置,默认0.9。磁盘分区使用率超过该阈值,则设置磁盘不可写,拒绝新消息的写入
if (minPhysicRatio > getDiskSpaceWarningLevelRatio()) {
boolean diskFull = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskFull) {
DefaultMessageStore.LOGGER.error("physic disk maybe full soon " + minPhysicRatio +
", so mark disk full, storePathPhysic=" + minStorePath);
}
cleanImmediately = true;
return true;
} else if (minPhysicRatio > getDiskSpaceCleanForciblyRatio()) {
cleanImmediately = true;
return true;
} else {
// 磁盘使用率 < diskSpaceCleanForciblyRatio时,恢复磁盘可写
boolean diskOK = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskOK) {
DefaultMessageStore.LOGGER.info("physic disk space OK " + minPhysicRatio +
", so mark disk ok, storePathPhysic=" + minStorePath);
}
}
// ConsumeQueue文件所在磁盘分区使用率
String storePathLogics = StorePathConfigHelper
.getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
if (logicsRatio > getDiskSpaceWarningLevelRatio()) {
boolean diskOK = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskOK) {
DefaultMessageStore.LOGGER.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full");
}
cleanImmediately = true;
return true;
} else if (logicsRatio > getDiskSpaceCleanForciblyRatio()) {
cleanImmediately = true;
return true;
} else {
boolean diskOK = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskOK) {
DefaultMessageStore.LOGGER.info("logics disk space OK " + logicsRatio + ", so mark disk ok");
}
}
/*
磁盘分区当前使用量,超出时则直接删除文件
*/
double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
int replicasPerPartition = DefaultMessageStore.this.getMessageStoreConfig().getReplicasPerDiskPartition();
// Only one commitLog in node
if (replicasPerPartition <= 1) {
if (minPhysicRatio < 0 || minPhysicRatio > ratio) {
DefaultMessageStore.LOGGER.info("commitLog disk maybe full soon, so reclaim space, " + minPhysicRatio);
return true;
}
if (logicsRatio < 0 || logicsRatio > ratio) {
DefaultMessageStore.LOGGER.info("consumeQueue disk maybe full soon, so reclaim space, " + logicsRatio);
return true;
}
return false;
} else {
long majorFileSize = DefaultMessageStore.this.getMajorFileSize();
long partitionLogicalSize = UtilAll.getDiskPartitionTotalSpace(minStorePath) / replicasPerPartition;
double logicalRatio = 1.0 * majorFileSize / partitionLogicalSize;
if (logicalRatio > DefaultMessageStore.this.getMessageStoreConfig().getLogicalDiskSpaceCleanForciblyThreshold()) {
// if logical ratio exceeds 0.80, then clean immediately
DefaultMessageStore.LOGGER.info("Logical disk usage {} exceeds logical disk space clean forcibly threshold {}, forcibly: {}",
logicalRatio, minPhysicRatio, cleanImmediately);
cleanImmediately = true;
return true;
}
boolean isUsageExceedsThreshold = logicalRatio > ratio;
if (isUsageExceedsThreshold) {
DefaultMessageStore.LOGGER.info("Logical disk usage {} exceeds clean threshold {}, forcibly: {}",
logicalRatio, ratio, cleanImmediately);
}
return isUsageExceedsThreshold;
}
}
三、参考资料
RocketMQ文件刷盘机制与过期文件删除_fFee-ops的博客-CSDN博客_rocketmq设置过期时间
消息队列 - RocketMQ -- 过期文件的删除 - 个人文章 - SegmentFault 思否
RocketMQ5.0.0消息存储<一>_存储文件及内存映射_爱我所爱0505的博客-CSDN博客
RocketMQ5.0.0消息存储<二>_消息存储流程_爱我所爱0505的博客-CSDN博客_rocketmq 消息写入流程RocketMQ5.0.0消息存储<三>_消息转发与恢复机制_爱我所爱0505的博客-CSDN博客
RocketMQ5.0.0消息存储<四>_刷盘机制_爱我所爱0505的博客-CSDN博客