淺析Disruptor高性能線(xiàn)程消息傳遞并發(fā)框架
前言碎語(yǔ)
Disruptor是英國(guó)LMAX公司開(kāi)源的高性能的線(xiàn)程間傳遞消息的并發(fā)框架,和jdk中的BlockingQueue非常類(lèi)似,但是性能卻是BlockingQueue不能比擬的,下面是官方給出的一分測(cè)試報(bào)告,可以直觀的看出兩者的性能區(qū)別:


Disruptor 項(xiàng)目地址:https://github.com/LMAX-Exchange/disruptor
核心概念?
這么性能炸裂的框架肯定要把玩一番,試用前,我們先了解下disruptor的主要的概念,然后結(jié)合樓主的weblog項(xiàng)目(之前使用的BlockingQueue),來(lái)實(shí)踐下
RingBuffer:環(huán)形的緩沖區(qū),消息事件信息的載體。曾經(jīng) RingBuffer 是 Disruptor 中的最主要的對(duì)象,但從3.0版本開(kāi)始,其職責(zé)被簡(jiǎn)化為僅僅負(fù)責(zé)對(duì)通過(guò) Disruptor 進(jìn)行交換的數(shù)據(jù)(事件)進(jìn)行存儲(chǔ)和更新。在一些更高級(jí)的應(yīng)用場(chǎng)景中,Ring Buffer 可以由用戶(hù)的自定義實(shí)現(xiàn)來(lái)完全替代。
Event:定義生產(chǎn)者和消費(fèi)者之間進(jìn)行交換的數(shù)據(jù)類(lèi)型。
EventFactory:創(chuàng)建事件的工廠(chǎng)類(lèi)接口,由用戶(hù)實(shí)現(xiàn),提供具體的事件
EventHandler:事件處理接口,由用戶(hù)實(shí)現(xiàn),用于處理事件。
目前為止,我們了解以上核心內(nèi)容即可,更多的詳情,可以移步wiki文檔:https://github.com/LMAX-Exchange/disruptor
核心架構(gòu)圖:

