图文并茂!深入了解RocketMQ的过期删除机制

共 12141字,需浏览 25分钟

 ·

2022-05-11 20:29

大家好,我是Leo。

今天聊一下RocketMQ的文件过期删除机制

本章概括

1206bc7c54cecd46df5c080d965be665.webp

源码定位

Broker是RocketMQ的核心,提供了消息的接收,存储,拉取等功能

我们可以先从Broker服务入手。从源码可以得知。RocketMQ启用了一个 BrokerController 的 start 函数

public static void main(String[] args) {
    start(createBrokerController(args));
}

public static BrokerController start(BrokerController controller) {
    try {
        controller.start();

        String tip = "The broker[";
        if (null != controller.getBrokerConfig().getNamesrvAddr()) {
            // 日志拼接
        }

        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

下列是start 函数启动的异步线程,他启动了一个 messageStore

public void start() throws Exception {
        if (this.messageStore != null) {
            this.messageStore.start();
        }
    }

从 messageStore.start() 函数进入后会有一个消息存储的第三方接口。

public interface MessageStore {

    /**
     * Load previously stored messages.
     *
     * @return true if success; false otherwise.
     */

    boolean load();

    /**
     * Launch this message store.
     *
     * @throws Exception if there is any error.
     */

    void start() throws Exception;
}

继续围绕 start 函数展开实现类查找,可以看到最终由 DefaultMessageStore 实现类实现

4dc4e9bd505db30d4eaeb257abb0f5c2.webp

定位到具体问题之后,可以看到 start 调用了一个 addScheduleTask 函数

这个函数主要处理的就是清除过期日志服务。

 public void start() throws Exception {
     //刷新ConsumeQueue的服务启动
     this.flushConsumeQueueService.start();
     //CommitLog刷新的服务启动
     this.commitLog.start();
     //存储状态检测的服务启动
     this.storeStatsService.start();

     //创建临时文件,来表示是否正常关机
     this.createTempFile();
     //启动其他服务。比如清除过期日志的服务等
     this.addScheduleTask();
     this.shutdown = false;
 }

这篇文件聊的就是这个 addScheduleTask 函数。言归正传,步入正题!

流程图

b7e1728065b82e48460e8d25b1a1ef9c.webp

过期删除机制

文件过期删除

首次执行时间是60000毫秒=60秒。其余间隔执行都是每10秒执行一次删除。

// 资源回收间隔
private int cleanResourceInterval = 10000;
/**
  * {}要执行的任务
  * 1.延迟第一次执行的时间
  * 2.两次执行之间的时间 10000 资源回收间隔
  * 3.毫秒
*/

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
       @Override
       public void run() {
             DefaultMessageStore.this.cleanFilesPeriodically();
       }
// 延迟第一次执行的时间
}, 1000 * 60this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);

1812b9f117dce499b1cc341c7997731b.webp

对于删除过期的时机包括以下3种:

  1. 默认凌晨4点。这个也比较好理解,这个时候用的人也比较少,删除对系统的影响就降到最小。
  2. 磁盘空间不足。当磁盘空间不足的时候,就要删除过期文件以提供更多的空间出来接收消息。
  3. 人工触发,指人为的介入去删除。

d74b147316390db6efeb3a2c893073c9.webp

删除的文件是过期文件,那哪些文件是过期的呢?

首先是保留时间,默认72小时,也就是3天,超过3天的数据,是需要删除的。

deleteExpiredFiles 是用于删除过期文件。执行步骤如下:

  1. 首先是需要判断是否需要删除文件,通过两个方法的调用isTimeToDeleteisSpaceToDelete判断是否达到定时删除时间以及是否磁盘已满需要删除,以及判断属性DefaultMessageStore.CleanCommitLogService#manualDeleteFileSeveralTimes是否大于0意味着需要手动删除。如果这三个条件任意为真,意味着需要执行删除,那就继续后续的流程。否则结束当前方法。
  2. 如果是手动删除,则属性DefaultMessageStore.CleanCommitLogService#manualDeleteFileSeveralTimes减1.
  3. 如果属性MessageStoreConfig#cleanFileForciblyEnableDefaultMessageStore.CleanCommitLogService#cleanImmediately为真,声明cleanAtOnece为true,否则为false。
  4. 调用方法 CommitLog#deleteExpiredFile 进行文件删除。方法需要4个入参,分别是:
    1. expiredTime:过期时间或者说文件删除前的保留时间,默认为72小时。
    2. deleteFilesInterval:文件删除间隔,这里取值为100.
    3. intervalForcibly:该参数用于强制文件强制释放时间间隔,单位是毫秒。这里取值为120*1000,
    4. cleanImmediately:是否立即执行删除,这边使用的就是步骤3中的数据。
