十、RocketMQ源码分析之消费队列、Index索引文件存储结构与存储机制-下篇
上篇主要是讲解 RocketMQ 运行过程中消息发送者发送一条消息,进入到 commitlog 文件,然后是如何被转发到consumequeue、index索引文件中的,本节主要剖析一下,在 RocketMQ 启动过程中,是如何根据 commitlog 重构consumeque,index的,因为毕竟 commitlog 文件中的消息与 consumequeue 中的文件内容并不能确保是一致的。
入口:DefaultMessageStore#load
/**
* @throws IOException
*/
public boolean load() {
boolean result = true;
try {
boolean lastExitOK = !this.isTempFileExist(); // @1
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load(); // @2
}
// load Commit Log
result = result && this.commitLog.load(); // @3
// load Consume Queue
result = result && this.loadConsumeQueue(); // @4
if (result) {
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); // @5
this.indexService.load(lastExitOK); // @6
this.recover(lastExitOK); // @7
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
} catch (Exception e) {
log.error("load exception", e);
result = false;
}
if (!result) {
this.allocateMappedFileService.shutdown();
}
return result;