從log4j2到Disruptor詳解
- log4j2實(shí)現(xiàn)原理可查看://chabaoo.cn/article/232602.htm
- 文章同樣基于log4j-2.7版本,disruptor-3.3.6
相信看過(guò)log4j2的源碼后大家應(yīng)該明白為什么第二代日志性能會(huì)提升那么多,這其中最大的功臣莫過(guò)于Disruptor并發(fā)編程框架。
下面我們就跟著log4j2來(lái)走進(jìn)Disruptor這個(gè)神奇的框(wang)架(zhan)
log4j2異步日志簡(jiǎn)要回顧
從日志工廠(Log4jLoggerFactory)中獲取日志Logger實(shí)例
從日志上下文工廠(Log4jContextFactory)獲取日志上下文
啟用日志上下文(AsyncLoggerContext)
啟動(dòng)Disruptor(AsyncLoggerDisruptor)
序列號(hào)屏障(ProcessingSequenceBarrier)等待序列號(hào)發(fā)布
等待策略(WaitStrategy)等待序列號(hào)
返回Logger等待序列號(hào)(即等待日志寫(xiě)入)
異步日志(AsyncLogger)寫(xiě)入
日志內(nèi)容與轉(zhuǎn)化者(RingBufferLogEventTranslator)綁定
Disruptor嘗試發(fā)布轉(zhuǎn)化者tryPublish
RingBuffer嘗試發(fā)布事件tryPublishEvent
獲取下一個(gè)可用序號(hào)
轉(zhuǎn)化并發(fā)布序號(hào),日志與序號(hào)對(duì)應(yīng)的事件綁定,并發(fā)布序號(hào)
RingBuffer發(fā)布序號(hào),MultiProducerSequencer發(fā)布
等待策略(waitStrategy)喚醒阻塞
Disruptor在log4j2中的應(yīng)用
AsyncLoggerDisruptor
異步日志Disruptor啟動(dòng)
創(chuàng)建事件工廠EventFactory
計(jì)算ringBufferSize:AsyncLogger.RingBufferSize屬性
創(chuàng)建等待策略:AsyncLogger.WaitStrategy屬性
創(chuàng)建守護(hù)線程執(zhí)行器executor
創(chuàng)建異步隊(duì)列滿時(shí)處理策略AsyncQueueFullPolicy(非Disruptor步驟)
創(chuàng)建Disruptor
- 創(chuàng)建RingBuffer與Disruptor綁定
- RingBuffer根據(jù)生產(chǎn)者類型創(chuàng)建對(duì)應(yīng)的實(shí)例,例如多生產(chǎn)者:MultiProducerSequencer
- 創(chuàng)建多生產(chǎn)者序號(hào)(bufferSize,waitStrategy)
綁定異常句柄(Disruptor.handleExceptionsWith)
綁定事件處理句柄(Disruptor.handleEventsWith)
- 根據(jù)handle列表創(chuàng)建事件處理器createEventProcessors
- RingBuffer為Sequence(MultiProducerSequencer)序列創(chuàng)建序列屏障ProcessingSequenceBarrier
- 創(chuàng)建事件批處理器BatchEventProcessor
- 為事件批處理器綁定異常處理句柄
- 消費(fèi)者倉(cāng)庫(kù)(consumerRepository)添加消費(fèi)者,創(chuàng)建事件處理信息EventProcessorInfo添加至消費(fèi)者信息列表consumerInfos
- RingBuffer添加處理序列號(hào)列表processorSequences為序列號(hào)閘
- 如果存在序列號(hào)屏障,從閘門(mén)中移除屏障序列號(hào)并標(biāo)識(shí)endOfChain為false
啟動(dòng)Disruptor
- 遍歷消費(fèi)者倉(cāng)庫(kù)放入執(zhí)行器中執(zhí)行消費(fèi)者EventProcessorInfo
- 啟動(dòng)事件批處理器BatchEventProcessor
- 事件批處理器序列號(hào)自增1
- 死循環(huán)
- 序列號(hào)屏障ProcessingSequenceBarrier等待下個(gè)有效序列號(hào),默認(rèn)為超時(shí)等待策略,超時(shí)會(huì)繼續(xù)下輪循環(huán)
- 事件批處理器序列號(hào)如果小于等于有效序列號(hào)
- 從RingBuffer中按照序列號(hào)獲取event事件
- 通知回調(diào)事件句柄eventHandler.onEvent如果當(dāng)前消費(fèi)下標(biāo)等于有效序列號(hào)availableSequence說(shuō)明是當(dāng)前批次的最后一個(gè)消息,endOfBatch為true:eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
- 事件批處理器序列號(hào)設(shè)置為有效序列號(hào)
異步日志Disruptor寫(xiě)入
嘗試發(fā)布tryPublish事件轉(zhuǎn)化器EventTranslator:RingBufferLogEventTranslator
Disruptor獲取RingBuffer嘗試發(fā)布事件tryPublishEvent
序列號(hào)獲取下個(gè)有效序號(hào),步進(jìn)為1,例如:MultiProducerSequencer.tryNext
游標(biāo)按照步進(jìn)移動(dòng)
判斷是否有足夠的空間,沒(méi)有則拋出InsufficientCapacityException異常
返回有效序列號(hào)
轉(zhuǎn)化器轉(zhuǎn)化消息為對(duì)應(yīng)有效序列號(hào)的事件放入entries
發(fā)布序列號(hào)
- 設(shè)置有效序列號(hào)至緩存availableBuffer
- 等待策略喚醒阻塞waitStrategy.signalAllWhenBlocking
架構(gòu)及流程
紅色數(shù)字標(biāo)識(shí)流程為獲取logger時(shí)Disruptor創(chuàng)建消費(fèi)者流程
黑色數(shù)字標(biāo)識(shí)流程為logger寫(xiě)入日志時(shí)Disruptor創(chuàng)建事件并通知消費(fèi)者流程
RingBuffer對(duì)于所有消費(fèi)者、生產(chǎn)者是同一個(gè)實(shí)例
- 環(huán)形隊(duì)列,dataProvide,數(shù)據(jù)的存儲(chǔ)與提供者
Sequencer:生產(chǎn)者
- 對(duì)于所有消費(fèi)者、生產(chǎn)者(可能是多生產(chǎn)者序列類型對(duì)于Multi類型)是同一個(gè)實(shí)例,包含一個(gè)游標(biāo)序列號(hào)Sequence
SequenceBarrier:序列號(hào)屏障
- 對(duì)于所有消費(fèi)者、生產(chǎn)者也是同一個(gè)實(shí)例,序列號(hào)屏障包含一個(gè)等待策略、一個(gè)RingBuffer引用、一個(gè)游標(biāo)序列號(hào)、一個(gè)依賴序列號(hào)(可能是組序列號(hào)類型)
BatchEventProcessor:消費(fèi)者
- 消費(fèi)者包含一個(gè)RingBuffer引用
- 一個(gè)序列號(hào)屏障,可以包含多個(gè)屏障序列號(hào),默認(rèn)為0個(gè)則使用RingBuffer的MultiProducerSequencer的游標(biāo)序列號(hào)Sequence
- 一個(gè)EventHandler:RingBufferLogEventHandler
- 遍歷EventHandler列表將其封裝為BatchEventProcessor,將其與原始eventHandler、barrier屏障注冊(cè)至消費(fèi)者資源庫(kù)consumerRepository。
- 獲取batchEventProcessor序列號(hào)默認(rèn)為-1,將其緩存至processorSequences標(biāo)識(shí)正在處理,并將processorSequences、disruptor、consumerRepository綁定至EventHandlerGroup。Disruptor啟動(dòng)遍歷消費(fèi)者資源庫(kù)啟動(dòng)消費(fèi)者:BatchEventProcessor
消費(fèi)者入口
- 消費(fèi)者消費(fèi)前先自增本地序列號(hào)(即-1+1=0序號(hào)),向序列號(hào)屏障申請(qǐng)?jiān)撔蛄刑?hào)的消費(fèi),默認(rèn)為T(mén)imeout策略申請(qǐng)。
- 屏障收到申請(qǐng)waitFor序列號(hào),當(dāng)前屏障游標(biāo)序列號(hào)小于申請(qǐng)的消費(fèi)序列號(hào),等待生產(chǎn)者生產(chǎn)至當(dāng)前序列號(hào),如果超時(shí)則拋出異常(本地序列號(hào)不更新繼續(xù)重試);如果沒(méi)有超時(shí),將屏障的dependentSequence序列號(hào)(如果不是非多序列號(hào)屏障類型,log4j2使用的是非多序列號(hào)屏障,則是屏障的本地游標(biāo))賦值為availableSequence返回。
- 如果availableSequence有效的序列號(hào)(即屏障的游標(biāo)序列號(hào))小于申請(qǐng)要消費(fèi)的序列號(hào)直接返回availableSequence(即消費(fèi)超出的生產(chǎn)的速度,消費(fèi)者申請(qǐng)的序列號(hào)向后回移至有效序列號(hào))。否則getHighestPublishedSequence判斷申請(qǐng)的序列號(hào)至availableSequence序列號(hào)之間的每個(gè)序列號(hào)對(duì)應(yīng)的消息事件均是有效的則返回有效序列號(hào)(即生產(chǎn)者生產(chǎn)很快,消費(fèi)者申請(qǐng)消費(fèi)的序列號(hào)很小,向前移動(dòng)至有效的,可能是本身也可能會(huì)跳躍多個(gè)下標(biāo)),根據(jù)生產(chǎn)者的availableBuffer判斷是否有效,因?yàn)樯a(chǎn)者先發(fā)布序列號(hào)再寫(xiě)入數(shù)據(jù),此處避免了讀取數(shù)據(jù)異常,如果數(shù)據(jù)沒(méi)有寫(xiě)入,有效序列號(hào)緩存標(biāo)識(shí)沒(méi)有寫(xiě)入(即無(wú)效),消費(fèi)者會(huì)進(jìn)行剛剛所說(shuō)的“重試”,如果之間存在無(wú)效序列號(hào)則返回申請(qǐng)序列號(hào)-1(即回滾一個(gè)值,進(jìn)入邏輯時(shí)增加了一個(gè)值,也就是回滾至申請(qǐng)前的點(diǎn),可以理解為與超時(shí)相同,即重試)
- 如果申請(qǐng)的序列號(hào)小于等于有效的序列號(hào),則消費(fèi)序列號(hào)對(duì)應(yīng)的消息事件并更新本地BatchEventProcessor的序列號(hào),按照下標(biāo)去dataProvide(RingBuffer.entries)中提取對(duì)應(yīng)位置的數(shù)據(jù)消費(fèi)
- 如果申請(qǐng)的序列號(hào)大于有效的序列號(hào),則將消費(fèi)者本地序列號(hào)設(shè)置為有效序列號(hào)(即消費(fèi)超出的生產(chǎn)的速度,消費(fèi)的序列號(hào)向后回移)
- 如果期間出現(xiàn)任何未catch住的異常則會(huì)跳過(guò)當(dāng)前下標(biāo),異常出現(xiàn)時(shí)的下標(biāo)及對(duì)應(yīng)的事件會(huì)交由exceptionHandler處理,默認(rèn)為AsyncLoggerDefaultExceptionHandler異步處理,會(huì)將異常事件輸出至系統(tǒng)的標(biāo)準(zhǔn)錯(cuò)誤管道,雖然是異步也是會(huì)占用消費(fèi)者線程池資源
Disruptor:生產(chǎn)者入口
- 獲取RingBuffer嘗試發(fā)布消息,生產(chǎn)者(例如:MultiProducerSequencer)
- 生產(chǎn)者游標(biāo)序列號(hào)嘗試自增,判斷當(dāng)前是否有足夠的空間,當(dāng)前游標(biāo)+步進(jìn)-bufferSize是否大于最小的閘門(mén)序列號(hào)(gatingSequences,即:所有消費(fèi)者的本地游標(biāo)序列號(hào)processorSequences列表),最小序列號(hào)會(huì)緩存至本地gatingSequenceCache用于下次判斷減少進(jìn)行所有閘門(mén)序列號(hào)的遍歷次數(shù),如果是說(shuō)明已經(jīng)沒(méi)有空間(因?yàn)樯a(chǎn)者生產(chǎn)申請(qǐng)的序列號(hào)已經(jīng)追上了消費(fèi)者消費(fèi)序列號(hào)的最小值。RingBuffer是一個(gè)環(huán)形隊(duì)列結(jié)構(gòu)。上面已經(jīng)講到消費(fèi)者序列號(hào)會(huì)與生產(chǎn)者序列號(hào)同步,同步指消費(fèi)者申請(qǐng)序列號(hào)小于有效序列號(hào)時(shí)會(huì)前進(jìn)至有效序列號(hào),即使有延遲也保證了有大于等于buffer值的緩沖空間供生產(chǎn)者生產(chǎn)),如果沒(méi)有空間返回false進(jìn)入下一輪生產(chǎn)
- 自增成功后,將消息轉(zhuǎn)化為對(duì)應(yīng)序列號(hào)下標(biāo)位置的事件數(shù)據(jù)
- Sequencer發(fā)布序列號(hào),將當(dāng)前序列號(hào)設(shè)置為有效(availableBuffer),并根據(jù)等待策略喚醒等待的消費(fèi)者,被喚醒的消費(fèi)者根據(jù)發(fā)布的序列號(hào)獲取相應(yīng)下標(biāo)處事件數(shù)據(jù)進(jìn)行處理
Disruptor為什么這么快?
Disruptor采用無(wú)鎖并發(fā)編程,框架中主要使用CAS與volatile關(guān)鍵字保證并發(fā)安全
使用環(huán)形數(shù)據(jù)結(jié)構(gòu)(另一個(gè)典型的應(yīng)用是時(shí)鐘算法也是使用的環(huán)形數(shù)據(jù)結(jié)構(gòu)),為環(huán)形結(jié)構(gòu)添加序列號(hào)屏障來(lái)控制對(duì)環(huán)形隊(duì)列讀寫(xiě)操作,保證存儲(chǔ)數(shù)據(jù)的并發(fā)安全
另一個(gè)點(diǎn)便是神奇的緩沖行填充了
Log4j2為什么這么快?
使用Disruptor并發(fā)編程框架
使用NIO寫(xiě)入日志數(shù)據(jù)
當(dāng)然log4j2中有很多細(xì)節(jié),如果我們想要獲取線程棧信息,可以同樣學(xué)習(xí)一下這樣的寫(xiě)法
// LOG4J2-1029 new Throwable().getStackTrace is faster than Thread.currentThread().getStackTrace(). final StackTraceElement[] stackTrace = new Throwable().getStackTrace(); StackTraceElement last = null; for (int i = stackTrace.length - 1; i > 0; i--) { final String className = stackTrace[i].getClassName(); if (fqcnOfLogger.equals(className)) { return last; } last = stackTrace[i]; }
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Mac安裝多個(gè)JDK并實(shí)現(xiàn)動(dòng)態(tài)切換
有時(shí)候我們有多個(gè)項(xiàng)目需要使用多個(gè)版本JDK,本文主要介紹了Mac安裝多個(gè)JDK并實(shí)現(xiàn)動(dòng)態(tài)切換,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07IDEA設(shè)置字體隨鼠標(biāo)滾動(dòng)放大縮小的實(shí)現(xiàn)
這篇文章主要介紹了IDEA設(shè)置字體隨鼠標(biāo)滾動(dòng)放大縮小的實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-01-01sonar-scanner連接sonarquebe7的sonar.java.binaries問(wèn)題的解決方案
今天小編就為大家分享一篇關(guān)于sonar-scanner連接sonarquebe7的sonar.java.binaries問(wèn)題的解決方案,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2018-12-12java開(kāi)發(fā)之基于Validator接口的SpringMVC數(shù)據(jù)校驗(yàn)方式
這篇文章主要介紹了java開(kāi)發(fā)之基于Validator接口的SpringMVC數(shù)據(jù)校驗(yàn)方式,文中附含詳細(xì)示例代碼,有需要的朋友可以借鑒參考下2021-09-09springboot整合vue項(xiàng)目(小試牛刀)
這篇文章主要介紹了springboot整合vue項(xiàng)目(小試牛刀),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-09-09Java中生成隨機(jī)數(shù)的實(shí)現(xiàn)方法總結(jié)
這篇文章主要介紹了Java中生成隨機(jī)數(shù)的實(shí)現(xiàn)方法總結(jié),其中多線程并發(fā)的實(shí)現(xiàn)方式尤為exciting,需要的朋友可以參考下2015-11-11idea中使用maven?archetype新建項(xiàng)目時(shí)卡住問(wèn)題解決方案
這篇文章主要介紹了idea中使用maven?archetype新建項(xiàng)目時(shí)卡住,解決本問(wèn)題的方法,就是在maven的runner加上參數(shù)-DarchetypeCatalog=local就可以了,不需要下載xml文件再放到指定目錄,需要的朋友可以參考下2023-08-08