亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

RocketMQ消息存儲(chǔ)文件的加載與恢復(fù)機(jī)制源碼分析

 更新時(shí)間:2023年05月09日 11:06:42   作者:林師傅  
這篇文章主要介紹了RocketMQ源碼分析之消息存儲(chǔ)文件的加載與恢復(fù)機(jī)制詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

前言

前面文章我們介紹了Broker是如何將消息全量存儲(chǔ)到CommitLog文件中,并異步生成dispatchRequest任務(wù)更新ConsumeQueue,IndexFile的過程以及ConsumeQueue和IndexFile的文件結(jié)構(gòu)。由于是異步轉(zhuǎn)發(fā)消息,就可能出現(xiàn)消息成功存儲(chǔ)到CommitLog文件,轉(zhuǎn)發(fā)請(qǐng)求任務(wù)執(zhí)行失敗,Broker宕機(jī)了,此時(shí)CommitLog和Index消息并未處理完,導(dǎo)致CommitLog與ConsumeQueue和IndexFile文件中的數(shù)據(jù)不一致。如果由一部分消息在CommitLog中存在,在ConsumeQueue中不存在,那么這部分消息Consumer將永遠(yuǎn)無法消費(fèi)到了,那么Broker是如何保證數(shù)據(jù)一致性的呢?

StoreCheckPoint介紹

StoreCheckPoint的作用是記錄CommitLog,ConsumeQueue和IndexFile的刷盤點(diǎn),當(dāng)Broker異常結(jié)束時(shí)會(huì)根據(jù)StoreCheckPoint的數(shù)據(jù)恢復(fù),StoreCheckPoint屬性如下

public class StoreCheckpoint {
    // commitLog最后一條信息的刷盤時(shí)間戳
    private volatile long physicMsgTimestamp = 0;
    // consumeQueue最后一個(gè)存儲(chǔ)單元刷盤時(shí)間戳
    private volatile long logicsMsgTimestamp = 0;
    // 最近一個(gè)已經(jīng)寫完IndexFile的最后一條記錄刷盤時(shí)間戳
    private volatile long indexMsgTimestamp = 0;
}

StoreCheckPoint文件的存儲(chǔ)位置是${user.home}/store/checkpoint,文件的固定長(zhǎng)度為4K,但StoreCheckPoint只占用了前24個(gè)字節(jié),存儲(chǔ)格式如下圖所示

StoreCheckPoint時(shí)間戳更新時(shí)機(jī)

physicMsgTimestamp

FlushRealTimeService刷盤時(shí)更新

// org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run
public void run() {
  // ...
  // 更新CommitLog刷盤時(shí)間戳
  if (storeTimestamp > 0) { 					       CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
  }
}

GroupCommitService刷盤時(shí)更新

// org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit
private void doCommit() {
  // ...
  // 更新CommitLog刷盤時(shí)間戳
  if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
  }
}

logicsMsgTimestamp

ConsumeQueue保存消息存儲(chǔ)單元時(shí)更新

// org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfoWrapper
public void putMessagePositionInfoWrapper(DispatchRequest request, boolean multiQueue) {
  // ...
  // 如果consumeQueue保存成功,則更新ConsumeQueue存儲(chǔ)點(diǎn)信息
  if (result) {
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
  }
}

ConsumeQueue刷盤時(shí)更新并觸發(fā)StoreCheckPoint刷盤

// org.apache.rocketmq.store.DefaultMessageStore.FlushConsumeQueueService#doFlush
private void doFlush(int retryTimes) {
  // ...
  // 更新ConsumeQueue存儲(chǔ)時(shí)間戳,并刷盤
  if (0 == flushConsumeQueueLeastPages) {
    if (logicsMsgTimestamp > 0) {
  DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
    }
    // 更新存儲(chǔ)點(diǎn)
    DefaultMessageStore.this.getStoreCheckpoint().flush();
  }
}

indexMsgTimestamp

