Java并發(fā)編程之ConcurrentLinkedQueue源碼詳解
一、ConcurrentLinkedQueue介紹
并編程中,一般需要用到安全的隊列,如果要自己實現(xiàn)安全隊列,可以使用2種方式:
方式1:加鎖,這種實現(xiàn)方式就是我們常說的阻塞隊列。
方式2:使用循環(huán)CAS算法實現(xiàn),這種方式實現(xiàn)隊列稱之為非阻塞隊列。
從點到面, 下面我們來看下非阻塞隊列經(jīng)典實現(xiàn)類:ConcurrentLinkedQueue (JDK1.8版)
ConcurrentLinkedQueue 是一個基于鏈接節(jié)點的無界線程安全的隊列。當我們添加一個元素的時候,它會添加到隊列的尾部,當我們獲取一個元素時,它會返回隊列頭部的元素。它采用了“wait-free”算法來實現(xiàn),用CAS實現(xiàn)了非阻塞的線程安全隊列。當多個線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當?shù)倪x擇。此隊列不允許使用 null 元素,因為移除元素時實際是將節(jié)點中item置為null,如果元素本身為null,則跟刪除有沖突
我們首先看一下ConcurrentLinkedQueue的類圖結(jié)構(gòu)先,好有一個內(nèi)部邏輯有一個大概的印象,如下圖所示:

主要屬性head節(jié)點,tail節(jié)點
// 鏈表頭節(jié)點 private transient volatile Node<E> head; // 鏈表尾節(jié)點 private transient volatile Node<E> tail;
主要內(nèi)部類Node
類Node在static方法里獲取到item和next的內(nèi)存偏移量,之后通過casItem和casNext更改item值和next節(jié)點
private static class Node<E> {
volatile E item;
volatile Node<E> next;
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
Node(E item) {
//將item存放在本節(jié)點的itemOffset偏移量位置的內(nèi)存里
UNSAFE.putObject(this, itemOffset, item);//設置this對象的itemoffset位置
}
//更新item值
boolean casItem(E cmp, E val) {
//this對象的itemoffset位置存放的值如果和期望值cmp相等,則替換為val
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
//this對象的nextOffset位置存入val
UNSAFE.putOrderedObject(this, nextOffset, val);
}
//更新next節(jié)點值
boolean casNext(Node<E> cmp, Node<E> val) {
//this對象的nextOffset位置存放的值如果和期望值cmp相等,則替換為val
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
//當前節(jié)點存放的item的內(nèi)存偏移量
private static final long itemOffset;
//當前節(jié)點的next節(jié)點的內(nèi)存偏移量
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
concurrentlinkedqueue同樣在static方法里獲取到head和tail的內(nèi)存偏移量:之后通過casHead和casTail更改head節(jié)點和tail節(jié)點值
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentLinkedQueue.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("tail"));
} catch (Exception e) {
throw new Error(e);
}
}
private boolean casTail(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}
private boolean casHead(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}
二、構(gòu)造方法
- 無參構(gòu)造函數(shù),head=tail=new Node<E>(null)=空節(jié)點(里面無item值)
- 集合構(gòu)造函數(shù)(集合中每個元素不能為null):就是將集合中的元素挨個鏈起來
//無參構(gòu)造函數(shù),head=tail=new Node<E>(null)=空節(jié)點
//初始一個為空的ConcurrentLinkedQueue,此時head和tail都指向一個item為null的節(jié)點
public ConcurrentLinkedQueue() {
// 初始化頭尾節(jié)點
head = tail = new Node<E>(null);
}
//集合構(gòu)造函數(shù):就是將集合中的元素挨個鏈起來
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
for (E e : c) {
checkNotNull(e);
Node<E> newNode = new Node<E>(e);
if (h == null)
h = t = newNode;
else {
t.lazySetNext(newNode);//可以理解為一種懶加載, 將t的next值設置為newNode
t = newNode;
}
}
if (h == null)
h = t = new Node<E>(null);
head = h;
tail = t;
}
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
//putObjectVolatile的內(nèi)存非立即可見版本,
//寫后結(jié)果并不會被其他線程看到,通常是幾納秒后被其他線程看到,這個時間比較短,所以代價可以接收
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
三、入隊
獲取到當前尾節(jié)點p=tail:
- 如果p.next=null,代表是真正的尾節(jié)點,將新節(jié)點鏈入p.next=newNode。此時檢查tail是否還是p,如果不是p了,此時更新tail為最新的newNode(只有在tail節(jié)點后面tail.next成功添加的元素才不需要更新tail,其實更新不更新tail是交替的,即每添加倆次更新一次tail)。
- 如果p.next=p,此時其實是p.next==p==null,此時代表p被刪除了,此時需要從新的tail節(jié)點檢查,如果此時tail節(jié)點還是原來的tail(原來的tail在p前面,肯定也被刪除了),那就只能從head節(jié)點開始遍歷了
- 如果p.next!=null,代表有別的線程搶先添加元素了,此時需要繼續(xù)p=p.next遍歷獲取是null的節(jié)點(此時需要如果tail變了就使用新的tail往后遍歷)
public boolean offer (E e){
//先檢查元素是否為null,是null則拋出異常 不是null,則構(gòu)造新節(jié)點準備入隊
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
//初始p指針和t指針都指向尾節(jié)點,p指針用來向隊列后面推移,t指針用來判斷尾節(jié)點是否改變
Node<E> t = tail, p = t;
for (; ; ) {
Node<E> q = p.next;
if (q == null) {//p.next為null,則代表p為尾節(jié)點,則將p.next指向新節(jié)點
// p is last node
if (p.casNext(null, newNode)) {
/**
* 如果p!=t,即p向后推移了,t沒動,則此時同時將tail更新
* 不符合條件不更新tail,這里可以看出并不是每入隊一個節(jié)點都會更新tail的
* 而此時真正的尾節(jié)點其實是newNode了,所以tail不一定是真正的尾節(jié)點,
* tail的更新具有滯后性,這樣設計提高了入隊的效率,不用每入隊一個,更新一次
*尾節(jié)點
*/
if (p != t)
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
} else if (p == q)
/**
* 如果p.next和p相等,這種情況是出隊時的一種哨兵節(jié)點代表已被遺棄刪除,
* 那就是有線程在一直刪除節(jié)點,刪除到了p.next 那此時如果有線程已經(jīng)更新了tail,那就從p指向tail再開始繼續(xù)像后推移
* 如果始終沒有線程更新tail,則p指針從head開始向后推移
*
* p從head開始推移的原因:tail沒有更新,以前的tail肯定在哨兵節(jié)點的前面(因為此循環(huán)是從tail向后推移到哨兵節(jié)點的),
* 而head節(jié)點一定在哨兵節(jié)點的后面(出隊時只有更新了head節(jié)點,才會把前面部分的某個節(jié)點置為哨兵節(jié)點)
* 此時其實是一種tail在head之前,但實際上tail已經(jīng)無用了,哨兵之前的節(jié)點都無用了,
* 等著其他線程入隊時更新尾節(jié)點tail,此時的tail才有用所以從head開始,從head開始可以找到任何節(jié)點
*
*/
p = (t != (t = tail)) ? t : head;
else
/**
* p.next和p不相等時,此時p應該向后推移到p.next,即p=p.next,
* 如果next一直不為null一直定位不到尾節(jié)點,會一直next,
* 但是中間會優(yōu)先判斷tail是否已更新,如果tail已更新則p直接從tail向后推移即可。就沒必要一直next了。
*/
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
四、出隊
poll出隊:
獲取到當前頭節(jié)點p=head:如果成功設置了item為null,即p.catItem(item,null),
如果此時被其他線程搶走消費了,此時需要p=p.next,向后繼續(xù)爭搶消費,直到成功執(zhí)行p.catItem(item,null),此時檢查p是不是head節(jié)點,如果不是更新p.next為頭結(jié)點
public E poll() {
restartFromHead:
for (;;) {
// p節(jié)點表示首節(jié)點,即需要出隊的節(jié)點
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// 如果p節(jié)點的元素不為null,則通過CAS來設置p節(jié)點引用的元素為null,如果成功則返回p節(jié)點的元素
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
// 如果p != h,則更新head
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 如果頭節(jié)點的元素為空或頭節(jié)點發(fā)生了變化,這說明頭節(jié)點已經(jīng)被另外一個線程修改了。
// 那么獲取p節(jié)點的下一個節(jié)點,如果p節(jié)點的下一節(jié)點為null,則表明隊列已經(jīng)空了
else if ((q = p.next) == null) {
// 更新頭結(jié)點
updateHead(h, p);
return null;
}
// p == q,則使用新的head重新開始
else if (p == q)
continue restartFromHead;
// 如果下一個元素不為空,則將頭節(jié)點的下一個節(jié)點設置成頭節(jié)點
else
p = q;
}
}
}
五、總結(jié)
offer:
找到尾節(jié)點,將新節(jié)點鏈入到尾節(jié)點后面,tail.next=newNode,
由于多線程操作,所以拿到p=tail后cas操作執(zhí)行p.next=newNode可能由于被其他線程搶去而執(zhí)行不成功,此時需要p=p.next向后遍歷,直到找到p.next=null的目標節(jié)點。繼續(xù)嘗試向其后面添加元素,添加成功后檢查p是否是tail,如果不是tail,則更新tail=p,添加不成功繼續(xù)向后next遍歷
poll:
獲取到當前頭節(jié)點p=head:如果成功設置了item為null,即p.catItem(item,null),
如果此時被其他線程搶走消費了,此時需要p=p.next,向后繼續(xù)爭搶消費,直到成功執(zhí)行p.catItem(item,null),此時檢查p是不是head節(jié)點,如果不是更新頭結(jié)點head=p.next(因為p已經(jīng)刪除了)
更新tail和head:
不是每次添加都更新tail,而是間隔一次更新一次(head也是一樣道理):第一個搶到的線程拿到tail執(zhí)行成功tail.next=newNode1此時不更新tail,那么第二個線程再執(zhí)行成功添加p.next=newNode2會判斷出p是newNode1而不是tail,所以就更新tail為newNode2。
tail節(jié)點不總是最后一個,head節(jié)點不總是第一個設計初衷:
讓tail節(jié)點永遠作為隊列的尾節(jié)點,這樣實現(xiàn)代碼量非常少,而且邏輯非常清楚和易懂。但是這么做有個缺點就是每次都需要使用循環(huán)CAS更新tail節(jié)點。如果能減少CAS更新tail節(jié)點的次數(shù),就能提高入隊的效率。
在JDK 1.7的實現(xiàn)中,doug lea使用hops變量來控制并減少tail節(jié)點的更新頻率,并不是每次節(jié)點入隊后都將 tail節(jié)點更新成尾節(jié)點,而是當tail節(jié)點和尾節(jié)點的距離大于等于常量HOPS的值(默認等于1)時才更新tail節(jié)點,tail和尾節(jié)點的距離越長使用CAS更新tail節(jié)點的次數(shù)就會越少,但是距離越長帶來的負面效果就是每次入隊時定位尾節(jié)點的時間就越長,因為循環(huán)體需要多循環(huán)一次來定位出尾節(jié)點,但是這樣仍然能提高入隊的效率,因為從本質(zhì)上來看它通過增加對volatile變量的讀操作來減少了對volatile變量的寫操作,而對volatile變量的寫操作開銷要遠遠大于讀操作,所以入隊效率會有所提升。
在JDK 1.8的實現(xiàn)中,tail的更新時機是通過p和t是否相等來判斷的,其實現(xiàn)結(jié)果和JDK 1.7相同,即當tail節(jié)點和尾節(jié)點的距離大于等于1時,更新tail。
到此這篇關(guān)于Java并發(fā)編程之ConcurrentLinkedQueue源碼詳解的文章就介紹到這了,更多相關(guān)Java ConcurrentLinkedQueue源碼內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中main函數(shù)的String[]?args用法舉例詳解
這篇文章主要給大家介紹了關(guān)于Java中main函數(shù)的String[]?args用法的相關(guān)資料,JAVA類中main函數(shù)的參數(shù)String[]?args指的是運行時給main函數(shù)傳遞的參數(shù),文中通過圖文以及代碼介紹的非常詳細,需要的朋友可以參考下2023-12-12
hibernate通過session實現(xiàn)增刪改查操作實例解析
這篇文章主要介紹了hibernate通過session實現(xiàn)增刪改查操作實例解析,具有一定借鑒價值,需要的朋友可以參考下。2017-12-12
SpringBoot如何切換成其它的嵌入式Servlet容器(Jetty和Undertow)
這篇文章主要介紹了SpringBoot如何切換成其它的嵌入式Servlet容器(Jetty和Undertow),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-07-07
JAVA Netty實現(xiàn)聊天室+私聊功能的示例代碼
這篇文章主要介紹了JAVA Netty實現(xiàn)聊天室+私聊功能的示例代碼,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-08-08

