亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

java Disruptor構(gòu)建高性能內(nèi)存隊列使用詳解

 更新時間:2022年12月09日 10:37:27   作者:碼猿技術(shù)專欄  
這篇文章主要為大家介紹了java Disruptor構(gòu)建高性能內(nèi)存隊列使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

Java中有哪些隊列

  • ArrayBlockingQueue 使用ReentrantLock
  • LinkedBlockingQueue 使用ReentrantLock
  • ConcurrentLinkedQueue 使用CAS

等等

我們清楚使用鎖的性能比較低,盡量使用無鎖設計。接下來就我們來認識下

Disruptor簡單使用

github地址:github.com/LMAX-Exchan…

先簡單介紹下:

  • Disruptor它是一個開源的并發(fā)框架,并獲得2011 Duke’s程序框架創(chuàng)新獎【Oracle】,能夠在無鎖的情況下實現(xiàn)網(wǎng)絡的Queue并發(fā)操作。英國外匯交易公司LMAX開發(fā)的一個高性能隊列,號稱單線程能支撐每秒600萬訂單~
  • 日志框架Log4j2 異步模式采用了Disruptor來處理
  • 局限呢,他就是個內(nèi)存隊列,也就是說無法支撐分布式場景。

簡單使用

數(shù)據(jù)傳輸對象

@Data
public class EventData {
    private Long value;
}

消費者

public class EventConsumer implements WorkHandler<EventData> {
    /**
     * 消費回調(diào)
     * @param eventData
     * @throws Exception
     */
    @Override
    public void onEvent(EventData eventData) throws Exception {
        Thread.sleep(5000);
        System.out.println(Thread.currentThread() + ", eventData:" + eventData.getValue());
    }
}

生產(chǎn)者