/**
 * 删除已经失效的
 */

private void deleteExpiredFiles() {
    int deleteCount = 0;
    // 文件保留时长 72
    long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
    // 100
    int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
    // 1000 * 120 = 120000毫秒 = 120秒
    int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
    // 判断有没到凌晨4点
    boolean timeup = this.isTimeToDelete();
    // 空间是否上限
    boolean spacefull = this.isSpaceToDelete();
    // 手动删除  经过20次的调度
    boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;

    if (timeup || spacefull || manualDelete) {
        if (manualDelete)
            this.manualDeleteFileSeveralTimes--;

        boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;

        log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", fileReservedTime, timeup, spacefull, manualDeleteFileSeveralTimes, cleanAtOnce);

        fileReservedTime *= 60 * 60 * 1000;

        deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, destroyMapedFileIntervalForcibly, cleanAtOnce);
        if (deleteCount > 0) {} else if (spacefull) {
            // 删除文件失败
            log.warn("disk space will be full soon, but delete file failed.");
        }
    }
}

如果这个文件被其他线程引用了,此时就不会进行删除,记录第一次删除的时间戳,退出本次任务,等120s后,就会把文件的引用减1000,再强制删除。

在删除的过程中,会存在删除多个文件的情况,每个文件之间,还有一个时间间隔,比如第一个文件删除完后,需要等100ms再删除第二个文件。

120s可以通过 destroyMapedFileIntervalForcibly 得知

100ms可以通过 deletePhysicFilesInterval 得知

02dfb4b3789501e0f591a4e14146649f.webp

如果当前删除的文件数量,已经超过了可以删除的最大批量数,则退出本次任务。可以通过上述代码中的 spacefull 得出

/**
 * 根据时间删除过期文件
 * @param expiredTime 保留时长  一般是 72
 * @param deleteFilesInterval 删除间隔 100
 * @param intervalForcibly 120秒 延迟
 * @param cleanImmediately 是否强制启用
 * @return
 */

public int deleteExpiredFileByTime(final long expiredTime, final int deleteFilesInterval, final long intervalForcibly, final boolean cleanImmediately) {
    //获取映射文件列表 commitlog文件可能随时有写入,copy一份不影响写入
    Object[] mfs = this.copyMappedFiles(0);
    //如果映射文件列表为空直接返回
    if (null == mfs)
        return 0;

    int mfsLength = mfs.length - 1;
    int deleteCount = 0;
    // 存放要删除的MappedFile
    List < MappedFile > files = new ArrayList < MappedFile > ();
    if (null != mfs) {
        //对映射文件进行遍历
        for (int i = 0; i < mfsLength; i++) {
            MappedFile mappedFile = (MappedFile) mfs[i];
            //文件最后的修改时间 + 过期时间 = 文件最终能够存活的时间
            long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
            // 如果文件最新修改已经超过三天或者是磁盘空间达到85%以上  而要在此之前需要满足3个条件之一,时间,容量,和手动触发
            if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
                //删除文件,就是解除对文件的引用
                if (mappedFile.destroy(intervalForcibly)) {
                    //要删除的的文件加入到要删除的集合中
                    files.add(mappedFile);
                    //增加计数
                    deleteCount++;

                    if (files.size() >= DELETE_FILES_BATCH_MAX) {
                        break;
                    }

                    //如果删除时间间隔大于0,并且没有循环玩,则睡眠指定的删除间隔时长后在杀出
                    if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                        try {
                            Thread.sleep(deleteFilesInterval);
                        } catch (InterruptedException e) {}
                    }
                } else break;
            } else {
                // 避免在中间删除文件
                break;
            }
        }
    }
    //从文件映射队列中删除对应的文件映射
    deleteExpiredFile(files);
    //返回删除的文件个数
    return deleteCount;
}

由 timeup 变量我们可以引申出 isTimeToDelete函数

RocketMQ会配置执行删除工作的时间,默认是早上四点。如果当前时间在04:00~04:59之间,就返回true。

