Java并發(fā)編程ArrayBlockingQueue的使用
一、ArrayBlockingQueue概述
ArrayBlockingQueue
是一個(gè)基于數(shù)組的有界阻塞隊(duì)列。它在創(chuàng)建時(shí)需要指定隊(duì)列的大小,并且這個(gè)大小在之后是不能改變的。隊(duì)列中的元素按照FIFO(先進(jìn)先出)的原則進(jìn)行排序。ArrayBlockingQueue
是線程安全的,可以在多線程環(huán)境下安全地使用。
二、內(nèi)部機(jī)制
2.1. 數(shù)據(jù)結(jié)構(gòu)
ArrayBlockingQueue
內(nèi)部使用一個(gè)循環(huán)數(shù)組作為存儲(chǔ)結(jié)構(gòu)。它有兩個(gè)關(guān)鍵索引:takeIndex
和putIndex
,分別用于從隊(duì)列中取出元素和向隊(duì)列中添加元素。當(dāng)添加元素時(shí),putIndex
會(huì)遞增;當(dāng)取出元素時(shí),takeIndex
會(huì)遞增。當(dāng)索引達(dá)到數(shù)組的末尾時(shí),它們會(huì)回到數(shù)組的開頭,形成一個(gè)循環(huán)。
2.2. 鎖和條件變量
為了保證線程安全,ArrayBlockingQueue
使用了一個(gè)重入鎖(ReentrantLock
)以及與之關(guān)聯(lián)的條件變量(Condition
)。鎖用于保護(hù)隊(duì)列的狀態(tài),而條件變量用于在隊(duì)列為空或滿時(shí)等待和通知線程。具體來說,ArrayBlockingQueue
內(nèi)部有兩個(gè)條件變量:notEmpty
和notFull
。當(dāng)隊(duì)列滿時(shí),生產(chǎn)者線程會(huì)等待在notFull
條件變量上;當(dāng)隊(duì)列空時(shí),消費(fèi)者線程會(huì)等待在notEmpty
條件變量上。
2.3. 入隊(duì)和出隊(duì)操作
- 入隊(duì)操作(put):當(dāng)調(diào)用
put
方法向隊(duì)列中添加元素時(shí),如果隊(duì)列已滿,生產(chǎn)者線程會(huì)被阻塞,直到隊(duì)列中有空閑位置。一旦有空閑位置,生產(chǎn)者線程會(huì)將元素添加到隊(duì)列中,并通知可能在等待的消費(fèi)者線程。 - 出隊(duì)操作(take):當(dāng)調(diào)用
take
方法從隊(duì)列中取出元素時(shí),如果隊(duì)列為空,消費(fèi)者線程會(huì)被阻塞,直到隊(duì)列中有元素可供消費(fèi)。一旦有元素可供消費(fèi),消費(fèi)者線程會(huì)從隊(duì)列中取出元素,并通知可能在等待的生產(chǎn)者線程。
三、使用場(chǎng)景
- 生產(chǎn)者-消費(fèi)者模式:
ArrayBlockingQueue
非常適合實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模式。生產(chǎn)者線程將元素添加到隊(duì)列中,消費(fèi)者線程從隊(duì)列中取出元素進(jìn)行處理。通過阻塞隊(duì)列,可以很好地協(xié)調(diào)生產(chǎn)者和消費(fèi)者之間的速率差異,避免資源的浪費(fèi)。 - 限流:由于
ArrayBlockingQueue
是一個(gè)有界隊(duì)列,它可以用于實(shí)現(xiàn)限流功能。當(dāng)隊(duì)列已滿時(shí),新的請(qǐng)求會(huì)被阻塞或拒絕,從而保護(hù)系統(tǒng)免受過多的請(qǐng)求沖擊。 - 任務(wù)調(diào)度:在并發(fā)編程中,
ArrayBlockingQueue
可以用作任務(wù)調(diào)度器的一部分。將任務(wù)作為元素添加到隊(duì)列中,然后由工作線程從隊(duì)列中取出任務(wù)進(jìn)行處理。這種方式可以實(shí)現(xiàn)任務(wù)的異步執(zhí)行和資源的有效利用。
四、最佳實(shí)踐
- 合理設(shè)置隊(duì)列大小:在使用
ArrayBlockingQueue
時(shí),應(yīng)根據(jù)實(shí)際需求合理設(shè)置隊(duì)列的大小。過小的隊(duì)列可能導(dǎo)致頻繁的阻塞和上下文切換;過大的隊(duì)列可能導(dǎo)致內(nèi)存浪費(fèi)和長(zhǎng)時(shí)間的等待。 - 避免在隊(duì)列中存儲(chǔ)大量數(shù)據(jù):由于
ArrayBlockingQueue
是基于數(shù)組的實(shí)現(xiàn),每個(gè)元素都會(huì)占用一定的內(nèi)存空間。因此,應(yīng)避免在隊(duì)列中存儲(chǔ)大量數(shù)據(jù),以減少內(nèi)存消耗和垃圾回收的壓力??梢詫?shù)據(jù)拆分成較小的單元進(jìn)行傳輸和處理。 - 注意線程安全:雖然
ArrayBlockingQueue
本身是線程安全的,但在使用過程中仍需注意線程安全的問題。例如,在多個(gè)線程同時(shí)訪問隊(duì)列時(shí),應(yīng)確保對(duì)隊(duì)列的訪問是原子的,以避免競(jìng)態(tài)條件和數(shù)據(jù)不一致的問題。 - 優(yōu)雅地處理中斷:當(dāng)線程在等待從隊(duì)列中取出元素或向隊(duì)列中添加元素時(shí),可能會(huì)被中斷。在編寫代碼時(shí),應(yīng)優(yōu)雅地處理這些中斷情況,例如通過捕獲
InterruptedException
并適當(dāng)?shù)仨憫?yīng)中斷請(qǐng)求。 - 使用try-with-resources語句:在使用
ArrayBlockingQueue
的迭代器時(shí),建議使用try-with-resources語句來自動(dòng)關(guān)閉迭代器。這樣可以確保在迭代過程中及時(shí)釋放資源,避免資源泄漏的問題。
五、ArrayBlockingQueue實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者
下面是一個(gè)使用ArrayBlockingQueue
實(shí)現(xiàn)的稍微復(fù)雜的生產(chǎn)者-消費(fèi)者示例。代碼中模擬一個(gè)生產(chǎn)者線程生產(chǎn)數(shù)據(jù),多個(gè)消費(fèi)者線程消費(fèi)數(shù)據(jù)的場(chǎng)景,并且消費(fèi)者在處理完數(shù)據(jù)后會(huì)將結(jié)果存回另一個(gè)阻塞隊(duì)列中以供后續(xù)處理。
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class ProducerConsumerWithArrayBlockingQueue { public static void main(String[] args) { // 創(chuàng)建一個(gè)容量為10的ArrayBlockingQueue作為生產(chǎn)者和消費(fèi)者的共享隊(duì)列 BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); // 創(chuàng)建一個(gè)容量為5的ArrayBlockingQueue用于存儲(chǔ)消費(fèi)者的處理結(jié)果 BlockingQueue<Integer> resultQueue = new ArrayBlockingQueue<>(5); // 創(chuàng)建一個(gè)AtomicInteger作為數(shù)據(jù)生成的計(jì)數(shù)器 AtomicInteger counter = new AtomicInteger(); // 創(chuàng)建一個(gè)生產(chǎn)者線程 Thread producer = new Thread(() -> { try { for (int i = 0; i < 20; i++) { int item = counter.incrementAndGet(); System.out.println("生產(chǎn)者生產(chǎn)數(shù)據(jù):" + item); // 將數(shù)據(jù)放入隊(duì)列中 queue.put(item); // 稍微延遲一下,模擬生產(chǎn)數(shù)據(jù)的時(shí)間消耗 Thread.sleep(200); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); // 創(chuàng)建一個(gè)固定線程池的ExecutorService用于執(zhí)行消費(fèi)者任務(wù) ExecutorService executorService = Executors.newFixedThreadPool(3); // 提交3個(gè)消費(fèi)者任務(wù)到線程池中 for (int i = 0; i < 3; i++) { executorService.submit(() -> { try { while (true) { // 從隊(duì)列中取出數(shù)據(jù) int item = queue.take(); // 處理數(shù)據(jù)(此處僅打印作為示例) System.out.println("消費(fèi)者" + Thread.currentThread().getId() + "消費(fèi)數(shù)據(jù):" + item); // 假設(shè)處理后的數(shù)據(jù)是原始數(shù)據(jù)的平方 int processedItem = item * item; // 將處理后的結(jié)果存入結(jié)果隊(duì)列中 resultQueue.put(processedItem); // 稍微延遲一下,模擬處理數(shù)據(jù)的時(shí)間消耗 Thread.sleep(500); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } // 啟動(dòng)生產(chǎn)者線程 producer.start(); // 等待生產(chǎn)者線程完成 try { producer.join(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } // 關(guān)閉ExecutorService(這將導(dǎo)致消費(fèi)者線程中斷) executorService.shutdown(); try { // 等待一段時(shí)間,讓消費(fèi)者線程處理剩余的數(shù)據(jù) if (!executorService.awaitTermination(2, TimeUnit.SECONDS)) { executorService.shutdownNow(); // 如果超時(shí)則強(qiáng)制關(guān)閉消費(fèi)者線程 } } catch (InterruptedException e) { executorService.shutdownNow(); Thread.currentThread().interrupt(); } // 處理結(jié)果隊(duì)列中的數(shù)據(jù)(此處僅打印作為示例) while (!resultQueue.isEmpty()) { System.out.println("處理結(jié)果:" + resultQueue.poll()); } } }
在上面的代碼中,我們定義了兩個(gè)阻塞隊(duì)列
queue
和resultQueue
,一個(gè)用于生產(chǎn)者和消費(fèi)者之間傳遞數(shù)據(jù),另一個(gè)用于存儲(chǔ)消費(fèi)者的處理結(jié)果。我們還使用了一個(gè)
AtomicInteger
作為數(shù)據(jù)生成的計(jì)數(shù)器。生產(chǎn)者線程每次生產(chǎn)一個(gè)數(shù)據(jù)就將其放入queue
中,而消費(fèi)者線程則從queue
中取出數(shù)據(jù)進(jìn)行處理,并將處理結(jié)果放入resultQueue
中。最后,我們?cè)谥骶€程中等待生產(chǎn)者線程完成后,關(guān)閉消費(fèi)者線程的
ExecutorService
,并處理resultQueue
中的剩余數(shù)據(jù)。
需要注意的是,在實(shí)際的生產(chǎn)環(huán)境中,消費(fèi)者線程通常會(huì)有退出條件,而不是無限循環(huán)地處理數(shù)據(jù)。在這個(gè)示例中,由于我們?cè)O(shè)置了executorService.awaitTermination的超時(shí)時(shí)間,所以當(dāng)超時(shí)發(fā)生時(shí),會(huì)強(qiáng)制關(guān)閉消費(fèi)者線程。但是,在更復(fù)雜的場(chǎng)景下,我們可能需要使用其他機(jī)制來優(yōu)雅地關(guān)閉消費(fèi)者線程,例如使用一個(gè)特殊的結(jié)束信號(hào)或定期檢查某個(gè)關(guān)閉標(biāo)志。
請(qǐng)注意,在ArrayBlockingQueue
中,queue.isEmpty()
并不是一個(gè)可靠的退出條件,因?yàn)樵诙嗑€程環(huán)境下,你可能會(huì)遇到競(jìng)態(tài)條件的問題。更可靠的方式是使用一個(gè)特殊的結(jié)束信號(hào)或定期檢查某個(gè)關(guān)閉標(biāo)志來退出循環(huán)。
六、總結(jié)
ArrayBlockingQueue
是Java并發(fā)編程中一個(gè)非常有用的數(shù)據(jù)結(jié)構(gòu)。它提供了一個(gè)高效、線程安全的有界阻塞隊(duì)列實(shí)現(xiàn),適用于多種場(chǎng)景如生產(chǎn)者-消費(fèi)者模式、限流和任務(wù)調(diào)度等。在使用過程中,我們應(yīng)注意合理設(shè)置隊(duì)列大小、避免存儲(chǔ)大量數(shù)據(jù)、注意線程安全、優(yōu)雅地處理中斷以及使用try-with-resources語句等最佳實(shí)踐。通過深入了解ArrayBlockingQueue
的內(nèi)部機(jī)制和最佳實(shí)踐,我們可以更好地利用它來解決并發(fā)編程中的挑戰(zhàn)。
到此這篇關(guān)于Java并發(fā)編程ArrayBlockingQueue的使用的文章就介紹到這了,更多相關(guān)Java ArrayBlockingQueue內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- java ArrayBlockingQueue阻塞隊(duì)列的實(shí)現(xiàn)示例
- Java中ArrayBlockingQueue和LinkedBlockingQueue
- Java 并發(fā)編程ArrayBlockingQueue的實(shí)現(xiàn)
- java ArrayBlockingQueue的方法及缺點(diǎn)分析
- Java源碼解析阻塞隊(duì)列ArrayBlockingQueue介紹
- Java源碼解析阻塞隊(duì)列ArrayBlockingQueue常用方法
- Java源碼解析阻塞隊(duì)列ArrayBlockingQueue功能簡(jiǎn)介
- 詳細(xì)分析Java并發(fā)集合ArrayBlockingQueue的用法
- java并發(fā)之ArrayBlockingQueue詳細(xì)介紹
相關(guān)文章
Spring Boot 2.0多數(shù)據(jù)源配置方法實(shí)例詳解
這篇文章主要介紹了Spring Boot 2.0多數(shù)據(jù)源配置方法實(shí)例詳解,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2018-09-09Springboot上傳文件時(shí)提示405問題及排坑過程
這篇文章主要介紹了Springboot上傳文件時(shí)提示405問題及排坑過程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07一篇文中細(xì)看Java多線程的創(chuàng)建方式
隨著計(jì)算機(jī)的配置越來越高,我們需要將進(jìn)程進(jìn)一步優(yōu)化,細(xì)分為線程,充分提高圖形化界面的多線程的開發(fā),這篇文章主要給大家介紹了如何通過一篇文中細(xì)看Java多線程的創(chuàng)建方式,需要的朋友可以參考下2021-07-07Java連接合并2個(gè)數(shù)組(Array)的5種方法例子
最近在寫代碼時(shí)遇到了需要合并兩個(gè)數(shù)組的需求,突然發(fā)現(xiàn)以前沒用過,于是研究了一下合并數(shù)組的方式,這篇文章主要給大家介紹了關(guān)于Java連接合并2個(gè)數(shù)組(Array)的5種方法,需要的朋友可以參考下2023-12-12詳解Java如何使用集合來實(shí)現(xiàn)一個(gè)客戶信息管理系統(tǒng)
讀萬卷書不如行萬里路,只學(xué)書上的理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實(shí)戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用Java 集合實(shí)現(xiàn)一個(gè)客戶信息管理系統(tǒng),大家可以在過程中查缺補(bǔ)漏,提升水平2021-11-11SpringBoot設(shè)置默認(rèn)主頁的方法步驟
這篇文章主要介紹了SpringBoot設(shè)置默認(rèn)主頁的方法步驟,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12