public class EventProducer {
    private final RingBuffer<EventData> ringBuffer;
    public EventProducer(RingBuffer<EventData> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
    public void sendData(Long v){
        // cas展位
        long next = ringBuffer.next();
        try {
            EventData eventData = ringBuffer.get(next);
            eventData.setValue(v);
        } finally {
            // 通知等待的消費者
            System.out.println("EventProducer send success, sequence:"+next);
            ringBuffer.publish(next);
        }
    }
}

測試類

public class DisruptorTest {
    public static void main(String[] args) {
        // 2的n次方
        int bufferSize = 8;
        Disruptor<EventData> disruptor = new Disruptor<EventData>(
                () -> new EventData(), // 事件工廠
                bufferSize,            // 環(huán)形數(shù)組大小
                Executors.defaultThreadFactory(),       // 線程池工廠
                ProducerType.MULTI,    // 支持多事件發(fā)布者
                new BlockingWaitStrategy());    // 等待策略
        // 設置消費者
        disruptor.handleEventsWithWorkerPool(
                new EventConsumer(),
                new EventConsumer(),
                new EventConsumer(),
                new EventConsumer());
        disruptor.start();
        RingBuffer<EventData> ringBuffer = disruptor.getRingBuffer();
        EventProducer eventProducer = new EventProducer(ringBuffer);
        long i  = 0;
        for(;;){
            i++;
            eventProducer.sendData(i);
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

核心組件

基于上面簡單例子來看確實很簡單,Disruptor幫我們封裝好了生產(chǎn)消費模型的實現(xiàn),接下來我們來看下他是基于哪些核心組件來支撐起一個高性能無鎖隊列呢?

RingBuffer: 環(huán)形數(shù)組,底層使用數(shù)組entries,在初始化時填充數(shù)組,避免不斷新建對象帶來的開銷。后續(xù)只會對entries做更新操作

Sequencer: 核心管家

  • 定義生產(chǎn)同步的實現(xiàn):SingleProducerSequencer單生產(chǎn)、MultiProducerSequencer多生產(chǎn)
  • 當前寫的進度Sequence cursor
  • 所有消費者進度的數(shù)組Sequence[] gatingSequences
  • MultiProducerSequencer可用區(qū)availableBuffer【利用空間換取查詢效率】

Sequence: 本身就是一個序號器用來標識處理進度,也可以當做是一個atomicInteger; 還有另外一個特點,為了解決偽共享問題而引入的:緩存行填充。這個在后面介紹。

workProcessor: 處理Event的循環(huán),在循環(huán)中獲取Disruptor的事件,然后把事件分配給各個handler

EventHandler: 負責業(yè)務邏輯的handler,自己實現(xiàn)。

WaitStrategy: 消費者 如何等待 事件的策略,定義了如下策略

leepingWaitStrategy:自旋 + yield + sleep

BlockingWaitStrategy:加鎖,適合CPU資源緊張(不需要切換線程),系統(tǒng)吞吐量無要求的

YieldingWaitStrategy:自旋 + yield + 自旋

BusySpinWaitStrategy:自旋,減少線程之前切換

PhasedBackoffWaitStrategy:自旋 + yield + 自定義策略

帶著問題來解析代碼?

1、多生產(chǎn)者如何保證消息生產(chǎn)不會相互覆蓋。【如何達到互斥效果】

每個線程獲取不同的一段數(shù)組空間,然后通過CAS判斷這段空間是否已經(jīng)分配出去。

接下來我們看下多生產(chǎn)類MultiProducerSequencer中next方法【獲取生產(chǎn)序號】

// 消費者上一次消費的最小序號 // 后續(xù)第二點會講到
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 當前進度的序號
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 所有消費者的序號 //后續(xù)第二點會講到
protected volatile Sequence[] gatingSequences = new Sequence[0];
 public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }
        long current;
        long next;
        do
        {
            // 當前進度的序號,Sequence的value具有可見性,保證多線程間線程之間能感知到可申請的最新值
            current = cursor.get();
            // 要申請的序號空間:最大序列號
            next = current + n;
            long wrapPoint = next - bufferSize;
            // 消費者最小序列號
            long cachedGatingSequence = gatingSequenceCache.get();
            // 大于一圈 || 最小消費序列號>當前進度
            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
            {
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
                // 說明大于1圈,并沒有多余空間可以申請
                if (wrapPoint > gatingSequence)
                {
                    LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                    continue;
                }
                // 更新最小值到Sequence的value中
                gatingSequenceCache.set(gatingSequence);
            }
            // CAS成功后更新當前Sequence的value
            else if (cursor.compareAndSet(current, next))
            {
                break;
            }
        }
        while (true);
        return next;
    }

2、生產(chǎn)者向序號器申請寫的序號,如序號正在被消費,Sequencer是如何知道哪些序號是可以被寫入的呢?【未消費則被覆蓋如何處理】

從gatingSequences中取得最小的序號,生產(chǎn)者最多能寫到這個序號的后一位。通俗來講就是申請的序號不能大于最小消費者序號一圈【申請到最大序列號-buffersize 要小于/等于 最小消費的序列號】的時候, 才能申請到當前寫的序號

public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
{
    return createWorkerPool(new Sequence[0], workHandlers);
}
EventHandlerGroup<T> createWorkerPool(
    final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers)
{
    final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
    final WorkerPool<T> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
    consumerRepository.add(workerPool, sequenceBarrier);
    final Sequence[] workerSequences = workerPool.getWorkerSequences();
    updateGatingSequencesForNextInChain(barrierSequences, workerSequences);
    return new EventHandlerGroup<>(this, consumerRepository, workerSequences);
}
    private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
{
    if (processorSequences.length > 0)
    {
        // 消費者啟動后就會將所有消費者存放入AbstractSequencer中g(shù)atingSequences
        ringBuffer.addGatingSequences(processorSequences);
        for (final Sequence barrierSequence : barrierSequences)
        {
            ringBuffer.removeGatingSequence(barrierSequence);
        }
        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
    }
}

3、在多生產(chǎn)者情況下,生產(chǎn)者是申請到一段可寫入的序號,然后再寫入這些序號中,那么消費者是如何感知哪些序號是可以被消費的呢?【借問提1圖說明】

這個前提是多生產(chǎn)者情況下,第一點我們說過每個線程獲取不同的一段數(shù)組空間,那么現(xiàn)在單單通過序號已經(jīng)不夠用了,MultiProducerSequencer使用了int 數(shù)組 【availableBuffer】來標識當前序號是否可用。當生產(chǎn)者成功生產(chǎn)事件后會將availableBuffer中當前序列號置為1標識可以讀取。

如此消費者可以讀取的的最大序號就是我們availableBuffer中第一個不可用序號-1。

初始化availableBuffer流程

public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
{
    super(bufferSize, waitStrategy);
    // 初始化可用數(shù)組
    availableBuffer = new int[bufferSize];
    indexMask = bufferSize - 1;
    indexShift = Util.log2(bufferSize);
    initialiseAvailableBuffer();
}
// 初始化默認availableBuffer為-1
private void initialiseAvailableBuffer()
{
    for (int i = availableBuffer.length - 1; i != 0; i--)
    {
        setAvailableBufferValue(i, -1);
    }
    setAvailableBufferValue(0, -1);
}
// 生產(chǎn)者成功生產(chǎn)事件將可用區(qū)數(shù)組置為1
public void publish(final long sequence)
{
    setAvailable(sequence);
    waitStrategy.signalAllWhenBlocking();
}
private void setAvailableBufferValue(int index, int flag)
{
    long bufferAddress = (index * SCALE) + BASE;
    UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}

消費者消費流程

WorkProcessor類中消費run方法
public void run()
    {
        boolean processedSequence = true;
        long cachedAvailableSequence = Long.MIN_VALUE;
        long nextSequence = sequence.get();
        T event = null;
        while (true)
        {
            try
            {
                // 先通過cas獲取消費事件的占有權(quán)
                if (processedSequence)
                {
                    processedSequence = false;
                    do
                    {
                        nextSequence = workSequence.get() + 1L;
                        sequence.set(nextSequence - 1L);
                    }
                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                }
                // 數(shù)據(jù)就緒,可以消費
                if (cachedAvailableSequence >= nextSequence)
                {
                    event = ringBuffer.get(nextSequence);
                    // 觸發(fā)回調(diào)函數(shù)
                    workHandler.onEvent(event);
                    processedSequence = true;
                }
                else
                {
                    // 獲取可以被讀取的下標
                    cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                }
            }
        // ....省略
        }
        notifyShutdown();
        running.set(false);
    }
    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        checkAlert();
        // 這個值獲取的current write 下標,可以認為全局消費下標。此處與每一段的write1和write2下標區(qū)分開
        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
        if (availableSequence < sequence)
        {
            return availableSequence;
        }
        // 通過availableBuffer篩選出第一個不可用序號 -1
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }
    public long getHighestPublishedSequence(long lowerBound, long availableSequence)
    {
        // 從current read下標開始, 循環(huán)至 current write,如果碰到availableBuffer 為-1 直接返回
        for (long sequence = lowerBound; sequence <= availableSequence; sequence++)
        {
            if (!isAvailable(sequence))
            {
                return sequence - 1;
            }
        }
        return availableSequence;
    }

解決偽共享問題

什么是偽共享問題呢?

為了提高CPU的速度,Cpu有高速緩存Cache,該緩存最小單位為緩存行CacheLine,他是從主內(nèi)存復制的Cache的最小單位,通常是64字節(jié)。一個Java的long類型是8字節(jié),因此在一個緩存行中可以存8個long類型的變量。如果你訪問一個long數(shù)組,當數(shù)組中的一個值被加載到緩存中,它會額外加載另外7個。因此你能非??斓乇闅v這個數(shù)組。

偽共享問題是指,當多個線程共享某份數(shù)據(jù)時,線程1可能拉到線程2的數(shù)據(jù)在其cache line中,此時線程1修改數(shù)據(jù),線程2取其數(shù)據(jù)時就要重新從內(nèi)存中拉取,兩個線程互相影響,導致數(shù)據(jù)雖然在cache line中,每次卻要去內(nèi)存中拉取。

Disruptor是如何解決的呢?

在value前后統(tǒng)一都加入7個Long類型進行填充,線程拉取時,不論如何都會占滿整個緩存

回顧總結(jié):Disuptor為何能稱之為高性能的無鎖隊列框架呢?

