RocketMQ?源碼分析Broker消息刷盤服務
前言
上篇文章我們介紹了消息刷盤的四種方式,本篇文章我們來介紹Broker是如何實現(xiàn)這四種刷盤方式。
刷盤服務源碼分析
Broker中的四種刷盤分別是由CommitRealTimeService,F(xiàn)lushRealTimeService,GroupCommitService將消息從內(nèi)存中刷到磁盤上的。在了解刷盤這三個刷盤服務之前,我們先來了解MappedFile中下面幾個屬性
public class MappedFile extends ReferenceResource { // 當前寫文件位置,即數(shù)據(jù)被寫入MappedFile的最新指針,可能存在ByteBuffer中,沒有提交 protected final AtomicInteger wrotePosition = new AtomicInteger(0); // 數(shù)據(jù)被寫入文件的最新指針(只是被寫入文件映射,不一定被刷盤) protected final AtomicInteger committedPosition = new AtomicInteger(0); // 刷盤位置,該指針之前的數(shù)據(jù)都持久化存儲到磁盤中 private final AtomicInteger flushedPosition = new AtomicInteger(0); // 文件大小,默認是1042*1024*4(4GB) protected int fileSize; // 起始偏移量,MappedFile創(chuàng)建時從文件名中解析 private long fileFromOffset; }
上面幾個屬性在MappedFile中的位置如下圖所示
上面幾個位置關系: flushedPosition ≤ commitedPosition ≤ wrotePosition
CommitRealTimeService刷盤源碼分析
CommitRealTimeService類的作用就是將上圖中紅色的消息(也就是committedPosition -> wrotePosition之間的消息)從直接內(nèi)存ByteBuffer提交到FileChannel,提交完成并不帶表刷盤完成,還需要將FileChannel將數(shù)據(jù)刷到硬盤中,才正式刷盤完成。CommitRealTimeService核心代碼邏輯是在run()中,在run()中是包含一個死循環(huán),死循環(huán)中每個200ms提交一次消息,每次最少提交4頁的消息,每頁大小是4kb,也就是說只有wrotePosition - committedPosition ≥ 4*4kb
,消息才會被提交。
// org.apache.rocketmq.store.CommitLog.CommitRealTimeService#run public void run() { // 死循環(huán) while (!this.isStopped()) { // 消息提交時間間隔,默認200ms int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); // 最少提交頁數(shù),默認是4 int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); try { // 提交消息 boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); // 等待200ms this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } } }
上面mappedFileQueue#commit
提交最終會調(diào)用MappedFile#commit0
,commit0代碼邏輯如下,將直接內(nèi)存ByteBuffer中的數(shù)據(jù)拷貝到fileChannel中。
// org.apache.rocketmq.store.MappedFile#commit0 protected void commit0() { // 寫指針 int writePos = this.wrotePosition.get(); // 最后提交指針 int lastCommittedPosition = this.committedPosition.get(); // byteBuffer的數(shù)據(jù)提交到FileChannel if (writePos - lastCommittedPosition > 0) { try { ByteBuffer byteBuffer = writeBuffer.slice(); byteBuffer.position(lastCommittedPosition); byteBuffer.limit(writePos); this.fileChannel.position(lastCommittedPosition); this.fileChannel.write(byteBuffer); this.committedPosition.set(writePos); } catch (Throwable e) { log.error("Error occurred when commit data to FileChannel.", e); } } }
FlushRealTimeService刷盤源碼分析
FlushRealTimeService的代碼與CommitRealTimeService類似,核心代碼也帶run()中,run()中也是一個死循環(huán),每隔500ms調(diào)用mappedFileQueue#flush
刷盤。
// org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run public void run() { while (!this.isStopped()) { // 定時刷盤時間間隔,默認500ms int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); // 一次刷盤頁數(shù),默認是4頁 int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); try { if (flushCommitLogTimed) { // sleep 500ms Thread.sleep(interval); } else { this.waitForRunning(interval); } // 消息刷盤 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); } catch (Throwable e) { this.printFlushProgress(); } } }
mappedFileQueue#flush
刷盤最終調(diào)用了MappedFile#flush
,代碼如下所示,可以看到如果MappedFile中有直接內(nèi)存寫緩存,則會調(diào)用fileChannel.force(false)
刷盤,如果沒有寫緩存,則消息直接提交到MappedFile的內(nèi)存映射文件mappedByteBuffer中,因此調(diào)用mappedByteBuffer.force()
刷盤。
// org.apache.rocketmq.store.MappedFile#flush public int flush(final int flushLeastPages) { if (this.isAbleToFlush(flushLeastPages)) { if (this.hold()) { int value = getReadPosition(); try { // 如果使用了堆外內(nèi)存,那么通過fileChannel強制刷盤,這是異步堆外內(nèi)存的邏輯 if (writeBuffer != null || this.fileChannel.position() != 0) { this.fileChannel.force(false); } else { // 如果沒有使用堆外內(nèi)存,那么通過fileChannel強制刷盤,這是同步或者異步刷盤走的邏輯 this.mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk.", e); } // 設置刷盤位置為寫入位置 this.flushedPosition.set(value); // 減少對該MappedFile的引用次數(shù) this.release(); } else { log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); this.flushedPosition.set(getReadPosition()); } } return this.getFlushedPosition(); }
GroupCommitService刷盤源碼分析
同步刷盤GroupCommitService代碼與上述代碼類似,都繼承了ServiceThread
,它的核心邏輯在GroupCommitService#run
,在run()中也是一個死循環(huán),每隔10ms調(diào)用一次doCommit()
,雖然這個方法的名字叫doCommit,實際底層也與FlushRealTimeService相同,都是調(diào)用的mappedFileQueue#flush
,將mappedByteBuffer
中的數(shù)據(jù)刷入磁盤。
// org.apache.rocketmq.store.CommitLog.GroupCommitService#run public void run() { // 死循環(huán) while (!this.isStopped()) { try { // 間隔10ms this.waitForRunning(10); this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } }
看到這里大家可能會有疑問,為什么同步刷盤也是定時刷盤,這與異步刷盤有什么區(qū)別呢?實際上這里有著相當精妙的設計,在上篇文章中我們了解到同步刷盤包括等待消息保存與不等待消息保存。
如果不等待消息保存,則調(diào)用了ServiceThread#wakeup
方法。
public void wakeup() { if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); } }
ServiceThread狀態(tài)如下所示,如果刷盤線程在10ms等待中,hasNotified屬性值為false,hastNotified更新成功,刷盤線程被喚醒,立即停止等待。如果刷盤線程正在執(zhí)行中,hasNotified更新失敗,刷盤線程喚醒失敗。只能等待下一次被喚醒或者下一次時間間隔后再次刷盤。
如果是要等待刷盤成功后才返回結果,就要利用到GroupCommitService屬性中兩個刷盤請求容器
- requestWrite
同步刷盤請求暫存容器
- requestsRead
處理中的刷盤請求容器
class GroupCommitService extends FlushCommitLogService { // 同步刷盤請求暫存容器 private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>(); // 每次處理刷盤的request容器 private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>(); }
提交刷盤請求首先會被放入到requestsWrite容器中,然后再喚醒刷盤線程。
// org.apache.rocketmq.store.CommitLog.GroupCommitService#putRequest public synchronized void putRequest(final GroupCommitRequest request) { lock.lock(); try { // 寫請求 this.requestsWrite.add(request); } finally { lock.unlock(); } // 喚醒當前線程 this.wakeup(); }
刷盤線程被喚醒或者線程結束等待時都會調(diào)用onWaitEnd()
方法,交換請求暫存容器和刷盤request容器
// org.apache.rocketmq.store.CommitLog.GroupCommitService#onWaitEnd @Override protected void onWaitEnd() { this.swapRequests(); } // org.apache.rocketmq.store.CommitLog.GroupCommitService#swapRequests // 交換請求暫存容器和刷盤request容器 private void swapRequests() { lock.lock(); try { LinkedList<GroupCommitRequest> tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; } finally { lock.unlock(); } }
線程被喚醒后會調(diào)用doCommit(),從下面代碼可以發(fā)現(xiàn),不管requestsRead是否包含要處理的刷盤請求,實際都是通過調(diào)用mappedFileQueue#flush
執(zhí)行刷盤。
- 如果requestsRead中包含刷盤請求
則有可能需要調(diào)用mappedFileQueue#flush
,確保當前請求的消息能夠被刷盤,并返回刷盤結果給客戶端,如果包含請求,最多會調(diào)用兩次刷盤方法,確保消息能夠正確刷盤。
由于文件是固定大小,有可能刷盤位置在上一個MappedFile中,當前消息請求在最新的MappedFile中,刷盤兩次,確保當前消息能夠被刷入硬盤中
- 如果requestsRead中不包含刷盤請求
處理請求容器中包含request,直接調(diào)用MappedFileQueue#flush
,如果當前消息不在flushPosition所在的mappedFile中,則本次刷盤有可能并不會將當前消息持久化到磁盤中,需要等待下次刷盤。
// org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit private void doCommit() { // 如果處理Request不空 if (!this.requestsRead.isEmpty()) { // 遍歷處理Request for (GroupCommitRequest req : this.requestsRead) { // 如果刷盤指針大于刷盤請求中需要刷盤的offSet boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); // 消息刷盤 for (int i = 0; i < 2 && !flushOK; i++) { CommitLog.this.mappedFileQueue.flush(0); flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); } // 喚醒客戶端 req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { // 如果消息不等待刷盤成功就返回,則不會提交刷盤請求,調(diào)用這個方法 CommitLog.this.mappedFileQueue.flush(0); } }
總結
本次我們了解了RocketMQ中四種刷盤策略對應的刷盤服務
- 同步刷盤-等待消息保存到磁盤
- 同步刷盤-不等待消息保存到磁盤上
上面兩個同步刷盤都是由GroupCommitService實現(xiàn)的,由GroupCommitService將MappedByteBuffer消息刷盤到磁盤上
- 異步刷盤-開啟堆外緩存
如果開啟了堆外緩存,刷盤時會先由CommitRealTimeService將消息從Bytebuffer拷貝到FileChannel,F(xiàn)lushRealTimeService再將消息從FileChannel刷到磁盤上
- 異步刷盤-不開啟堆外緩存
這種方式也是默認的刷盤方式,由FlushRealTimeService將MappedByteBuffer消息刷盤到磁盤上
以上就是RocketMQ 源碼分析Broker消息刷盤服務的詳細內(nèi)容,更多關于RocketMQ Broker刷盤服務的資料請關注腳本之家其它相關文章!
相關文章
idea配置檢查XML中SQL語法及書寫sql語句智能提示的方法
idea連接了數(shù)據(jù)庫,也可以執(zhí)行SQL查到數(shù)據(jù),但是無法識別sql語句中的表導致沒有提示,下面這篇文章主要給大家介紹了關于idea配置檢查XML中SQL語法及書寫sql語句智能提示的相關資料,需要的朋友可以參考下2023-03-03java實現(xiàn)服務器文件打包zip并下載的示例(邊打包邊下載)
這篇文章主要介紹了java實現(xiàn)服務器文件打包zip并下載的示例,使用該方法,可以即時打包文件,一邊打包一邊傳輸,不使用任何的緩存,讓用戶零等待,需要的朋友可以參考下2014-04-04SpringBoot同時支持HTTPS與HTTP的實現(xiàn)示例
本文主要介紹了SpringBoot同時支持HTTPS與HTTP的實現(xiàn)示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-07-07