// org.apache.rocketmq.store.index.IndexService#getAndCreateLastIndexFile
public IndexFile getAndCreateLastIndexFile() {
  // 獲取最新IndexFile,如果IndexFile已經(jīng)滿了,需要?jiǎng)?chuàng)建一個(gè)新的IndexFile
  if (indexFile == null) {
          indexFile =
              new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
                  lastUpdateIndexTimestamp);
			// 如果創(chuàng)建新的IndexFile成功,原IndexFile刷盤
      if (indexFile != null) {
          final IndexFile flushThisFile = prevIndexFile;
          Thread flushThread = new Thread(new Runnable() {
              @Override
              public void run() {
                	// indexFile刷盤
                  IndexService.this.flush(flushThisFile);
              }
          }, "FlushIndexFileThread");
          flushThread.setDaemon(true);
          flushThread.start();
      }
  }
  return indexFile;
}
// org.apache.rocketmq.store.index.IndexService#flush
public void flush(final IndexFile f) {
    if (null == f)
        return;
    long indexMsgTimestamp = 0;
    if (f.isWriteFull()) {
        indexMsgTimestamp = f.getEndTimestamp();
    }
    f.flush();
    if (indexMsgTimestamp > 0) {
        // 更新checkPoint的indexMsgTimestamp并觸發(fā)刷盤
        this.defaultMessageStore.getStoreCheckpoint().setIndexMsgTimestamp(indexMsgTimestamp);
        this.defaultMessageStore.getStoreCheckpoint().flush();
    }
}
  • 保存消息Index,獲取最新的IndexFile如果滿了,則會(huì)創(chuàng)建一個(gè)新的IndexFile,并且更新IndexMsgTimestamp并觸發(fā)StoreCheckPoint刷盤

StoreCheckPoint刷盤源碼

StoreCheckPoint刷盤源碼如下所示,就是將CommitLog,ConsumeQueue和IndexFile刷盤時(shí)間戳持久化到硬盤上,由上面源碼可知它的刷盤觸發(fā)時(shí)機(jī)

  • ConsumeQueue刷盤時(shí)觸發(fā)
  • 創(chuàng)建新IndexFile文件時(shí)觸發(fā)

StoreCheckPoint刷盤源碼如下

// org.apache.rocketmq.store.StoreCheckpoint#flush
public void flush() {
    this.mappedByteBuffer.putLong(0, this.physicMsgTimestamp);
    this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp);
    this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp);
    this.mappedByteBuffer.force();
}

消息加載源碼分析

在BrokerController啟動(dòng)時(shí)會(huì)調(diào)用DefaultMessageStore#load加載存儲(chǔ)文件加載和恢復(fù)過程主要分為下面幾步

  • 判斷Broker上次是否正常退出。這個(gè)判斷邏輯是根據(jù)${user.home}/store/abort是否存在。如果文件存在,說明上次是異常退出,如果文件不存在,則說明是正常退出。
  • 加載CommitLog
  • 加載ConsumeQueue
  • 加載StoreCheckPoint
  • 加載IndexFile
  • 恢復(fù)ConsumeQueue與IndexFile
  • 加載延遲隊(duì)列服務(wù)
// org.apache.rocketmq.store.DefaultMessageStore#load
public boolean load() {
    boolean result = true;
    try {
      	// 1. Broker上次是否正常退出	
        boolean lastExitOK = !this.isTempFileExist();
        log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
        // 2. 加載commitLog
        result = result && this.commitLog.load();
				// 3. 加載consumeQueue
        result = result && this.loadConsumeQueue();
        if (result) {
            // 4. 加載StoreCheckPoint
            this.storeCheckpoint =
                new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
            // 5. 加載IndexFile
            this.indexService.load(lastExitOK);
            // 6. 恢復(fù)ConsumeQueue與IndexFile
            this.recover(lastExitOK);
						// 7. 延遲隊(duì)列服務(wù)加載
            if (null != scheduleMessageService) {
                result =  this.scheduleMessageService.load();
            }
        }
    } 
    return result;
}