  • 緩存行填充,避免緩存頻繁失效?!緅ava8中也引入@sun.misc.Contended注解來避免偽共享】
  • 無鎖競爭:通過CAS 【二階段提交】
  • 環(huán)形數(shù)組:數(shù)據(jù)都是覆蓋,避免GC
  • 底層更多的使用位運算來提升效率

以上就是java Disruptor構(gòu)建高性能內(nèi)存隊列使用詳解的詳細內(nèi)容,更多關(guān)于java Disruptor構(gòu)建內(nèi)存隊列的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • springboot中使用groovy的示例代碼

    springboot中使用groovy的示例代碼

    Groovy就是一種繼承了動態(tài)語言的優(yōu)良特性并運行在JVM上的編程語言,Groovy支持動態(tài)輸入,閉包,元編程,運算符重載等等語法,這篇文章主要介紹了springboot中使用groovy的相關(guān)知識,需要的朋友可以參考下
    2022-09-09
  • 淺談Spring5 響應式編程

    淺談Spring5 響應式編程

    本篇文章主要介紹了淺談Spring5 響應式編程,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-01-01
  • java操作PDF文件方法之轉(zhuǎn)換、合成、切分

    java操作PDF文件方法之轉(zhuǎn)換、合成、切分

    最近需要做?個把多個pdf報告合并成?個以?便預覽的需求,下面這篇文章主要給大家介紹了關(guān)于java操作PDF文件方法之轉(zhuǎn)換、合成、切分的相關(guān)資料,文中通過代碼介紹的非常詳細,需要的朋友可以參考下
    2024-01-01
  • 詳解如何在SpringBoot中優(yōu)雅地重試調(diào)用第三方API

