亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

RocketMQ?Broker消息如何刷盤(pán)源碼解析

 更新時(shí)間:2023年05月09日 14:15:53   作者:林師傅  
這篇文章主要為大家介紹了RocketMQ?Broker消息如何刷盤(pán)源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

前言

我們?cè)趯W(xué)習(xí)RocketMQ的時(shí)候,我們知道RocketMQ的刷盤(pán)策略有兩個(gè)刷盤(pán)策略

  • 同步刷盤(pán)

同步刷盤(pán)即Broker消息已經(jīng)被持久化到硬盤(pán)后才會(huì)向客戶端返回成功。同步刷盤(pán)的優(yōu)點(diǎn)是能保證消息不丟失,但是這是以犧牲寫(xiě)入性能為代價(jià)的。

  • 異步刷盤(pán)

異步刷盤(pán)是指Broker將信息存儲(chǔ)到pagecache后就立即向客戶端返回成功,然后會(huì)有一個(gè)異步線程定時(shí)將內(nèi)存中的數(shù)據(jù)寫(xiě)入磁盤(pán),默認(rèn)時(shí)間間隔為500ms。

Broker中的刷盤(pán)策略是通過(guò)Broker配置文件中flushDiskType進(jìn)行配置,可以配置ASYNC_FLUSH(異步刷盤(pán))和SYNC_FLUSH(同步刷盤(pán)),默認(rèn)配置是ASYNC_FLUSH。

Broker的刷盤(pán)采用基于JDK NIO技術(shù),消息首先會(huì)存儲(chǔ)到內(nèi)存中,然后再根據(jù)不同的刷盤(pán)策略在不同時(shí)間刷盤(pán),如果有不了解的小伙伴可以參考這篇文章《【NIO實(shí)戰(zhàn)】深入理解FileChannel》

刷盤(pán)相關(guān)類介紹

CommitLog中的內(nèi)部類FlushCommitLogService及其子類CommitRealTimeService、GroupCommitService、FlushRealTimeService分別是用于不同場(chǎng)景下用于刷盤(pán)的刷盤(pán)行為,他們會(huì)單獨(dú)或者配合起來(lái)使用。具體類圖如下所示。

如果是同步刷盤(pán)會(huì)使用GroupCommitService。如果是異步刷盤(pán),并且關(guān)閉了堆外緩存(TransientStorePool),則采用FlushRealTimeService刷盤(pán)。如果是異步刷盤(pán),并且開(kāi)啟了堆外緩存,則會(huì)使用FlushRealTimeService與CommitRealTimeService配合刷盤(pán)。

默認(rèn)的輸盤(pán)策略是異步關(guān)閉堆外緩存,因此默認(rèn)是采用FlushRealTimeService進(jìn)行刷盤(pán)

Broker刷盤(pán)源碼分析

消息刷盤(pán)相關(guān)邏輯都是圍繞在CommitLog,因此要想知道消息時(shí)如何刷盤(pán)的關(guān)鍵是研究CommitLog

CommitLog構(gòu)造&屬性賦值

CommitLog中與刷盤(pán)相關(guān)的屬性有flushCommitLogService、commitLogService。如果是同步刷盤(pán)則在構(gòu)造函數(shù)中會(huì)給flushCommitLogService賦值GroupCommitService,如果是異步刷盤(pán)則給flushCommitLogService賦值FlushRealTimeService。commitLogService的值是CommitRealTimeService,從上面我們可以很明顯的看出它只有在異步且開(kāi)啟TransientStorePoolEnabled時(shí)才會(huì)被使用。

public class CommitLog {
  // 如果是同步刷盤(pán),則是GroupCommitService。如果是異步刷盤(pán)則是FlushRealTimeService
  // 默認(rèn)是異步刷盤(pán),因此是CommitLog$FlushRealTimeService
  private final FlushCommitLogService flushCommitLogService;
  // 開(kāi)啟TransientStorePoolEnable時(shí)使用CommitRealTimeService
  private final FlushCommitLogService commitLogService;
	// 構(gòu)造函數(shù)
  public CommitLog(final DefaultMessageStore defaultMessageStore) {
      // 默認(rèn)是異步刷盤(pán),因此這里是false
      if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
          this.flushCommitLogService = new GroupCommitService();
      } else {
          this.flushCommitLogService = new FlushRealTimeService();
      }
      this.commitLogService = new CommitRealTimeService();
      // 消息回調(diào)
      this.appendMessageCallback = new DefaultAppendMessageCallback();
      flushDiskWatcher = new FlushDiskWatcher();
  }
}