CommitLog加載

前面文章介紹過,CommitLog文件的存儲(chǔ)目錄是${user.home}/store/commitlog/,并且CommitLog文件的底層是MappedFile,由MappedFileQueue管理。

CommitLog文件的加載其實(shí)調(diào)用的是MappedFileQueue#load方法,代碼如下所示,load()中首先加載CommitLog文件目錄下的所有文件,并調(diào)用doLoad()方法加載CommitLog。

// org.apache.rocketmq.store.MappedFileQueue#load
public boolean load() {
    File dir = new File(this.storePath/*${user.home}/store/commitlog/*/);
    File[] ls = dir.listFiles();
    if (ls != null) {
        return doLoad(Arrays.asList(ls));
    }
    return true;
}

MappedFile的加載過程如下所示,核心邏輯主要分為下面三步

  • 按照文件名稱將文件排序,排序好的文件就會(huì)按照消息保存的先后順序存放在列表中
  • 校驗(yàn)文件大小與mappedFile是否一致,如果commitLog文件大小與mappedFileSize不一致,則說明配置被改了,或者CommitLog文件被修改
  • 創(chuàng)建mappedFile,并且設(shè)置wrotePosition,flushedPosition,committedPosition為mappedFileSize
public boolean doLoad(List<File> files) {
    // 按照文件名稱排序
    files.sort(Comparator.comparing(File::getName));
    for (File file : files) {
      	// 如果commitLog文件大小與mappedFileSize不一致,則說明配置被改了,或者CommitLog文件被修改
        if (file.length() != this.mappedFileSize) {
            return false;
        }
        try {
          	// 創(chuàng)建MappedFile
            MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
            mappedFile.setWrotePosition(this.mappedFileSize);
            mappedFile.setFlushedPosition(this.mappedFileSize);
            mappedFile.setCommittedPosition(this.mappedFileSize);
            this.mappedFiles.add(mappedFile);
        } 
    }
    return true;
}

看到這里肯定會(huì)有疑問,加載后的MappedFile的wrotePosition,flushedPosition和committedPosition的值都為mappedFileSize,如果最后一個(gè)MappedFile沒有使用完,Broker啟動(dòng)后還會(huì)從最后一個(gè)MappedFile開始寫么?我們可以在后面消息文件恢復(fù)源碼分析找到答案。

ConsumeQueue加載

從前面文章我們知道,ConsumeQueue文件底層其實(shí)也是MappedFile,因此ConsumeQueue文件的加載與CommitLog加載差別不大。ConsumeQueue加載邏輯為

  • 獲取ConsumeQueue目錄下存儲(chǔ)的所有Topic目錄,遍歷Topic目錄
  • 遍歷每個(gè)Topic目錄下的所有queueId目錄,逐個(gè)加載ququeId中的所有MappedFile
// org.apache.rocketmq.store.DefaultMessageStore#loadConsumeQueue
private boolean loadConsumeQueue() {
  // 獲取consumeQueue目錄
  File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()/*${user.home}/store */));
  // topic文件夾數(shù)組
  File[] fileTopicList = dirLogic.listFiles();
  if (fileTopicList != null) {
      // 遍歷topic	
      for (File fileTopic : fileTopicList) {
          // 獲取topic名稱
          String topic = fileTopic.getName();
          // 獲取queueId文件夾數(shù)組
          File[] fileQueueIdList = fileTopic.listFiles();
          // 遍歷queueId
          if (fileQueueIdList != null) {
              for (File fileQueueId : fileQueueIdList) {
                  int queueId;
                  // 文件夾名稱就是queueId
                  queueId = Integer.parseInt(fileQueueId.getName());
                  // 構(gòu)建consumeQueue
                  ConsumeQueue logic = new ConsumeQueue(/* ... */);
                  this.putConsumeQueue(topic, queueId, logic);
                  // ConsumeQueue加載
                  if (!logic.load()) {
                      return false;
                  }
              }
          }
      }
  }
  return true;
}