    詳解如何在SpringBoot中優(yōu)雅地重試調(diào)用第三方API

    作為后端程序員,我們的日常工作就是調(diào)用一些第三方服務,將數(shù)據(jù)存入數(shù)據(jù)庫,返回信息給前端。本文為大家介紹了如何在SpringBoot中優(yōu)雅地重試調(diào)用第三方API,需要的可以參考一下
    2022-12-12
  • Java設計模式之迭代模式(Iterator模式)介紹

    Java設計模式之迭代模式(Iterator模式)介紹

    這篇文章主要介紹了Java設計模式之迭代模式(Iterator模式)介紹,本文用一個老師點名的現(xiàn)象描述了迭代模式的使用,需要的朋友可以參考下
    2015-03-03
  • Mybatis使用typeHandler加密的實現(xiàn)

    Mybatis使用typeHandler加密的實現(xiàn)

    本文詳細介紹了如何在Mybatis中使用typeHandler對特定字段進行加密處理,涵蓋了從引入依賴、配置Mybatis,到實現(xiàn)typeHandler繼承類和配置mapper層的詳細步驟,為需要在項目中實現(xiàn)字段加密的開發(fā)者提供了參考和借鑒
    2024-09-09
  • javascript最新2020經(jīng)典面試題

    javascript最新2020經(jīng)典面試題

    這篇文章主要介紹了javascript最新2020經(jīng)典面試題的相關(guān)內(nèi)容,有需要的朋友們可以學習下。
    2020-02-02
  • Spring Boot實現(xiàn)qq郵箱驗證碼注冊和登錄驗證功能

    Spring Boot實現(xiàn)qq郵箱驗證碼注冊和登錄驗證功能

    這篇文章主要給大家介紹了關(guān)于Spring Boot實現(xiàn)qq郵箱驗證碼注冊和登錄驗證功能的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-12-12
  • Java的DataInputStream和DataOutputStream數(shù)據(jù)輸入輸出流

    Java的DataInputStream和DataOutputStream數(shù)據(jù)輸入輸出流

    這里我們來看一下Java的DataInputStream和DataOutputStream數(shù)據(jù)輸入輸出流的使用示例,兩個類分別繼承于FilterInputStream和FilterOutputStream:
    2016-06-06
  • Java線程讓步y(tǒng)ield用法實例分析

    Java線程讓步y(tǒng)ield用法實例分析

    這篇文章主要介紹了Java線程讓步y(tǒng)ield用法,結(jié)合實例形式分析了java中yield()方法的功能、原理及線程讓步操作的相關(guān)實現(xiàn)技巧,需要的朋友可以參考下
    2019-09-09

最新評論