亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

深入理解Java線程編程中的阻塞隊(duì)列容器

 更新時(shí)間:2015年12月07日 14:23:16   作者:方騰飛  
這篇文章主要介紹了Java線程編程中的阻塞隊(duì)列容器,介紹了JDK中所提供的一些基本的實(shí)現(xiàn)阻塞隊(duì)列的方法,需要的朋友可以參考下

1. 什么是阻塞隊(duì)列?

阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列。這兩個(gè)附加的操作是:在隊(duì)列為空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强铡.?dāng)隊(duì)列滿(mǎn)時(shí),存儲(chǔ)元素的線程會(huì)等待隊(duì)列可用。阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景,生產(chǎn)者是往隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里拿元素的線程。阻塞隊(duì)列就是生產(chǎn)者存放元素的容器,而消費(fèi)者也只從容器里拿元素。

阻塞隊(duì)列提供了四種處理方法:

2015127142052051.png (522×105)

拋出異常:是指當(dāng)阻塞隊(duì)列滿(mǎn)時(shí)候,再往隊(duì)列里插入元素,會(huì)拋出IllegalStateException("Queue full")異常。當(dāng)隊(duì)列為空時(shí),從隊(duì)列里獲取元素時(shí)會(huì)拋出NoSuchElementException異常 。
返回特殊值:插入方法會(huì)返回是否成功,成功則返回true。移除方法,則是從隊(duì)列里拿出一個(gè)元素,如果沒(méi)有則返回null
一直阻塞:當(dāng)阻塞隊(duì)列滿(mǎn)時(shí),如果生產(chǎn)者線程往隊(duì)列里put元素,隊(duì)列會(huì)一直阻塞生產(chǎn)者線程,直到拿到數(shù)據(jù),或者響應(yīng)中斷退出。當(dāng)隊(duì)列空時(shí),消費(fèi)者線程試圖從隊(duì)列里take元素,隊(duì)列也會(huì)阻塞消費(fèi)者線程,直到隊(duì)列可用。
超時(shí)退出:當(dāng)阻塞隊(duì)列滿(mǎn)時(shí),隊(duì)列會(huì)阻塞生產(chǎn)者線程一段時(shí)間,如果超過(guò)一定的時(shí)間,生產(chǎn)者線程就會(huì)退出。
2. Java里的阻塞隊(duì)列

JDK7提供了7個(gè)阻塞隊(duì)列。分別是

  1. ArrayBlockingQueue :一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列。
  2. LinkedBlockingQueue :一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列。
  3. PriorityBlockingQueue :一個(gè)支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列。
  4. DelayQueue:一個(gè)使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的無(wú)界阻塞隊(duì)列。
  5. SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。
  6. LinkedTransferQueue:一個(gè)由鏈表結(jié)構(gòu)組成的無(wú)界阻塞隊(duì)列。
  7. LinkedBlockingDeque:一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。

ArrayBlockingQueue是一個(gè)用數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列。此隊(duì)列按照先進(jìn)先出(FIFO)的原則對(duì)元素進(jìn)行排序。默認(rèn)情況下不保證訪問(wèn)者公平的訪問(wèn)隊(duì)列,所謂公平訪問(wèn)隊(duì)列是指阻塞的所有生產(chǎn)者線程或消費(fèi)者線程,當(dāng)隊(duì)列可用時(shí),可以按照阻塞的先后順序訪問(wèn)隊(duì)列,即先阻塞的生產(chǎn)者線程,可以先往隊(duì)列里插入元素,先阻塞的消費(fèi)者線程,可以先從隊(duì)列里獲取元素。通常情況下為了保證公平性會(huì)降低吞吐量。我們可以使用以下代碼創(chuàng)建一個(gè)公平的阻塞隊(duì)列:

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

訪問(wèn)者的公平性是使用可重入鎖實(shí)現(xiàn)的,代碼如下:

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
      throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
}

LinkedBlockingQueue是一個(gè)用鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列。此隊(duì)列的默認(rèn)和最大長(zhǎng)度為Integer.MAX_VALUE。此隊(duì)列按照先進(jìn)先出的原則對(duì)元素進(jìn)行排序。

