RocketMQ?Broker如何保存消息源碼解析
前言
前面我們介紹了RocketMQ是如何接收消息的,下面我們來介紹Broker是如何保存消息的。
消息存儲格式總覽
Broker消息存儲主要包括CommitLog,ConsumerQueue和Index三個部分。

- CommitLog
CommitLog主要用于消息存儲,所有topic的消息按順序都存儲在CommitLog中。
- ConsumerQueue
ConsumerQueue對應消費隊列,消息存儲到CommitLog后,會異步轉(zhuǎn)發(fā)到ConsumerQueue文件中
- Index
消息索引,只要存儲消息key與offset的關系
CommitLog介紹
CommitLog是消息和消息數(shù)據(jù)存儲的主體,CommitLog存儲的文件目錄在${user.home}/store/commitlog中,它其實是一個目錄,消息并不是直接存儲在CommitLog中,而是存儲在由20位數(shù)字構成的文件中。

MappedFile詳解
commitlog文件夾中文件單元是MappedFile,我們可以把MappedFile理解成一個文件管理的工具,如果需要將數(shù)據(jù)存儲到磁盤,或者快速查找數(shù)據(jù),都可以通過MappedFile。
每個MappedFile文件大小默認是1GB,文件名是由20位數(shù)字構成,文件名其實是MappedFile的起始偏移量,如果偏移量不足20位,則將偏移量的左邊補0。上圖中MappedFile的文件名是00000000000000000000,它代表的是CommitLog中的第一個文件,由于每個MappedFile文件大小是1GB,因此第二個文件的偏移量為1024*1024*1024(1GB),計算后的結(jié)果為1073741824,因此第二個文件的文件名為00000000001073741824,可依此類推其他文件的文件名。
消息存儲格式介紹
消息在commitLog中存儲的格式如下所示

