RocketMQ?ConsumeQueue與IndexFile實(shí)時(shí)更新機(jī)制源碼解析
前言
前面我們介紹了消息是如何存儲(chǔ)的,消息是如何刷盤的,講的都是CommitLog是如何存儲(chǔ)和刷盤的。雖然CommitLog順序存儲(chǔ)著所有消息,但是CommitLog中的消息并沒有區(qū)分topic、keys等,如果需要消費(fèi)某個(gè)topic的消息或者查找某一條消息只能遍歷CommitLog文件去查找,性能相當(dāng)?shù)拖拢虼擞辛薈onsumeLog和IndexFile兩個(gè)文件類型,這兩個(gè)文件的作用主要是提升消息消費(fèi)和查詢的性能。
ConsumeQueue詳解
為了提高消費(fèi)消息查詢性能,Broker會(huì)為每個(gè)Topic在~/store/consumequeue中創(chuàng)建一個(gè)Topic名稱的目錄,并再為該Topic創(chuàng)建目錄名為queueId的目錄,每個(gè)目錄存放著若干consumequeue文件,consumequeue屬于commitLog的索引文件,可以根據(jù)consumequeue定位到具體的消息,consumequeue存儲(chǔ)文件見下圖

consumequeue文件名由20位數(shù)字構(gòu)成,表示當(dāng)前文件的第一個(gè)索引條目的起始偏移量。與commitLog文件名不同的是,consumequeue后續(xù)文件名是固定的,由于consumequeue文件大小是固定不變的。
consumequeue文件大小由mappedFileSizeConsumeQueue配置控制,它的默認(rèn)大小是30W * ConsumeQueue.CQ_STORE_UNIT_SIZE(20),也就是600W字節(jié)大小,ConsumeQueue.CQ_STORE_UNIT_SIZE是consumequeue每個(gè)索引條目的大小,每隔索引條目包含了三個(gè)消息的重要屬性:消息在mappedFile文件中的物理偏移量(8字節(jié))、消息的長(zhǎng)度(4字節(jié))、消息Tag的hashcode值,這三個(gè)屬性占了20個(gè)字節(jié),單個(gè)索引條目結(jié)構(gòu)如下圖所示