PriorityBlockingQueue是一個(gè)支持優(yōu)先級(jí)的無(wú)界隊(duì)列。默認(rèn)情況下元素采取自然順序排列,也可以通過(guò)比較器comparator來(lái)指定元素的排序規(guī)則。元素按照升序排列。

DelayQueue是一個(gè)支持延時(shí)獲取元素的無(wú)界阻塞隊(duì)列。隊(duì)列使用PriorityQueue來(lái)實(shí)現(xiàn)。隊(duì)列中的元素必須實(shí)現(xiàn)Delayed接口,在創(chuàng)建元素時(shí)可以指定多久才能從隊(duì)列中獲取當(dāng)前元素。只有在延遲期滿(mǎn)時(shí)才能從隊(duì)列中提取元素。我們可以將DelayQueue運(yùn)用在以下應(yīng)用場(chǎng)景:

緩存系統(tǒng)的設(shè)計(jì):可以用DelayQueue保存緩存元素的有效期,使用一個(gè)線程循環(huán)查詢(xún)DelayQueue,一旦能從DelayQueue中獲取元素時(shí),表示緩存有效期到了。
定時(shí)任務(wù)調(diào)度。使用DelayQueue保存當(dāng)天將會(huì)執(zhí)行的任務(wù)和執(zhí)行時(shí)間,一旦從DelayQueue中獲取到任務(wù)就開(kāi)始執(zhí)行,從比如TimerQueue就是使用DelayQueue實(shí)現(xiàn)的。
隊(duì)列中的Delayed必須實(shí)現(xiàn)compareTo來(lái)指定元素的順序。比如讓延時(shí)時(shí)間最長(zhǎng)的放在隊(duì)列的末尾。實(shí)現(xiàn)代碼如下:

public int compareTo(Delayed other) {
      if (other == this) // compare zero ONLY if same object
        return 0;
      if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask x = (ScheduledFutureTask)other;
        long diff = time - x.time;
        if (diff < 0)
          return -1;
        else if (diff > 0)
          return 1;
  else if (sequenceNumber < x.sequenceNumber)
          return -1;
        else
          return 1;
      }
      long d = (getDelay(TimeUnit.NANOSECONDS) -
           other.getDelay(TimeUnit.NANOSECONDS));
      return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }

3.如何實(shí)現(xiàn)Delayed接口

我們可以參考ScheduledThreadPoolExecutor里ScheduledFutureTask類(lèi)。這個(gè)類(lèi)實(shí)現(xiàn)了Delayed接口。首先:在對(duì)象創(chuàng)建的時(shí)候,使用time記錄前對(duì)象什么時(shí)候可以使用,代碼如下:


ScheduledFutureTask(Runnable r, V result, long ns, long period) {
      super(r, result);
      this.time = ns;
      this.period = period;
      this.sequenceNumber = sequencer.getAndIncrement();
}

然后使用getDelay可以查詢(xún)當(dāng)前元素還需要延時(shí)多久,代碼如下:

public long getDelay(TimeUnit unit) {
      return unit.convert(time - now(), TimeUnit.NANOSECONDS);
    }

通過(guò)構(gòu)造函數(shù)可以看出延遲時(shí)間參數(shù)ns的單位是納秒,自己設(shè)計(jì)的時(shí)候最好使用納秒,因?yàn)間etDelay時(shí)可以指定任意單位,一旦以納秒作為單位,而延時(shí)的時(shí)間又精確不到納秒就麻煩了。使用時(shí)請(qǐng)注意當(dāng)time小于當(dāng)前時(shí)間時(shí),getDelay會(huì)返回負(fù)數(shù)。

4.如何實(shí)現(xiàn)延時(shí)隊(duì)列

延時(shí)隊(duì)列的實(shí)現(xiàn)很簡(jiǎn)單,當(dāng)消費(fèi)者從隊(duì)列里獲取元素時(shí),如果元素沒(méi)有達(dá)到延時(shí)時(shí)間,就阻塞當(dāng)前線程。