IndexFile加載

IndexFile文件加載過程調(diào)用的是IndexService#load,首先獲取${user.home}/store/index目錄下的所有文件,遍歷所有文件,如果IndexFile最后存儲(chǔ)時(shí)間大于StoreCheckPoint中indexMsgTimestamp,則會(huì)先刪除IndexFile

// org.apache.rocketmq.store.index.IndexService#load
public boolean load(final boolean lastExitOK) {
    // indexFile文件目錄
    File dir = new File(this.storePath);
    // indexFile文件列表
    File[] files = dir.listFiles();
    if (files != null) {
        // 文件排序
        Arrays.sort(files);
        for (File file : files) {
            try {
                IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);
                f.load();
                if (!lastExitOK) {
                    // 文件最后存儲(chǔ)時(shí)間戳大于刷盤點(diǎn),則摧毀indexFile,重建
                    if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()/*存儲(chǔ)點(diǎn)時(shí)間*/
                        .getIndexMsgTimestamp()) {
                        f.destroy(0);
                        continue;
                    }
                }
                this.indexFileList.add(f);
            } 
        }
    }
    return true;
}

ConsumeQueue與IndexFile恢復(fù)

如果是正常退出,數(shù)據(jù)都已經(jīng)正常刷盤,前面我們說到CommitLog在加載時(shí)的wrotePosition,flushedPosition,committedPosition都設(shè)置為mappedFileSize,

因此即使是正常退出,也會(huì)調(diào)用CommitLog#recoverNormally找到最后一條消息的位置,更新這三個(gè)屬性。

// org.apache.rocketmq.store.DefaultMessageStore#recover
private void recover(final boolean lastExitOK) {
    // consumeQueue中最大物理偏移量
    long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
    if (lastExitOK) {
        // 正常退出文件恢復(fù)
        this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
    } else {
        // 異常退出文件恢復(fù)
        this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
    }
    // 恢復(fù)topicQueueTable
    this.recoverTopicQueueTable();
}

正常恢復(fù)的源碼如下,由于Broker是正常關(guān)閉,因此CommitLog,ConsumeQueue與IndexFile都已經(jīng)正確刷盤,并且三者的消息是一致的。正常恢復(fù)的主要目的是找到找到最后一條消息的偏移量,然后更新CommitLog的MappedFileQueue中的刷盤點(diǎn)(flushWhere)和提交點(diǎn)(committedWhere),

  • 從最后3個(gè)mappedFile開始恢復(fù),如果mappedFile總數(shù)不足3個(gè),則從第0個(gè)mappedFile開始恢復(fù)
  • 逐個(gè)遍歷mappedFile,找到每個(gè)MappedFile的最后一條消息的偏移量,并將其更新到CommitLog中MappedFileQueue的刷盤點(diǎn)和提交點(diǎn)中
  • 清除ConsumeQueue冗余數(shù)據(jù)
    public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
        // 確認(rèn)消息是否完整,默認(rèn)是true
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            // 默認(rèn)從最后3個(gè)mappedFile開始恢復(fù)
            int index = mappedFiles.size() - 3;
            // 如果commitLog不足三個(gè),則從第一個(gè)文件開始恢復(fù)
            if (index < 0)
                index = 0;
            MappedFile mappedFile = mappedFiles.get(index);
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            // 最后一個(gè)MappedFile的文件起始偏移量
            long processOffset = mappedFile.getFileFromOffset();
            // mappedFileOffset偏移量
            long mappedFileOffset = 0;
            // 遍歷CommitLog文件
            while (true) {
                // 校驗(yàn)消息完整性
                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
                // 獲取消息size
                int size = dispatchRequest.getMsgSize();
                // 返回結(jié)果為true并且消息size>0,說明消息是完整的
                if (dispatchRequest.isSuccess() && size > 0) {
                    mappedFileOffset += size;
                }
            }
            // 最大物理偏移量
            processOffset += mappedFileOffset;
            // 更新flushedWhere和committedPosition指針
            this.mappedFileQueue.setFlushedWhere(processOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);
            this.mappedFileQueue.truncateDirtyFiles(processOffset);
            // 清除ConsumeQueue冗余數(shù)據(jù)
            if (maxPhyOffsetOfConsumeQueue >= processOffset) {
                this.defaultMessageStore.truncateDirtyLogicFiles(processOffset/*CommitLog最大物理偏移量*/);
            }
        } 
    }

