RocketMQ消息存儲(chǔ)文件的加載與恢復(fù)機(jī)制源碼分析
前言
前面文章我們介紹了Broker是如何將消息全量存儲(chǔ)到CommitLog文件中,并異步生成dispatchRequest任務(wù)更新ConsumeQueue,IndexFile的過程以及ConsumeQueue和IndexFile的文件結(jié)構(gòu)。由于是異步轉(zhuǎn)發(fā)消息,就可能出現(xiàn)消息成功存儲(chǔ)到CommitLog文件,轉(zhuǎn)發(fā)請(qǐng)求任務(wù)執(zhí)行失敗,Broker宕機(jī)了,此時(shí)CommitLog和Index消息并未處理完,導(dǎo)致CommitLog與ConsumeQueue和IndexFile文件中的數(shù)據(jù)不一致。如果由一部分消息在CommitLog中存在,在ConsumeQueue中不存在,那么這部分消息Consumer將永遠(yuǎn)無法消費(fèi)到了,那么Broker是如何保證數(shù)據(jù)一致性的呢?
StoreCheckPoint介紹
StoreCheckPoint的作用是記錄CommitLog,ConsumeQueue和IndexFile的刷盤點(diǎn),當(dāng)Broker異常結(jié)束時(shí)會(huì)根據(jù)StoreCheckPoint的數(shù)據(jù)恢復(fù),StoreCheckPoint屬性如下
public class StoreCheckpoint { // commitLog最后一條信息的刷盤時(shí)間戳 private volatile long physicMsgTimestamp = 0; // consumeQueue最后一個(gè)存儲(chǔ)單元刷盤時(shí)間戳 private volatile long logicsMsgTimestamp = 0; // 最近一個(gè)已經(jīng)寫完IndexFile的最后一條記錄刷盤時(shí)間戳 private volatile long indexMsgTimestamp = 0; }
StoreCheckPoint文件的存儲(chǔ)位置是${user.home}/store/checkpoint
,文件的固定長(zhǎng)度為4K,但StoreCheckPoint只占用了前24個(gè)字節(jié),存儲(chǔ)格式如下圖所示
StoreCheckPoint時(shí)間戳更新時(shí)機(jī)
physicMsgTimestamp
FlushRealTimeService刷盤時(shí)更新
// org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run public void run() { // ... // 更新CommitLog刷盤時(shí)間戳 if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } }
GroupCommitService刷盤時(shí)更新
// org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit private void doCommit() { // ... // 更新CommitLog刷盤時(shí)間戳 if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } }
logicsMsgTimestamp
ConsumeQueue保存消息存儲(chǔ)單元時(shí)更新
// org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfoWrapper public void putMessagePositionInfoWrapper(DispatchRequest request, boolean multiQueue) { // ... // 如果consumeQueue保存成功,則更新ConsumeQueue存儲(chǔ)點(diǎn)信息 if (result) { this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp()); } }
ConsumeQueue刷盤時(shí)更新并觸發(fā)StoreCheckPoint刷盤
// org.apache.rocketmq.store.DefaultMessageStore.FlushConsumeQueueService#doFlush private void doFlush(int retryTimes) { // ... // 更新ConsumeQueue存儲(chǔ)時(shí)間戳,并刷盤 if (0 == flushConsumeQueueLeastPages) { if (logicsMsgTimestamp > 0) { DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp); } // 更新存儲(chǔ)點(diǎn) DefaultMessageStore.this.getStoreCheckpoint().flush(); } }
indexMsgTimestamp
// org.apache.rocketmq.store.index.IndexService#getAndCreateLastIndexFile public IndexFile getAndCreateLastIndexFile() { // 獲取最新IndexFile,如果IndexFile已經(jīng)滿了,需要?jiǎng)?chuàng)建一個(gè)新的IndexFile if (indexFile == null) { indexFile = new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset, lastUpdateIndexTimestamp); // 如果創(chuàng)建新的IndexFile成功,原IndexFile刷盤 if (indexFile != null) { final IndexFile flushThisFile = prevIndexFile; Thread flushThread = new Thread(new Runnable() { @Override public void run() { // indexFile刷盤 IndexService.this.flush(flushThisFile); } }, "FlushIndexFileThread"); flushThread.setDaemon(true); flushThread.start(); } } return indexFile; } // org.apache.rocketmq.store.index.IndexService#flush public void flush(final IndexFile f) { if (null == f) return; long indexMsgTimestamp = 0; if (f.isWriteFull()) { indexMsgTimestamp = f.getEndTimestamp(); } f.flush(); if (indexMsgTimestamp > 0) { // 更新checkPoint的indexMsgTimestamp并觸發(fā)刷盤 this.defaultMessageStore.getStoreCheckpoint().setIndexMsgTimestamp(indexMsgTimestamp); this.defaultMessageStore.getStoreCheckpoint().flush(); } }
- 保存消息Index,獲取最新的IndexFile如果滿了,則會(huì)創(chuàng)建一個(gè)新的IndexFile,并且更新IndexMsgTimestamp并觸發(fā)StoreCheckPoint刷盤
StoreCheckPoint刷盤源碼
StoreCheckPoint刷盤源碼如下所示,就是將CommitLog,ConsumeQueue和IndexFile刷盤時(shí)間戳持久化到硬盤上,由上面源碼可知它的刷盤觸發(fā)時(shí)機(jī)
- ConsumeQueue刷盤時(shí)觸發(fā)
- 創(chuàng)建新IndexFile文件時(shí)觸發(fā)
StoreCheckPoint刷盤源碼如下
// org.apache.rocketmq.store.StoreCheckpoint#flush public void flush() { this.mappedByteBuffer.putLong(0, this.physicMsgTimestamp); this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp); this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp); this.mappedByteBuffer.force(); }
消息加載源碼分析
在BrokerController啟動(dòng)時(shí)會(huì)調(diào)用DefaultMessageStore#load
加載存儲(chǔ)文件加載和恢復(fù)過程主要分為下面幾步
- 判斷Broker上次是否正常退出。這個(gè)判斷邏輯是根據(jù)
${user.home}/store/abort
是否存在。如果文件存在,說明上次是異常退出,如果文件不存在,則說明是正常退出。 - 加載CommitLog
- 加載ConsumeQueue
- 加載StoreCheckPoint
- 加載IndexFile
- 恢復(fù)ConsumeQueue與IndexFile
- 加載延遲隊(duì)列服務(wù)
// org.apache.rocketmq.store.DefaultMessageStore#load public boolean load() { boolean result = true; try { // 1. Broker上次是否正常退出 boolean lastExitOK = !this.isTempFileExist(); log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally"); // 2. 加載commitLog result = result && this.commitLog.load(); // 3. 加載consumeQueue result = result && this.loadConsumeQueue(); if (result) { // 4. 加載StoreCheckPoint this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); // 5. 加載IndexFile this.indexService.load(lastExitOK); // 6. 恢復(fù)ConsumeQueue與IndexFile this.recover(lastExitOK); // 7. 延遲隊(duì)列服務(wù)加載 if (null != scheduleMessageService) { result = this.scheduleMessageService.load(); } } } return result; }
CommitLog加載
前面文章介紹過,CommitLog文件的存儲(chǔ)目錄是${user.home}/store/commitlog/
,并且CommitLog文件的底層是MappedFile,由MappedFileQueue管理。
CommitLog文件的加載其實(shí)調(diào)用的是MappedFileQueue#load
方法,代碼如下所示,load()中首先加載CommitLog文件目錄下的所有文件,并調(diào)用doLoad()方法加載CommitLog。
// org.apache.rocketmq.store.MappedFileQueue#load public boolean load() { File dir = new File(this.storePath/*${user.home}/store/commitlog/*/); File[] ls = dir.listFiles(); if (ls != null) { return doLoad(Arrays.asList(ls)); } return true; }
MappedFile的加載過程如下所示,核心邏輯主要分為下面三步
- 按照文件名稱將文件排序,排序好的文件就會(huì)按照消息保存的先后順序存放在列表中
- 校驗(yàn)文件大小與mappedFile是否一致,如果commitLog文件大小與mappedFileSize不一致,則說明配置被改了,或者CommitLog文件被修改
- 創(chuàng)建mappedFile,并且設(shè)置wrotePosition,flushedPosition,committedPosition為mappedFileSize
public boolean doLoad(List<File> files) { // 按照文件名稱排序 files.sort(Comparator.comparing(File::getName)); for (File file : files) { // 如果commitLog文件大小與mappedFileSize不一致,則說明配置被改了,或者CommitLog文件被修改 if (file.length() != this.mappedFileSize) { return false; } try { // 創(chuàng)建MappedFile MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); mappedFile.setWrotePosition(this.mappedFileSize); mappedFile.setFlushedPosition(this.mappedFileSize); mappedFile.setCommittedPosition(this.mappedFileSize); this.mappedFiles.add(mappedFile); } } return true; }
看到這里肯定會(huì)有疑問,加載后的MappedFile的wrotePosition,flushedPosition和committedPosition的值都為mappedFileSize,如果最后一個(gè)MappedFile沒有使用完,Broker啟動(dòng)后還會(huì)從最后一個(gè)MappedFile開始寫么?我們可以在后面消息文件恢復(fù)源碼分析找到答案。
ConsumeQueue加載
從前面文章我們知道,ConsumeQueue文件底層其實(shí)也是MappedFile,因此ConsumeQueue文件的加載與CommitLog加載差別不大。ConsumeQueue加載邏輯為
- 獲取ConsumeQueue目錄下存儲(chǔ)的所有Topic目錄,遍歷Topic目錄
- 遍歷每個(gè)Topic目錄下的所有queueId目錄,逐個(gè)加載ququeId中的所有MappedFile
// org.apache.rocketmq.store.DefaultMessageStore#loadConsumeQueue private boolean loadConsumeQueue() { // 獲取consumeQueue目錄 File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()/*${user.home}/store */)); // topic文件夾數(shù)組 File[] fileTopicList = dirLogic.listFiles(); if (fileTopicList != null) { // 遍歷topic for (File fileTopic : fileTopicList) { // 獲取topic名稱 String topic = fileTopic.getName(); // 獲取queueId文件夾數(shù)組 File[] fileQueueIdList = fileTopic.listFiles(); // 遍歷queueId if (fileQueueIdList != null) { for (File fileQueueId : fileQueueIdList) { int queueId; // 文件夾名稱就是queueId queueId = Integer.parseInt(fileQueueId.getName()); // 構(gòu)建consumeQueue ConsumeQueue logic = new ConsumeQueue(/* ... */); this.putConsumeQueue(topic, queueId, logic); // ConsumeQueue加載 if (!logic.load()) { return false; } } } } } return true; }
IndexFile加載
IndexFile文件加載過程調(diào)用的是IndexService#load
,首先獲取${user.home}/store/index
目錄下的所有文件,遍歷所有文件,如果IndexFile最后存儲(chǔ)時(shí)間大于StoreCheckPoint中indexMsgTimestamp,則會(huì)先刪除IndexFile
// org.apache.rocketmq.store.index.IndexService#load public boolean load(final boolean lastExitOK) { // indexFile文件目錄 File dir = new File(this.storePath); // indexFile文件列表 File[] files = dir.listFiles(); if (files != null) { // 文件排序 Arrays.sort(files); for (File file : files) { try { IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0); f.load(); if (!lastExitOK) { // 文件最后存儲(chǔ)時(shí)間戳大于刷盤點(diǎn),則摧毀indexFile,重建 if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()/*存儲(chǔ)點(diǎn)時(shí)間*/ .getIndexMsgTimestamp()) { f.destroy(0); continue; } } this.indexFileList.add(f); } } } return true; }
ConsumeQueue與IndexFile恢復(fù)
如果是正常退出,數(shù)據(jù)都已經(jīng)正常刷盤,前面我們說到CommitLog在加載時(shí)的wrotePosition,flushedPosition,committedPosition都設(shè)置為mappedFileSize,
因此即使是正常退出,也會(huì)調(diào)用CommitLog#recoverNormally
找到最后一條消息的位置,更新這三個(gè)屬性。
// org.apache.rocketmq.store.DefaultMessageStore#recover private void recover(final boolean lastExitOK) { // consumeQueue中最大物理偏移量 long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue(); if (lastExitOK) { // 正常退出文件恢復(fù) this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue); } else { // 異常退出文件恢復(fù) this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue); } // 恢復(fù)topicQueueTable this.recoverTopicQueueTable(); }
正常恢復(fù)的源碼如下,由于Broker是正常關(guān)閉,因此CommitLog,ConsumeQueue與IndexFile都已經(jīng)正確刷盤,并且三者的消息是一致的。正常恢復(fù)的主要目的是找到找到最后一條消息的偏移量,然后更新CommitLog的MappedFileQueue中的刷盤點(diǎn)(flushWhere)和提交點(diǎn)(committedWhere),
- 從最后3個(gè)mappedFile開始恢復(fù),如果mappedFile總數(shù)不足3個(gè),則從第0個(gè)mappedFile開始恢復(fù)
- 逐個(gè)遍歷mappedFile,找到每個(gè)MappedFile的最后一條消息的偏移量,并將其更新到CommitLog中MappedFileQueue的刷盤點(diǎn)和提交點(diǎn)中
- 清除ConsumeQueue冗余數(shù)據(jù)
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) { // 確認(rèn)消息是否完整,默認(rèn)是true boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { // 默認(rèn)從最后3個(gè)mappedFile開始恢復(fù) int index = mappedFiles.size() - 3; // 如果commitLog不足三個(gè),則從第一個(gè)文件開始恢復(fù) if (index < 0) index = 0; MappedFile mappedFile = mappedFiles.get(index); ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); // 最后一個(gè)MappedFile的文件起始偏移量 long processOffset = mappedFile.getFileFromOffset(); // mappedFileOffset偏移量 long mappedFileOffset = 0; // 遍歷CommitLog文件 while (true) { // 校驗(yàn)消息完整性 DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); // 獲取消息size int size = dispatchRequest.getMsgSize(); // 返回結(jié)果為true并且消息size>0,說明消息是完整的 if (dispatchRequest.isSuccess() && size > 0) { mappedFileOffset += size; } } // 最大物理偏移量 processOffset += mappedFileOffset; // 更新flushedWhere和committedPosition指針 this.mappedFileQueue.setFlushedWhere(processOffset); this.mappedFileQueue.setCommittedWhere(processOffset); this.mappedFileQueue.truncateDirtyFiles(processOffset); // 清除ConsumeQueue冗余數(shù)據(jù) if (maxPhyOffsetOfConsumeQueue >= processOffset) { this.defaultMessageStore.truncateDirtyLogicFiles(processOffset/*CommitLog最大物理偏移量*/); } } }
異?;謴?fù)源碼如下,由于上次Broker沒有正常關(guān)閉,因此由可能存在CommitLog、ConsumeQueue與IndexFile不一致的情況,因此在異?;謴?fù)時(shí)可能需要恢復(fù)ConsumeQueue和IndexFile,異常恢復(fù)核心邏輯主要包括
- 倒序查CommitLog的mappedFile文件,找到第一條消息存儲(chǔ)的時(shí)間戳比StoreCheckPoint里的physicMsgTimestamp,logicsMsgTimestamp和indexMsgTimestamp三者都小的最大MappedFile,該mappedFile至少有一部分消息是被正常轉(zhuǎn)發(fā),正常存儲(chǔ),正常刷盤的
- 從該mappedFile開始逐條轉(zhuǎn)發(fā)消息,重新恢復(fù)ConsumeQueue和IndexFile
- 當(dāng)遍歷到最后一條消息,將其偏移量更新到CommitLog中MappedFileQueue的刷盤點(diǎn)和提交點(diǎn)中
- 清除ConsumeQueue冗余數(shù)據(jù)
// org.apache.rocketmq.store.CommitLog#recoverAbnormally public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) { // 是否CRC校驗(yàn) boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { // 最后一個(gè)mappedFile的index int index = mappedFiles.size() - 1; MappedFile mappedFile = null; // 倒序遍歷mappedFile數(shù)組, for (; index >= 0; index--) { mappedFile = mappedFiles.get(index); // 1. 如果第一條消息的時(shí)間戳小于存儲(chǔ)點(diǎn)時(shí)間戳 if (this.isMappedFileMatchedRecover(mappedFile)) { break; } } long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0; while (true) { DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); int size = dispatchRequest.getMsgSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { mappedFileOffset += size; // 2. 轉(zhuǎn)發(fā)消息 if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()/*消息是否可以重復(fù),默認(rèn)是false*/) { if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) { this.defaultMessageStore.doDispatch(dispatchRequest); } } else { this.defaultMessageStore.doDispatch(dispatchRequest); } } } // 3. 更新MappedFileQueue中的刷盤位置和提交位置 processOffset += mappedFileOffset; this.mappedFileQueue.setFlushedWhere(processOffset); this.mappedFileQueue.setCommittedWhere(processOffset); this.mappedFileQueue.truncateDirtyFiles(processOffset); // 清除ConsumeQueue中的冗余數(shù)據(jù) if (maxPhyOffsetOfConsumeQueue >= processOffset) { this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); } } }
總結(jié)
Broker啟動(dòng)時(shí)會(huì)分別加載CommitLog、ConsumeQueue與IndexFile。加載完成后,如果Broker上次是正常退出,只需要找到CommitLog的最后一條消息,并更新刷盤點(diǎn)和提交點(diǎn)。如果Broker上次是異常退出,就有可能出現(xiàn)ConsumeQueue、IndexFile與CommitLog不一致的情況,需要根據(jù)StoreCheckPoint存儲(chǔ)的時(shí)間戳從CommitLog找到消息,逐條恢復(fù)ConsumeQueue與IndexFile。
以上就是RocketMQ | 源碼分析】消息存儲(chǔ)文件的加載與恢復(fù)機(jī)制的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ 消息存儲(chǔ)文件加載恢復(fù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java實(shí)現(xiàn)UDP通信過程實(shí)例分析【服務(wù)器端與客戶端】
這篇文章主要介紹了Java實(shí)現(xiàn)UDP通信過程,結(jié)合實(shí)例形式分析了java實(shí)現(xiàn)UDP服務(wù)器端與客戶端相關(guān)操作技巧與注意事項(xiàng),需要的朋友可以參考下2020-05-05利用java實(shí)現(xiàn)一個(gè)客戶信息管理系統(tǒng)
這篇文章主要給大家介紹了關(guān)于利用java實(shí)現(xiàn)一個(gè)客戶信息管理系統(tǒng)的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04淺談SpringBoot實(shí)現(xiàn)異步調(diào)用的幾種方式
本文主要介紹了淺談SpringBoot實(shí)現(xiàn)異步調(diào)用的幾種方式,主要包括CompletableFuture異步任務(wù),基于@Async異步任務(wù), TaskExecutor異步任務(wù),感興趣的可以了解一下2023-11-11小白也可以學(xué)會(huì)的Java NIO的Write事件
剛開始對(duì)NIO的寫操作理解的不深,不知道為什么要注冊(cè)寫事件,何時(shí)注冊(cè)寫事件,為什么寫完之后要取消注冊(cè)寫事件,今天特地整理了本篇文章,需要的朋友可以參考下2021-06-06IntelliJ IDEA 創(chuàng)建 Java 項(xiàng)目及創(chuàng)建 Java 文件并運(yùn)行的詳細(xì)步驟
這篇文章主要介紹了IntelliJ IDEA 創(chuàng)建 Java 項(xiàng)目及創(chuàng)建 Java 文件并運(yùn)行的詳細(xì)步驟,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-11-11SpringBoot增強(qiáng)Controller方法@ControllerAdvice注解的使用詳解
這篇文章主要介紹了SpringBoot增強(qiáng)Controller方法@ControllerAdvice注解的使用詳解,@ControllerAdvice,是Spring3.2提供的新注解,它是一個(gè)Controller增強(qiáng)器,可對(duì)controller進(jìn)行增強(qiáng)處理,需要的朋友可以參考下2023-10-10Java數(shù)據(jù)結(jié)構(gòu)之堆(優(yōu)先隊(duì)列)的實(shí)現(xiàn)
堆(優(yōu)先隊(duì)列)是一種典型的數(shù)據(jù)結(jié)構(gòu),其形狀是一棵完全二叉樹,一般用于求解topk問題。本文將利用Java語言實(shí)現(xiàn)堆,感興趣的可以學(xué)習(xí)一下2022-05-05