long delay = first.getDelay(TimeUnit.NANOSECONDS);
          if (delay <= 0)
            return q.poll();
          else if (leader != null)
            available.await();

SynchronousQueue是一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。每一個(gè)put操作必須等待一個(gè)take操作,否則不能繼續(xù)添加元素。SynchronousQueue可以看成是一個(gè)傳球手,負(fù)責(zé)把生產(chǎn)者線程處理的數(shù)據(jù)直接傳遞給消費(fèi)者線程。隊(duì)列本身并不存儲(chǔ)任何元素,非常適合于傳遞性場(chǎng)景,比如在一個(gè)線程中使用的數(shù)據(jù),傳遞給另外一個(gè)線程使用,SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue。

LinkedTransferQueue是一個(gè)由鏈表結(jié)構(gòu)組成的無(wú)界阻塞TransferQueue隊(duì)列。相對(duì)于其他阻塞隊(duì)列,LinkedTransferQueue多了tryTransfer和transfer方法。

transfer方法。如果當(dāng)前有消費(fèi)者正在等待接收元素(消費(fèi)者使用take()方法或帶時(shí)間限制的poll()方法時(shí)),transfer方法可以把生產(chǎn)者傳入的元素立刻transfer(傳輸)給消費(fèi)者。如果沒(méi)有消費(fèi)者在等待接收元素,transfer方法會(huì)將元素存放在隊(duì)列的tail節(jié)點(diǎn),并等到該元素被消費(fèi)者消費(fèi)了才返回。transfer方法的關(guān)鍵代碼如下:

Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);

第一行代碼是試圖把存放當(dāng)前元素的s節(jié)點(diǎn)作為tail節(jié)點(diǎn)。第二行代碼是讓CPU自旋等待消費(fèi)者消費(fèi)元素。因?yàn)樽孕龝?huì)消耗CPU,所以自旋一定的次數(shù)后使用Thread.yield()方法來(lái)暫停當(dāng)前正在執(zhí)行的線程,并執(zhí)行其他線程。

tryTransfer方法。則是用來(lái)試探下生產(chǎn)者傳入的元素是否能直接傳給消費(fèi)者。如果沒(méi)有消費(fèi)者等待接收元素,則返回false。和transfer方法的區(qū)別是tryTransfer方法無(wú)論消費(fèi)者是否接收,方法立即返回。而transfer方法是必須等到消費(fèi)者消費(fèi)了才返回。

對(duì)于帶有時(shí)間限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,則是試圖把生產(chǎn)者傳入的元素直接傳給消費(fèi)者,但是如果沒(méi)有消費(fèi)者消費(fèi)該元素則等待指定的時(shí)間再返回,如果超時(shí)還沒(méi)消費(fèi)元素,則返回false,如果在超時(shí)時(shí)間內(nèi)消費(fèi)了元素,則返回true。

LinkedBlockingDeque是一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。所謂雙向隊(duì)列指的你可以從隊(duì)列的兩端插入和移出元素。雙端隊(duì)列因?yàn)槎嗔艘粋€(gè)操作隊(duì)列的入口,在多線程同時(shí)入隊(duì)時(shí),也就減少了一半的競(jìng)爭(zhēng)。相比其他的阻塞隊(duì)列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,以First單詞結(jié)尾的方法,表示插入,獲?。╬eek)或移除雙端隊(duì)列的第一個(gè)元素。以Last單詞結(jié)尾的方法,表示插入,獲取或移除雙端隊(duì)列的最后一個(gè)元素。另外插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法卻等同于takeFirst,不知道是不是Jdk的bug,使用時(shí)還是用帶有First和Last后綴的方法更清楚。

在初始化LinkedBlockingDeque時(shí)可以設(shè)置容量防止其過(guò)渡膨脹。另外雙向阻塞隊(duì)列可以運(yùn)用在“工作竊取”模式中。

5.阻塞隊(duì)列的實(shí)現(xiàn)原理
本文以ArrayBlockingQueue為例,其他阻塞隊(duì)列實(shí)現(xiàn)原理可能和ArrayBlockingQueue有一些差別,但是大體思路應(yīng)該類(lèi)似,有興趣的朋友可自行查看其他阻塞隊(duì)列的實(shí)現(xiàn)源碼。

  首先看一下ArrayBlockingQueue類(lèi)中的幾個(gè)成員變量:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
 
