RocketMQ5.0.0消息存储<五>_文件过期删除机制
创始人
2024-05-26 06:38:17
0

目录

一、概览

二、过期文件删除机制

三、参考资料


一、概览

        RocketMQ操作CommitLog、ConsumeQueue文件是基于内存映射机制并在启动时会加载commitlog、consumequeue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储Broker服务器上,所以需要删除己过期的文件。

        RocketMQ采用顺序写Commitlog文件、ConsumeQueue文件,所有写操作全部落在最后一个CommitLog或ConsumeQueue文件上,之前的文件在下一个文件创建后将不会再被更新。若是更新消息时(如:更新消息的延迟重试次数),采用重新写入的方式,而不是直接更新原始消息

        RocketMQ清除过期文件的方法是:如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,不会关注该文件的消息是否全部被消费。默认每个文件的过期时间为48小时,通过broker.conf配置文件中fileReservedTime参数来改变过期时间,单位为小时

二、过期文件删除机制

        Broker启动时,会创建Schedule定时任务并启动,方法org.apache.rocketmq.store.DefaultMessageStore#addScheduleTask执行文件过期删除、消息存储自检查、存储锁住、Checkpoint刷盘等定时任务。如下代码所示,文件过期删除,每10s执行一次。方法调用链如下,看出CommitLog、ConsumeQueue过期文件删除共用一套删除机制,这里介绍Commitlog文件过期删除。

/*** 日程任务,如:文件过期删除、消息存储自检查、存储锁住、Checkpoint刷盘* 总入口:DefaultMessageStore#start(),即:Broker启动时,添加这些日程*/
private void addScheduleTask() {// 文件过期删除,每10s执行一次this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {@Overridepublic void run2() {DefaultMessageStore.this.cleanFilesPeriodically();}}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);......
}// org.apache.rocketmq.store.DefaultMessageStore#cleanFilesPeriodically
// 删除CommitLog、ConsumeQueue过期文件
private void cleanFilesPeriodically() {// CommitLog文件this.cleanCommitLogService.run();// ConsumeQueue文件this.cleanConsumeQueueService.run();// 更新逻辑偏移量this.correctLogicOffsetService.run();
}

        org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#deleteExpiredFiles是Commitlog文件过期删除的核心方法,其调用链及代码如下所示。

        注意,触发删除过期文件的三个条件(满足其一即可删除):

  • 指定时间点删除文件(默认每天凌晨4点,通过brker.conf配置文件参数:deleteWhen);
  • 磁盘不足;
  • 人工删除,调用excuteDeleteFilesManualy方法手工触发过期文件删除。

/*** 删除过期Commitlog文件*/
private void deleteExpiredFiles() {// 删除文件计数int deleteCount = 0;// 文件保留时间,离最后一次修改多长时间后删除,默认48h(通过brker.conf配置文件参数:fileReservedTime)long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();// 两次删除真实物理文件的间隔时间int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();// 离第一次拒绝删除的最大保留时间(文件被其他线程所引用,拒绝删除),超出改时间后,强制删除int destroyMappedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();// 批量删除文件的最大数量int deleteFileBatchMax = DefaultMessageStore.this.getMessageStoreConfig().getDeleteFileBatchMax();/** 满足下列3条之一,则直接删除:* 1. 指定时间点删除文件(默认每天凌晨4点,通过brker.conf配置文件参数:deleteWhen)* 2. 磁盘不足* 3. 人工删除*/boolean isTimeUp = this.isTimeToDelete(); // 指定时间点删除文件boolean isUsageExceedsThreshold = this.isSpaceToDelete();  // 磁盘不足boolean isManualDelete = this.manualDeleteFileSeveralTimes > 0;  // 人工删除if (isTimeUp || isUsageExceedsThreshold || isManualDelete) {if (isManualDelete) {this.manualDeleteFileSeveralTimes--;}// 是否可直接删除文件boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;LOGGER.info("begin to delete before {} hours file. isTimeUp: {} isUsageExceedsThreshold: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {} deleteFileBatchMax: {}",fileReservedTime,isTimeUp,isUsageExceedsThreshold,manualDeleteFileSeveralTimes,cleanAtOnce,deleteFileBatchMax);fileReservedTime *= 60 * 60 * 1000;// 执行文件删除deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,destroyMappedFileIntervalForcibly, cleanAtOnce, deleteFileBatchMax);if (deleteCount > 0) {// If in the controller mode, we should notify the AutoSwitchHaService to truncateEpochFileif (DefaultMessageStore.this.brokerConfig.isEnableControllerMode()) {if (DefaultMessageStore.this.haService instanceof AutoSwitchHAService) {final long minPhyOffset = getMinPhyOffset();((AutoSwitchHAService) DefaultMessageStore.this.haService).truncateEpochFilePrefix(minPhyOffset - 1);}}} else if (isUsageExceedsThreshold) {LOGGER.warn("disk space will be full soon, but delete file failed.");}}
}

        org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#isSpaceToDelete是判定磁盘不足的核心方法,代码如下。

