java ArrayBlockingQueue阻塞隊列的實現示例
在Java并發(fā)編程中,ArrayBlockingQueue是一個非常常用的工具類。它是一個由數組支持的有界阻塞隊列,提供了線程安全的隊列操作。
1.ArrayBlockingQueue概述
ArrayBlockingQueue是一個基于數組實現的阻塞隊列,它繼承自AbstractQueue并實現了BlockingQueue接口。這個隊列在創(chuàng)建時需要指定一個固定的大小,之后這個大小就不能再改變了。當隊列滿時,如果再有新的元素試圖加入隊列,那么這個操作會被阻塞;同樣地,如果隊列為空,那么從隊列中取元素的操作也會被阻塞。這種特性使得ArrayBlockingQueue非常適合作為生產者-消費者模式中的緩沖區(qū)。
2.ArrayBlockingQueue的核心特性
2.1.線程安全性
ArrayBlockingQueue是線程安全的,它通過內部鎖機制保證了在多線程環(huán)境下的安全性。因此,在多線程環(huán)境中,你可以放心地使用它而不需要擔心數據的一致性問題。
2.2.阻塞控制
ArrayBlockingQueue提供了阻塞控制機制。當隊列滿時,嘗試向隊列中添加元素的線程會被阻塞,直到隊列中有空間可用;同樣,當隊列為空時,嘗試從隊列中取出元素的線程也會被阻塞,直到隊列中有元素可供消費。這種機制可以有效地控制生產者和消費者的速度,避免資源的浪費。
2.3.有界性
ArrayBlockingQueue的有界性可以防止隊列無限制地增長,從而避免內存溢出。在實際應用中,這種有界性可以作為系統(tǒng)的一個流量控制閥,當系統(tǒng)過載時,通過阻塞或拒絕請求來保護系統(tǒng)。
3.ArrayBlockingQueue的使用
3.1.創(chuàng)建ArrayBlockingQueue
創(chuàng)建一個ArrayBlockingQueue非常簡單,只需要指定隊列的大小即可:
int queueSize = 10; BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(queueSize);
3.2.生產者-消費者模式
ArrayBlockingQueue常用于生產者-消費者模式。生產者負責生成數據并添加到隊列中,而消費者則從隊列中取出數據并處理。下面是一個簡單的生產者-消費者示例:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerExample {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
Thread producer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
System.out.println("生產者生產了數據:" + i);
queue.put(i);
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread consumer = new Thread(() -> {
while (true) {
try {
Integer data = queue.take();
System.out.println("消費者消費了數據:" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
consumer.start();
}
}
運行結果:
生產者生產了數據:0
消費者消費了數據:0
生產者生產了數據:1
消費者消費了數據:1
生產者生產了數據:2
消費者消費了數據:2
生產者生產了數據:3
消費者消費了數據:3
生產者生產了數據:4
消費者消費了數據:4
生產者生產了數據:5
消費者消費了數據:5
生產者生產了數據:6
消費者消費了數據:6
生產者生產了數據:7
消費者消費了數據:7
生產者生產了數據:8
消費者消費了數據:8
生產者生產了數據:9
消費者消費了數據:9
在這個示例中,我們創(chuàng)建了一個大小為5的ArrayBlockingQueue,然后啟動了一個生產者線程和一個消費者線程。生產者線程會生成10個數據,并嘗試將它們添加到隊列中;消費者線程則會不斷地從隊列中取出數據并處理。由于隊列的大小只有5,因此當生產者生產了5個數據后,它會被阻塞,直到消費者消費了一些數據釋放出空間。同樣地,當隊列為空時,消費者線程也會被阻塞,直到生產者生產了新的數據。
4.ArrayBlockingQueue的最佳實踐
4.1.選擇合適的隊列大小
隊列的大小應根據具體的應用場景來設置。如果設置得太小,可能會導致頻繁的阻塞和上下文切換,影響性能;如果設置得太大,可能會浪費內存資源。因此,在選擇隊列大小時,需要綜合考慮系統(tǒng)的負載、內存資源和性能要求等因素。
4.2.合理使用阻塞方法
ArrayBlockingQueue提供了多種阻塞方法,如put、take、offer和poll等。在使用這些方法時,需要根據具體的需求來選擇合適的方法。例如,如果你希望當隊列滿時生產者線程能夠阻塞等待空間可用,那么可以使用put方法;如果你希望生產者線程在隊列滿時能夠立即返回并做其他處理,那么可以使用offer方法。
4.3.避免死鎖
在使用ArrayBlockingQueue時,需要注意避免死鎖的發(fā)生。例如,不要在持有其他鎖的情況下調用ArrayBlockingQueue的阻塞方法,否則可能會導致死鎖。此外,還需要注意避免循環(huán)等待和饑餓等問題。
4.4.考慮使用公平策略
ArrayBlockingQueue的構造函數允許指定一個公平性參數。如果設置為true,等待時間最長的線程將優(yōu)先獲得訪問隊列的機會。但需要注意的是,公平性可能會降低性能。因此,在決定是否使用公平策略時,需要綜合考慮系統(tǒng)的性能和公平性要求。
5.源碼詳解
5.1.主要屬性
// 用于存儲隊列元素的數組 final Object[] items; // 隊列的容量 int count; // 控制并發(fā)訪問的鎖 final ReentrantLock lock; // 隊列不滿時的等待條件 private final Condition notFull; // 隊列不為空時的等待條件 private final Condition notEmpty; // 隊列中等待取數據的線程數 final AtomicInteger waitingConsumers = new AtomicInteger(); // 隊列中等待插入數據的線程數 final AtomicInteger waitingProducers = new AtomicInteger();
5.2.構造函數
ArrayBlockingQueue 提供了幾種構造函數,其中最基本的兩個是接受隊列容量和指定是否公平的構造函數。
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
5.3.入隊操作
put(E e) 和 offer(E e) 是兩種入隊操作,其中 put 方法在隊列滿時會阻塞,而 offer 方法在隊列滿時會立即返回失敗或者根據提供的超時時間等待。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// 隊列尾部插入元素
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 通知可能在等待的消費者線程
notEmpty.signal();
}
5.4.出隊操作
take() 和 poll() 是兩種出隊操作,其中 take 方法在隊列空時會阻塞,而 poll 方法在隊列空時會立即返回 null 或者根據提供的超時時間等待。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// 隊列頭部取出元素
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 通知可能在等待的生產者線程
notFull.signal();
return x;
}
6.總結
ArrayBlockingQueue是Java并發(fā)編程中一個非常實用的工具類。它提供了線程安全的阻塞隊列實現,支持生產者-消費者模式,并允許通過隊列的大小來控制系統(tǒng)的流量。在使用ArrayBlockingQueue時,需要注意選擇合適的隊列大小、合理使用阻塞方法、避免死鎖和考慮使用公平策略等問題。通過合理地使用ArrayBlockingQueue,可以有效地提高系統(tǒng)的并發(fā)性能和穩(wěn)定性。
到此這篇關于java ArrayBlockingQueue阻塞隊列的實現示例的文章就介紹到這了,更多相關java ArrayBlockingQueue阻塞隊列內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
- Java中ArrayBlockingQueue和LinkedBlockingQueue
- Java 并發(fā)編程ArrayBlockingQueue的實現
- java ArrayBlockingQueue的方法及缺點分析
- Java源碼解析阻塞隊列ArrayBlockingQueue介紹
- Java源碼解析阻塞隊列ArrayBlockingQueue常用方法
- Java源碼解析阻塞隊列ArrayBlockingQueue功能簡介
- 詳細分析Java并發(fā)集合ArrayBlockingQueue的用法
- java并發(fā)之ArrayBlockingQueue詳細介紹
- Java并發(fā)編程ArrayBlockingQueue的使用