/**
 * 判断时间是否到 凌晨4点
 * @return
 */

private boolean isTimeToDelete() {
    // 04
    String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
    if (UtilAll.isItTimeToDo(when)) {
        DefaultMessageStore.log.info("it's time to reclaim disk space, " + when);
        return true;
    }
    return false;
}

由 spacefull 变量我们可以引申出 isSpaceToDelete函数

判断磁盘空间是否满足删除的条件,判断要求如下:

  1. 使用提交日志的路径,检查其所在的磁盘空间的使用率。默认情况下,使用率超过90%,设置磁盘不可用标志位,并且设置属性DefaultMessageStore.CleanCommitLogService#cleanImmediately为true。使用率超过85%,设置属性DefaultMessageStore.CleanCommitLogService#cleanImmediately为true。其他情况,设置运行状态位为磁盘可用。
  2. 磁盘使用率小于0或者大于属性MessageStoreConfig#diskMaxUsedSpaceRatio的要求,默认是75%,则返回true给调用。
  3. 针对消费队列的文件路径,上述步骤重复一次。
  4. 如果步骤1~3都没有返回true,则返回false给调用者。意味着此时磁盘空间有剩余,不要求删除。
/**
 * 空间是否上限
 * @return
 */

private boolean isSpaceToDelete() {
    double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;

    cleanImmediately = false;

    {
        String commitLogStorePath = DefaultMessageStore.this.getStorePathPhysic();
        String[] storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
        Set < String > fullStorePath = new HashSet < > ();
        double minPhysicRatio = 100;
        String minStorePath = null;
        for (String storePathPhysic: storePaths) {
            double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
            if (minPhysicRatio > physicRatio) {
                minPhysicRatio = physicRatio;
                minStorePath = storePathPhysic;
            }
            if (physicRatio > diskSpaceCleanForciblyRatio) {
                fullStorePath.add(storePathPhysic);
            }
        }
        DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
        if (minPhysicRatio > diskSpaceWarningLevelRatio) {
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
            if (diskok) {
                DefaultMessageStore.log.error("physic disk maybe full soon " + minPhysicRatio +
                    ", so mark disk full, storePathPhysic=" + minStorePath);
            }

            cleanImmediately = true;
        } else if (minPhysicRatio > diskSpaceCleanForciblyRatio) {
            cleanImmediately = true;
        } else {
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
            if (!diskok) {
                DefaultMessageStore.log.info("physic disk space OK " + minPhysicRatio + ", so mark disk ok, storePathPhysic=" + minStorePath);
            }
        }

        if (minPhysicRatio < 0 || minPhysicRatio > ratio) {
            DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + minPhysicRatio + ", storePathPhysic=" + minStorePath);
            return true;
        }
    }

    {
        String storePathLogics = DefaultMessageStore.this.getStorePathLogic();
        double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
        if (logicsRatio > diskSpaceWarningLevelRatio) {
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
            if (diskok) {
                DefaultMessageStore.log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full");
            }

            cleanImmediately = true;
        } else if (logicsRatio > diskSpaceCleanForciblyRatio) {
            cleanImmediately = true;
        } else {
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
            if (!diskok) {
                DefaultMessageStore.log.info("logics disk space OK " + logicsRatio + ", so mark disk ok");
            }
        }

        if (logicsRatio < 0 || logicsRatio > ratio) {
            DefaultMessageStore.log.info("logics disk maybe full soon, so reclaim space, " + logicsRatio);
            return true;
        }
    }

    return false;
}

消费队列过期删除

CLeanConsumeQueueServicerun方法就是直接委托这个方法来实现。这个方法的作用就是删除无效的消费队列条目内容或者文件本身。其代码逻辑如下:

  1. 通过方法CommitLog#getMinOffset获取提交日志最小的偏移量,声明为minOffset。
  2. 如果minOffset大于类属性lastPhysicalMinOffset,那么意味着当前提交日志的最小偏移量对比上一次查询的值发生了变化,也就是说必然是最少一个提交日志文件被删除,那么相应的在消费队列中的过期数据也可以被删除,就执行后面的流程。反之,则意味着不需要执行任何操作,结束方法即可。
  3. minOffset赋值给lastPhysicalMinOffset
  4. 对属性consumeQueueTable进行遍历,遍历其中每一个ConsumeQueue对象。使用本次的minOffset作为入参,调用方法ConsumeQueue#deleteExpiredFile删除过期的消费队列文件以及更新消费队列的最小偏移量。如果有删除到文件,则休眠MessageStoreConfig#deleteConsumeQueueFilesInterval配置的时间,继续对下一个消费队列执行删除。
  5. 当循环执行完毕,使用参数minOffset作为入参,调用方法IndexService#deleteExpiredFile(long)来删除索引文件中已经完全无效的索引文件。
