Java線程隊列LinkedBlockingQueue的使用
一、定義
LinkedBlockingQueue繼承自AbstractQueue,實現(xiàn)了BlockingQueue,Serializable接口。
- LinkedBlockingQueue是一個基于已鏈接節(jié)點的,范圍任意的blocking queue
- 此隊列按FIFO(先進先出)排序元素
- 新元素插入到隊列的尾部,并且隊列獲取操作會獲得位于隊列頭部的元素
- 鏈接隊列的吞吐量通常要高于基于數(shù)組的對列(ArrayBlockingQueue),但是在大多數(shù)并發(fā)應用程序中,其可預知的性能要低
- 可選的容量范圍構造方法參數(shù)作為防止隊列過度擴展的一種方法,如果未指定容量,則等于Integer.MAX_VALUE,除非插入節(jié)點會使隊列超出容量,否則每次插入后會動態(tài)地創(chuàng)建鏈接節(jié)點
二、比對分析
阻塞隊列大致可以分為這幾種:ArrayBlockingQueue,LinkedBlockingQueue,ConcurrentLinkedQueue,DelayQueue,LinkedTransferQueue,SynchronusQueue。
ArrayBlockingQueue--數(shù)組實現(xiàn)的有界隊列
會自動阻塞,根據(jù)調(diào)用api不同,有不同特性,當隊列容量不足時,有阻塞能力。
boolean add(E e):在容量不足時,拋出異常。
void put(E e):在容量不足時,阻塞等待。
boolean offer(E e):不阻塞,容量不足時返回false,當前新增數(shù)據(jù)操作放棄。
boolean offer(E e, long timeout, TimeUnit unit):容量不足時,阻塞times時長(單位為timeunit),如果在阻塞時長內(nèi),有容量空閑,新增數(shù)據(jù)返回true。如果阻塞時長范圍內(nèi),無容量空閑,放棄新增數(shù)據(jù),返回false。
LinkedBlockingQueue--鏈式隊列,隊列容量不足或為0時自動阻塞
void put(E e):自動阻塞,隊列容量滿后,自動阻塞。
E take():自動阻塞,隊列容量為0后,自動阻塞。
ConcurrentLinkedQueue--基礎鏈表同步隊列
boolean offer(E e):入隊。
E peek():查看queue中的首數(shù)據(jù)。
E poll():取出queue中的首數(shù)據(jù)。
DelayQueue--延時隊列
根據(jù)比較機制,實現(xiàn)自定義處理順序的隊列。常用于定時任務,如:定時關機。
int compareTo(Delayed o):比較大小,自動升序。
比較方法建議和getDelay方法配合完成。如果在DelayQueue是需要按時完成的計劃任務,必須配合getDelay方法完成。
long getDelay(TimeUnit unit):獲取計劃時長的方法,根據(jù)參數(shù)TimeUnit來決定,如何返回結果值。
LinkedTransferQueue--轉移隊列
boolean add(E e):隊列會保存數(shù)據(jù),不做阻塞等待。
void transfer(E e):是TransferQueue的特有方法。必須有消費者(take()方法調(diào)用者)。如果沒有任意線程消費數(shù)據(jù),transfer方法阻塞。一般用于處理及時消息。
SynchronousQueue--同步隊列,容量為0
是特殊的TransferQueue,必須先有消費線程等待,才能使用的隊列。
boolean add(E e):父類方法,無阻塞,若沒有消費線程阻塞等待數(shù)據(jù),則拋出異常。
put(E e):有阻塞,若沒有消費線程阻塞等待數(shù)據(jù),則阻塞。
三、源碼解讀
屬性
/** * 節(jié)點類,用于存儲數(shù)據(jù) */ static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } /** 阻塞隊列的大小,默認為Integer.MAX_VALUE */ private final int capacity; /** 當前阻塞隊列中的元素個數(shù) */ private final AtomicInteger count = new AtomicInteger(); /** * 阻塞隊列的頭結點 */ transient Node<E> head; /** * 阻塞隊列的尾節(jié)點 */ private transient Node<E> last; /** 獲取并移除元素時使用的鎖,如take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** notEmpty條件對象,當隊列沒有數(shù)據(jù)時用于掛起執(zhí)行刪除的線程 */ private final Condition notEmpty = takeLock.newCondition(); /** 添加元素時使用的鎖如 put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** notFull條件對象,當隊列數(shù)據(jù)已滿時用于掛起執(zhí)行添加的線程 */ private final Condition notFull = putLock.newCondition();
從上面的屬性我們知道,每個添加到LinkedBlockingQueue隊列中的數(shù)據(jù)都將被封裝成Node節(jié)點,添加的鏈表隊列中,其中head和last分別指向隊列的頭結點和尾結點。與ArrayBlockingQueue不同的是,LinkedBlockingQueue內(nèi)部分別使用了takeLock 和 putLock 對并發(fā)進行控制,也就是說,添加和刪除操作并不是互斥操作,可以同時進行,這樣也就可以大大提高吞吐量。
這里如果不指定隊列的容量大小,也就是使用默認的Integer.MAX_VALUE,如果存在添加速度大于刪除速度時候,有可能會內(nèi)存溢出,這點在使用前希望慎重考慮。
另外,LinkedBlockingQueue對每一個lock鎖都提供了一個Condition用來掛起和喚醒其他線程。
構造函數(shù)
public LinkedBlockingQueue() { // 默認大小為Integer.MAX_VALUE this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
默認的構造函數(shù)和最后一個構造函數(shù)創(chuàng)建的隊列大小都為Integer.MAX_VALUE,只有第二個構造函數(shù)用戶可以指定隊列的大小。第二個構造函數(shù)最后初始化了last和head節(jié)點,讓它們都指向了一個元素為null的節(jié)點。
最后一個構造函數(shù)使用了putLock來進行加鎖,但是這里并不是為了多線程的競爭而加鎖,只是為了放入的元素能立即對其他線程可見。
方法
同樣,LinkedBlockingQueue也有著和ArrayBlockingQueue一樣的方法,我們先來看看入隊列的方法。
入隊方法
LinkedBlockingQueue提供了多種入隊操作的實現(xiàn)來滿足不同情況下的需求,入隊操作有如下幾種:
- void put(E e);
- boolean offer(E e);
- boolean offer(E e, long timeout, TimeUnit unit)。
put(E e)
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 獲取鎖中斷 putLock.lockInterruptibly(); try { //判斷隊列是否已滿,如果已滿阻塞等待 while (count.get() == capacity) { notFull.await(); } // 把node放入隊列中 enqueue(node); c = count.getAndIncrement(); // 再次判斷隊列是否有可用空間,如果有喚醒下一個線程進行添加操作 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } // 如果隊列中有一條數(shù)據(jù),喚醒消費線程進行消費 if (c == 0) signalNotEmpty(); }
小結put方法來看,它總共做了以下情況的考慮:
- 隊列已滿,阻塞等待。
- 隊列未滿,創(chuàng)建一個node節(jié)點放入隊列中,如果放完以后隊列還有剩余空間,繼續(xù)喚醒下一個添加線程進行添加。如果放之前隊列中沒有元素,放完以后要喚醒消費線程進行消費。
很清晰明了是不是?
我們來看看該方法中用到的幾個其他方法,先來看看enqueue(Node node)方法:
private void enqueue(Node<E> node) { last = last.next = node; }
該方法可能有些同學看不太懂,我們用一張圖來看看往隊列里依次放入元素A和元素B,畢竟無圖無真相:
接下來我們看看signalNotEmpty,順帶著看signalNotFull方法。
private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
為什么要這么寫?因為signal的時候要獲取到該signal對應的Condition對象的鎖才行。
offer(E e)
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { // 隊列有可用空間,放入node節(jié)點,判斷放入元素后是否還有可用空間, // 如果有,喚醒下一個添加線程進行添加操作。 if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
可以看到offer僅僅對put方法改動了一點點,當隊列沒有可用元素的時候,不同于put方法的阻塞等待,offer方法直接方法false。
offer(E e, long timeout, TimeUnit unit)
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { // 等待超時時間nanos,超時時間到了返回false while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
出隊方法
入隊列的方法說完后,我們來說說出隊列的方法。LinkedBlockingQueue提供了多種出隊操作的實現(xiàn)來滿足不同情況下的需求,如下:
- E take();
- E poll();
- E poll(long timeout, TimeUnit unit);
take()
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 隊列為空,阻塞等待 while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); // 隊列中還有元素,喚醒下一個消費線程進行消費 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } // 移除元素之前隊列是滿的,喚醒生產(chǎn)線程進行添加元素 if (c == capacity) signalNotFull(); return x; }
take方法看起來就是put方法的逆向操作,它總共做了以下情況的考慮:
- 隊列為空,阻塞等待。
- 隊列不為空,從隊首獲取并移除一個元素,如果消費后還有元素在隊列中,繼續(xù)喚醒下一個消費線程進行元素移除。如果放之前隊列是滿元素的情況,移除完后要喚醒生產(chǎn)線程進行添加元素。
我們來看看dequeue方法
可能有些童鞋鏈表算法不是很熟悉,我們可以結合注釋和圖來看就清晰很多了。
其實這個寫法看起來很繞,我們其實也可以這么寫:
private E dequeue() { // 獲取到head節(jié)點 Node<E> h = head; // 獲取到head節(jié)點指向的下一個節(jié)點,也就是節(jié)點A Node<E> first = h.next; // 獲取到下下個節(jié)點,也就是節(jié)點B Node<E> next = first.next; // head的next指向下下個節(jié)點,也就是圖中的B節(jié)點 h.next = next; // 得到節(jié)點A的值 E x = first.item; first.item = null; // help GC first.next = first; // help GC return x; }
poll()
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
poll方法去除了take方法中元素為空后阻塞等待這一步驟,這里也就不詳細說了。同理,poll(long timeout, TimeUnit unit)也和offer(E e, long timeout, TimeUnit unit)一樣,利用了Condition的awaitNanos方法來進行阻塞等待直至超時。這里就不列出來說了。
獲取元素方法
public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } }
加鎖后,獲取到head節(jié)點的next節(jié)點,如果為空返回null,如果不為空,返回next節(jié)點的item值。
刪除元素方法
public boolean remove(Object o) { if (o == null) return false; // 兩個lock全部上鎖 fullyLock(); try { // 從head開始遍歷元素,直到最后一個元素 for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { // 如果找到相等的元素,調(diào)用unlink方法刪除元素 if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { // 兩個lock全部解鎖 fullyUnlock(); } } void fullyLock() { putLock.lock(); takeLock.lock(); } void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }
因為remove方法使用兩個鎖全部上鎖,所以其他操作都需要等待它完成,而該方法需要從head節(jié)點遍歷到尾節(jié)點,所以時間復雜度為O(n)。我們來看看unlink方法。
void unlink(Node<E> p, Node<E> trail) { // p的元素置為null p.item = null; // p的前一個節(jié)點的next指向p的next,也就是把p從鏈表中去除了 trail.next = p.next; // 如果last指向p,刪除p后讓last指向trail if (last == p) last = trail; // 如果刪除之前元素是滿的,刪除之后就有空間了,喚醒生產(chǎn)線程放入元素 if (count.getAndDecrement() == capacity) notFull.signal(); }
問題
看源碼的時候,我給自己拋出了一個問題。
- 為什么dequeue里的h.next不指向null,而指向h?
- 為什么unlink里沒有p.next = null或者p.next = p這樣的操作?這個疑問一直困擾著我,直到我看了迭代器的部分源碼后才豁然開朗,下面放出部分迭代器的源碼:
private Node<E> current; private Node<E> lastRet; private E currentElement; Itr() { fullyLock(); try { current = head.next; if (current != null) currentElement = current.item; } finally { fullyUnlock(); } } private Node<E> nextNode(Node<E> p) { for (;;) { // 解決了問題1 Node<E> s = p.next; if (s == p) return head.next; if (s == null || s.item != null) return s; p = s; } }
迭代器的遍歷分為兩步,第一步加雙鎖把元素放入臨時變量中,第二部遍歷臨時變量的元素。也就是說remove可能和迭代元素同時進行,很有可能remove的時候,有線程在進行迭代操作,而如果unlink中改變了p的next,很有可能在迭代的時候會造成錯誤,造成不一致問題。這個解決了問題2。
而問題1其實在nextNode方法中也能找到,為了正確遍歷,nextNode使用了 s == p的判斷,當下一個元素是自己本身時,返回head的下一個節(jié)點。
四、總結
LinkedBlockingQueue是一個阻塞隊列,內(nèi)部由兩個ReentrantLock來實現(xiàn)出入隊列的線程安全,由各自的Condition對象的await和signal來實現(xiàn)等待和喚醒功能。它和ArrayBlockingQueue的不同點在于:
- 隊列大小有所不同,ArrayBlockingQueue是有界的初始化必須指定大小,而LinkedBlockingQueue可以是有界的也可以是無界的(Integer.MAX_VALUE),對于后者而言,當添加速度大于移除速度時,在無界的情況下,可能會造成內(nèi)存溢出等問題。
- 數(shù)據(jù)存儲容器不同,ArrayBlockingQueue采用的是數(shù)組作為數(shù)據(jù)存儲容器,而LinkedBlockingQueue采用的則是以Node節(jié)點作為連接對象的鏈表。
- 由于ArrayBlockingQueue采用的是數(shù)組的存儲容器,因此在插入或刪除元素時不會產(chǎn)生或銷毀任何額外的對象實例,而LinkedBlockingQueue則會生成一個額外的Node對象。這可能在長時間內(nèi)需要高效并發(fā)地處理大批量數(shù)據(jù)的時,對于GC可能存在較大影響。
- 兩者的實現(xiàn)隊列添加或移除的鎖不一樣,ArrayBlockingQueue實現(xiàn)的隊列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個ReenterLock鎖,而LinkedBlockingQueue實現(xiàn)的隊列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高隊列的吞吐量,也意味著在高并發(fā)的情況下生產(chǎn)者和消費者可以并行地操作隊列中的數(shù)據(jù),以此來提高整個隊列的并發(fā)性能。
到此這篇關于Java線程隊列LinkedBlockingQueue的使用的文章就介紹到這了,更多相關Java LinkedBlockingQueue內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
MyBatis Plus 將查詢結果封裝到指定實體的方法步驟
這篇文章主要介紹了MyBatis Plus 將查詢結果封裝到指定實體的方法步驟,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-09-09Spring?Boot讀取配置文件內(nèi)容的3種方式(@Value、Environment和@ConfigurationP
工作中經(jīng)常會有一些參數(shù)需要配置,同時在代碼里面需要用到,所有就需要配置類讀取,然后在使用的時候注入該類進行獲取相關參數(shù),下面這篇文章主要給大家介紹了關于Spring?Boot讀取配置文件內(nèi)容的3種方式,需要的朋友可以參考下2023-01-01Seata AT模式TransactionHook被刪除探究
這篇文章主要為大家介紹了Seata AT模式TransactionHook被刪除探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-11-11Java調(diào)用DOS實現(xiàn)定時關機的實例
Java調(diào)用DOS實現(xiàn)定時關機的實例,需要的朋友可以參考一下2013-04-04