實(shí)踐Disruptor
改造boot-websocket-log項(xiàng)目,這是一個(gè)典型的生產(chǎn)者消費(fèi)者模式的實(shí)例。然后將BlockingQueue替換成Disruptor,完成功能,有興趣的可以對(duì)比下。
第一步,定義事件類(lèi)型
/**
* Created by kl on 2018/8/24.
* Content :進(jìn)程日志事件內(nèi)容載體
*/
public class LoggerEvent {
private LoggerMessage log;
public LoggerMessage getLog() {
return log;
}
public void setLog(LoggerMessage log) {
this.log = log;
}
}第二步,定義事件工廠(chǎng)
/**
* Created by kl on 2018/8/24.
* Content :進(jìn)程日志事件工廠(chǎng)類(lèi)
*/
public class LoggerEventFactory implements EventFactory{
@Override
public LoggerEvent newInstance() {
return new LoggerEvent();
}
}第三步,定義數(shù)據(jù)處理器
/**
* Created by kl on 2018/8/24.
* Content :進(jìn)程日志事件處理器
*/
@Component
public class LoggerEventHandler implements EventHandler{
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Override
public void onEvent(LoggerEvent stringEvent, long l, boolean b) {
messagingTemplate.convertAndSend("/topic/pullLogger",stringEvent.getLog());
}
}第四步,創(chuàng)建Disruptor實(shí)操類(lèi),定義事件發(fā)布方法,發(fā)布事件
/**
* Created by kl on 2018/8/24.
* Content :Disruptor 環(huán)形隊(duì)列
*/
@Component
public class LoggerDisruptorQueue {
private Executor executor = Executors.newCachedThreadPool();
// The factory for the event
private LoggerEventFactory factory = new LoggerEventFactory();
private FileLoggerEventFactory fileLoggerEventFactory = new FileLoggerEventFactory();
// Specify the size of the ring buffer, must be power of 2.
private int bufferSize = 2 * 1024;
// Construct the Disruptor
private Disruptordisruptor = new Disruptor<>(factory, bufferSize, executor);;
private DisruptorfileLoggerEventDisruptor = new Disruptor<>(fileLoggerEventFactory, bufferSize, executor);;
private static RingBufferringBuffer;
private static RingBufferfileLoggerEventRingBuffer;
@Autowired
LoggerDisruptorQueue(LoggerEventHandler eventHandler,FileLoggerEventHandler fileLoggerEventHandler) {
disruptor.handleEventsWith(eventHandler);
fileLoggerEventDisruptor.handleEventsWith(fileLoggerEventHandler);
this.ringBuffer = disruptor.getRingBuffer();
this.fileLoggerEventRingBuffer = fileLoggerEventDisruptor.getRingBuffer();
disruptor.start();
fileLoggerEventDisruptor.start();
}
public static void publishEvent(LoggerMessage log) {
long sequence = ringBuffer.next(); // Grab the next sequence
try {
LoggerEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.setLog(log); // Fill with data
} finally {
ringBuffer.publish(sequence);
}
}
public static void publishEvent(String log) {
if(fileLoggerEventRingBuffer == null) return;
long sequence = fileLoggerEventRingBuffer.next(); // Grab the next sequence
try {
FileLoggerEvent event = fileLoggerEventRingBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.setLog(log); // Fill with data
} finally {
fileLoggerEventRingBuffer.publish(sequence);
}
}
}文末結(jié)語(yǔ)
以上四步已經(jīng)完成了Disruptor的使用,啟動(dòng)項(xiàng)目后就會(huì)不斷的發(fā)布日志事件,處理器會(huì)將事件內(nèi)容通過(guò)websocket傳送到前端頁(yè)面上展示,
boot-websocket-log項(xiàng)目地址:https://gitee.com/kailing/boot-websocket-log
Disruptor是高性能的進(jìn)程內(nèi)線(xiàn)程間的數(shù)據(jù)交換框架,特別適合日志類(lèi)的處理。Disruptor也是從https://github.com/alipay/sofa-tracer了解到的,這是螞蟻金服 團(tuán)隊(duì)開(kāi)源的分布式鏈路追蹤項(xiàng)目,其中日志處理部分就是使用了Disruptor。
以上就是淺析Disruptor高性能線(xiàn)程消息傳遞并發(fā)框架的詳細(xì)內(nèi)容,更多關(guān)于Disruptor線(xiàn)程消息傳遞并發(fā)框架的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringMVC九大組件之HandlerMapping詳解
這篇文章主要介紹了SpringMVC九大組件之HandlerMapping詳解,HandlerMapping 叫做處理器映射器,它的作用就是根據(jù)當(dāng)前 request 找到對(duì)應(yīng)的 Handler 和 Interceptor,然后封裝成一個(gè) HandlerExecutionChain 對(duì)象返回,需要的朋友可以參考下2023-09-09
SpringBoot整合Security權(quán)限控制登錄首頁(yè)
這篇文章主要為大家介紹了SpringBoot整合Security權(quán)限控制登錄首頁(yè)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-11-11
實(shí)現(xiàn)一個(gè)簡(jiǎn)單Dubbo完整過(guò)程詳解
這篇文章主要為大家介紹了實(shí)現(xiàn)一個(gè)簡(jiǎn)單Dubbo完整過(guò)程詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-07-07
Java實(shí)現(xiàn)的AES256加密解密功能示例
這篇文章主要介紹了Java實(shí)現(xiàn)的AES256加密解密功能,結(jié)合完整實(shí)例形式分析了Java實(shí)現(xiàn)AES256加密解密功能的步驟與相關(guān)操作技巧,需要的朋友可以參考下2017-02-02
Java核心庫(kù)實(shí)現(xiàn)AOP過(guò)程
給大家分享一下利用Java核心庫(kù)實(shí)現(xiàn)簡(jiǎn)單的AOP的經(jīng)驗(yàn)分享和教學(xué),需要的讀者們參考下吧。2017-12-12
Java實(shí)現(xiàn)企業(yè)員工管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)企業(yè)員工管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-02-02