private static final long serialVersionUID = -817911632652898426L;
 
/** The queued items */
private final E[] items;
/** items index for next take, poll or remove */
private int takeIndex;
/** items index for next put, offer, or add. */
private int putIndex;
/** Number of items in the queue */
private int count;
 
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
 
/** Main lock guarding all access */
private final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
}

   可以看出,ArrayBlockingQueue中用來(lái)存儲(chǔ)元素的實(shí)際上是一個(gè)數(shù)組,takeIndex和putIndex分別表示隊(duì)首元素和隊(duì)尾元素的下標(biāo),count表示隊(duì)列中元素的個(gè)數(shù)。

  lock是一個(gè)可重入鎖,notEmpty和notFull是等待條件。

  下面看一下ArrayBlockingQueue的構(gòu)造器,構(gòu)造器有三個(gè)重載版本:

public ArrayBlockingQueue(int capacity) {
}
public ArrayBlockingQueue(int capacity, boolean fair) {
 
}
public ArrayBlockingQueue(int capacity, boolean fair,
             Collection<? extends E> c) {
}

   第一個(gè)構(gòu)造器只有一個(gè)參數(shù)用來(lái)指定容量,第二個(gè)構(gòu)造器可以指定容量和公平性,第三個(gè)構(gòu)造器可以指定容量、公平性以及用另外一個(gè)集合進(jìn)行初始化。

  然后看它的兩個(gè)關(guān)鍵方法的實(shí)現(xiàn):put()和take():

public void put(E e) throws InterruptedException {
  if (e == null) throw new NullPointerException();
  final E[] items = this.items;
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    try {
      while (count == items.length)
        notFull.await();
    } catch (InterruptedException ie) {
      notFull.signal(); // propagate to non-interrupted thread
      throw ie;
    }
    insert(e);
  } finally {
    lock.unlock();
  }
}

   從put方法的實(shí)現(xiàn)可以看出,它先獲取了鎖,并且獲取的是可中斷鎖,然后判斷當(dāng)前元素個(gè)數(shù)是否等于數(shù)組的長(zhǎng)度,如果相等,則調(diào)用notFull.await()進(jìn)行等待,如果捕獲到中斷異常,則喚醒線程并拋出異常。

  當(dāng)被其他線程喚醒時(shí),通過(guò)insert(e)方法插入元素,最后解鎖。

  我們看一下insert方法的實(shí)現(xiàn):

private void insert(E x) {
  items[putIndex] = x;
  putIndex = inc(putIndex);
  ++count;
  notEmpty.signal();
}

   它是一個(gè)private方法,插入成功后,通過(guò)notEmpty喚醒正在等待取元素的線程。

  下面是take()方法的實(shí)現(xiàn):

public E take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    try {
      while (count == 0)
        notEmpty.await();
    } catch (InterruptedException ie) {
      notEmpty.signal(); // propagate to non-interrupted thread
      throw ie;
    }
    E x = extract();
    return x;
  } finally {
    lock.unlock();
  }
}


   跟put方法實(shí)現(xiàn)很類(lèi)似,只不過(guò)put方法等待的是notFull信號(hào),而take方法等待的是notEmpty信號(hào)。在take方法中,如果可以取元素,則通過(guò)extract方法取得元素,下面是extract方法的實(shí)現(xiàn):


private E extract() {
  final E[] items = this.items;
  E x = items[takeIndex];
  items[takeIndex] = null;
  takeIndex = inc(takeIndex);
  --count;
  notFull.signal();
  return x;
}

   跟insert方法也很類(lèi)似。

  其實(shí)從這里大家應(yīng)該明白了阻塞隊(duì)列的實(shí)現(xiàn)原理,事實(shí)它和我們用Object.wait()、Object.notify()和非阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者的思路類(lèi)似,只不過(guò)它把這些工作一起集成到了阻塞隊(duì)列中實(shí)現(xiàn)。

相關(guān)文章

最新評(píng)論