public void run() {
    try {
        this.deleteExpiredFiles();
    } catch (Throwable e) {
        DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
    }
}

private void deleteExpiredFiles() {
    // 0.1秒
    int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
    // 得到commitlog中第一个文件的起始物理offset
    long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    if (minOffset > this.lastPhysicalMinOffset) {
        // 发现上次的已经变小了   说明commitlog已经发生过删除操作了
        this.lastPhysicalMinOffset = minOffset;

        ConcurrentMap < String, ConcurrentMap < Integer, ConsumeQueue >> tables = DefaultMessageStore.this.consumeQueueTable;

        for (ConcurrentMap < Integer, ConsumeQueue > maps: tables.values()) {
            for (ConsumeQueue logic: maps.values()) {
                // 对某一个消费队列做删除  参数是commitlog最小的物理点位
                int deleteCount = logic.deleteExpiredFile(minOffset);

                if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
                    try {
                        // 当上一个ConsumeQueue成功删除之后,下一个ConsumeQueue删除需要等待0.1s
                        Thread.sleep(deleteLogicsFilesInterval);
                    } catch (InterruptedException ignored) {

                    }
                }
            }
        }
        // 删除索引文件
        DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
    }
}

索引文件删除

索引文件的删除是在消费队列删除完成后,调用方法 deleteExpiredFile 完成的。

该方法是用于删除索引文件中的无效文件。执行流程如下:

  1. 首先需要确认,索引文件中是否存在无效文件。获取第一个索引文件,获取其endPhyOffset属性,判断该属性的值是否小于入参的offset。如果是的话,至少意味着有一个文件是无效的,则执行后续流程。否则没有无效文件,则直接结束整个方法。
  2. 声明一个局部变量fileList,遍历索引文件IndexFile对象,如果其endPhyOffset小于入参的offset,说明该文件是无效的,添加到fileList中。
  3. 使用第二步的fileList作为入参,调用方法IndexService#deleteExpiredFile(List)。该方法内部调用了IndexFile#destory方法,内部也是委托了MappedFile#destory方法实现的文件销毁。并且删除成功的IndexFile还会从属性indexFileList列表中删除对应的对象。
/**
 * 删除索引文件
 * @param offset
 */

public void deleteExpiredFile(long offset) {
    Object[] files = null;
    try {
        this.readWriteLock.readLock().lock();
        if (this.indexFileList.isEmpty()) {
            return;
        }

        long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();
        if (endPhyOffset < offset) {
            files = this.indexFileList.toArray();
        }
    } catch (Exception e) {
        log.error("destroy exception", e);
    } finally {
        this.readWriteLock.readLock().unlock();
    }

    if (files != null) {
        List < IndexFile > fileList = new ArrayList < IndexFile > ();
        for (int i = 0; i < (files.length - 1); i++) {
            IndexFile f = (IndexFile) files[i];
            if (f.getEndPhyOffset() < offset) {
                fileList.add(f);
            } else {
                break;
            }
        }

        this.deleteExpiredFile(fileList);
    }
}

文件恢复机制

从源码定位中,我们可以看到执行 ./mqbroker 命令后,会启动main函数的 createBrokerController函数。

在函数中调用了一个 initialize 初始化 ,我们在初始化函数中找到了 this.messageStore.load