異?;謴?fù)源碼如下,由于上次Broker沒有正常關(guān)閉,因此由可能存在CommitLog、ConsumeQueue與IndexFile不一致的情況,因此在異?;謴?fù)時(shí)可能需要恢復(fù)ConsumeQueue和IndexFile,異常恢復(fù)核心邏輯主要包括

  • 倒序查CommitLog的mappedFile文件,找到第一條消息存儲(chǔ)的時(shí)間戳比StoreCheckPoint里的physicMsgTimestamp,logicsMsgTimestamp和indexMsgTimestamp三者都小的最大MappedFile,該mappedFile至少有一部分消息是被正常轉(zhuǎn)發(fā),正常存儲(chǔ),正常刷盤的
  • 從該mappedFile開始逐條轉(zhuǎn)發(fā)消息,重新恢復(fù)ConsumeQueue和IndexFile
  • 當(dāng)遍歷到最后一條消息,將其偏移量更新到CommitLog中MappedFileQueue的刷盤點(diǎn)和提交點(diǎn)中
  • 清除ConsumeQueue冗余數(shù)據(jù)
// org.apache.rocketmq.store.CommitLog#recoverAbnormally
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
    // 是否CRC校驗(yàn)
    boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
        // 最后一個(gè)mappedFile的index
        int index = mappedFiles.size() - 1;
        MappedFile mappedFile = null;
        // 倒序遍歷mappedFile數(shù)組,
        for (; index >= 0; index--) {
            mappedFile = mappedFiles.get(index);
            // 1. 如果第一條消息的時(shí)間戳小于存儲(chǔ)點(diǎn)時(shí)間戳
            if (this.isMappedFileMatchedRecover(mappedFile)) {
                break;
            }
        }
        long processOffset = mappedFile.getFileFromOffset();
        long mappedFileOffset = 0;
        while (true) {
            DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
            int size = dispatchRequest.getMsgSize();
            if (dispatchRequest.isSuccess()) {
                if (size > 0) {
                    mappedFileOffset += size;
                    // 2. 轉(zhuǎn)發(fā)消息
                    if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()/*消息是否可以重復(fù),默認(rèn)是false*/) {
                        if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
                            this.defaultMessageStore.doDispatch(dispatchRequest);
                        }
                    } else {
                        this.defaultMessageStore.doDispatch(dispatchRequest);
                    }
                }
        }
				// 3. 更新MappedFileQueue中的刷盤位置和提交位置
        processOffset += mappedFileOffset;
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        this.mappedFileQueue.truncateDirtyFiles(processOffset);
        // 清除ConsumeQueue中的冗余數(shù)據(jù)
        if (maxPhyOffsetOfConsumeQueue >= processOffset) {
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }
    }
}

總結(jié)

Broker啟動(dòng)時(shí)會(huì)分別加載CommitLog、ConsumeQueue與IndexFile。加載完成后,如果Broker上次是正常退出,只需要找到CommitLog的最后一條消息,并更新刷盤點(diǎn)和提交點(diǎn)。如果Broker上次是異常退出,就有可能出現(xiàn)ConsumeQueue、IndexFile與CommitLog不一致的情況,需要根據(jù)StoreCheckPoint存儲(chǔ)的時(shí)間戳從CommitLog找到消息,逐條恢復(fù)ConsumeQueue與IndexFile。

以上就是RocketMQ | 源碼分析】消息存儲(chǔ)文件的加載與恢復(fù)機(jī)制的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ 消息存儲(chǔ)文件加載恢復(fù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論