- totalSize
消息總長度,4字節(jié)
- magicCode
魔數(shù),4字節(jié),固定值十六進制是0xdaa320a7,10進制是-875286124
- bodyCRC
消息體crc校驗碼,4字節(jié)
- queueId
消息隊列id,4字節(jié)
- flag
消息標記,RocketMQ不做處理,默認4字節(jié)
- queueOffset
消息在ConsumeQueue文件中的物理偏移量,默認8字節(jié)
- physicalOffset
消息在CommitLog文件中的物理偏移量,默認8字節(jié)
- sysFlag
消息系統(tǒng)標記,例如是否壓縮、是否是事務消息等,4字節(jié)
- bornTimestamp
消息生產(chǎn)者調(diào)用消息API的時間戳,8字節(jié)
- bornHost
BORNHOST 消息生產(chǎn)者IP和端口號,8字節(jié)
- storeTimestamp
消息存儲時間戳,8字節(jié)
- storeHostAddress
STOREHOSTADDRESS 消息存儲Broker的IP和端口號,8字節(jié)
- reconsumeTimes
消息重試次數(shù) 4字節(jié)
- Prepared Transaction Offset
事務消息偏移量,8字節(jié)
- bodyLength
消息體長度,4字節(jié)
- body
消息體內(nèi)容,它是變長的,長度為bodyLength中存儲的值
- TopicLength
topicLength表示topic占用的長度,topicLength占用1字節(jié),也就是255,也就是說topic長度最長不能超過255字節(jié)
- Topic
topic是消息主題名稱,topic是變長的,實際占用topicLength字節(jié)
- PropertiesLength
propertiesLength表示properties占用的長度,propertiesLength占用2字節(jié),也就是說properties長度最長不超過65536字節(jié)
- Properties
properties是消息屬性,properties是變長的,實際占用propertiesLength字節(jié)
DefaultMessageStore介紹
Broker保存消息是通過消息存儲默認實現(xiàn)類org.apache.rocketmq.store.DefaultMessageStore執(zhí)行的,它是Broker存儲模塊中最最最重要的一個類,提供了很多存儲文件的API。DefaultMessageStore中和消息存儲相關的屬性如下所示,
// 消息存儲配置
private final MessageStoreConfig messageStoreConfig;
// CommitLog文件的存儲實現(xiàn)類
private final CommitLog commitLog;
// 消息隊列存儲緩存表,key是topic
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
// MappedFile分配服務
private final AllocateMappedFileService allocateMappedFileService;
// 直接內(nèi)存暫存池
private final TransientStorePool transientStorePool;
// broker狀態(tài)管理器
private final BrokerStatsManager brokerStatsManager;
// 鎖文件
// 目錄: ${user.home}/store/lock
private RandomAccessFile lockFile;
消息存儲源碼分析
發(fā)送消息存儲流程
發(fā)送消息存儲的入口函數(shù)是DefaultMessageStore#asyncPutMessage,它主要分為下面三步
- 存儲狀態(tài)校驗
- 校驗消息存儲服務是否關閉,當前Broker是否是從節(jié)點,queue是否可寫
- 消息校驗
- 校驗topic名稱長度是否超過了127字節(jié)和property長度是否超過了32767
- 將消息保存到commitLog
// org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
// 1. 存儲狀態(tài)校驗
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
}
// 2. 校驗topic名稱和property長度
PutMessageStatus msgCheckStatus = this.checkMessage(msg);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
}
// ...
long beginTime = this.getSystemClock().now();
// 3. 保存到commitLog
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
//...
return putResultFuture;
}
CommitLog#asyncPutMessage保存消息
CommitLog#asyncPutMessage保存消息可以分為三個階段
- 消息預處理階段
- 消息保存階段
- 消息保存結(jié)果處理階段
消息預處理階段
消息預處理階段可以分為下面三個步驟
- 設置消息存儲時間戳和消息體CSC32信息
- 如果是延遲消息,則設置延遲信息
如果是非事務消息或者是提交的事務消息,并且設置了消息的延遲級別,說明當前消息是延遲消息,Broker在處理延遲消息時會將消息投遞到名為SCHEDULE_TOPIC_XXXX的Topic。在消息預處理的階段,會先將當前消息的topic設置為SCHEDULE_TOPIC_XXXX,queueId設置為延遲級別-1,并且將原來的Topic和queueId設置到消息的REAL_TOPIC和REAL_QID屬性中。
- 設置ip及構建存儲消息上下文
// org.apache.rocketmq.store.CommitLog#asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// 1. 設置消息存儲時間戳和消息體CSC32信息
msg.setStoreTimestamp(System.currentTimeMillis()); // 設置消息存儲時間
msg.setBodyCRC(UtilAll.crc32(msg.getBody())); // 設置消息體CRC32校驗值
// 2. 如果是非事務消息,或者是事務提交消息,判斷是否是是否是延遲消息,如果是延遲消息則設置延遲相關信息
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
// 如果延遲級別>0,說明是延遲消息
if (msg.getDelayTimeLevel() > 0) {
// 如果大于最大的延遲級別,則取最大的延遲級別
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 消息topic改成延遲消息topic(SCHEDULE_TOPIC_XXXX)
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
// 延遲topic的queueId:延遲級別-1
int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// 消息屬性中設置真實的QueueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 把SCHEDULE_TOPIC_XXXX設置為當前消息的topic,消息先投遞到這個隊列中
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
// 3. 設置ip并構建存儲消息上下文信息
msg.setBornHostV6Flag(); // 如果producer的ip是IpV6,則設置生產(chǎn)者IpV6 flag
msg.setStoreHostAddressV6Flag(); // 如果如果broker的ip是IpV6,則設置BrokerIpV6 flag
PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
// 構建存消息上下文
PutMessageContext putMessageContext = new PutMessageContext(/*key值:topic-queueId*/generateKey(putMessageThreadLocal.getKeyBuilder()/*StringBuilder*/, msg));
// ... 省略部分代碼
}
消息保存階段
消息保存階段可以分為如下步驟
- 獲取消息保存鎖
- 獲取最新的mappedFile
獲取MappedFile調(diào)用的是MappedFileQueue中的方法,獲取最新的MappedFile
- 如果最新的mappedFile為空或者已經(jīng)滿了,則創(chuàng)建新的MappedFile
- 將消息保存的mappedFile中
- 處理消息保存結(jié)果
- 釋放消息保存鎖
// org.apache.rocketmq.store.CommitLog#asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// ... 省略部分代碼
// 1. 消息保存鎖,默認是ReentrantLock互斥鎖
putMessageLock.lock();
try {
// 2. 獲取最新的mappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 3. 如果獲取到的mappedFile是null說明之前沒有存儲消息
// 如果mappedFile滿了,說明需要創(chuàng)建一個新的MappedFile
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
}
// 如果創(chuàng)建mappedFile失敗,則返回異常信息
if (null == mappedFile) {
// 創(chuàng)建mappedFile失敗
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
// 4. 將消息保存的mappedFile中
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
// 5. 處理消息保存結(jié)果
switch (result.getStatus()) {
case PUT_OK:
break;
// mappedFile滿了,重新創(chuàng)建mappedFile后再寫入消息
case END_OF_FILE:
unlockMappedFile = mappedFile;
// 創(chuàng)建一個新的文件,然后重新寫入
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
//...
// 寫消息
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
break;
// ...
}
} finally {
// 6. 釋放鎖
putMessageLock.unlock();
}
// ... 省略部分代碼
}
上面第4步MappedFile#appendMessage邏輯主要有三步
- 獲取當前寫文件位置
如果寫指針小于文件大小,則對消息進行追加處理
獲取寫緩沖
調(diào)用AppendMessageCallback的doAppend將消息寫到內(nèi)存緩沖中
回調(diào)函數(shù)doAppend方法分為單條處理邏輯和批量消息處理邏輯,下面僅展示了單條消息處理邏輯
- 消息保存完成后會更新當前寫文件的位置和消息保存時間戳
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
// 獲取當前寫文件位置
int currentPos = this.wrotePosition.get();
// 如果寫文件位置小于文件size
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
// 如果是單條消息
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos/*文件長度-當前寫位置,可以寫的長度*/,(MessageExtBrokerInner) messageExt, putMessageContext);
}
//...
// 更新當前寫文件位置和消息保存時間戳
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
}
上面保存消息回調(diào)函數(shù)中的doAppend實際調(diào)用的是CommitLog中內(nèi)部類DefaultAppendMessageCallback的doAppend方法,這里大致可以分為下面幾個步驟
- 獲取消息物理偏移量,并且創(chuàng)建消息id生成器,從topicQueueTable中獲取Queue的最大相對便宜量。
消息id的格式如下所示,它由ip,端口和消息偏移量公共構成,長度是16字節(jié),為了保證消息的可讀性,返回給應用程序的Id轉(zhuǎn)成了字符串。
消息id這么設計的原因是可以根據(jù)消息id快速找到broker的IP,端口,以及消息在的物理偏移量,通過它可以快速找到消息