TransientStorePoolEnabled介紹

transientStorePoolEnabled配置的默認(rèn)值為false,開(kāi)啟transientStorePoolEnabled需要手動(dòng)開(kāi)啟。如果開(kāi)啟transientStorePoolEnabled會(huì)開(kāi)啟堆外內(nèi)存存儲(chǔ)池,Broker在啟動(dòng)時(shí)會(huì)申請(qǐng)5個(gè)與CommitLog大小(1GB)相同的堆外內(nèi)存交給TransientStorePool,創(chuàng)建MappedFile時(shí)會(huì)向TransientStorePool“借”一個(gè)堆外內(nèi)存ByteBuffer,保存消息時(shí)會(huì)先將消息保存到堆外內(nèi)存ByteBuffer中,然后在commit到MappedFile的FileChannel,最后再flush到硬盤(pán)中。TransientStorePool屬性和一些核心方法源碼如下,堆外內(nèi)存ByteBuffer都是由它來(lái)管理。

// org.apache.rocketmq.store.TransientStorePool
public class TransientStorePool {
    // 存儲(chǔ)池大小,默認(rèn)是5
    private final int poolSize;
    // CommitLog MappedFile文件大小,默認(rèn)1GB
    private final int fileSize;
    // 默認(rèn)存5個(gè)ByteBuffer
    private final Deque<ByteBuffer> availableBuffers;
    // 消息存儲(chǔ)配置
    private final MessageStoreConfig storeConfig;
		// TransientStorePool初始化
    public void init() {
        // 默認(rèn)是5
        for (int i = 0; i < poolSize; i++) {
            // 分配1GB的直接內(nèi)存
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
            // 生成的緩存保存到隊(duì)列中
            availableBuffers.offer(byteBuffer);
        }
    }
    // 歸還緩沖
    public void returnBuffer(ByteBuffer byteBuffer) {
        // 修改position和limit,"清空"緩沖
        byteBuffer.position(0);
        byteBuffer.limit(fileSize);
      	// 緩沖入隊(duì)
        this.availableBuffers.offerFirst(byteBuffer);
    }
    // 向TransientStorePool借緩沖
    public ByteBuffer borrowBuffer() {
      	// 緩沖出隊(duì)
        ByteBuffer buffer = availableBuffers.pollFirst();
        return buffer;
    }
}

消息保存源碼分析

前面文章《【RocketMQ | 源碼分析】Broker是如何保存消息的? 》我們雖然介紹了消息的保存過(guò)程,但是開(kāi)啟或者關(guān)閉TransientStorePoolEnabled時(shí),消息保存的細(xì)節(jié)是不同的,我們?cè)俅蜷_(kāi)消息保存MappedFile的源碼如下,下面代碼中如果writeBuffer不空,則會(huì)將消息先追加到writeBuffer,否者直接寫(xiě)入到MappedFile的內(nèi)存映射文件中。

// org.apache.rocketmq.store.MappedFile#appendMessagesInner
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
        PutMessageContext putMessageContext) {
    // 如果寫(xiě)文件位置小于文件size
    if (currentPos < this.fileSize) {
        // 如果writeBuffer不空,則獲取writeBuffer的淺拷貝,否則獲取MappedFile的內(nèi)存映射(MappedByteBuffer)的淺拷貝
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        // 如果是單條消息
        if (messageExt instanceof MessageExtBrokerInner) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos/*文件長(zhǎng)度-當(dāng)前寫(xiě)位置,可以寫(xiě)的長(zhǎng)度*/,
                    (MessageExtBrokerInner) messageExt, putMessageContext);
        } // ...如果是批量消息
        return result;
    }
}

那么什么情況下MappedFile中的writeBuffer為空,什么情況下writeBuffer不為空呢?我們可以先來(lái)了解MappedFile是如何創(chuàng)建的,MappedFile是由AllocateMappedFileService創(chuàng)建的,具體源碼如下,如果開(kāi)啟了TransientStorePoolEnabled,則在創(chuàng)建MappedFile時(shí)會(huì)向TransientStorePool“借”一個(gè)ByteBuffer,如果沒(méi)有開(kāi)啟TransientStorePoolEnabled,MappedFile中的writeBuffer是空,在保存數(shù)據(jù)時(shí)會(huì)將數(shù)據(jù)直接保存到MappedFile的直接內(nèi)存映射(MappedByteBuffer)中。

