java ArrayBlockingQueue阻塞隊(duì)列的實(shí)現(xiàn)示例
在Java并發(fā)編程中,ArrayBlockingQueue
是一個非常常用的工具類。它是一個由數(shù)組支持的有界阻塞隊(duì)列,提供了線程安全的隊(duì)列操作。
1.ArrayBlockingQueue概述
ArrayBlockingQueue
是一個基于數(shù)組實(shí)現(xiàn)的阻塞隊(duì)列,它繼承自AbstractQueue
并實(shí)現(xiàn)了BlockingQueue
接口。這個隊(duì)列在創(chuàng)建時需要指定一個固定的大小,之后這個大小就不能再改變了。當(dāng)隊(duì)列滿時,如果再有新的元素試圖加入隊(duì)列,那么這個操作會被阻塞;同樣地,如果隊(duì)列為空,那么從隊(duì)列中取元素的操作也會被阻塞。這種特性使得ArrayBlockingQueue
非常適合作為生產(chǎn)者-消費(fèi)者模式中的緩沖區(qū)。
2.ArrayBlockingQueue的核心特性
2.1.線程安全性
ArrayBlockingQueue
是線程安全的,它通過內(nèi)部鎖機(jī)制保證了在多線程環(huán)境下的安全性。因此,在多線程環(huán)境中,你可以放心地使用它而不需要擔(dān)心數(shù)據(jù)的一致性問題。
2.2.阻塞控制
ArrayBlockingQueue
提供了阻塞控制機(jī)制。當(dāng)隊(duì)列滿時,嘗試向隊(duì)列中添加元素的線程會被阻塞,直到隊(duì)列中有空間可用;同樣,當(dāng)隊(duì)列為空時,嘗試從隊(duì)列中取出元素的線程也會被阻塞,直到隊(duì)列中有元素可供消費(fèi)。這種機(jī)制可以有效地控制生產(chǎn)者和消費(fèi)者的速度,避免資源的浪費(fèi)。
2.3.有界性
ArrayBlockingQueue
的有界性可以防止隊(duì)列無限制地增長,從而避免內(nèi)存溢出。在實(shí)際應(yīng)用中,這種有界性可以作為系統(tǒng)的一個流量控制閥,當(dāng)系統(tǒng)過載時,通過阻塞或拒絕請求來保護(hù)系統(tǒng)。
3.ArrayBlockingQueue的使用
3.1.創(chuàng)建ArrayBlockingQueue
創(chuàng)建一個ArrayBlockingQueue
非常簡單,只需要指定隊(duì)列的大小即可:
int queueSize = 10; BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(queueSize);
3.2.生產(chǎn)者-消費(fèi)者模式
ArrayBlockingQueue
常用于生產(chǎn)者-消費(fèi)者模式。生產(chǎn)者負(fù)責(zé)生成數(shù)據(jù)并添加到隊(duì)列中,而消費(fèi)者則從隊(duì)列中取出數(shù)據(jù)并處理。下面是一個簡單的生產(chǎn)者-消費(fèi)者示例:
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class ProducerConsumerExample { public static void main(String[] args) { BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5); Thread producer = new Thread(() -> { for (int i = 0; i < 10; i++) { try { System.out.println("生產(chǎn)者生產(chǎn)了數(shù)據(jù):" + i); queue.put(i); Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread consumer = new Thread(() -> { while (true) { try { Integer data = queue.take(); System.out.println("消費(fèi)者消費(fèi)了數(shù)據(jù):" + data); } catch (InterruptedException e) { e.printStackTrace(); } } }); producer.start(); consumer.start(); } }
運(yùn)行結(jié)果:
生產(chǎn)者生產(chǎn)了數(shù)據(jù):0
消費(fèi)者消費(fèi)了數(shù)據(jù):0
生產(chǎn)者生產(chǎn)了數(shù)據(jù):1
消費(fèi)者消費(fèi)了數(shù)據(jù):1
生產(chǎn)者生產(chǎn)了數(shù)據(jù):2
消費(fèi)者消費(fèi)了數(shù)據(jù):2
生產(chǎn)者生產(chǎn)了數(shù)據(jù):3
消費(fèi)者消費(fèi)了數(shù)據(jù):3
生產(chǎn)者生產(chǎn)了數(shù)據(jù):4
消費(fèi)者消費(fèi)了數(shù)據(jù):4
生產(chǎn)者生產(chǎn)了數(shù)據(jù):5
消費(fèi)者消費(fèi)了數(shù)據(jù):5
生產(chǎn)者生產(chǎn)了數(shù)據(jù):6
消費(fèi)者消費(fèi)了數(shù)據(jù):6
生產(chǎn)者生產(chǎn)了數(shù)據(jù):7
消費(fèi)者消費(fèi)了數(shù)據(jù):7
生產(chǎn)者生產(chǎn)了數(shù)據(jù):8
消費(fèi)者消費(fèi)了數(shù)據(jù):8
生產(chǎn)者生產(chǎn)了數(shù)據(jù):9
消費(fèi)者消費(fèi)了數(shù)據(jù):9
在這個示例中,我們創(chuàng)建了一個大小為5的ArrayBlockingQueue
,然后啟動了一個生產(chǎn)者線程和一個消費(fèi)者線程。生產(chǎn)者線程會生成10個數(shù)據(jù),并嘗試將它們添加到隊(duì)列中;消費(fèi)者線程則會不斷地從隊(duì)列中取出數(shù)據(jù)并處理。由于隊(duì)列的大小只有5,因此當(dāng)生產(chǎn)者生產(chǎn)了5個數(shù)據(jù)后,它會被阻塞,直到消費(fèi)者消費(fèi)了一些數(shù)據(jù)釋放出空間。同樣地,當(dāng)隊(duì)列為空時,消費(fèi)者線程也會被阻塞,直到生產(chǎn)者生產(chǎn)了新的數(shù)據(jù)。
4.ArrayBlockingQueue的最佳實(shí)踐
4.1.選擇合適的隊(duì)列大小
隊(duì)列的大小應(yīng)根據(jù)具體的應(yīng)用場景來設(shè)置。如果設(shè)置得太小,可能會導(dǎo)致頻繁的阻塞和上下文切換,影響性能;如果設(shè)置得太大,可能會浪費(fèi)內(nèi)存資源。因此,在選擇隊(duì)列大小時,需要綜合考慮系統(tǒng)的負(fù)載、內(nèi)存資源和性能要求等因素。
4.2.合理使用阻塞方法
ArrayBlockingQueue
提供了多種阻塞方法,如put
、take
、offer
和poll
等。在使用這些方法時,需要根據(jù)具體的需求來選擇合適的方法。例如,如果你希望當(dāng)隊(duì)列滿時生產(chǎn)者線程能夠阻塞等待空間可用,那么可以使用put
方法;如果你希望生產(chǎn)者線程在隊(duì)列滿時能夠立即返回并做其他處理,那么可以使用offer
方法。
4.3.避免死鎖
在使用ArrayBlockingQueue
時,需要注意避免死鎖的發(fā)生。例如,不要在持有其他鎖的情況下調(diào)用ArrayBlockingQueue
的阻塞方法,否則可能會導(dǎo)致死鎖。此外,還需要注意避免循環(huán)等待和饑餓等問題。
4.4.考慮使用公平策略
ArrayBlockingQueue
的構(gòu)造函數(shù)允許指定一個公平性參數(shù)。如果設(shè)置為true,等待時間最長的線程將優(yōu)先獲得訪問隊(duì)列的機(jī)會。但需要注意的是,公平性可能會降低性能。因此,在決定是否使用公平策略時,需要綜合考慮系統(tǒng)的性能和公平性要求。
5.源碼詳解
5.1.主要屬性
// 用于存儲隊(duì)列元素的數(shù)組 final Object[] items; // 隊(duì)列的容量 int count; // 控制并發(fā)訪問的鎖 final ReentrantLock lock; // 隊(duì)列不滿時的等待條件 private final Condition notFull; // 隊(duì)列不為空時的等待條件 private final Condition notEmpty; // 隊(duì)列中等待取數(shù)據(jù)的線程數(shù) final AtomicInteger waitingConsumers = new AtomicInteger(); // 隊(duì)列中等待插入數(shù)據(jù)的線程數(shù) final AtomicInteger waitingProducers = new AtomicInteger();
5.2.構(gòu)造函數(shù)
ArrayBlockingQueue
提供了幾種構(gòu)造函數(shù),其中最基本的兩個是接受隊(duì)列容量和指定是否公平的構(gòu)造函數(shù)。
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
5.3.入隊(duì)操作
put(E e)
和 offer(E e)
是兩種入隊(duì)操作,其中 put
方法在隊(duì)列滿時會阻塞,而 offer
方法在隊(duì)列滿時會立即返回失敗或者根據(jù)提供的超時時間等待。
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } } private void enqueue(E x) { // 隊(duì)列尾部插入元素 final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; // 通知可能在等待的消費(fèi)者線程 notEmpty.signal(); }
5.4.出隊(duì)操作
take()
和 poll()
是兩種出隊(duì)操作,其中 take
方法在隊(duì)列空時會阻塞,而 poll
方法在隊(duì)列空時會立即返回 null
或者根據(jù)提供的超時時間等待。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // 隊(duì)列頭部取出元素 final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); // 通知可能在等待的生產(chǎn)者線程 notFull.signal(); return x; }
6.總結(jié)
ArrayBlockingQueue
是Java并發(fā)編程中一個非常實(shí)用的工具類。它提供了線程安全的阻塞隊(duì)列實(shí)現(xiàn),支持生產(chǎn)者-消費(fèi)者模式,并允許通過隊(duì)列的大小來控制系統(tǒng)的流量。在使用ArrayBlockingQueue
時,需要注意選擇合適的隊(duì)列大小、合理使用阻塞方法、避免死鎖和考慮使用公平策略等問題。通過合理地使用ArrayBlockingQueue
,可以有效地提高系統(tǒng)的并發(fā)性能和穩(wěn)定性。
到此這篇關(guān)于java ArrayBlockingQueue阻塞隊(duì)列的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)java ArrayBlockingQueue阻塞隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Java中ArrayBlockingQueue和LinkedBlockingQueue
- Java 并發(fā)編程ArrayBlockingQueue的實(shí)現(xiàn)
- java ArrayBlockingQueue的方法及缺點(diǎn)分析
- Java源碼解析阻塞隊(duì)列ArrayBlockingQueue介紹
- Java源碼解析阻塞隊(duì)列ArrayBlockingQueue常用方法
- Java源碼解析阻塞隊(duì)列ArrayBlockingQueue功能簡介
- 詳細(xì)分析Java并發(fā)集合ArrayBlockingQueue的用法
- java并發(fā)之ArrayBlockingQueue詳細(xì)介紹
- Java并發(fā)編程ArrayBlockingQueue的使用
相關(guān)文章
JavaWeb中轉(zhuǎn)發(fā)與重定向的區(qū)別小結(jié)
轉(zhuǎn)發(fā)和重定向是JavaWeb中常用的兩種頁面跳轉(zhuǎn)方式,它們在實(shí)現(xiàn)上有一些區(qū)別,本文主要介紹了JavaWeb中轉(zhuǎn)發(fā)與重定向的區(qū)別小結(jié),具有一定的參考價值,感興趣的可以了解一下2023-10-10使用Java進(jìn)行Json數(shù)據(jù)的解析(對象數(shù)組的相互嵌套)
下面小編就為大家?guī)硪黄褂肑ava進(jìn)行Json數(shù)據(jù)的解析(對象數(shù)組的相互嵌套)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-08-08java通過cglib動態(tài)生成實(shí)體bean的操作
這篇文章主要介紹了java通過cglib動態(tài)生成實(shí)體bean的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02