public static void main(String[] args) {
    start(createBrokerController(args));
}
public static BrokerController createBrokerController(String[] args) {
    boolean initResult = controller.initialize();
    if (!initResult) {
        controller.shutdown();
        System.exit(-3);
    }
    return controller;
catch (Throwable e) {
    e.printStackTrace();
    System.exit(-1);
}
public boolean initialize() throws CloneNotSupportedException {
    result = result && this.messageStore.load();

    if (result) {

    }
    return result;
}

这里的 load 和上面的代码一样,都是接口实现类。统一由 DefaultMessageStore 实现。

所以文件恢复函数 recover 从 Broker启动之后,就会随之启动。启动之后

  1. 检查当前文件是否损坏(异常关闭)或者存不存在 (检查依据已在下列代码的尾部贴出)
  2. 加载Commit Log 和 Consume Queue文件。加载成功之后进行 recover 文件恢复
/**
 * 检查abort文件是不是存在,如果存在表示上次是异常关闭,这个文件是一个空文件,在启动之后会创建,正常关闭的情况会删除掉。
 * 加载延迟消息相关的配置,加载 Commit Log文件,加载Consume Queue文件
 * 如果步骤2成功加载,则加载checkpoint文件,加载indexFile然后进行文件的恢复逻辑
 * 对于文件的恢复逻辑在recover方法中,会调用CommitLog类中的方法
 * @throws IOException
 */

public boolean load() {
    boolean result = true;

    try {
        //是否存在abort文件,如果存在说明上次服务关闭时异常关闭的
        boolean lastExitOK = !this.isTempFileExist();
        log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");

        // 加载 Commit Log文件
        result = result && this.commitLog.load();

        // 加载 Consume Queue文件
        result = result && this.loadConsumeQueue();

        //检查前面3个文件是不是加载成功
        if (result) {
            //加载成功则继续加载checkpoint文件
            this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
            //加载indexFile
            this.indexService.load(lastExitOK);
            //进行文件的恢复逻辑
            this.recover(lastExitOK);

            log.info("load over, and the max phy offset = {}"this.getMaxPhyOffset());

            if (null != scheduleMessageService) {
                result = this.scheduleMessageService.load();
            }
        }

    } catch (Exception e) {
        log.error("load exception", e);
        result = false;
    }

    if (!result) {
        this.allocateMappedFileService.shutdown();
    }

    return result;
}

// 检查依据是从这个路径中
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";

recover 函数的实现逻辑

从 ConsumeQueue文件的集合中取出,从倒数第三个文件开始,逐条遍历消息,如果取出的物理点位大于0并且message的size大于0,说明数据有效。

恢复commitlog分正常退出和非正常退出。

正常退出的commitlog所有数据都是flush完成的,所以只要从倒数第三个文件开始恢复即可,遍历每一个message,并校验其CRC。

非正常退出则从最后一个文件开始恢复,一般出现问题的都是最后一个文件,然后获取文件中的第一个message,其存储时间是否小于checkpoint时间点中的最小的一个,如果是,表示其就是需要恢复的起始文件。然后检验每一个message的CRC,并将通过校验的数据dispatch到consumelog和index文件中。

/**
 * 进行文件的恢复逻辑
 * @param lastExitOK
 */

private void recover(final boolean lastExitOK) {
    long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();

    //上次服务关闭是不是正常关闭
    if (lastExitOK) {
        //正常情况关闭
        this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
    } else {
        //异常情况关闭
        this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
    }

    //恢复topic消费相关相关的缓存
    this.recoverTopicQueueTable();
}
/**
 * 计算恢复ConsumeQueue文件集合的下标 
 */

private long recoverConsumeQueue() {
    long maxPhysicOffset = -1;
    for (ConcurrentMap < Integer, ConsumeQueue > maps: this.consumeQueueTable.values()) {
        for (ConsumeQueue logic: maps.values()) {
            logic.recover();
            if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
                maxPhysicOffset = logic.getMaxPhysicOffset();
            }
        }
    }
    return maxPhysicOffset;
}
/**
 * 恢复topic消费相关相关的缓存
 */

public void recoverTopicQueueTable() {
    /* topic-queueid */
    /* offset */
    HashMap < String, Long > table = new HashMap < String, Long > (1024);
    long minPhyOffset = this.commitLog.getMinOffset();
    for (ConcurrentMap < Integer, ConsumeQueue > maps: this.consumeQueueTable.values()) {
        for (ConsumeQueue logic: maps.values()) {
            String key = logic.getTopic() + "-" + logic.getQueueId();
            table.put(key, logic.getMaxOffsetInQueue());
            logic.correctMinOffset(minPhyOffset);
        }
    }

    this.commitLog.setTopicQueueTable(table);
}
/**
 * 当正常退出、数据恢复时,所有内存数据均已刷新
 * 服务正常恢复 加载的映射文件列表进行遍历,对文件进行校验,和文件中的消息的魔数进行校验,来判断哪些数据是正常的,
 * 并计算出正常的数据的最大偏移量。然后,根据偏移量设置对应的提交和刷新的位置以及不正常数据的删除。
 */