- 如果消息長度加上消息結(jié)束符(8字節(jié))大于maxBlank,則表示該mappedFile已經(jīng)沒有足夠的空間保存該消息了,那么就會將消息結(jié)束符寫入緩沖中,并返回
END_OF_FILE,mappedFile消息結(jié)束符如下所示

- 如果空間足夠,將queue的相對偏移量,物理偏移量,sysflag,消息創(chuàng)建時間,消息創(chuàng)建ip,消息保存時間及消息體等按照上面消息格式保存到緩沖中。
- 創(chuàng)建AppendMessageResult對象并返回,它包括消息追加狀態(tài)、消息寫入物理偏移量、消息寫入長度、消息ID生成器、消息開始追加的時間戳、消息隊列偏移量、消息開始寫入的時間戳等屬性。
// org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend
public AppendMessageResult doAppend(final long fileFromOffset/*消息文件起始偏移量*/, final ByteBuffer byteBuffer, final int maxBlank/*文件可寫長度*/,final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
// 1. 物理offset,文件起始offset+寫offset
long wroteOffset = fileFromOffset + byteBuffer.position();
// 創(chuàng)建消息id supplier
Supplier<String> msgIdSupplier = () -> {
int sysflag = msgInner.getSysFlag();
int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
return UtilAll.bytes2string(msgIdBuffer.array());
};
// topic-ququeId
String key = putMessageContext.getTopicQueueTableKey();
// 獲取消息queue offset
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
// 如果queueOffset是null,則將其置0
if (null == queueOffset) {
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
// 獲取寫緩沖
ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
final int msgLen = preEncodeBuffer.getInt(0);
// 2. 判斷空間是否足夠,如果剩余空間不足,則保存TOTAL+MAGICCODE之后,返回BLANK_MAGIC_CODE
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.msgStoreItemMemory.clear();
// 1 TOTALSIZE 寫消息總長度
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE 寫魔數(shù)
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
return new AppendMessageResult(/*...*/);
}
int pos = 4/*totalSize*/ + 4/*magicCode*/ + 4/*bodyCRC*/ + 4/*queueId*/ + 4/*flag*/;
// set隊列的offset,
preEncodeBuffer.putLong(pos, queueOffset);
pos += 8;
// 設置物理offset: 文件起始offset+當前文件寫消息的offset
preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
// set 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
pos += 8 + 4 + 8 + ipLen;
// 設置存儲消息ip地址
preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
// 寫消息到隊列緩沖
byteBuffer.put(preEncodeBuffer);
msgInner.setEncodedBuff(null);
// 4. 返回消息保存結(jié)果
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
return result;
}
消息保存結(jié)果處理階段
消息保存結(jié)果處理階段主要包括下面三個
- 提交刷盤請求
如果是同步刷盤,則會創(chuàng)建刷盤請求并返回CompleteFuture,如果是異步刷盤,則會喚醒刷盤服務,然后返回消息保存成功的CompleteFuture
- 提交消息復制請求
如果是同步復制,則創(chuàng)建消息同步請求然后返回CompleteFuture,如果是異步復制則直接放回消息保存成功的CompleteFuture
- 合并提交刷盤請求和提交消息復制請求
CompleteFuture#thenCombine是將兩個CompleteFuture(提交刷盤請求,提交消息復制請求)組合起來,等提交刷盤請求和提交消息復制請求都執(zhí)行完了之后再執(zhí)行后續(xù)任務
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// ... 省略部分代碼
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// 1. 提交刷盤請求
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
// 2. 提交復制請求
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
// 3. 合并提交刷盤請求和提交復制請求結(jié)果
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(flushStatus);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
}
return putMessageResult;
});
}
總結(jié)
消息保存到commitLog實際上是保存到byteBuffer中,消息是在回調(diào)結(jié)果時根據(jù)配置決定同步/異步刷盤以及同步/異步同步到從節(jié)點。消息在這個階段也并不會將消息分發(fā)到comsumeQueue以及Index中。
以上就是RocketMQ | 源碼分析】Broker是如何保存消息的?的詳細內(nèi)容,更多關于RocketMQ Broker保存消息的資料請關注腳本之家其它相關文章!
相關文章
SpringBoot結(jié)合Redis配置工具類實現(xiàn)動態(tài)切換庫
本文主要介紹了SpringBoot結(jié)合Redis配置工具類實現(xiàn)動態(tài)切換庫,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-08-08
Java+Springboot搭建一個在線網(wǎng)盤文件分享系統(tǒng)
本主要介紹了通過springboot+freemark+jpa+MySQL實現(xiàn)的在線網(wǎng)盤文件分享系統(tǒng),其功能跟百度網(wǎng)盤非常類似,可以實現(xiàn)文件的上傳、移動、復制、下載等,需要的可以參考一下2021-11-11
Java通過PropertyDescriptor反射調(diào)用set和get方法
這篇文章主要為大家詳細介紹了Java通過PropertyDescriptor反射調(diào)用set和get方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-03-03
Java實現(xiàn)作業(yè)調(diào)度的示例代碼
這篇文章主要為大家詳細介紹了如何利用Java實現(xiàn)SJF算法調(diào)度,要求測試數(shù)據(jù)可以隨即輸入或從文件中讀入,文中的示例代碼講解詳細,需要的可以參考一下2023-04-04