private boolean mmapOperation() {
  // ...
  if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
      try {
          mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
        	// 初始化mappedFile會(huì)向TransientStorePool"借"一個(gè)writeBuffer
          mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
      } catch (RuntimeException e) {
          mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
      }
  } else {
    	// 創(chuàng)建MappedFile,沒(méi)有writeBuffer
      mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
  }
  // ...
}

由上可知,消息保存如下圖所示

消息刷盤(pán)入口方法源碼分析

消息保存和刷盤(pán)的入口方法CommitLog#asyncPutMessage,消息保存到mappedFile的緩存后,最后會(huì)調(diào)用submitFlushRequest方法提交刷盤(pán)請(qǐng)求,Broker會(huì)根據(jù)刷盤(pán)策略進(jìn)行刷盤(pán)。

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    //... 保存消息
    result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
    // ...
    // 提交刷盤(pán)請(qǐng)求
    CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
    // 提交復(fù)制請(qǐng)求
    CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
    // 合并提交刷盤(pán)請(qǐng)求和提交復(fù)制請(qǐng)求結(jié)果
    return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
        if (flushStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(flushStatus);
        }
        if (replicaStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(replicaStatus);
        }
        return putMessageResult;
    });
}

提交了刷盤(pán)請(qǐng)求后,根據(jù)刷盤(pán)策略,是否開(kāi)啟堆外緩存,推送消息中是否要等待消息保存有如下四種刷盤(pán)方式

  • 異步刷盤(pán)(關(guān)閉TransientStorePoolEnabled)

異步刷盤(pán)(關(guān)閉TransientStorePoolEnabled)是默認(rèn)的刷盤(pán)方案,這個(gè)刷盤(pán)方案先會(huì)**異步喚醒(wakeup)**FlushRealTimeService,然后直接返回消息保存成功。由于關(guān)閉了TransientStorePoolEnabled,消息是保存到MappedFile中的內(nèi)存映射文件MappedByteBuffer,F(xiàn)lushRealTimeService將定時(shí)MappedByteBuffer刷到磁盤(pán)。

  • 異步刷盤(pán)(開(kāi)啟TransientStorePoolEnabled)

異步刷盤(pán)(開(kāi)啟TransientStorePoolEnabled)會(huì)先**異步喚醒(wakeup)**CommitRealTimeService,然后直接返回消息保存成功。由于開(kāi)啟了TransientStorePoolEnabled,消息會(huì)保存到MappedFile中的內(nèi)存映射文件ByteBuffer,CommitRealTimeService定時(shí)將ByteBuffer中的數(shù)據(jù)刷到FileChannel中。

  • 同步刷盤(pán)(等待消息保存)

同步刷盤(pán)(等待消息保存)會(huì)先創(chuàng)建一個(gè)刷盤(pán)請(qǐng)求(GroupCommitRequest),然后向GroupCommitService提交刷盤(pán)請(qǐng)求,最后等待刷盤(pán)結(jié)果并返回

  • 同步刷盤(pán)(不等待消息保存)

同步刷盤(pán)(不等待消息保存)也是通過(guò)GroupCommitService刷盤(pán),與等待消息保存不同的是不等待的方式異步喚醒(wakeup)GroupCommitService后,直接返回消息保存成功。

四種刷盤(pán)方式源碼如下所示

public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
    // 同步刷盤(pán)
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        // 獲取同步刷盤(pán)Service
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsgOK()) {
            // 創(chuàng)建GroupCommitRequest 刷盤(pán)偏移量nextOffset = 當(dāng)前寫(xiě)入偏移量 + 當(dāng)前消息寫(xiě)入大小
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                    this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            // 向刷盤(pán)監(jiān)視器(flushDistWatch)提交刷盤(pán)請(qǐng)求
            flushDiskWatcher.add(request);
            // 提交刷盤(pán)請(qǐng)求,并且喚醒同步刷盤(pán)線程
            service.putRequest(request);
            return request.future();
        } else {
            // 同步刷盤(pán),但是不需要等待刷盤(pán)結(jié)果,那么喚醒同步刷盤(pán)線程
            service.wakeup();
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }
    // 異步刷盤(pán)
    else {
        // 是否啟動(dòng)了堆外緩存
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            // 如果沒(méi)有啟動(dòng)堆外緩存,則喚醒異步刷盤(pán)服務(wù) flushRealTimeService
            flushCommitLogService.wakeup();
        } else  {
            // 如果啟動(dòng)了堆外緩存,則喚醒異步轉(zhuǎn)存服務(wù)CommitRealTimeService
            commitLogService.wakeup();
        }
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }
}