public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
    boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
    final List < MappedFile > mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
        // Began to recover from the last third file
        //如果文件列表大于3就从倒数第3个开始,否则从第一个开始
        int index = mappedFiles.size() - 3;
        if (index < 0)
            index = 0;

        MappedFile mappedFile = mappedFiles.get(index);
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        long processOffset = mappedFile.getFileFromOffset();
        long mappedFileOffset = 0;
        while (true) {
            //校验消息,然后返回转发请求,根据Magic_code正确,并且crc32正确,并且消息的msgSize记录大小和消息整体大小相等。则表示是合格的消息
            DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
            int size = dispatchRequest.getMsgSize();
            // Normal data
            // 是一个合格的消息并且消息体大于0
            if (dispatchRequest.isSuccess() && size > 0) {
                // 则读取的偏移量mapedFileOffset累加msgSize
                mappedFileOffset += size;
            }
            // Come the end of the file, switch to the next file Since the return 0 representatives met last hole, this can not be included in truncate offset
            // 是合格的消息,但是消息体为0,表示读取到了文件的最后一块信息
            else if (dispatchRequest.isSuccess() && size == 0) {
                index++;
                // 文件读完了
                if (index >= mappedFiles.size()) {
                    // Current branch can not happen
                    log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
                    break;
                } else {
                    mappedFile = mappedFiles.get(index);
                    byteBuffer = mappedFile.sliceByteBuffer();
                    processOffset = mappedFile.getFileFromOffset();
                    mappedFileOffset = 0;
                    log.info("recover next physics file, " + mappedFile.getFileName());
                }
            }
            // Intermediate file read error
            else if (!dispatchRequest.isSuccess()) {
                log.info("recover physics file end, " + mappedFile.getFileName());
                break;
            }
        }
        // 最后读取的MapedFile对象的fileFromOffset加上最后读取的位置mapedFileOffset值
        processOffset += mappedFileOffset;
        // 设置文件刷新到的offset
        this.mappedFileQueue.setFlushedWhere(processOffset);
        // 设置文件提交到的offset
        this.mappedFileQueue.setCommittedWhere(processOffset);
        // 删除offset之后的脏数据文件
        this.mappedFileQueue.truncateDirtyFiles(processOffset);

        // Clear ConsumeQueue redundant data
        // 清除ConsumeQueue冗余数据
        if (maxPhyOffsetOfConsumeQueue >= processOffset) {
            log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }
    } else {
        // Commitlog case files are deleted  案例文件被删除
        log.warn("The commitlog files are deleted, and delete the consume queue files");
        this.mappedFileQueue.setFlushedWhere(0);
        this.mappedFileQueue.setCommittedWhere(0);
        this.defaultMessageStore.destroyLogics();
    }
}

往期推荐

2022年文章目录整理

RocketMQ性能提升

RocketMQ刷盘机制

结尾

本篇文件介绍的就是RocketMQ的过期删除机制,与恢复机制。

文件过期删除机制 触发主要有三点

  1. 默认凌晨4点。这个也比较好理解,这个时候用的人也比较少,删除对系统的影响就降到最小。
  2. 磁盘空间不足。当磁盘空间不足的时候,就要删除过期文件以提供更多的空间出来接收消息。
  3. 人工触发,指人为的介入去删除。

由上述三种情况展开聊了一些文件过大,被占用,文件损坏的一些安全性处理。

恢复机制 没有硬性条件,主要有以下2点

  1. 检查当前文件是否损坏(异常关闭)或者存不存在
  2. 加载Commit Log 和 Consume Queue文件。加载成功之后才执行

消息队列过期删除

取出commitlog中第一个文件的起始物理offset位置,与末次最小物理坐标offset做对比。如果发现上次的下标已经变小了,说明commitlog已经发生过删除操作了

索引过期删除

执行完消息队列的过期删除,根据坐标直接删掉对应的索引

非常欢迎大家加我个人微信有关后端方面的问题我们在群内一起讨论! 我们下期再见!

欢迎『点赞』、『在看』、『转发』三连支持一下,下次见~


浏览 51
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报