目录
一、概览
二、过期文件删除机制
三、参考资料
RocketMQ操作CommitLog、ConsumeQueue文件是基于内存映射机制并在启动时会加载commitlog、consumequeue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储Broker服务器上,所以需要删除己过期的文件。
RocketMQ采用顺序写Commitlog文件、ConsumeQueue文件,所有写操作全部落在最后一个CommitLog或ConsumeQueue文件上,之前的文件在下一个文件创建后将不会再被更新。若是更新消息时(如:更新消息的延迟重试次数),采用重新写入的方式,而不是直接更新原始消息。
RocketMQ清除过期文件的方法是:如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,不会关注该文件的消息是否全部被消费。默认每个文件的过期时间为48小时,通过broker.conf配置文件中fileReservedTime参数来改变过期时间,单位为小时。
Broker启动时,会创建Schedule定时任务并启动,方法org.apache.rocketmq.store.DefaultMessageStore#addScheduleTask执行文件过期删除、消息存储自检查、存储锁住、Checkpoint刷盘等定时任务。如下代码所示,文件过期删除,每10s执行一次。方法调用链如下,看出CommitLog、ConsumeQueue过期文件删除共用一套删除机制,这里介绍Commitlog文件过期删除。
/*** 日程任务,如:文件过期删除、消息存储自检查、存储锁住、Checkpoint刷盘* 总入口:DefaultMessageStore#start(),即:Broker启动时,添加这些日程*/
private void addScheduleTask() {// 文件过期删除,每10s执行一次this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {@Overridepublic void run2() {DefaultMessageStore.this.cleanFilesPeriodically();}}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);......
}// org.apache.rocketmq.store.DefaultMessageStore#cleanFilesPeriodically
// 删除CommitLog、ConsumeQueue过期文件
private void cleanFilesPeriodically() {// CommitLog文件this.cleanCommitLogService.run();// ConsumeQueue文件this.cleanConsumeQueueService.run();// 更新逻辑偏移量this.correctLogicOffsetService.run();
}
org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#deleteExpiredFiles是Commitlog文件过期删除的核心方法,其调用链及代码如下所示。
注意,触发删除过期文件的三个条件(满足其一即可删除):
/*** 删除过期Commitlog文件*/
private void deleteExpiredFiles() {// 删除文件计数int deleteCount = 0;// 文件保留时间,离最后一次修改多长时间后删除,默认48h(通过brker.conf配置文件参数:fileReservedTime)long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();// 两次删除真实物理文件的间隔时间int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();// 离第一次拒绝删除的最大保留时间(文件被其他线程所引用,拒绝删除),超出改时间后,强制删除int destroyMappedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();// 批量删除文件的最大数量int deleteFileBatchMax = DefaultMessageStore.this.getMessageStoreConfig().getDeleteFileBatchMax();/** 满足下列3条之一,则直接删除:* 1. 指定时间点删除文件(默认每天凌晨4点,通过brker.conf配置文件参数:deleteWhen)* 2. 磁盘不足* 3. 人工删除*/boolean isTimeUp = this.isTimeToDelete(); // 指定时间点删除文件boolean isUsageExceedsThreshold = this.isSpaceToDelete(); // 磁盘不足boolean isManualDelete = this.manualDeleteFileSeveralTimes > 0; // 人工删除if (isTimeUp || isUsageExceedsThreshold || isManualDelete) {if (isManualDelete) {this.manualDeleteFileSeveralTimes--;}// 是否可直接删除文件boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;LOGGER.info("begin to delete before {} hours file. isTimeUp: {} isUsageExceedsThreshold: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {} deleteFileBatchMax: {}",fileReservedTime,isTimeUp,isUsageExceedsThreshold,manualDeleteFileSeveralTimes,cleanAtOnce,deleteFileBatchMax);fileReservedTime *= 60 * 60 * 1000;// 执行文件删除deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,destroyMappedFileIntervalForcibly, cleanAtOnce, deleteFileBatchMax);if (deleteCount > 0) {// If in the controller mode, we should notify the AutoSwitchHaService to truncateEpochFileif (DefaultMessageStore.this.brokerConfig.isEnableControllerMode()) {if (DefaultMessageStore.this.haService instanceof AutoSwitchHAService) {final long minPhyOffset = getMinPhyOffset();((AutoSwitchHAService) DefaultMessageStore.this.haService).truncateEpochFilePrefix(minPhyOffset - 1);}}} else if (isUsageExceedsThreshold) {LOGGER.warn("disk space will be full soon, but delete file failed.");}}
}
org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#isSpaceToDelete是判定磁盘不足的核心方法,代码如下。
/*** 磁盘不足,则直接删除文件* Commilog文件、ConsumeQueue文件所在磁盘分区使用率:* > diskSpaceWarningLevelRatio设置,默认0.9时,则设置磁盘不可写,拒绝新消息的写入* > diskSpaceCleanForciblyRatio设置,默认0.85,则执行过期文件清除,但不会拒绝新消息的写入* < diskSpaceCleanForciblyRatio时,恢复磁盘可写* 磁盘最大使用量:超出则直接删除文件* @return*/
private boolean isSpaceToDelete() {// 是否直接删除cleanImmediately = false;/*磁盘使用率处理:> diskSpaceWarningLevelRatio设置,默认0.9时,则设置磁盘不可写,拒绝新消息的写入> diskSpaceCleanForciblyRatio设置,默认0.85,则执行过期文件清除,但不会拒绝新消息的写入< diskSpaceCleanForciblyRatio时,恢复磁盘可写*/// Commilog文件所在磁盘分区使用率String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();String[] storePaths = commitLogStorePath.trim().split(MixAll.MULTI_PATH_SPLITTER);Set fullStorePath = new HashSet<>();double minPhysicRatio = 100;String minStorePath = null;for (String storePathPhysic : storePaths) {// Commitlog文件所在分区的磁盘使用率double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);if (minPhysicRatio > physicRatio) {minPhysicRatio = physicRatio;minStorePath = storePathPhysic;}// 系统参数-Drocketmq.broker.diskSpaceCleanForciblyRatio设置,默认0.85。磁盘分区使用率超过该阈值,则执行过期文件清除,但不会拒绝新消息的写入if (physicRatio > getDiskSpaceCleanForciblyRatio()) {fullStorePath.add(storePathPhysic);}}DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);// 系统参数-Drocketmq.broker.diskSpaceWarningLevelRatio设置,默认0.9。磁盘分区使用率超过该阈值,则设置磁盘不可写,拒绝新消息的写入if (minPhysicRatio > getDiskSpaceWarningLevelRatio()) {boolean diskFull = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();if (diskFull) {DefaultMessageStore.LOGGER.error("physic disk maybe full soon " + minPhysicRatio +", so mark disk full, storePathPhysic=" + minStorePath);}cleanImmediately = true;return true;} else if (minPhysicRatio > getDiskSpaceCleanForciblyRatio()) {cleanImmediately = true;return true;} else {// 磁盘使用率 < diskSpaceCleanForciblyRatio时,恢复磁盘可写boolean diskOK = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();if (!diskOK) {DefaultMessageStore.LOGGER.info("physic disk space OK " + minPhysicRatio +", so mark disk ok, storePathPhysic=" + minStorePath);}}// ConsumeQueue文件所在磁盘分区使用率String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);if (logicsRatio > getDiskSpaceWarningLevelRatio()) {boolean diskOK = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();if (diskOK) {DefaultMessageStore.LOGGER.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full");}cleanImmediately = true;return true;} else if (logicsRatio > getDiskSpaceCleanForciblyRatio()) {cleanImmediately = true;return true;} else {boolean diskOK = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();if (!diskOK) {DefaultMessageStore.LOGGER.info("logics disk space OK " + logicsRatio + ", so mark disk ok");}}/*磁盘分区当前使用量,超出时则直接删除文件*/double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;int replicasPerPartition = DefaultMessageStore.this.getMessageStoreConfig().getReplicasPerDiskPartition();// Only one commitLog in nodeif (replicasPerPartition <= 1) {if (minPhysicRatio < 0 || minPhysicRatio > ratio) {DefaultMessageStore.LOGGER.info("commitLog disk maybe full soon, so reclaim space, " + minPhysicRatio);return true;}if (logicsRatio < 0 || logicsRatio > ratio) {DefaultMessageStore.LOGGER.info("consumeQueue disk maybe full soon, so reclaim space, " + logicsRatio);return true;}return false;} else {long majorFileSize = DefaultMessageStore.this.getMajorFileSize();long partitionLogicalSize = UtilAll.getDiskPartitionTotalSpace(minStorePath) / replicasPerPartition;double logicalRatio = 1.0 * majorFileSize / partitionLogicalSize;if (logicalRatio > DefaultMessageStore.this.getMessageStoreConfig().getLogicalDiskSpaceCleanForciblyThreshold()) {// if logical ratio exceeds 0.80, then clean immediatelyDefaultMessageStore.LOGGER.info("Logical disk usage {} exceeds logical disk space clean forcibly threshold {}, forcibly: {}",logicalRatio, minPhysicRatio, cleanImmediately);cleanImmediately = true;return true;}boolean isUsageExceedsThreshold = logicalRatio > ratio;if (isUsageExceedsThreshold) {DefaultMessageStore.LOGGER.info("Logical disk usage {} exceeds clean threshold {}, forcibly: {}",logicalRatio, ratio, cleanImmediately);}return isUsageExceedsThreshold;}
}
RocketMQ文件刷盘机制与过期文件删除_fFee-ops的博客-CSDN博客_rocketmq设置过期时间
消息队列 - RocketMQ -- 过期文件的删除 - 个人文章 - SegmentFault 思否
RocketMQ5.0.0消息存储<一>_存储文件及内存映射_爱我所爱0505的博客-CSDN博客
RocketMQ5.0.0消息存储<二>_消息存储流程_爱我所爱0505的博客-CSDN博客_rocketmq 消息写入流程RocketMQ5.0.0消息存储<三>_消息转发与恢复机制_爱我所爱0505的博客-CSDN博客
RocketMQ5.0.0消息存储<四>_刷盘机制_爱我所爱0505的博客-CSDN博客
上一篇:【C语言】指针进阶
下一篇:C++——二叉树排序树