IndexFile詳解
RocketMQ除了提供消息的Topic給消息消費(fèi)外,RocketMQ還提供了根據(jù)key來查找消息的功能,producer創(chuàng)建消息時(shí)可以傳入keys值,用于快速查找消息。
// 構(gòu)建Message參數(shù)
Message msg = new Message("TopicTest", // 消息topic
"TagA", // 消息Tag
"key1 key2 key3", // 消息keys,多個(gè)key用" "隔開
"hello linshifu!".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息體
IndexFile可以看做是一個(gè)key的哈希索引文件,通過計(jì)算key的hash值,快速找到某個(gè)key對(duì)應(yīng)的消息在commitLog中的位置。IndexFile由下面三個(gè)部分構(gòu)成:
- indexHeader
- slots槽位
- indexes索引數(shù)據(jù)
IndexFile結(jié)構(gòu)如下圖所示
每個(gè)IndexFile的長(zhǎng)度是固定的,其中indexHeader占用40字節(jié),slots占用500W * 4字節(jié),Index索引數(shù)據(jù)占用2000W * 20字節(jié)

IndexHeader
IndexHeader占用IndexFile的前40個(gè)字節(jié),它主要存儲(chǔ)著IndexFile索引文件的相關(guān)信息,IndexHeader包含如下屬性
// org.apache.rocketmq.store.index.IndexHeader
public class IndexHeader {
// 索引文件第一條消息在commitLog中的存儲(chǔ)時(shí)間
private final AtomicLong beginTimestamp = new AtomicLong(0);
// 索引文件最后一條消息在commitLog中的存儲(chǔ)時(shí)間
private final AtomicLong endTimestamp = new AtomicLong(0);
// 索引文件第一條消息的偏移量
private final AtomicLong beginPhyOffset = new AtomicLong(0);
// 索引文件最后一條消息的偏移量
private final AtomicLong endPhyOffset = new AtomicLong(0);
// 已經(jīng)填充slot的hash槽數(shù)量
private final AtomicInteger hashSlotCount = new AtomicInteger(0);
// 該indexFile種包含的索引單元數(shù)量
private final AtomicInteger indexCount = new AtomicInteger(1);
}
數(shù)據(jù)結(jié)構(gòu)如下圖所示

slots槽位
在IndexFile中間部分存儲(chǔ)的是IndexFlie中key的hash槽,每個(gè)hash槽存儲(chǔ)的是index索引單元的indexNo,添加索引時(shí)會(huì)將key的hash值%500W的結(jié)果計(jì)算哈希槽序號(hào),然后將index索引單元的indexNo放入slot槽中,indexNo是int類型,slots槽位總共有500W個(gè),因此slots槽位占用的大小是500w * 4=2000w

indexes索引數(shù)據(jù)
index索引由2000W個(gè)索引單元構(gòu)成,每個(gè)索引單元大小為20字節(jié),每隔索引單元由下面四個(gè)部分構(gòu)成
- keyHash
keyHash是消息索引key的Hash值
- phyOffet
phyOffset是當(dāng)前key對(duì)應(yīng)消息在commitLog中的偏移量commitLog offset
- timeDiff
timeDiff是當(dāng)前key對(duì)應(yīng)消息存儲(chǔ)時(shí)間與當(dāng)前indexFile第一個(gè)索引存儲(chǔ)時(shí)間差
- preIndex
當(dāng)前slot的index索引單元的前一個(gè)索引單元的indexNo
索引單元數(shù)據(jù)結(jié)構(gòu)如下

實(shí)時(shí)更新ConsumeQueue與IndexFile源碼分析
之前的文章我們只了解了Broker的CommitLog文件保存和刷盤的流程,現(xiàn)在我們來了解Broker實(shí)時(shí)更新ConsumeQueue和IndexFile的流程。
消息保存的過程僅僅會(huì)保存CommitLog,ConsumeQueue文件及IndexFile中的數(shù)據(jù)是通過ReputMessageService將CommitLog中的消息轉(zhuǎn)發(fā)到ConsumeQueue及IndexFile。
ReputMessageService和之前的刷盤服務(wù)類似,都是異步線程執(zhí)行的服務(wù)。ReputMessageService是DefaultMessageStore的一個(gè)內(nèi)部類,它跟隨者消息存儲(chǔ)對(duì)象DefaultMessageStore創(chuàng)建時(shí)共同創(chuàng)建。ReputMessageService刷新ConsumeQueue與IndexFile的邏輯可以從它的run()方法開始分析。
// org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#run
@Override
public void run() {
// 死循環(huán)
while (!this.isStopped()) {
try {
// 睡眠1ms
Thread.sleep(1);
// 更新consumeQueue和IndexFile
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
}
從上面代碼可以看出,更新ConsumeQueue與IndexFile在死循環(huán)中執(zhí)行,每隔1ms執(zhí)行一次doReput()來更新更新consumeQueue和IndexFile,在doReput()中的主要邏輯如下
- 如果重放消息偏移量reputFromOffset小于CommitLog的最大offset,則會(huì)循環(huán)重放消息,更新ConsumeQueue及IndexFile
- 從CommitLog的重放偏移量開始獲取映射緩沖結(jié)果SelectMappedBufferResult,SelectMappedBufferResult包含如下屬性
// org.apache.rocketmq.store.SelectMappedBufferResult
public class SelectMappedBufferResult {
// mappedFile文件起始偏移量+position
private final long startOffset;
// reputFromOffset開始的緩沖
private final ByteBuffer byteBuffer;
// 消息size
private int size;
// commitLog的MappedFile
private MappedFile mappedFile;
}
- 根據(jù)SelectMappedBufferResult校驗(yàn)消息,并創(chuàng)建轉(zhuǎn)發(fā)請(qǐng)求DispatchRequest,DispatchRequest中包含更新ConsumeQueue和IndexFile中需要用到的屬性,如topic,消息偏移量,消息key,消息存儲(chǔ)時(shí)間戳,消息長(zhǎng)度,消息tagHashCode等。
- 如果當(dāng)前消息size>0,則說明當(dāng)前消息需要被轉(zhuǎn)發(fā)更新ConsumeQueue和IndexFile,會(huì)調(diào)用關(guān)鍵方法
DefaultMessageStore.this.doDispatch轉(zhuǎn)發(fā)更新 - 如果當(dāng)前消息size=0,則說明已經(jīng)讀到了CommitLog當(dāng)前MappedFile的結(jié)尾,因此需要讀取下一個(gè)MappedFile,并進(jìn)行轉(zhuǎn)發(fā)。
// org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput
private void doReput() {
// 1.reputFromOffset ≤ commitLog最大offset,則循環(huán)重放
for (boolean doNext = true; this.isCommitLogAvailable()/*reputFromOffset≤commitLog最大offset*/&&doNext; ) {
// 2.根據(jù)reputFromOffset的物理偏移量找到mappedFileQueue中對(duì)應(yīng)的CommitLog文件的MappedFile
// 然后從該MappedFile中截取一段自reputFromOffset偏移量開始的ByteBuffer,這段內(nèi)存存儲(chǔ)著將要重放的消息
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
// 遍歷消息,開始reput
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
// 3. 檢查消息屬性,并構(gòu)建一個(gè)消息的dispatchRequest
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
if (dispatchRequest.isSuccess()) {
if (size > 0) {
// 4.消息分發(fā),寫consumeQueue和Index
DefaultMessageStore.this.doDispatch(dispatchRequest);
// 設(shè)置reputOffset加上當(dāng)前消息大小
this.reputFromOffset += size;
// 設(shè)置讀取的大小加上當(dāng)前消息大小
readSize += size;
//如果size=0,說明讀取到了MappedFile的文件結(jié)尾
} else if (size == 0) {
// 5. 獲取下個(gè)文件的起始o(jì)ffset
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
// 設(shè)置readSize=0,結(jié)束循環(huán)
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {
// ...
}
} finally {
result.release();
}
} else {
doNext = false;
}
}
}
由上面代碼可知,轉(zhuǎn)發(fā)更新ConsumeQueue和IndexFile的關(guān)鍵代碼在DefaultMessageStore.this.doDispatch(dispatchRequest)中,在doDispatch()方法中循環(huán)遍歷dispatcherList中的CommitLogDispatcher。
public void doDispatch(DispatchRequest req) {
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);
}
}
debug代碼可以中包含處理轉(zhuǎn)發(fā)請(qǐng)求的Dispatcher類,通過類名就可以很容易判斷出CommitLogDispatcherBuildConsumeQueue是將CommitLog轉(zhuǎn)發(fā)到ConsumeQueue中,CommitLogDispatcherBuildIndex是將消息構(gòu)建IndexFile,下面我們來分別分析兩者是如何處理CommitLog消息轉(zhuǎn)發(fā)的。

CommitLogDispatcherBuildConsumeQueue源碼分析
CommitLogDispatcherBuildConsumeQueue將消息保存到ConsumeQueue如下所示,主要是下面兩步
- 先根據(jù)消息Topic和QueueId從consumeQueueTable找到ConsumeQueue,如果找不到會(huì)創(chuàng)建一個(gè)新的consumeQueue
- 調(diào)用ConsumeQueue#putMessagePositionInfoWrapper,將消息保存到consumeQueue中
// org.apache.rocketmq.store.DefaultMessageStore#putMessagePositionInfo
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
// 找到ConsumeQueue,如果找不到會(huì)創(chuàng)建一個(gè)ConsumeQueue
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
// 消息保存到consumeQueue中
cq.putMessagePositionInfoWrapper(dispatchRequest, checkMultiDispatchQueue(dispatchRequest));
}
保存consumeQueue存儲(chǔ)單元消息如下,主要分為下面三個(gè)步驟
- 將consumeQueue存儲(chǔ)單元
offset(8字節(jié))+消息長(zhǎng)度(4字節(jié))+tags的哈希碼(8字節(jié))保存到consumeQueue的緩存byteBufferIndex中 - 根據(jù)consumeQueue的offset找到MappedFile
- 將緩沖中的存儲(chǔ)單元存儲(chǔ)到MappedFile中
// org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
this.byteBufferIndex.flip();
// consumeQueue存儲(chǔ)單元的長(zhǎng)度
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
// 消息物理偏移量
this.byteBufferIndex.putLong(offset);
// 消息長(zhǎng)度
this.byteBufferIndex.putInt(size);
// 消息tags的哈希碼
this.byteBufferIndex.putLong(tagsCode);
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
// 獲取最后一個(gè)mappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
// 更新物理offset
this.maxPhysicOffset = offset + size;
// 數(shù)據(jù)保存到consumeQueue
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
CommitLogDispatcherBuildIndex源碼分析
除了CommitLogDispatcherBuildConsumeQueue,下面我們來分析在dispatcherList中另一個(gè)CommitLogDispatcher的實(shí)現(xiàn)類CommitLogDispatcherBuildIndex是如何將Index索引單元保存到IndexFile中的,存儲(chǔ)消息索引的核心邏輯如下所示。
- 獲取或者創(chuàng)建最新的IndexFile
- 將msgId構(gòu)建Index索引單元并保存到IndexFile中
- 將Message中的keys用空格分隔成key數(shù)組,并循環(huán)保存到indexFile中
public void buildIndex(DispatchRequest req) {
// 獲取或者創(chuàng)建最新索引文件,支持重試最多3次
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
// 獲取結(jié)束物理索引
long endPhyOffset = indexFile.getEndPhyOffset();
DispatchRequest msg = req;
// 獲取topic和keys
String topic = msg.getTopic();
String keys = msg.getKeys();
// 如果當(dāng)前消息的commitLogOffset小于當(dāng)前IndexFile的endPhyOffset時(shí),說明當(dāng)前消息已經(jīng)構(gòu)建過Index索引,因此直接返回
if (msg.getCommitLogOffset() < endPhyOffset) {
return;
}
// 獲取客戶端生成的uniqueId(msgId),代表客戶端生成的唯一一條消息
// 消息解密時(shí)生成的
if (req.getUniqKey() != null) {
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
}
// 客戶端傳遞的keys,消息是從keys屬性中獲取的
if (keys != null && keys.length() > 0) {
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR/*空格*/);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));
if (indexFile == null) {
return;
}
}
}
}
} else {
log.error("build index error, stop building index");
}
}
從上面源碼可知,保存消息的關(guān)鍵就在putKey方法中主要分為下面三個(gè)步驟
- 獲取要保存到IndexFile的keyHashCode(keyHash),hashSlot的絕對(duì)位置(absSlotPos),hash槽中的索引值(slotValue),保存消息時(shí)間差(timeDiff),索引的絕對(duì)位置(absIndexPos)等。
- 更新Index索引單元信息,keyHashCode(keyHash),消息在commitLog中的偏移量(phyOffset),消息存儲(chǔ)時(shí)間與索引文件開始存儲(chǔ)時(shí)間差(timeDiff),前置消息索引值(slotValue)
- 更新slots的IndexCount
- 更新IndexHeader中的indexCount,更新物理偏移量(phyoffset),最后存儲(chǔ)時(shí)間戳(sotreTimestamp)
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
// 索引數(shù)量小于2000W,否則說明當(dāng)前索引文件已經(jīng)滿了,不能添加索引
if (this.indexHeader.getIndexCount() < this.indexNum) {
// keyHashCode
int keyHash = indexKeyHashMethod(key);
// 索引槽位置
int slotPos = keyHash % this.hashSlotNum;
// 絕對(duì)位置
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
try {
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize/*哈希槽數(shù)量*哈希槽大小=500w*4*/
+ this.indexHeader.getIndexCount() * indexSize;
// 更新IndexFile索引單元信息
// keyHash(4)+消息在commitLog中的偏移量(8)+消息存儲(chǔ)時(shí)間-索引文件開始存儲(chǔ)時(shí)間(4)+前置消息索引值(4)
this.mappedByteBuffer.putInt(absIndexPos/*索引位置*/, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
// 更新slots的indexCount
this.mappedByteBuffer.putInt(absSlotPos/*hash槽的絕對(duì)位置*/, this.indexHeader.getIndexCount());
//...
// 更新IndexHeader信息
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
}
}
return false;
}
IndexFile如何解決Hash沖突
假設(shè)在IndexFile的索引IndexN的是一個(gè)keyHash為100的索引,如下圖所示,此時(shí)slots槽位100存儲(chǔ)著indexN的序號(hào),在IndexFile索引單元保存的數(shù)據(jù)keyHash=100,preIndexNo=0。

