Java高并發(fā)BlockingQueue重要的實現(xiàn)類詳解
ArrayBlockingQueue
有界的阻塞隊列,內部是一個數(shù)組,有邊界的意思是:容量是有限的,必須進行初始化,指定它的容量大小,以先進先出的方式存儲數(shù)據(jù),最新插入的在對尾,最先移除的對象在頭部。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** 隊列元素 */ final Object[] items; /** 下一次讀取操作的位置, poll, peek or remove */ int takeIndex; /** 下一次寫入操作的位置, offer, or add */ int putIndex; /** 元素數(shù)量 */ int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. * 它采用一個 ReentrantLock 和相應的兩個 Condition 來實現(xiàn)。 */ /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; /** 指定大小 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } /** * 指定容量大小與指定訪問策略 * @param fair 指定獨占鎖是公平鎖還是非公平鎖。非公平鎖的吞吐量比較高,公平鎖可以保證每次都是等待最久的線程獲取到鎖; */ public ArrayBlockingQueue(int capacity, boolean fair) {} /** * 指定容量大小、指定訪問策略與最初包含給定集合中的元素 * @param c 將此集合中的元素在構造方法期間就先添加到隊列中 */ public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {} }
- ArrayBlockingQueue 在生產者放入數(shù)據(jù)和消費者獲取數(shù)據(jù),都是共用一個鎖對象,由此也意味著兩者無法真正并行運行。按照實現(xiàn)原理來分析, ArrayBlockingQueue 完全可以采用分離鎖,從而實現(xiàn)生產者和消費者操作的完全并行運行。然而事實上并沒有如此,因為 ArrayBlockingQueue 的數(shù)據(jù)寫入已經足夠輕巧,以至于引入獨立的鎖機制,除了給代碼帶來額外的復雜性外,其在性能上完全占不到任何便宜。
- 通過構造函數(shù)得知,參數(shù) fair 控制對象內部是否采用公平鎖,默認采用非公平鎖。
- items、takeIndex、putIndex、count 等屬性并沒有使用 volatile 修飾,這是因為訪問這些變量(通過方法獲取)使用都在鎖內,并不存在可見性問題,如 size() 。
- 另外有個獨占鎖 lock 用來對出入對操作加鎖,這導致同時只有一個線程可以訪問入隊出隊。
Put 源碼分析
/** 進行入隊操作 */ public void put(E e) throws InterruptedException { //e為null,則拋出NullPointerException異常 checkNotNull(e); //獲取獨占鎖 final ReentrantLock lock = this.lock; /** * lockInterruptibly() * 獲取鎖定,除非當前線程為interrupted * 如果鎖沒有被另一個線程占用并且立即返回,則將鎖定計數(shù)設置為1。 * 如果當前線程已經保存此鎖,則保持計數(shù)將遞增1,該方法立即返回。 * 如果鎖被另一個線程保持,則當前線程將被禁用以進行線程調度,并且處于休眠狀態(tài) * */ lock.lockInterruptibly(); try { //空隊列 while (count == items.length) //進行條件等待處理 notFull.await(); //入隊操作 enqueue(e); } finally { //釋放鎖 lock.unlock(); } } /** 真正的入隊 */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; //獲取當前元素 final Object[] items = this.items; //按下一個插入索引進行元素添加 items[putIndex] = x; // 計算下一個元素應該存放的下標,可以理解為循環(huán)隊列 if (++putIndex == items.length) putIndex = 0; count++; //喚起消費者 notEmpty.signal(); }
這里由于在操作共享變量前加了鎖,所以不存在內存不可見問題,加鎖后獲取的共享變量都是從主內存中獲取的,而不是在CPU緩存或者寄存器里面的值,釋放鎖后修改的共享變量值會刷新到主內存。
另外這個隊列使用循環(huán)數(shù)組實現(xiàn),所以在計算下一個元素存放下標時候有些特殊。另外 insert 后調用 notEmpty.signal() ;是為了激活調用 notEmpty.await(); 阻塞后放入 notEmpty 條件隊列的線程。
Take 源碼分析
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; //這里有些特殊 if (itrs != null) //保持隊列中的元素和迭代器的元素一致 itrs.elementDequeued(); notFull.signal(); return x; }
Take 操作和 Put 操作很類似
//該類的迭代器,所有的迭代器共享數(shù)據(jù),隊列改變會影響所有的迭代器 transient Itrs itrs = null; //其存放了目前所創(chuàng)建的所有迭代器。 /** * 迭代器和它們的隊列之間的共享數(shù)據(jù),允許隊列元素被刪除時更新迭代器的修改。 */ class Itrs { void elementDequeued() { // assert lock.getHoldCount() == 1; if (count == 0) //隊列中數(shù)量為0的時候,隊列就是空的,會將所有迭代器進行清理并移除 queueIsEmpty(); //takeIndex的下標是0,意味著隊列從尾中取完了,又回到頭部獲取 else if (takeIndex == 0) takeIndexWrapped(); } /** * 當隊列為空的時候做的事情 * 1. 通知所有迭代器隊列已經為空 * 2. 清空所有的弱引用,并且將迭代器置空 */ void queueIsEmpty() {} /** * 將takeIndex包裝成0 * 并且通知所有的迭代器,并且刪除已經過期的任何對象(個人理解是置空對象) * 也直接的說就是在Blocking隊列進行出隊的時候,進行迭代器中的數(shù)據(jù)同步,保持隊列中的元素和迭代器的元素是一致的。 */ void takeIndexWrapped() {} }
Itrs迭代器創(chuàng)建的時機
//從這里知道,在ArrayBlockingQueue對象中調用此方法,才會生成這個對象 //那么就可以理解為,只要并未調用此方法,則ArrayBlockingQueue對象中的Itrs對象則為空 public Iterator<E> iterator() { return new Itr(); } private class Itr implements Iterator<E> { Itr() { //這里就是生產它的地方 //count等于0的時候,創(chuàng)建的這個迭代器是個無用的迭代器,可以直接移除,進入detach模式。 //否則就把當前隊列的讀取位置給迭代器當做下一個元素,cursor存儲下個元素的位置。 if (count == 0) { // assert itrs == null; cursor = NONE; nextIndex = NONE; prevTakeIndex = DETACHED; } else { final int takeIndex = ArrayBlockingQueue.this.takeIndex; prevTakeIndex = takeIndex; nextItem = itemAt(nextIndex = takeIndex); cursor = incCursor(takeIndex); if (itrs == null) { itrs = new Itrs(this); } else { itrs.register(this); // in this order itrs.doSomeSweeping(false); } prevCycles = itrs.cycles; // assert takeIndex >= 0; // assert prevTakeIndex == takeIndex; // assert nextIndex >= 0; // assert nextItem != null; } } }
代碼演示
package com.rumenz.task; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @className: BlockingQuqueExample * @description: TODO 類描述 * @author: mac * @date: 2021/1/20 **/ public class BlockingQueueExample { private static volatile Boolean flag=false; public static void main(String[] args) { BlockingQueue blockingQueue=new ArrayBlockingQueue(1024); ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.execute(()->{ try{ blockingQueue.put(1); Thread.sleep(2000); blockingQueue.put(3); flag=true; }catch (Exception e){ e.printStackTrace(); } }); executorService.execute(()->{ try { while (!flag){ Integer i = (Integer) blockingQueue.take(); System.out.println(i); } }catch (Exception e){ e.printStackTrace(); } }); executorService.shutdown(); } }
LinkedBlockingQueue
基于鏈表的阻塞隊列,通 ArrayBlockingQueue 類似,其內部也維護這一個數(shù)據(jù)緩沖隊列(該隊列由一個鏈表構成),當生產者往隊列放入一個數(shù)據(jù)時,隊列會從生產者手上獲取數(shù)據(jù),并緩存在隊列的內部,而生產者立即返回,只有當隊列緩沖區(qū)到達最大值容量時(LinkedBlockingQueue可以通過構造函數(shù)指定該值),才會阻塞隊列,直到消費者從隊列中消費掉一份數(shù)據(jù),生產者會被喚醒,反之對于消費者這端的處理也基于同樣的原理。
LinkedBlockingQueue 之所以能夠高效的處理并發(fā)數(shù)據(jù),還因為其對于生產者和消費者端分別采用了獨立的鎖來控制數(shù)據(jù)同步,這也意味著在高并發(fā)的情況下生產者和消費者可以并行的操作隊列中的數(shù)據(jù),以調高整個隊列的并發(fā)能力。
如果構造一個 LinkedBlockingQueue 對象,而沒有指定容量大小, LinkedBlockingQueue 會默認一個類似無限大小的容量 Integer.MAX_VALUE ,這樣的話,如果生產者的速度一旦大于消費者的速度,也許還沒有等到隊列滿阻塞產生,系統(tǒng)內存就有可能已經被消耗殆盡了。
LinkedBlockingQueue 是一個使用鏈表完成隊列操作的阻塞隊列。鏈表是單向鏈表,而不是雙向鏈表。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //隊列的容量,指定大小或為默認值Integer.MAX_VALUE private final int capacity; //元素的數(shù)量 private final AtomicInteger count = new AtomicInteger(); //隊列頭節(jié)點,始終滿足head.item==null transient Node<E> head; //隊列的尾節(jié)點,始終滿足last.next==null private transient Node<E> last; /** Lock held by take, poll, etc */ //出隊的鎖:take, poll, peek 等讀操作的方法需要獲取到這個鎖 private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ //當隊列為空時,保存執(zhí)行出隊的線程:如果讀操作的時候隊列是空的,那么等待 notEmpty 條件 private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ //入隊的鎖:put, offer 等寫操作的方法需要獲取到這個鎖 private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ //當隊列滿時,保存執(zhí)行入隊的線程:如果寫操作的時候隊列是滿的,那么等待 notFull 條件 private final Condition notFull = putLock.newCondition(); //傳說中的無界隊列 public LinkedBlockingQueue() {} //傳說中的有界隊列 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } //傳說中的無界隊列 public LinkedBlockingQueue(Collection<? extends E> c){} /** * 鏈表節(jié)點類 */ static class Node<E> { E item; /** * One of: * - 真正的繼任者節(jié)點 * - 這個節(jié)點,意味著繼任者是head.next * - 空,意味著沒有后繼者(這是最后一個節(jié)點) */ Node<E> next; Node(E x) { item = x; } } }
通過其構造函數(shù),得知其可以當做無界隊列也可以當做有界隊列來使用。
這里用了兩把鎖分別是 takeLock 和 putLock ,而 Condition 分別是 notEmpty 和 notFull ,它們是這樣搭配的。
takeLock
putLock
從上面的構造函數(shù)中可以看到,這里會初始化一個空的頭結點,那么第一個元素入隊的時候,隊列中就會有兩個元素。讀取元素時,也是獲取頭結點后面的一個元素。count的計數(shù)值不包含這個頭結點。
Put源碼分析
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** * 將指定元素插入到此隊列的尾部,如有必要,則等待空間變得可用。 */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // 如果你糾結這里為什么是 -1,可以看看 offer 方法。這就是個標識成功、失敗的標志而已。 int c = -1; //包裝成node節(jié)點 Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //獲取鎖定 putLock.lockInterruptibly(); try { /** 如果隊列滿,等待 notFull 的條件滿足。 */ while (count.get() == capacity) { notFull.await(); } //入隊 enqueue(node); //原子性自增 c = count.getAndIncrement(); // 如果這個元素入隊后,還有至少一個槽可以使用,調用 notFull.signal() 喚醒等待線程。 // 哪些線程會等待在 notFull 這個 Condition 上呢? if (c + 1 < capacity) notFull.signal(); } finally { //解鎖 putLock.unlock(); } // 如果 c == 0,那么代表隊列在這個元素入隊前是空的(不包括head空節(jié)點), // 那么所有的讀線程都在等待 notEmpty 這個條件,等待喚醒,這里做一次喚醒操作 if (c == 0) signalNotEmpty(); } /** 鏈接節(jié)點在隊列末尾 */ private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; // 入隊的代碼非常簡單,就是將 last 屬性指向這個新元素,并且讓原隊尾的 next 指向這個元素 //last.next = node; //last = node; // 這里入隊沒有并發(fā)問題,因為只有獲取到 putLock 獨占鎖以后,才可以進行此操作 last = last.next = node; } /** * 等待PUT信號 * 僅在 take/poll 中調用 * 也就是說:元素入隊后,如果需要,則會調用這個方法喚醒讀線程來讀 */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal();//喚醒 } finally { putLock.unlock(); } } }
Take源碼分析
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; //首先,需要獲取到 takeLock 才能進行出隊操作 takeLock.lockInterruptibly(); try { // 如果隊列為空,等待 notEmpty 這個條件滿足再繼續(xù)執(zhí)行 while (count.get() == 0) { notEmpty.await(); } //// 出隊 x = dequeue(); //count 進行原子減 1 c = count.getAndDecrement(); // 如果這次出隊后,隊列中至少還有一個元素,那么調用 notEmpty.signal() 喚醒其他的讀線程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } /** * 出隊 */ private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } /** * Signals a waiting put. Called only from take/poll. */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } }
與 ArrayBlockingQueue 對比
ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在于,前者在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而后者則會生成一個額外的Node對象。這在長時間內需要高效并發(fā)地處理大批量數(shù)據(jù)的系統(tǒng)中,其對于GC的影響還是存在一定的區(qū)別。
LinkedBlockingQueue 實現(xiàn)一個線程添加文件對象,四個線程讀取文件對象
package concurrent; import java.io.File; import java.io.FileFilter; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; public class TestBlockingQueue { static long randomTime() { return (long) (Math.random() * 1000); } public static void main(String[] args) { // 能容納100個文件 final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100); // 線程池 final ExecutorService exec = Executors.newFixedThreadPool(5); final File root = new File("F:\\JavaLib"); // 完成標志 final File exitFile = new File(""); // 讀個數(shù) final AtomicInteger rc = new AtomicInteger(); // 寫個數(shù) final AtomicInteger wc = new AtomicInteger(); // 讀線程 Runnable read = new Runnable() { public void run() { scanFile(root); scanFile(exitFile); } public void scanFile(File file) { if (file.isDirectory()) { File[] files = file.listFiles(new FileFilter() { public boolean accept(File pathname) { return pathname.isDirectory() || pathname.getPath().endsWith(".java"); } }); for (File one : files) scanFile(one); } else { try { int index = rc.incrementAndGet(); System.out.println("Read0: " + index + " " + file.getPath()); queue.put(file); } catch (InterruptedException e) { } } } }; exec.submit(read); // 四個寫線程 for (int index = 0; index < 4; index++) { // write thread final int NO = index; Runnable write = new Runnable() { String threadName = "Write" + NO; public void run() { while (true) { try { Thread.sleep(randomTime()); int index = wc.incrementAndGet(); File file = queue.take(); // 隊列已經無對象 if (file == exitFile) { // 再次添加"標志",以讓其他線程正常退出 queue.put(exitFile); break; } System.out.println(threadName + ": " + index + " " + file.getPath()); } catch (InterruptedException e) { } } } }; exec.submit(write); } exec.shutdown(); } }
總結
到此這篇關于Java高并發(fā)BlockingQueue重要實現(xiàn)類的文章就介紹到這了,更多相關Java高并發(fā)BlockingQueue實現(xiàn)類內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
JAVA Spring Boot 自動配置實現(xiàn)原理詳解
這篇文章主要介紹了詳解SpringBoot自動配置原理,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2021-09-09為什么Spring和IDEA都不推薦使用 @Autowired 注解
本文主要介紹了為什么Spring和IDEA都不推薦使用 @Autowired 注解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-04-04詳解JAVA中的Collection接口和其主要實現(xiàn)的類
這篇文章主要介紹了JAVA中的Collection接口和其主要實現(xiàn)的類,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-03-03Mybatis+Druid+MybatisPlus多數(shù)據(jù)源配置方法
在項目開發(fā)中,經常需要連接多個數(shù)據(jù)庫,使用Mybatis、Druid和MybatisPlus可以實現(xiàn)多數(shù)據(jù)源配置,通過定義配置類和修改配置文件,如properties或yaml,可以設置多個數(shù)據(jù)源,本文介紹了配置項包括Druid基本配置、數(shù)據(jù)源一、數(shù)據(jù)源二,感興趣的朋友一起看看吧2024-09-09Java微信公眾平臺開發(fā)(10) 微信自定義菜單的創(chuàng)建實現(xiàn)
這篇文章主要為大家詳細介紹了Java微信公眾平臺開發(fā)第十步,微信自定義菜單的創(chuàng)建實現(xiàn),具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-04-04springboot連接多個數(shù)據(jù)庫的實現(xiàn)方法
有時候一個SpringBoot項目需要同時連接兩個數(shù)據(jù)庫,本文就來介紹一下springboot連接多個數(shù)據(jù)庫的實現(xiàn)方法,具有一定的參考價值,感興趣的可以了解一下2024-08-08