Java并發(fā)容器ConcurrentLinkedQueue解析
ConcurrentLinkedQueue簡介
在單線程編程中常用的集合類,如ArrayList和HashMap等,但是這些類都不是線程安全的類。為了保證線程安全,可以使用Vector作為替代,但其通過使用synchronized獨占鎖在方法級別實現(xiàn)線程安全,從而使多線程執(zhí)行變?yōu)榇谢?,效率較低。另外,也可以使用Collections.synchronizedList(List<T> list)將ArrayList轉(zhuǎn)換為線程安全的,但仍然是通過synchronized修飾方法實現(xiàn)的,依然不是高效的方式。
針對隊列這種常用的數(shù)據(jù)結(jié)構(gòu),在解決線程安全問題時,Doug Lea大師提供了ConcurrentLinkedQueue,這是一個線程安全的隊列。根據(jù)類名可以看出,它基于鏈表實現(xiàn)隊列的數(shù)據(jù)結(jié)構(gòu)。
Node
Node節(jié)點是用于鏈表數(shù)據(jù)結(jié)構(gòu)中,構(gòu)成ConcurrentLinkedQueue的基本單元。節(jié)點包含了兩個字段:一個是item,用于存儲元素數(shù)據(jù);另一個是next,用于指向下一個節(jié)點,從而實現(xiàn)鏈表結(jié)構(gòu)。
在ConcurrentLinkedQueue中,由于要支持并發(fā)操作,因此使用了volatile關(guān)鍵字對節(jié)點的item和next字段進(jìn)行修飾。volatile關(guān)鍵字可以保證在多線程環(huán)境下的可見性,從而避免了出現(xiàn)臟讀等線程安全問題。
在ConcurrentLinkedQueue的構(gòu)造函數(shù)中,會初始化頭尾節(jié)點,同時將head和tail指向同一個初始節(jié)點。這個節(jié)點的item為空(null),表示隊列中還沒有任何元素。
通過不斷添加和刪除節(jié)點,就可以實現(xiàn)ConcurrentLinkedQueue,該隊列是線程安全的,由于使用了無鎖算法(CAS操作),因此具有較高的吞吐量。
操作Node的幾個CAS操作
在ConcurrentLinkedQueue中,對Node節(jié)點的CAS操作(關(guān)于CAS操作可以看這篇文章),有以下幾個方法:
- casItem(E cmp, E val): 用于比較并交換節(jié)點的數(shù)據(jù)域item。這個操作會比較節(jié)點的item域是否等于cmp,如果相等則將其替換為val。返回true表示交換成功,返回false表示交換失敗。
- lazySetNext(Node<E> val): 用于延遲設(shè)置節(jié)點的下一個節(jié)點指針next。這個操作會將節(jié)點的next字段設(shè)置為val,但不保證立即可見。即使在執(zhí)行這個操作之后,其他線程讀取到的節(jié)點的next值可能仍然是舊值。這種操作通常在性能上比強制可見性要好。
- casNext(Node<E> cmp, Node<E> val): 用于比較并交換節(jié)點的下一個節(jié)點指針next。這個操作會比較節(jié)點的next域是否等于cmp,如果相等則將其替換為val。返回true表示交換成功,返回false表示交換失敗。
//更改Node中的數(shù)據(jù)域item boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } //更改Node中的指針域next void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } //更改Node中的指針域next boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); }
需要注意的是,這些方法中都是通過調(diào)用sun.misc.Unsafe類的相關(guān)方法來實現(xiàn)CAS操作。Unsafe類是Java底層提供的一個類,用于支持直接操作內(nèi)存和并發(fā)操作,它提供了一些原子性操作的方法,包括compareAndSwapObject()等方法。這些方法利用處理器指令集的CMPXCHG指令來實現(xiàn)原子性的比較和交換操作。
CAS操作在并發(fā)編程中非常重要,它可以避免使用鎖機制而實現(xiàn)線程安全,提高并發(fā)性能。然而,需要注意的是CAS操作并不是適用于所有情況,有時候可能會存在ABA問題等需要注意的情況。
offer方法
1. 單個線程offer:當(dāng)只有一個線程執(zhí)行offer操作時,它會將元素插入到隊列的末尾。由于沒有其他線程執(zhí)行poll操作,所以隊列的長度會不斷增長。
2. 多個線程offer:當(dāng)多個線程同時執(zhí)行offer操作時,它們會將元素插入到隊列的末尾。因為offer操作總是在隊列的末尾進(jìn)行,而poll操作總是在隊列的隊頭進(jìn)行,所以這兩類線程并不會相互影響。
3. 部分線程offer,部分線程poll:
- offer速度快于poll:如果offer操作的速度比poll操作快,那么隊列的長度會越來越長。因為offer節(jié)點總是在隊列的末尾插入,而poll節(jié)點總是在隊列的隊頭刪除,所以兩類線程之間沒有交集,可以看作是一個"單線程offer"的情況。
- offer速度慢于poll:如果poll操作的速度比offer操作快,那么隊列的長度會越來越短。此時,offer線程和poll線程會存在交集,即在某一時刻會發(fā)生同時操作的節(jié)點,這個時刻被稱為臨界點。在臨界點時,需要考慮在offer方法中的處理邏輯。
在多線程環(huán)境下,ConcurrentLinkedQueue的offer方法可以保證并發(fā)安全,無需額外的同步措施。它能夠高效地支持并發(fā)插入操作,并且遵循FIFO(先進(jìn)先出)特性。
import java.util.concurrent.ConcurrentLinkedQueue; public class main { public static void main(String[] args) { // 創(chuàng)建一個ConcurrentLinkedQueue對象 ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(); // 單個線程offer Thread singleThreadOffer = new Thread(() -> { for (int i = 0; i < 5; i++) { queue.offer(i); System.out.println("單個線程offer元素:" + i); } }); // 多個線程offer Thread multipleThreadsOffer = new Thread(() -> { for (int i = 5; i < 10; i++) { queue.offer(i); System.out.println("多個線程offer元素:" + i); } }); // 部分線程offer,部分線程poll Thread mixedThreads = new Thread(() -> { for (int i = 0; i < 5; i++) { if (i % 2 == 0) { queue.offer(i); System.out.println("部分線程offer元素:" + i); } else { Integer element = queue.poll(); System.out.println("部分線程poll元素:" + element); } } }); // 啟動線程 singleThreadOffer.start(); multipleThreadsOffer.start(); mixedThreads.start(); // 等待線程執(zhí)行完畢 try { singleThreadOffer.join(); multipleThreadsOffer.join(); mixedThreads.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }
上述代碼演示了三種場景:
- 單個線程執(zhí)行offer操作,將元素逐個插入隊列;
- 多個線程同時執(zhí)行offer操作,將元素逐個插入隊列;
- 部分線程執(zhí)行offer操作,部分線程執(zhí)行poll操作,實現(xiàn)元素的插入和移除。
在每個線程中,通過queue.offer(i)方法將元素插入隊列,使用queue.poll()方法從隊列移除元素。每個操作都會打印出相應(yīng)的信息以便觀察結(jié)果。
ConcurrentLinkedQueue的offer和poll方法可以安全地在多線程環(huán)境下使用,并且遵循FIFO(先進(jìn)先出)特性。
poll方法
- 在單線程下,poll操作會先判斷隊頭節(jié)點的數(shù)據(jù)是否為空,如果不為空,則直接返回數(shù)據(jù)并將該節(jié)點出隊;否則需要遍歷隊列尋找數(shù)據(jù)不為空的節(jié)點,并更新隊頭的指向。
- 在多線程情況下,需要注意處理多個線程同時poll和offer的情況,保證操作的正確性。同時,在判斷隊列是否為空的時候,不能僅僅依靠poll返回值是否為null,而應(yīng)該使用isEmpty方法進(jìn)行判斷。
import java.util.Queue; import java.util.concurrent.*; public class main { // 創(chuàng)建一個ConcurrentLinkedQueue對象 private static Queue<Integer> queue = new ConcurrentLinkedQueue<>(); public static void main(String[] args) throws InterruptedException { // 將數(shù)字放入隊列中 queue.offer(1); queue.offer(2); queue.offer(3); // 從隊列中取出元素并輸出 Integer value = queue.poll(); System.out.println("取出的值為:" + value); System.out.println("當(dāng)前隊列中的元素為:" + queue.toString()); // 創(chuàng)建一個線程池 ExecutorService executor = Executors.newCachedThreadPool(); // 循環(huán)10次,每次提交一個任務(wù)到線程池中 for (int i = 0; i < 10; i++) { executor.execute(() -> { // 將數(shù)字放入隊列中 queue.offer((int) (Math.random() * 100)); // 從隊列中取出元素并輸出 Integer result = queue.poll(); System.out.println(Thread.currentThread().getName() + " 取出的值為:" + result); System.out.println("當(dāng)前隊列中的元素為:" + queue.toString()); }); } // 關(guān)閉線程池 executor.shutdown(); // 等待線程池中的任務(wù)執(zhí)行完成 executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } }
在這個綜合代碼示例中,首先在單個線程環(huán)境下將數(shù)字放入隊列中并取出元素,并輸出隊列的當(dāng)前狀態(tài)。接著,在多線程環(huán)境下,我們創(chuàng)建了一個線程池并提交了10個任務(wù),每個任務(wù)都會隨機生成一個數(shù)字放入隊列中,然后再從隊列中取出元素并輸出,最后輸出隊列的當(dāng)前狀態(tài)??梢钥吹剑趩蝹€線程和多個線程的情況下,使用ConcurrentLinkedQueue的poll方法都能夠保證隊列操作的線程安全性,并且在多線程環(huán)境下能夠提高程序的并發(fā)性能。
HOPS的設(shè)計
通過上面對offer和poll方法的分析,我們發(fā)現(xiàn)tail和head是延遲更新的,兩者更新觸發(fā)時機為:
HOPS的設(shè)計通過延遲更新的方式,減少CAS操作的頻率,從而提升入隊操作的性能。具體來說,tail的更新被延遲至插入節(jié)點之后,只有當(dāng)tail指向的節(jié)點的下一個節(jié)點不為null時才會進(jìn)行真正的隊尾節(jié)點定位和更新操作。同樣地,head的更新也被延遲至刪除節(jié)點之后,只有當(dāng)head指向的節(jié)點的item域為null時才會進(jìn)行隊頭節(jié)點定位和更新操作。
這種延遲更新策略的設(shè)計意圖是為了減少CAS操作對性能的影響。如果每次都立即更新tail或head,那么大量的入隊或出隊操作都需要執(zhí)行CAS操作來更新tail或head,從而對性能產(chǎn)生較大的負(fù)擔(dān)。通過延遲更新,在一定程度上減少了CAS操作的頻率,提升了入隊操作的效率。
盡管延遲更新會增加在循環(huán)中定位隊尾節(jié)點的操作,但整體而言,讀取操作的性能要遠(yuǎn)遠(yuǎn)高于寫操作。因此,增加的在循環(huán)中定位尾節(jié)點的操作性能損耗相對較小。
總結(jié)來說,HOPS設(shè)計中延遲更新的策略通過減少CAS操作的頻率,提升了入隊操作的性能,同時在讀取操作性能與寫入操作性能的權(quán)衡中取得了較好的平衡。
擴展知識
tail和head是ConcurrentLinkedQueue隊列中兩個重要的指針。它們分別指向隊列中的尾節(jié)點和頭節(jié)點。
當(dāng)需要往隊列中插入元素時,我們需要使用tail指針指向的節(jié)點作為插入節(jié)點的前驅(qū)節(jié)點,并通過CAS(compare-and-swap)操作將插入節(jié)點加在其后面。如果插入節(jié)點成功地加入了隊列尾部,則需要使用CAS操作將tail指針指向新的隊尾節(jié)點。
當(dāng)需要從隊列中刪除元素時,我們需要使用head指針指向的節(jié)點作為要刪除的節(jié)點,并通過CAS操作將其指向下一個節(jié)點。如果刪除成功,則需要使用updateHead方法將head指針指向真正的隊頭節(jié)點。
需要注意的是,tail和head指針的更新會存在延遲,只有在特定條件下才會進(jìn)行更新。具體來說,當(dāng)tail指向的節(jié)點的下一個節(jié)點不為null時,會執(zhí)行定位隊列真正的隊尾節(jié)點的操作,并通過CAS操作完成tail的更新;當(dāng)head指向的節(jié)點的item域為null時,會執(zhí)行定位隊列真正的隊頭節(jié)點的操作,并通過updateHead方法完成head的更新。這種延遲更新策略可以有效地減少CAS操作的頻率,提高隊列的性能。
到此這篇關(guān)于Java并發(fā)容器ConcurrentLinkedQueue解析的文章就介紹到這了,更多相關(guān)Java的ConcurrentLinkedQueue內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java處理圖片實現(xiàn)base64編碼轉(zhuǎn)換
這篇文章主要介紹了Java處理圖片實現(xiàn)base64編碼轉(zhuǎn)換,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-02-02IntelliJ?IDEA?2023.1.4?無法刷新Maven項目模塊的問題及解決方法
這篇文章主要介紹了如何排查?IDEA?自身報錯問題,本文以IntelliJ?IDEA?2023.1.4無法刷新項目Maven模塊的問題為例,給大家詳細(xì)講解,需要的朋友可以參考下2023-08-08SpringBoot中@ConditionalOnProperty注解的使用方法詳解
這篇文章主要介紹了SpringBoot中@ConditionalOnProperty注解的使用方法詳解,在開發(fā)基于SpringBoot框架的項目時,會用到下面的條件注解,有時會有需要控制配置類是否生效或注入到Spring上下文中的場景,可以使用@ConditionalOnProperty注解來控制,需要的朋友可以參考下2024-01-01Springcloud Alibaba超詳細(xì)使用詳解
SpringCloudAlibaba是一款優(yōu)秀的微服務(wù)架構(gòu),在市面上有著廣泛的應(yīng)用,這篇文章介紹了SpringCloudAlibaba的一些基本使用,適合初學(xué)者,希望能夠給大家?guī)韼椭?/div> 2024-08-08不使用Math.random方法生成隨機數(shù)(隨機數(shù)生成器)
不調(diào)用Math.random方法產(chǎn)生自己的隨機數(shù),現(xiàn)代計算機運行速度很快,在主線程等待一定毫秒數(shù)時,其他線程就會執(zhí)行run方法中的while循環(huán),一般會執(zhí)行數(shù)十萬次2014-01-01最新評論