將上面四種場(chǎng)景及調(diào)用關(guān)系如下圖所示

總結(jié)

本篇文章介紹了TransientStorePool機(jī)制以及開(kāi)啟和管理隊(duì)消息保存的影響,我們還介紹了RocketMQ中四種刷盤(pán)策略

  • 同步刷盤(pán)-等待消息保存到磁盤(pán)
  • 同步刷盤(pán)-不等待消息保存到磁盤(pán)上
  • 異步刷盤(pán)-開(kāi)啟堆外緩存
  • 異步刷盤(pán)-不開(kāi)啟堆外緩存

以上就是RocketMQ Broker消息如何刷盤(pán)源碼解析的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ Broker消息刷盤(pán)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Springmvc和ajax如何實(shí)現(xiàn)前后端交互

    Springmvc和ajax如何實(shí)現(xiàn)前后端交互

    這篇文章主要介紹了Springmvc和ajax如何實(shí)現(xiàn)前后端交互,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-05-05
  • 一行命令同時(shí)修改maven項(xiàng)目中多個(gè)module的版本號(hào)的方法

    一行命令同時(shí)修改maven項(xiàng)目中多個(gè)module的版本號(hào)的方法

    這篇文章主要介紹了一行命令同時(shí)修改maven項(xiàng)目中多個(gè)module的版本號(hào)的方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2019-06-06
  • 利用HttpUrlConnection 上傳 接收文件的實(shí)現(xiàn)方法

    利用HttpUrlConnection 上傳 接收文件的實(shí)現(xiàn)方法

    下面小編就為大家?guī)?lái)一篇利用HttpUrlConnection 上傳 接收文件的實(shí)現(xiàn)方法。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2016-11-11
  • Java Mail郵件發(fā)送如何實(shí)現(xiàn)簡(jiǎn)單封裝

    Java Mail郵件發(fā)送如何實(shí)現(xiàn)簡(jiǎn)單封裝

    這篇文章主要介紹了Java Mail郵件發(fā)送如何實(shí)現(xiàn)簡(jiǎn)單封裝,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-11-11
  • Java面向?qū)ο缶幊讨^承和多態(tài)以及包的解析與使用范例

    Java面向?qū)ο缶幊讨^承和多態(tài)以及包的解析與使用范例

    繼承就是可以直接使用前輩的屬性和方法。自然界如果沒(méi)有繼承,那一切都是處于混沌狀態(tài)。多態(tài)是同一個(gè)行為具有多個(gè)不同表現(xiàn)形式或形態(tài)的能力。多態(tài)就是同一個(gè)接口,使用不同的實(shí)例而執(zhí)行不同操作
    2021-11-11
  • Java中的Semaphore信號(hào)量深入解析

    Java中的Semaphore信號(hào)量深入解析

    這篇文章主要介紹了Java中的Semaphore信號(hào)量深入解析,Semaphore是Java里面另外一個(gè)基本的并發(fā)工具包類,主要的的作用是用來(lái)保護(hù)共享資源的訪問(wèn)的,也就是僅僅允許一定數(shù)量的線程訪問(wèn)共享資源,需要的朋友可以參考下
    2023-11-11
  • 在Spring Boot應(yīng)用程序中使用Apache Kafka的方法步驟詳解

    在Spring Boot應(yīng)用程序中使用Apache Kafka的方法步驟詳解

    這篇文章主要介紹了在Spring Boot應(yīng)用程序中使用Apache Kafka的方法步驟詳解,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2018-11-11
  • Spring AOP注解案例及基本原理詳解

    Spring AOP注解案例及基本原理詳解

    這篇文章主要介紹了Spring AOP注解案例及基本原理詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-06-06
  • Java Bean的作用域,生命周期和注解

    Java Bean的作用域,生命周期和注解

    這篇文章主要介紹了淺談Spring中Bean的作用域,生命周期和注解,具有一定借鑒價(jià)值,需要的朋友可以參考下,希望能夠給你帶來(lái)幫助
    2021-11-11
  • SpringBoot分離打Jar包的兩種配置方式

    SpringBoot分離打Jar包的兩種配置方式

    這篇文章主要介紹了SpringBoot分離打Jar包的兩種配置方式,方式一是基于maven-jar-plugin,方式二是基于spring-boot-maven-plugin,文中結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下
    2022-11-11

最新評(píng)論