kafka的消息存儲機制和原理分析
消息的保存路徑
消息發(fā)送端發(fā)送消息到 broker 上以后,消息是如何持久化的?
數(shù)據(jù)分片
kafka 使用日志文件的方式來保存生產(chǎn)者和發(fā)送者的消息,每條消息都有一個 offset 值來表示它在分區(qū)中的偏移量。
Kafka 中存儲的一般都是海量的消息數(shù)據(jù),為了避免日志文件過大,一個分片 并不是直接對應(yīng)在一個磁盤上的日志文件,而是對應(yīng)磁盤上的一個目錄,這個目錄的命名規(guī)則是<topic_name>_<partition_id>。
比如創(chuàng)建一個名為firstTopic的topic,其中有3個partition,那么在 kafka 的數(shù)據(jù)目錄(/tmp/kafka-log)中就有 3 個目錄,firstTopic-0~3
多個分區(qū)在集群中多個broker上的分配方法
1.將所有 N Broker 和待分配的 i 個 Partition 排序
2.將第 i 個 Partition 分配到第(i mod n)個 Broker 上
log分段
每個分片目錄中,kafka 通過分段的方式將 數(shù)據(jù) 分為多個 LogSegment,一個 LogSegment 對應(yīng)磁盤上的一個日志文件(00000000000000000000.log)和一個索引文件(如上:00000000000000000000.index),其中日志文件是用來記錄消息的。索引文件是用來保存消息的索引。
每個LogSegment 的大小可以在server.properties 中l(wèi)og.segment.bytes=107370 (設(shè)置分段大小,默認是1gb)選項進行設(shè)置。
segment 的 index file 和 data file 2 個文件一一對應(yīng),成對出現(xiàn),后綴".index"和“.log”分別表示為 segment 索引文件、數(shù)據(jù)文件.命名規(guī)則:partion 全局的第一個 segment從 0 開始,后續(xù)每個 segment 文件名為上一個 segment文件最后一條消息的 offset 值進行遞增。數(shù)值最大為 64 位long 大小,20 位數(shù)字字符長度,沒有數(shù)字用 0 填充
第一個 log 文件的最后一個 offset 為:5376,所以下一個segment 的文件命名為: 0000000000000005376.log。
對應(yīng)的 index 為 00000000000000005376.index
kafka 這種分片和分段策略,避免了數(shù)據(jù)量過大時,數(shù)據(jù)文件文件無限擴張帶來的隱患,更有助于消息文件的維護以及被消費的消息的清理。
日志和索引文件內(nèi)容分析
通過下面這條命令可以看到 kafka 消息日志的內(nèi)容
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log
輸出結(jié)果為:
offset: 5376 position: 102124 CreateTime: 1531477349287isvalid: true keysize: -1 valuesize: 12 magic: 2compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: []payload: message_5376
可以看到一條消息,會包含很多的字段,如下:
offset: 5371 position: 102124 CreateTime: 1531477349286isvalid: true keysize: -1 valuesize: 12 magic: 2compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: []payload: message_5371
各字段的意義:
offset
:記錄號 ;position
:偏移量;createTime
:創(chuàng)建時間、keysize
和valuesize
表示key
和value
的大小compresscodec
:表示壓縮編碼payload
:表示消息的具體內(nèi)容
為了提高查找消息的性能,kafka為每一個日志文件添加 了2 個索引文件:OffsetIndex 和 TimeIndex,分別對應(yīng)*.index以及*.timeindex, *.TimeIndex 是映射時間戳和相對 offset的文件
查看索引內(nèi)容命令:
?sh ?kafka-run-class.shkafka.tools.DumpLogSegments ?--files ?/tmp/kafka-logs/test-0/00000000000000000000.index --print-data-log
索引文件和日志文件內(nèi)容關(guān)系如下
如上圖所示,index 文件中存儲了索引以及物理偏移量。
log 文件存儲了消息的內(nèi)容。
索引文件中保存了部分offset和偏移量position的對應(yīng)關(guān)系。
比如 index文件中 [4053,80899],表示在 log 文件中,對應(yīng)的是第 4053 條記錄,物理偏移量(position)為 80899.
在 partition 中通過 offset 查找 message過程
- 根據(jù) offset 的值,查找 segment 段中的 index 索引文件。由于索引文件命名是以上一個文件的最后一個offset 進行命名的,所以,使用二分查找算法能夠根據(jù)offset 快速定位到指定的索引文件
- 找到索引文件后,根據(jù) offset 進行定位,找到索引文件中的匹配范圍的偏移量position。(kafka 采用稀疏索引的方式來提高查找性能)
- 得到 position 以后,再到對應(yīng)的 log 文件中,從 position處開始查找 offset 對應(yīng)的消息,將每條消息的 offset 與目標(biāo) offset 進行比較,直到找到消息
比如說,我們要查找 offset=2490 這條消息,那么先找到00000000000000000000.index, 然后找到[2487,49111]這個索引,再到 log 文件中,根據(jù) 49111 這個 position 開始查找,比較每條消息的 offset 是否大于等于 2490。最后查找到對應(yīng)的消息以后返回
日志的清除策略以及壓縮策略
日志的清理策略有兩個
- 根據(jù)消息的保留時間,當(dāng)消息在 kafka 中保存的時間超過了指定的時間,就會觸發(fā)清理過程
- 根據(jù) topic 存儲的數(shù)據(jù)大小,當(dāng) topic 所占的日志文件大小大于一定的閥值,則可以開始刪除最舊的消息。
通過 log.retention.bytes 和 log.retention.hours 這兩個參數(shù)來設(shè)置,當(dāng)其中任意一個達到要求,都會執(zhí)行刪除。默認的保留時間是:7 天
kafka會啟動一個后臺線程,定期檢查是否存在可以刪除的消息。
日志壓縮策略
Kafka 還提供了“日志壓縮(Log Compaction)”功能,通過這個功能可以有效的減少日志文件的大小,緩解磁盤緊張的情況,在很多實際場景中,消息的 key 和 value 的值之間的對應(yīng)關(guān)系是不斷變化的,就像數(shù)據(jù)庫中的數(shù)據(jù)會不斷被修改一樣,消費者只關(guān)心 key 對應(yīng)的最新的 value。
因此,我們可以開啟 kafka 的日志壓縮功能,服務(wù)端會在后臺啟動Cleaner線程池,定期將相同的key進行合并,只保留最新的 value 值。日志的壓縮原理如下圖:
消息寫入的性能
順序?qū)?/h3>
我們現(xiàn)在大部分企業(yè)仍然用的是機械結(jié)構(gòu)的磁盤,如果把消息以隨機的方式寫入到磁盤,那么磁盤首先要做的就是尋址,也就是定位到數(shù)據(jù)所在的物理地址,在磁盤上就要找到對應(yīng)的柱面、磁頭以及對應(yīng)的扇區(qū);
這個過程相對內(nèi)存來說會消耗大量時間,為了規(guī)避隨機讀寫帶來的時間消耗,kafka 采用順序?qū)懙姆绞酱鎯?shù)據(jù)。
零拷貝
即使采用順序?qū)?,但是頻繁的 I/O 操作仍然會造成磁盤的性能瓶頸,所以 kafka還有一個性能策略:零拷貝
消息從發(fā)送到落地保存,broker 維護的消息日志本身就是文件目錄,每個文件都是二進制保存,生產(chǎn)者和消費者使用相同的格式來處理。
在消費者獲取消息時,服務(wù)器先從硬盤讀取數(shù)據(jù)到內(nèi)存,然后把內(nèi)存中的數(shù)據(jù)原封不動的通過 socket 發(fā)送給消費者。
雖然這個操作描述起來很簡單,但實際上經(jīng)歷了很多步驟。如下:
- 操作系統(tǒng)將數(shù)據(jù)從磁盤讀入到內(nèi)核空間的頁緩存
- 應(yīng)用程序?qū)?shù)據(jù)從內(nèi)核空間讀入到用戶空間緩存中
- 應(yīng)用程序?qū)?shù)據(jù)寫回到內(nèi)核空間到 socket 緩存中
- 操作系統(tǒng)將數(shù)據(jù)從 socket 緩沖區(qū)復(fù)制到網(wǎng)卡緩沖區(qū),以便將數(shù)據(jù)經(jīng)網(wǎng)絡(luò)發(fā)出
這個過程涉及到 4 次上下文切換以及 4 次數(shù)據(jù)復(fù)制,并且有兩次復(fù)制操作是由 CPU 完成。但是這個過程中,數(shù)據(jù)完全沒有進行變化,僅僅是從磁盤復(fù)制到網(wǎng)卡緩沖區(qū)。通過“零拷貝”技術(shù),可以去掉這些沒必要的數(shù)據(jù)復(fù)制操作,同時也會減少上下文切換次數(shù)。
現(xiàn)代的 unix 操作系統(tǒng)提供一個優(yōu)化的代碼路徑,用于將數(shù)據(jù)從頁緩存?zhèn)鬏數(shù)?socket;在 Linux 中,是通過 sendfile 系統(tǒng)調(diào)用來完成的。
Java 提供了訪問這個系統(tǒng)調(diào)用的方法:FileChannel.transferTo API。
使用 sendfile,只需要一次拷貝就行,允許操作系統(tǒng)將數(shù)據(jù)直接從頁緩存發(fā)送到網(wǎng)絡(luò)上。
所以在這個優(yōu)化的路徑中,只有最后一步將數(shù)據(jù)拷貝到網(wǎng)卡緩存中是需要的
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Spring常用注解及http數(shù)據(jù)轉(zhuǎn)換教程
這篇文章主要為大家介紹了Spring常用注解及http數(shù)據(jù)轉(zhuǎn)換原理以及接收復(fù)雜嵌套對象參數(shù)與Http數(shù)據(jù)轉(zhuǎn)換的原理,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步2022-03-03基于eclipse.ini內(nèi)存設(shè)置的問題詳解
本篇文章是對eclipse.ini內(nèi)存設(shè)置的問題進行了詳細的分析介紹,需要的朋友參考下2013-05-05Java中BigDecimal,DateFormatter?和迭代器的"陷阱"
這篇文章主要介紹了Java中BigDecimal,DateFormatter?和迭代器的"陷阱",文章圍繞主題展開詳細的內(nèi)容介紹,感興趣的小伙伴可以參考一下2022-06-06mybatis動態(tài)SQL?if的test寫法及規(guī)則詳解
這篇文章主要介紹了mybatis動態(tài)SQL?if的test寫法及規(guī)則詳解,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01AsyncHttpClient?ClientStats源碼流程解讀
這篇文章主要為大家介紹了AsyncHttpClient?ClientStats源碼流程解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-12-12解決啟用 Spring-Cloud-OpenFeign 配置可刷新項目無法啟動的問題
這篇文章主要介紹了解決啟用 Spring-Cloud-OpenFeign 配置可刷新項目無法啟動的問題,本文重點給大家介紹Spring-Cloud-OpenFeign的原理及問題解決方法,需要的朋友可以參考下2021-10-10學(xué)習(xí)Java之如何正確地向上轉(zhuǎn)型與向下轉(zhuǎn)型
面向?qū)ο蟮牡谌齻€特征是多態(tài),實現(xiàn)多態(tài)有三個必要條件:繼承、方法重寫和向上轉(zhuǎn)型,在學(xué)習(xí)多態(tài)之前,我們還要先學(xué)習(xí)Java的類型轉(zhuǎn)換,本篇文章就來帶大家認識什么是類型轉(zhuǎn)換,看看類型轉(zhuǎn)換都有哪幾種情況,以及如何避免類型轉(zhuǎn)換時出現(xiàn)異常2023-05-05