/*** 磁盘不足,则直接删除文件* Commilog文件、ConsumeQueue文件所在磁盘分区使用率:*      > diskSpaceWarningLevelRatio设置,默认0.9时,则设置磁盘不可写,拒绝新消息的写入*      > diskSpaceCleanForciblyRatio设置,默认0.85,则执行过期文件清除,但不会拒绝新消息的写入*      < diskSpaceCleanForciblyRatio时,恢复磁盘可写* 磁盘最大使用量:超出则直接删除文件* @return*/
private boolean isSpaceToDelete() {// 是否直接删除cleanImmediately = false;/*磁盘使用率处理:> diskSpaceWarningLevelRatio设置,默认0.9时,则设置磁盘不可写,拒绝新消息的写入> diskSpaceCleanForciblyRatio设置,默认0.85,则执行过期文件清除,但不会拒绝新消息的写入< diskSpaceCleanForciblyRatio时,恢复磁盘可写*/// Commilog文件所在磁盘分区使用率String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();String[] storePaths = commitLogStorePath.trim().split(MixAll.MULTI_PATH_SPLITTER);Set fullStorePath = new HashSet<>();double minPhysicRatio = 100;String minStorePath = null;for (String storePathPhysic : storePaths) {// Commitlog文件所在分区的磁盘使用率double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);if (minPhysicRatio > physicRatio) {minPhysicRatio = physicRatio;minStorePath = storePathPhysic;}// 系统参数-Drocketmq.broker.diskSpaceCleanForciblyRatio设置,默认0.85。磁盘分区使用率超过该阈值,则执行过期文件清除,但不会拒绝新消息的写入if (physicRatio > getDiskSpaceCleanForciblyRatio()) {fullStorePath.add(storePathPhysic);}}DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);// 系统参数-Drocketmq.broker.diskSpaceWarningLevelRatio设置,默认0.9。磁盘分区使用率超过该阈值,则设置磁盘不可写,拒绝新消息的写入if (minPhysicRatio > getDiskSpaceWarningLevelRatio()) {boolean diskFull = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();if (diskFull) {DefaultMessageStore.LOGGER.error("physic disk maybe full soon " + minPhysicRatio +", so mark disk full, storePathPhysic=" + minStorePath);}cleanImmediately = true;return true;} else if (minPhysicRatio > getDiskSpaceCleanForciblyRatio()) {cleanImmediately = true;return true;} else {// 磁盘使用率 < diskSpaceCleanForciblyRatio时,恢复磁盘可写boolean diskOK = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();if (!diskOK) {DefaultMessageStore.LOGGER.info("physic disk space OK " + minPhysicRatio +", so mark disk ok, storePathPhysic=" + minStorePath);}}// ConsumeQueue文件所在磁盘分区使用率String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);if (logicsRatio > getDiskSpaceWarningLevelRatio()) {boolean diskOK = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();if (diskOK) {DefaultMessageStore.LOGGER.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full");}cleanImmediately = true;return true;} else if (logicsRatio > getDiskSpaceCleanForciblyRatio()) {cleanImmediately = true;return true;} else {boolean diskOK = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();if (!diskOK) {DefaultMessageStore.LOGGER.info("logics disk space OK " + logicsRatio + ", so mark disk ok");}}/*磁盘分区当前使用量,超出时则直接删除文件*/double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;int replicasPerPartition = DefaultMessageStore.this.getMessageStoreConfig().getReplicasPerDiskPartition();// Only one commitLog in nodeif (replicasPerPartition <= 1) {if (minPhysicRatio < 0 || minPhysicRatio > ratio) {DefaultMessageStore.LOGGER.info("commitLog disk maybe full soon, so reclaim space, " + minPhysicRatio);return true;}if (logicsRatio < 0 || logicsRatio > ratio) {DefaultMessageStore.LOGGER.info("consumeQueue disk maybe full soon, so reclaim space, " + logicsRatio);return true;}return false;} else {long majorFileSize = DefaultMessageStore.this.getMajorFileSize();long partitionLogicalSize = UtilAll.getDiskPartitionTotalSpace(minStorePath) / replicasPerPartition;double logicalRatio = 1.0 * majorFileSize / partitionLogicalSize;if (logicalRatio > DefaultMessageStore.this.getMessageStoreConfig().getLogicalDiskSpaceCleanForciblyThreshold()) {// if logical ratio exceeds 0.80, then clean immediatelyDefaultMessageStore.LOGGER.info("Logical disk usage {} exceeds logical disk space clean forcibly threshold {}, forcibly: {}",logicalRatio, minPhysicRatio, cleanImmediately);cleanImmediately = true;return true;}boolean isUsageExceedsThreshold = logicalRatio > ratio;if (isUsageExceedsThreshold) {DefaultMessageStore.LOGGER.info("Logical disk usage {} exceeds clean threshold {}, forcibly: {}",logicalRatio, ratio, cleanImmediately);}return isUsageExceedsThreshold;}
}

三、参考资料

RocketMQ文件刷盘机制与过期文件删除_fFee-ops的博客-CSDN博客_rocketmq设置过期时间

消息队列 - RocketMQ -- 过期文件的删除 - 个人文章 - SegmentFault 思否

RocketMQ5.0.0消息存储<一>_存储文件及内存映射_爱我所爱0505的博客-CSDN博客

RocketMQ5.0.0消息存储<二>_消息存储流程_爱我所爱0505的博客-CSDN博客_rocketmq 消息写入流程RocketMQ5.0.0消息存储<三>_消息转发与恢复机制_爱我所爱0505的博客-CSDN博客 

RocketMQ5.0.0消息存储<四>_刷盘机制_爱我所爱0505的博客-CSDN博客

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
客厅放八骏马摆件可以吗(家里摆... 今天给各位分享客厅放八骏马摆件可以吗的知识,其中也会对家里摆八骏马摆件好吗进行解释,如果能碰巧解决你...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...