Java?AQS?原理與?ReentrantLock?實(shí)現(xiàn)方法
一、AQS 簡(jiǎn)介
AbstractQueuedSynchronizer(簡(jiǎn)稱 AQS)是 Java 并發(fā)包(java.util.concurrent)中最核心的基礎(chǔ)組件之一,它為 Java 中的大多數(shù)同步類(如 ReentrantLock、Semaphore、CountDownLatch 等)提供了一個(gè)通用的框架。理解 AQS 的工作原理對(duì)于深入掌握 Java 并發(fā)編程至關(guān)重要。
AQS 的作用是解決同步器的實(shí)現(xiàn)問(wèn)題,它將復(fù)雜的同步器實(shí)現(xiàn)分解為簡(jiǎn)單的框架方法,開(kāi)發(fā)者只需要實(shí)現(xiàn)少量特定的方法就能快速構(gòu)建出可靠的同步器。
二、AQS 核心設(shè)計(jì)
2.1 核心組成部分
AQS 主要由以下部分組成:
- 同步狀態(tài)(state):使用 volatile int 類型的變量表示資源的可用狀態(tài)
- FIFO 等待隊(duì)列:使用雙向鏈表實(shí)現(xiàn)的隊(duì)列,用于管理等待獲取資源的線程
- 獨(dú)占/共享模式:支持獨(dú)占鎖(如 ReentrantLock)和共享鎖(如 CountDownLatch)兩種模式
- 條件變量:通過(guò) ConditionObject 類提供條件等待/通知機(jī)制,類似于 Object.wait()/notify()
2.2 AQS 的工作原理
AQS 通過(guò)模板方法模式,將一些通用的同步操作封裝在框架內(nèi)部,而將特定同步器的特性(如資源是否可獲取的判斷)交給子類去實(shí)現(xiàn)。AQS 提供以下基本操作:
- 資源獲取:線程嘗試獲取資源,如果獲取不到,將被包裝成 Node 加入等待隊(duì)列并被阻塞
- 資源釋放:持有資源的線程釋放資源后,會(huì)喚醒等待隊(duì)列中的下一個(gè)線程
- 線程阻塞與喚醒:通過(guò) LockSupport 的 park/unpark 機(jī)制實(shí)現(xiàn)
2.3 AQS 的關(guān)鍵方法
AQS 定義了一組需要子類實(shí)現(xiàn)的方法:
- tryAcquire(int):嘗試以獨(dú)占模式獲取資源
- tryRelease(int):嘗試以獨(dú)占模式釋放資源
- tryAcquireShared(int):嘗試以共享模式獲取資源
- tryReleaseShared(int):嘗試以共享模式釋放資源
- isHeldExclusively():判斷資源是否被當(dāng)前線程獨(dú)占
三、ReentrantLock 與 AQS 的關(guān)系
ReentrantLock 是基于 AQS 實(shí)現(xiàn)的可重入鎖,它通過(guò)內(nèi)部類 Sync(繼承自 AQS)來(lái)實(shí)現(xiàn)鎖的基本功能,并通過(guò) FairSync 和 NonfairSync 兩個(gè)子類分別實(shí)現(xiàn)公平鎖和非公平鎖。
3.1 ReentrantLock 的結(jié)構(gòu)
public class ReentrantLock implements Lock { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { // 實(shí)現(xiàn)鎖的基本操作 } // 公平鎖實(shí)現(xiàn) static final class FairSync extends Sync { ... } // 非公平鎖實(shí)現(xiàn) static final class NonfairSync extends Sync { ... } }
3.2 ReentrantLock 如何使用 AQS 的 state
ReentrantLock 使用 AQS 的 state 字段來(lái)表示鎖的持有次數(shù):
- state = 0:表示鎖未被持有
- state > 0:表示鎖被持有,值表示重入次數(shù)
四、AQS 關(guān)鍵流程分析
4.1 獨(dú)占鎖的獲取流程
當(dāng)線程調(diào)用 ReentrantLock.lock()方法時(shí),實(shí)際上會(huì)執(zhí)行以下流程:
- 首先調(diào)用 AQS 的 acquire(1)方法:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire 嘗試獲取鎖,這是由 ReentrantLock 的 Sync 子類實(shí)現(xiàn)的:
- 如果 state=0,嘗試使用 CAS 將 state 設(shè)為 1,并設(shè)置當(dāng)前線程為持有鎖的線程
- 如果當(dāng)前線程已經(jīng)持有鎖,則增加 state 值,實(shí)現(xiàn)可重入
- 其他情況下返回 false
如果 tryAcquire 失敗,則調(diào)用 addWaiter 將當(dāng)前線程封裝成 Node 添加到等待隊(duì)列末尾:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 嘗試快速添加到隊(duì)列尾部 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 快速添加失敗,進(jìn)入完整的入隊(duì)方法 enq(node); return node; }
- 然后執(zhí)行 acquireQueued 方法,讓該節(jié)點(diǎn)在隊(duì)列中不斷嘗試獲取鎖,直到成功或被中斷:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 獲取前驅(qū)節(jié)點(diǎn) final Node p = node.predecessor(); // 如果前驅(qū)是頭節(jié)點(diǎn),說(shuō)明輪到當(dāng)前節(jié)點(diǎn)嘗試獲取鎖 if (p == head && tryAcquire(arg)) { // 獲取成功,把當(dāng)前節(jié)點(diǎn)設(shè)為頭節(jié)點(diǎn) setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 判斷是否應(yīng)該阻塞當(dāng)前線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
4.2 獨(dú)占鎖的釋放流程
當(dāng)線程調(diào)用 ReentrantLock.unlock()方法時(shí),會(huì)執(zhí)行以下流程:
- 首先調(diào)用 AQS 的 release(1)方法:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
tryRelease 嘗試釋放鎖,這是由 ReentrantLock 的 Sync 類實(shí)現(xiàn)的:
- 檢查當(dāng)前線程是否是持有鎖的線程
- 減少 state 值
- 如果 state 變?yōu)?0,清空持有鎖的線程,并返回 true
如果 tryRelease 返回 true,表示已完全釋放鎖,則調(diào)用 unparkSuccessor 喚醒等待隊(duì)列中的下一個(gè)線程:
private void unparkSuccessor(Node node) { // 獲取當(dāng)前節(jié)點(diǎn)的等待狀態(tài) int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 找到下一個(gè)需要喚醒的節(jié)點(diǎn) Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 從尾部向前查找需要喚醒的節(jié)點(diǎn) for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 喚醒找到的節(jié)點(diǎn) if (s != null) LockSupport.unpark(s.thread); }
五、公平鎖與非公平鎖
ReentrantLock 支持公平鎖和非公平鎖兩種模式:
5.1 非公平鎖(默認(rèn))
非公平鎖的 tryAcquire 實(shí)現(xiàn):
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 非公平鎖直接嘗試CAS獲取鎖,不檢查隊(duì)列 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
5.2 公平鎖
公平鎖的 tryAcquire 實(shí)現(xiàn):
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 公平鎖會(huì)先調(diào)用hasQueuedPredecessors檢查是否有前驅(qū)節(jié)點(diǎn)在等待 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
公平鎖與非公平鎖的主要區(qū)別在于獲取鎖時(shí)是否考慮等待隊(duì)列。公平鎖會(huì)檢查是否有線程在等待隊(duì)列中排隊(duì),而非公平鎖則直接嘗試獲取,不考慮等待順序。
六、自定義實(shí)現(xiàn):簡(jiǎn)化版 ReentrantLock
為了更深入理解 AQS 原理,我們可以實(shí)現(xiàn)一個(gè)簡(jiǎn)化版的 ReentrantLock:
public class SimpleReentrantLock implements Lock { private final Sync sync; /** * 默認(rèn)創(chuàng)建非公平鎖 */ public SimpleReentrantLock() { sync = new NonfairSync(); } /** * 根據(jù)參數(shù)創(chuàng)建公平鎖或非公平鎖 */ public SimpleReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } /** * 繼承AQS的同步器實(shí)現(xiàn) */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * 非公平的方式獲取鎖 */ final boolean unfairTryAcquire(int acquires) { // 獲取當(dāng)前線程 final Thread current = Thread.currentThread(); // 獲取當(dāng)前state狀態(tài) int c = getState(); // state為0表示鎖未被持有 if (c == 0) { // 使用CAS嘗試將state從0設(shè)置為1 if (compareAndSetState(0, acquires)) { // 成功獲取鎖,設(shè)置當(dāng)前持有鎖的線程為當(dāng)前線程 setExclusiveOwnerThread(current); return true; } } // 如果當(dāng)前線程就是持有鎖的線程,實(shí)現(xiàn)可重入 else if (current == getExclusiveOwnerThread()) { // 增加state值實(shí)現(xiàn)重入計(jì)數(shù) int nextC = c + acquires; // 檢查溢出 if (nextC < 0) { throw new Error("Maximum lock count exceeded"); } // 設(shè)置新的state值,這里不需要CAS因?yàn)楫?dāng)前線程已經(jīng)持有鎖 setState(nextC); return true; } // 獲取鎖失敗 return false; } /** * 釋放鎖 */ @Override protected final boolean tryRelease(int releases) { // 檢查當(dāng)前線程是否是持有鎖的線程 if (Thread.currentThread() != getExclusiveOwnerThread()) { throw new IllegalMonitorStateException(); } // 減少state值 int c = getState() - releases; // 判斷是否完全釋放鎖 boolean free = (c == 0); if (free) { // 完全釋放鎖,清空持有鎖的線程 setExclusiveOwnerThread(null); } // 更新state值 setState(c); return free; } /** * 判斷當(dāng)前線程是否持有鎖 */ @Override protected boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } /** * 創(chuàng)建條件變量 */ Condition newCondition() { return new ConditionObject(); } /** * 獲取鎖的持有計(jì)數(shù) */ public int getHoldCount() { return isHeldExclusively() ? getState() : 0; } } /** * 公平鎖的實(shí)現(xiàn) */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; @Override protected boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 公平性體現(xiàn):先檢查隊(duì)列中是否有前驅(qū)節(jié)點(diǎn)在等待 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextC = c + acquires; if (nextC < 0) { throw new Error("Maximum lock count exceeded"); } setState(nextC); return true; } return false; } } /** * 非公平鎖的實(shí)現(xiàn) */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * 非公平鎖的獲取實(shí)現(xiàn) */ @Override protected boolean tryAcquire(int acquires) { return unfairTryAcquire(acquires); } } // 實(shí)現(xiàn)Lock接口的方法 @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.unfairTryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } /** * 查詢當(dāng)前鎖是否被某個(gè)線程持有 */ public boolean isLocked() { return sync.isHeldExclusively(); } /** * 查詢當(dāng)前線程是否持有鎖 */ public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); } /** * 獲取當(dāng)前鎖的持有計(jì)數(shù) */ public int getHoldCount() { return sync.getHoldCount(); } }
七、Condition 實(shí)現(xiàn)原理
AQS 提供了 ConditionObject 內(nèi)部類,用于實(shí)現(xiàn) Condition 接口,支持類似 wait/notify 的條件等待/通知機(jī)制:
- 條件隊(duì)列:每個(gè) Condition 維護(hù)一個(gè)單獨(dú)的條件隊(duì)列,與 AQS 同步隊(duì)列相互獨(dú)立
- await 操作:將當(dāng)前線程加入條件隊(duì)列,并釋放持有的鎖
- signal 操作:將條件隊(duì)列中的線程轉(zhuǎn)移到同步隊(duì)列,等待重新獲取鎖
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 添加到條件隊(duì)列 Node node = addConditionWaiter(); // 完全釋放鎖 int savedState = fullyRelease(node); int interruptMode = 0; // 循環(huán)檢查節(jié)點(diǎn)是否已經(jīng)轉(zhuǎn)移到同步隊(duì)列 while (!isOnSyncQueue(node)) { // 阻塞當(dāng)前線程 LockSupport.park(this); // 檢查中斷 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 重新競(jìng)爭(zhēng)鎖 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
八、AQS 的應(yīng)用場(chǎng)景
AQS 廣泛應(yīng)用于 Java 并發(fā)包中的各種同步器:
- ReentrantLock:可重入獨(dú)占鎖
- Semaphore:信號(hào)量,控制同時(shí)訪問(wèn)特定資源的線程數(shù)量
- CountDownLatch:閉鎖,允許一個(gè)或多個(gè)線程等待一組操作完成
- ReentrantReadWriteLock:讀寫(xiě)鎖,允許多個(gè)線程同時(shí)讀,但只允許一個(gè)線程寫(xiě)
- CyclicBarrier:循環(huán)柵欄,允許一組線程相互等待達(dá)到一個(gè)共同點(diǎn)
九、總結(jié)
AQS 是 Java 并發(fā)框架中最核心的基礎(chǔ)組件,它通過(guò)以下機(jī)制實(shí)現(xiàn)了高效的線程同步:
- 狀態(tài)管理:使用 volatile 變量和 CAS 操作保證線程安全
- 隊(duì)列管理:使用 CLH 隊(duì)列高效管理等待線程
- 阻塞原語(yǔ):使用 LockSupport 實(shí)現(xiàn)線程的阻塞和喚醒
- 模板方法模式:將通用邏輯和特定邏輯分離,提高可擴(kuò)展性
理解 AQS 的工作原理,不僅有助于更好地使用 Java 并發(fā)包中的同步器,也能幫助我們?cè)诒匾獣r(shí)實(shí)現(xiàn)自己的高效同步器。AQS 通過(guò)簡(jiǎn)潔的設(shè)計(jì)將復(fù)雜的同步器問(wèn)題分解為少量的基本方法,使得開(kāi)發(fā)者能夠快速實(shí)現(xiàn)各種同步器。ReentrantLock 相比 synchronized 提供了更多的功能,如可中斷、超時(shí)等待、公平性選擇等。
到此這篇關(guān)于深入理解 Java AQS 原理與 ReentrantLock 實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Java AQS 原理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
JAVA正則表達(dá)式及字符串的替換與分解相關(guān)知識(shí)總結(jié)
今天給大家?guī)?lái)的是關(guān)于Java的相關(guān)知識(shí)總結(jié),文章圍繞著JAVA正則表達(dá)式及字符串的替換與分解展開(kāi),文中有非常詳細(xì)的介紹及代碼示例,需要的朋友可以參考下2021-06-06Spring Boot中的那些條件判斷的實(shí)現(xiàn)方法
這篇文章主要介紹了Spring Boot中的那些條件判斷的實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-04-04Linux下Java開(kāi)發(fā)環(huán)境搭建以及第一個(gè)HelloWorld
這篇文章主要介紹了Linux下Java開(kāi)發(fā)環(huán)境搭建以及第一個(gè)HelloWorld的實(shí)現(xiàn)過(guò)程,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2015-09-09Java動(dòng)態(tài)代理和AOP應(yīng)用示例
這篇文章主要介紹了Java動(dòng)態(tài)代理和AOP應(yīng)用,結(jié)合實(shí)例形式分析了java動(dòng)態(tài)代理在AOP面向切面編程中的相關(guān)操作技巧與使用注意事項(xiàng),需要的朋友可以參考下2019-07-07使用Java增刪改查數(shù)據(jù)庫(kù)的操作方法
這篇文章主要介紹了使用Java增刪改查數(shù)據(jù)庫(kù)的操作方法,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-12-12