Java并發(fā)容器ConcurrentLinkedQueue解析
ConcurrentLinkedQueue簡介
在單線程編程中常用的集合類,如ArrayList和HashMap等,但是這些類都不是線程安全的類。為了保證線程安全,可以使用Vector作為替代,但其通過使用synchronized獨(dú)占鎖在方法級別實(shí)現(xiàn)線程安全,從而使多線程執(zhí)行變?yōu)榇谢?,效率較低。另外,也可以使用Collections.synchronizedList(List<T> list)將ArrayList轉(zhuǎn)換為線程安全的,但仍然是通過synchronized修飾方法實(shí)現(xiàn)的,依然不是高效的方式。
針對隊(duì)列這種常用的數(shù)據(jù)結(jié)構(gòu),在解決線程安全問題時(shí),Doug Lea大師提供了ConcurrentLinkedQueue,這是一個(gè)線程安全的隊(duì)列。根據(jù)類名可以看出,它基于鏈表實(shí)現(xiàn)隊(duì)列的數(shù)據(jù)結(jié)構(gòu)。
Node
Node節(jié)點(diǎn)是用于鏈表數(shù)據(jù)結(jié)構(gòu)中,構(gòu)成ConcurrentLinkedQueue的基本單元。節(jié)點(diǎn)包含了兩個(gè)字段:一個(gè)是item,用于存儲元素?cái)?shù)據(jù);另一個(gè)是next,用于指向下一個(gè)節(jié)點(diǎn),從而實(shí)現(xiàn)鏈表結(jié)構(gòu)。
在ConcurrentLinkedQueue中,由于要支持并發(fā)操作,因此使用了volatile關(guān)鍵字對節(jié)點(diǎn)的item和next字段進(jìn)行修飾。volatile關(guān)鍵字可以保證在多線程環(huán)境下的可見性,從而避免了出現(xiàn)臟讀等線程安全問題。
在ConcurrentLinkedQueue的構(gòu)造函數(shù)中,會初始化頭尾節(jié)點(diǎn),同時(shí)將head和tail指向同一個(gè)初始節(jié)點(diǎn)。這個(gè)節(jié)點(diǎn)的item為空(null),表示隊(duì)列中還沒有任何元素。
通過不斷添加和刪除節(jié)點(diǎn),就可以實(shí)現(xiàn)ConcurrentLinkedQueue,該隊(duì)列是線程安全的,由于使用了無鎖算法(CAS操作),因此具有較高的吞吐量。
操作Node的幾個(gè)CAS操作
在ConcurrentLinkedQueue中,對Node節(jié)點(diǎn)的CAS操作(關(guān)于CAS操作可以看這篇文章),有以下幾個(gè)方法:
- casItem(E cmp, E val): 用于比較并交換節(jié)點(diǎn)的數(shù)據(jù)域item。這個(gè)操作會比較節(jié)點(diǎn)的item域是否等于cmp,如果相等則將其替換為val。返回true表示交換成功,返回false表示交換失敗。
- lazySetNext(Node<E> val): 用于延遲設(shè)置節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)指針next。這個(gè)操作會將節(jié)點(diǎn)的next字段設(shè)置為val,但不保證立即可見。即使在執(zhí)行這個(gè)操作之后,其他線程讀取到的節(jié)點(diǎn)的next值可能仍然是舊值。這種操作通常在性能上比強(qiáng)制可見性要好。
- casNext(Node<E> cmp, Node<E> val): 用于比較并交換節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)指針next。這個(gè)操作會比較節(jié)點(diǎn)的next域是否等于cmp,如果相等則將其替換為val。返回true表示交換成功,返回false表示交換失敗。
//更改Node中的數(shù)據(jù)域item
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
//更改Node中的指針域next
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
//更改Node中的指針域next
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}需要注意的是,這些方法中都是通過調(diào)用sun.misc.Unsafe類的相關(guān)方法來實(shí)現(xiàn)CAS操作。Unsafe類是Java底層提供的一個(gè)類,用于支持直接操作內(nèi)存和并發(fā)操作,它提供了一些原子性操作的方法,包括compareAndSwapObject()等方法。這些方法利用處理器指令集的CMPXCHG指令來實(shí)現(xiàn)原子性的比較和交換操作。
CAS操作在并發(fā)編程中非常重要,它可以避免使用鎖機(jī)制而實(shí)現(xiàn)線程安全,提高并發(fā)性能。然而,需要注意的是CAS操作并不是適用于所有情況,有時(shí)候可能會存在ABA問題等需要注意的情況。
offer方法
1. 單個(gè)線程offer:當(dāng)只有一個(gè)線程執(zhí)行offer操作時(shí),它會將元素插入到隊(duì)列的末尾。由于沒有其他線程執(zhí)行poll操作,所以隊(duì)列的長度會不斷增長。
2. 多個(gè)線程offer:當(dāng)多個(gè)線程同時(shí)執(zhí)行offer操作時(shí),它們會將元素插入到隊(duì)列的末尾。因?yàn)閛ffer操作總是在隊(duì)列的末尾進(jìn)行,而poll操作總是在隊(duì)列的隊(duì)頭進(jìn)行,所以這兩類線程并不會相互影響。
3. 部分線程offer,部分線程poll:
- offer速度快于poll:如果offer操作的速度比poll操作快,那么隊(duì)列的長度會越來越長。因?yàn)閛ffer節(jié)點(diǎn)總是在隊(duì)列的末尾插入,而poll節(jié)點(diǎn)總是在隊(duì)列的隊(duì)頭刪除,所以兩類線程之間沒有交集,可以看作是一個(gè)"單線程offer"的情況。
- offer速度慢于poll:如果poll操作的速度比offer操作快,那么隊(duì)列的長度會越來越短。此時(shí),offer線程和poll線程會存在交集,即在某一時(shí)刻會發(fā)生同時(shí)操作的節(jié)點(diǎn),這個(gè)時(shí)刻被稱為臨界點(diǎn)。在臨界點(diǎn)時(shí),需要考慮在offer方法中的處理邏輯。
在多線程環(huán)境下,ConcurrentLinkedQueue的offer方法可以保證并發(fā)安全,無需額外的同步措施。它能夠高效地支持并發(fā)插入操作,并且遵循FIFO(先進(jìn)先出)特性。
import java.util.concurrent.ConcurrentLinkedQueue;
public class main {
public static void main(String[] args) {
// 創(chuàng)建一個(gè)ConcurrentLinkedQueue對象
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
// 單個(gè)線程offer
Thread singleThreadOffer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
queue.offer(i);
System.out.println("單個(gè)線程offer元素:" + i);
}
});
// 多個(gè)線程offer
Thread multipleThreadsOffer = new Thread(() -> {
for (int i = 5; i < 10; i++) {
queue.offer(i);
System.out.println("多個(gè)線程offer元素:" + i);
}
});
// 部分線程offer,部分線程poll
Thread mixedThreads = new Thread(() -> {
for (int i = 0; i < 5; i++) {
if (i % 2 == 0) {
queue.offer(i);
System.out.println("部分線程offer元素:" + i);
} else {
Integer element = queue.poll();
System.out.println("部分線程poll元素:" + element);
}
}
});
// 啟動線程
singleThreadOffer.start();
multipleThreadsOffer.start();
mixedThreads.start();
// 等待線程執(zhí)行完畢
try {
singleThreadOffer.join();
multipleThreadsOffer.join();
mixedThreads.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}上述代碼演示了三種場景:
- 單個(gè)線程執(zhí)行offer操作,將元素逐個(gè)插入隊(duì)列;
- 多個(gè)線程同時(shí)執(zhí)行offer操作,將元素逐個(gè)插入隊(duì)列;
- 部分線程執(zhí)行offer操作,部分線程執(zhí)行poll操作,實(shí)現(xiàn)元素的插入和移除。
在每個(gè)線程中,通過queue.offer(i)方法將元素插入隊(duì)列,使用queue.poll()方法從隊(duì)列移除元素。每個(gè)操作都會打印出相應(yīng)的信息以便觀察結(jié)果。
ConcurrentLinkedQueue的offer和poll方法可以安全地在多線程環(huán)境下使用,并且遵循FIFO(先進(jìn)先出)特性。
poll方法
- 在單線程下,poll操作會先判斷隊(duì)頭節(jié)點(diǎn)的數(shù)據(jù)是否為空,如果不為空,則直接返回?cái)?shù)據(jù)并將該節(jié)點(diǎn)出隊(duì);否則需要遍歷隊(duì)列尋找數(shù)據(jù)不為空的節(jié)點(diǎn),并更新隊(duì)頭的指向。
- 在多線程情況下,需要注意處理多個(gè)線程同時(shí)poll和offer的情況,保證操作的正確性。同時(shí),在判斷隊(duì)列是否為空的時(shí)候,不能僅僅依靠poll返回值是否為null,而應(yīng)該使用isEmpty方法進(jìn)行判斷。
import java.util.Queue;
import java.util.concurrent.*;
public class main {
// 創(chuàng)建一個(gè)ConcurrentLinkedQueue對象
private static Queue<Integer> queue = new ConcurrentLinkedQueue<>();
public static void main(String[] args) throws InterruptedException {
// 將數(shù)字放入隊(duì)列中
queue.offer(1);
queue.offer(2);
queue.offer(3);
// 從隊(duì)列中取出元素并輸出
Integer value = queue.poll();
System.out.println("取出的值為:" + value);
System.out.println("當(dāng)前隊(duì)列中的元素為:" + queue.toString());
// 創(chuàng)建一個(gè)線程池
ExecutorService executor = Executors.newCachedThreadPool();
// 循環(huán)10次,每次提交一個(gè)任務(wù)到線程池中
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
// 將數(shù)字放入隊(duì)列中
queue.offer((int) (Math.random() * 100));
// 從隊(duì)列中取出元素并輸出
Integer result = queue.poll();
System.out.println(Thread.currentThread().getName() + " 取出的值為:" + result);
System.out.println("當(dāng)前隊(duì)列中的元素為:" + queue.toString());
});
}
// 關(guān)閉線程池
executor.shutdown();
// 等待線程池中的任務(wù)執(zhí)行完成
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}
}在這個(gè)綜合代碼示例中,首先在單個(gè)線程環(huán)境下將數(shù)字放入隊(duì)列中并取出元素,并輸出隊(duì)列的當(dāng)前狀態(tài)。接著,在多線程環(huán)境下,我們創(chuàng)建了一個(gè)線程池并提交了10個(gè)任務(wù),每個(gè)任務(wù)都會隨機(jī)生成一個(gè)數(shù)字放入隊(duì)列中,然后再從隊(duì)列中取出元素并輸出,最后輸出隊(duì)列的當(dāng)前狀態(tài)??梢钥吹剑趩蝹€(gè)線程和多個(gè)線程的情況下,使用ConcurrentLinkedQueue的poll方法都能夠保證隊(duì)列操作的線程安全性,并且在多線程環(huán)境下能夠提高程序的并發(fā)性能。
HOPS的設(shè)計(jì)
通過上面對offer和poll方法的分析,我們發(fā)現(xiàn)tail和head是延遲更新的,兩者更新觸發(fā)時(shí)機(jī)為:
HOPS的設(shè)計(jì)通過延遲更新的方式,減少CAS操作的頻率,從而提升入隊(duì)操作的性能。具體來說,tail的更新被延遲至插入節(jié)點(diǎn)之后,只有當(dāng)tail指向的節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)不為null時(shí)才會進(jìn)行真正的隊(duì)尾節(jié)點(diǎn)定位和更新操作。同樣地,head的更新也被延遲至刪除節(jié)點(diǎn)之后,只有當(dāng)head指向的節(jié)點(diǎn)的item域?yàn)閚ull時(shí)才會進(jìn)行隊(duì)頭節(jié)點(diǎn)定位和更新操作。
這種延遲更新策略的設(shè)計(jì)意圖是為了減少CAS操作對性能的影響。如果每次都立即更新tail或head,那么大量的入隊(duì)或出隊(duì)操作都需要執(zhí)行CAS操作來更新tail或head,從而對性能產(chǎn)生較大的負(fù)擔(dān)。通過延遲更新,在一定程度上減少了CAS操作的頻率,提升了入隊(duì)操作的效率。
盡管延遲更新會增加在循環(huán)中定位隊(duì)尾節(jié)點(diǎn)的操作,但整體而言,讀取操作的性能要遠(yuǎn)遠(yuǎn)高于寫操作。因此,增加的在循環(huán)中定位尾節(jié)點(diǎn)的操作性能損耗相對較小。
總結(jié)來說,HOPS設(shè)計(jì)中延遲更新的策略通過減少CAS操作的頻率,提升了入隊(duì)操作的性能,同時(shí)在讀取操作性能與寫入操作性能的權(quán)衡中取得了較好的平衡。
擴(kuò)展知識
tail和head是ConcurrentLinkedQueue隊(duì)列中兩個(gè)重要的指針。它們分別指向隊(duì)列中的尾節(jié)點(diǎn)和頭節(jié)點(diǎn)。
當(dāng)需要往隊(duì)列中插入元素時(shí),我們需要使用tail指針指向的節(jié)點(diǎn)作為插入節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),并通過CAS(compare-and-swap)操作將插入節(jié)點(diǎn)加在其后面。如果插入節(jié)點(diǎn)成功地加入了隊(duì)列尾部,則需要使用CAS操作將tail指針指向新的隊(duì)尾節(jié)點(diǎn)。
當(dāng)需要從隊(duì)列中刪除元素時(shí),我們需要使用head指針指向的節(jié)點(diǎn)作為要刪除的節(jié)點(diǎn),并通過CAS操作將其指向下一個(gè)節(jié)點(diǎn)。如果刪除成功,則需要使用updateHead方法將head指針指向真正的隊(duì)頭節(jié)點(diǎn)。
需要注意的是,tail和head指針的更新會存在延遲,只有在特定條件下才會進(jìn)行更新。具體來說,當(dāng)tail指向的節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)不為null時(shí),會執(zhí)行定位隊(duì)列真正的隊(duì)尾節(jié)點(diǎn)的操作,并通過CAS操作完成tail的更新;當(dāng)head指向的節(jié)點(diǎn)的item域?yàn)閚ull時(shí),會執(zhí)行定位隊(duì)列真正的隊(duì)頭節(jié)點(diǎn)的操作,并通過updateHead方法完成head的更新。這種延遲更新策略可以有效地減少CAS操作的頻率,提高隊(duì)列的性能。
到此這篇關(guān)于Java并發(fā)容器ConcurrentLinkedQueue解析的文章就介紹到這了,更多相關(guān)Java的ConcurrentLinkedQueue內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java 實(shí)現(xiàn)圖片圓角處理、背景透明化
這篇文章主要介紹了java 實(shí)現(xiàn)圖片圓角處理、背景透明化,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11
Java處理圖片實(shí)現(xiàn)base64編碼轉(zhuǎn)換
這篇文章主要介紹了Java處理圖片實(shí)現(xiàn)base64編碼轉(zhuǎn)換,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-02-02
IntelliJ?IDEA?2023.1.4?無法刷新Maven項(xiàng)目模塊的問題及解決方法
這篇文章主要介紹了如何排查?IDEA?自身報(bào)錯問題,本文以IntelliJ?IDEA?2023.1.4無法刷新項(xiàng)目Maven模塊的問題為例,給大家詳細(xì)講解,需要的朋友可以參考下2023-08-08
SpringBoot中@ConditionalOnProperty注解的使用方法詳解
這篇文章主要介紹了SpringBoot中@ConditionalOnProperty注解的使用方法詳解,在開發(fā)基于SpringBoot框架的項(xiàng)目時(shí),會用到下面的條件注解,有時(shí)會有需要控制配置類是否生效或注入到Spring上下文中的場景,可以使用@ConditionalOnProperty注解來控制,需要的朋友可以參考下2024-01-01
Springcloud Alibaba超詳細(xì)使用詳解
SpringCloudAlibaba是一款優(yōu)秀的微服務(wù)架構(gòu),在市面上有著廣泛的應(yīng)用,這篇文章介紹了SpringCloudAlibaba的一些基本使用,適合初學(xué)者,希望能夠給大家?guī)韼椭?/div> 2024-08-08
不使用Math.random方法生成隨機(jī)數(shù)(隨機(jī)數(shù)生成器)
不調(diào)用Math.random方法產(chǎn)生自己的隨機(jī)數(shù),現(xiàn)代計(jì)算機(jī)運(yùn)行速度很快,在主線程等待一定毫秒數(shù)時(shí),其他線程就會執(zhí)行run方法中的while循環(huán),一般會執(zhí)行數(shù)十萬次2014-01-01最新評論