如果又有一個(gè)索引單元indexN+X的keyHashCode=100,保存消息時(shí)發(fā)現(xiàn)solt-100已經(jīng)指向了索引單元indexN,會(huì)將當(dāng)前索引單元IndxeN+X的preIndexNo更新為indexN,使得當(dāng)前索引單元indexN+X的前置索引單元指向indeNo,再更新slots-100槽位的值為indexN+X,保存完成后的索引關(guān)系如下圖所示。相當(dāng)于在slots槽位下面掛了index索引單元鏈表,根據(jù)key查找消息時(shí),可以根據(jù)key計(jì)算出keyHashCode,然后順著鏈表查詢鏈表中的消息。

總結(jié)
ConsumeQueue可以看成是消息消費(fèi)的索引,不同Topic的ConsumeQueue存儲(chǔ)到不同目錄中,默認(rèn)存儲(chǔ)在~/store/consumequeue/${topic}目錄中,其底層也是使用MappedFile,Broker會(huì)按照消息在CommitLog中的順序,異步轉(zhuǎn)發(fā)到ConsumeQueue中,每條消息在ConsumeQueue生成固定大小20字節(jié)的存儲(chǔ)單元指向CommitLog。
IndexFile保存著Producer發(fā)送消息keys中的索引,有了IndexFile就可以根據(jù)消息key快速找到消息。IndexFile的數(shù)據(jù)接口與HashMap類似,它使用鏈表的方式解決解決哈希沖突,并且使用頭插法將數(shù)據(jù)插入鏈表中。
以上就是RocketMQ ConsumeQueue與IndexFile實(shí)時(shí)更新機(jī)制源碼解析的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ 實(shí)時(shí)更新機(jī)制的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
通過idea創(chuàng)建Spring Boot項(xiàng)目并配置啟動(dòng)過程圖解
這篇文章主要介紹了通過idea創(chuàng)建Spring Boot項(xiàng)目并配置啟動(dòng)過程圖解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11
Java?Servlet響應(yīng)httpServletResponse過程詳解
HttpServletResponse是處理http響應(yīng)的對(duì)象,調(diào)用該對(duì)象的方法,設(shè)置到對(duì)象屬性的內(nèi)容,tomcat最終會(huì)組織為http響應(yīng)報(bào)文2022-02-02
深入理解springMVC中的Model和Session屬性
這篇文章主要介紹了深入理解springMVC中的Model和Session屬性,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12
Java實(shí)現(xiàn)文件點(diǎn)擊沒反應(yīng)的方法
jsp頁面鏈接,點(diǎn)擊訪問action用IO流去下載服務(wù)器上的文件,問題是任憑怎么點(diǎn)擊都沒反應(yīng),日志也不報(bào)錯(cuò)。這篇文章給大家介紹Java實(shí)現(xiàn)文件點(diǎn)擊沒反應(yīng)的方法,需要的朋友參考下吧2018-07-07
java中構(gòu)造方法及this關(guān)鍵字的用法實(shí)例詳解(超詳細(xì))
大家都知道,java作為一門內(nèi)容豐富的編程語言,其中涉及的范圍是十分廣闊的,下面這篇文章主要給大家介紹了關(guān)于java中構(gòu)造方法及this關(guān)鍵字用法的相關(guān)資料,需要的朋友可以參考下2022-04-04
關(guān)于java開發(fā)的性能問題總結(jié)(必看)
下面小編就為大家?guī)硪黄P(guān)于java開發(fā)的性能問題總結(jié)(必看)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-03-03

