Java AQS的實(shí)現(xiàn)原理詳解
使用
我們這里借助ReentrantLock
來搞清楚AQS的實(shí)現(xiàn)原理。
lock
這個(gè)方法就是開始獲取鎖運(yùn)行的入口,在這個(gè)方法的實(shí)現(xiàn)中,交給了sync
對(duì)象來獲取鎖。
public void lock() { sync.acquire(1); } private final Sync sync; // Sync對(duì)象是一個(gè)ReentrantLock實(shí)現(xiàn)的內(nèi)部抽象類,具體的實(shí)現(xiàn)又分為了公平版本與非公平兩種 abstract static class Sync extends AbstractQueuedSynchronizer {} // 在ReentrantLock的無參構(gòu)造器中,默認(rèn)使用的實(shí)現(xiàn)就是非公平鎖的實(shí)現(xiàn) public ReentrantLock() { sync = new NonfairSync(); } // 也可以通過帶參數(shù)的構(gòu)造器來使用公平鎖 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
Sync
由于公平鎖FairSync
和NonfairSync
的差別主要在tryAcquire
方法上,別的邏輯都是相同的,因此我們就直接看Sync
和AQS中的實(shí)現(xiàn)。
acquire
方法實(shí)現(xiàn)如下,來自AQS的實(shí)現(xiàn):
// 首先會(huì)調(diào)用 tryAcquire 和 acquireQueued 方法,如果2個(gè)方法都返回true的話, // 那么才會(huì)調(diào)用自行中斷的邏輯 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();
tryAcquire
方法就會(huì)因?yàn)楣芥i和非公平鎖的差異,有2種不同的實(shí)現(xiàn),首先來看看非公平鎖的實(shí)現(xiàn),也就是ReentrantLock
的默認(rèn)策略。
NonfairSync.tryAcquire
這個(gè)方法會(huì)直接調(diào)用并返回 Sync
實(shí)現(xiàn)的 nonfairTryAcquire(acquires)
方法。
Sync類中的實(shí)現(xiàn)
// 這里的參數(shù) acquires = 1 final boolean nonfairTryAcquire(int acquires) { // 獲取當(dāng)前調(diào)用者的線程對(duì)象 final Thread current = Thread.currentThread(); // 獲取AQS中定義的state值,這個(gè)state值是AQS的核心之一 int c = getState(); // 在ReentrantLock的實(shí)現(xiàn)中,state就表示當(dāng)前是否有線程持有鎖,0代表沒有線程持有鎖, // 當(dāng)前訪問的線程就可以繼續(xù)執(zhí)行代碼,如果大于0則表示當(dāng)前持有鎖的線程的數(shù)量。 // 由于ReentrantLock屬于可重入鎖,因此,這個(gè)值會(huì)>=1 if (c == 0) { // 能進(jìn)來就表示當(dāng)前沒有線程持有鎖,那么嘗試用CAS獲取鎖 if (compareAndSetState(0, acquires)) { // 獲取鎖成功,那么將當(dāng)前線程設(shè)置到AQS中的當(dāng)前線程中 setExclusiveOwnerThread(current); return true; } } // 如果當(dāng)前持有鎖的就是自己,那么就代表是鎖的重入 else if (current == getExclusiveOwnerThread()) { // 累計(jì)持有鎖的次數(shù) int nextc = c + acquires; // 這里就說明了,state能夠設(shè)置的最大值就是Int.MAX_VALUE, // 當(dāng)處于MAX_VALUE的時(shí)候再加1,那么Int數(shù)字的最高位就會(huì)變成1,符號(hào)位為負(fù) if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 更新新的state值 setState(nextc); return true; } return false; }
AQS中的實(shí)現(xiàn)
private volatile int state; // 當(dāng)前持有鎖的線程對(duì)象 private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }
小結(jié)一下,非公平鎖tryAcquire
方法就是先看看有沒有線程持有鎖,沒有的話自己就通過CAS的方式嘗試獲取一下鎖,如果獲取鎖成功或者是自己重入,那么tryAcquire
方法就會(huì)返回true,acquire
方法中的條件判斷就會(huì)直接返回false,lock方法結(jié)束,線程繼續(xù)支持下面的代碼。
FairSync.tryAcquire
下面來看看公平鎖的實(shí)現(xiàn),大體的邏輯跟非公平的是相同的。
FairSync中的實(shí)現(xiàn)
// 這里的參數(shù) acquire = 1 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 判斷當(dāng)前是不是有線程持有鎖 if (c == 0) { // 當(dāng)前沒有線程持有鎖就進(jìn)來 // 由于是公平鎖,那么就要保證只有在當(dāng)前等待隊(duì)列為空或者隊(duì)列中等待的線程 // 都沒有到運(yùn)行的條件的時(shí)候,才嘗試通過CAS來獲取鎖。否則就去乖乖排隊(duì) if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 同非公平鎖,鎖重入 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
AQS中的實(shí)現(xiàn)
// 檢查當(dāng)前AQS等待隊(duì)列中是否有正在等待的有效線程節(jié)點(diǎn) public final boolean hasQueuedPredecessors() { Node h, s; // 首先讓參數(shù)h指向當(dāng)前隊(duì)列的頭部 if ((h = head) != null) { // 隊(duì)列不為空 // 將臨時(shí)變量賦值為當(dāng)前第二個(gè)節(jié)點(diǎn) // 這里需要簡(jiǎn)單說明一下AQS的等待隊(duì)列的構(gòu)成,第一個(gè)節(jié)點(diǎn)是沒有業(yè)務(wù)含義的, // 只是用作喚醒下一個(gè)待執(zhí)行的線程節(jié)點(diǎn) if ((s = h.next) == null || s.waitStatus > 0) { // 能進(jìn)來就表示當(dāng)前隊(duì)列只有一個(gè)頭結(jié)點(diǎn),或者第二個(gè)節(jié)點(diǎn)的狀態(tài)是已取消 // - > 參考說明1 // 如果第二個(gè)節(jié)點(diǎn)不為空,那么就釋放這個(gè)引用 s = null; // traverse in case of concurrent cancellation // 從后往前遍歷,找到距離隊(duì)列頭最近的有效節(jié)點(diǎn) for (Node p = tail; p != h && p != null; p = p.prev) { if (p.waitStatus <= 0) s = p; } } // 如果找到了正在隊(duì)列中的排隊(duì)的有效節(jié)點(diǎn)并且不是當(dāng)前訪問的線程,那么就返回true if (s != null && s.thread != Thread.currentThread()) return true; } // 頭結(jié)點(diǎn)指向NULL,那么說明隊(duì)列是空的,直接返回false return false; }
說明1:這里就引入了隊(duì)列節(jié)點(diǎn)中的等待狀態(tài)這個(gè)重要的概念,在ReentrantLock
中,我們只需要關(guān)注CANCELLED
和SIGNAL
即可。只有CANCELLED
是大于0的,新節(jié)點(diǎn)的默認(rèn)值為0。因此只要等待狀態(tài)大于0就代表該節(jié)點(diǎn)被取消了。
// 等待隊(duì)列中節(jié)點(diǎn)的等待狀態(tài) volatile int waitStatus; // 當(dāng)前節(jié)點(diǎn)因?yàn)榈却瑫r(shí)或者被中斷了被取消 static final int CANCELLED = 1; // 接下來有資格被喚醒獲得鎖的標(biāo)記,只有獲得了這個(gè)標(biāo)記的節(jié)點(diǎn)才能被執(zhí)行完的線程喚醒 static final int SIGNAL = -1;
小結(jié)一下,公平鎖對(duì)比非公平鎖,在最開始有機(jī)會(huì)獲取鎖的時(shí)候,會(huì)先檢查一下當(dāng)前隊(duì)列中是否已經(jīng)有線程在排隊(duì)等待執(zhí)行了,如果等待隊(duì)列中是空的或者沒有有效的排隊(duì)節(jié)點(diǎn),才會(huì)獲取鎖。如果獲取鎖成功,或者鎖重入成功,那么同樣會(huì)結(jié)束AQS的邏輯,繼續(xù)執(zhí)行業(yè)務(wù)代碼。
acquireQueued
上面分析完在tryAcquire
方法中如果成功獲得鎖的情況,就會(huì)結(jié)束AQS的邏輯,接下來就來分析未能成功獲得鎖的邏輯,即:acquire
方法中條件判斷的第二個(gè)條件判斷。
在AQS中實(shí)現(xiàn)
// 這個(gè)方法就會(huì)將當(dāng)前線程添加到等待隊(duì)列中,并且返回操作是否成功,arg就是傳入的acquire值,為1 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
首先通過addWaiter
方法構(gòu)建一個(gè)包含線程對(duì)象的節(jié)點(diǎn)并且添加到隊(duì)列中
// 這里的參數(shù)為 Node.EXCLUSIVE,表示這是一個(gè)排它鎖的實(shí)現(xiàn),這里的值為NULL private Node addWaiter(Node mode) { // 構(gòu)建一個(gè)線程節(jié)點(diǎn) Node node = new Node(mode); // 這里就是AQS的核心理念了,通過不斷的自旋,將線程節(jié)點(diǎn)插入到隊(duì)列中 for (;;) { // 獲取原來的隊(duì)列的隊(duì)尾,因?yàn)锳QS才去的尾插方式 Node oldTail = tail; if (oldTail != null) { // 將新插入的節(jié)點(diǎn)指向原來的尾結(jié)點(diǎn) node.setPrevRelaxed(oldTail); // 通過CAS的方式將當(dāng)前節(jié)點(diǎn)設(shè)置到線程共享隊(duì)列的尾部去,這里要注意, // 凡是涉及到多線程操作的屬性,都需要通過CAS保證操作的原子性 if (compareAndSetTail(oldTail, node)) { oldTail.next = node; // 設(shè)置成功,就返回插入的節(jié)點(diǎn)對(duì)象;如若不成功, // 就表示有別的線程也修改了尾結(jié)點(diǎn),那么就要等下一次循環(huán)重試 return node; } } else { // 沒有尾結(jié)點(diǎn)說明隊(duì)列不存在,那么就進(jìn)行初始化 initializeSyncQueue(); } } } // 初始化等待隊(duì)列 private final void initializeSyncQueue() { Node h; // 還是通過CAS的方式,給隊(duì)列初始化一個(gè)默認(rèn)的Node節(jié)點(diǎn),幾個(gè)重要的屬性的初始值如下 // waitStatus = 0; thread = null if (HEAD.compareAndSet(this, null, (h = new Node()))) tail = h; }
小結(jié)一下,addWaiter
方法通過尾插的方式將沒有搶到鎖的線程封裝為Node
節(jié)點(diǎn),插入到AQS的等待隊(duì)列中。如果隊(duì)列還未初始化,那么就先初始化隊(duì)列,隊(duì)列自帶一個(gè)無實(shí)際含義的頭結(jié)點(diǎn)。
acquireQueued
在AQS中實(shí)現(xiàn)
// arg就是傳入的acquire參數(shù),為1;該方法返回值的含義為,線程在等待過程中是否被中斷 final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { // 還是不斷的自旋,等待機(jī)會(huì)進(jìn)行操作 for (;;) { // 獲取新插入節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn) final Node p = node.predecessor(); // 如果上一個(gè)節(jié)點(diǎn)已經(jīng)是頭結(jié)點(diǎn),那么說明當(dāng)前就已經(jīng)輪到當(dāng)前線程獲取鎖執(zhí)行業(yè)務(wù)了 // 那么就再次嘗試搶一次鎖 if (p == head && tryAcquire(arg)) { // 能進(jìn)來就說明獲取鎖成功了 // 那么就將當(dāng)前線程的節(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn) // 在這個(gè)方法中,會(huì)去除節(jié)點(diǎn)中的信息,做一個(gè)純粹的頭結(jié)點(diǎn) setHead(node); // 將已經(jīng)沒有指向的原頭結(jié)點(diǎn)的next指為空,等待回收 p.next = null; // help GC // 這里返回的值為false,因?yàn)楫?dāng)前線程并未被阻塞就獲得了鎖 return interrupted; } // 走到這里說明當(dāng)前線程并沒有獲取到鎖,那么就要考慮是否要將線程阻塞了 if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; } } private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } // 線程獲取鎖失敗之后,是否需要將線程阻塞,這里2個(gè)參數(shù), // pred是新插入節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn),node是新插入的節(jié)點(diǎn) private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 獲取上一個(gè)節(jié)點(diǎn)的等待狀態(tài) int ws = pred.waitStatus; // 判斷上一個(gè)節(jié)點(diǎn)的狀態(tài)是不是SIGNAL,只有狀態(tài)為SIGNAL的才是有效的可指向節(jié)點(diǎn) if (ws == Node.SIGNAL) // 如果上一個(gè)節(jié)點(diǎn)是SIGNAL狀態(tài),那就說明當(dāng)前線程可以連接上該節(jié)點(diǎn),然后被掛起了 return true; // 上面我們提到過,只有節(jié)點(diǎn)被取消了,等待狀態(tài)才會(huì)>0 if (ws > 0) { // 一直往前找,直到找到等待狀態(tài)<=0的,數(shù)據(jù)規(guī)范的話即找到最近的一個(gè)等待狀態(tài) // 為SIGNAL的節(jié)點(diǎn),跳過全部被取消的節(jié)點(diǎn) do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 此時(shí)pred指向的即第一個(gè)合法的線程節(jié)點(diǎn),指向當(dāng)前新插入的節(jié)點(diǎn) pred.next = node; } else { // 這里的意思就是上一個(gè)節(jié)點(diǎn)就是有效節(jié)點(diǎn),那么就將上一個(gè)節(jié)點(diǎn)的等待狀態(tài)強(qiáng)制 // 更新為SIGNAL,即-1。毫無意義的那個(gè)頭結(jié)點(diǎn)也會(huì)被設(shè)置為SIGNAL狀態(tài) pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } return false; }
小結(jié)一下,這個(gè)方法的含義就是,由于當(dāng)前線程沒有獲取到鎖資源,因此就需要被阻塞。同時(shí)在這個(gè)方法中,也會(huì)整理等待隊(duì)列,將那些已經(jīng)被取消的節(jié)點(diǎn)從隊(duì)列中移除。接著就是調(diào)用條件判斷中的執(zhí)行方法,將線程掛起阻塞起來。
private final boolean parkAndCheckInterrupt() { // 調(diào)用了park方法,底層是調(diào)用UnSafe類的park方法來實(shí)現(xiàn) LockSupport.park(this); return Thread.interrupted(); }
至此,lock
方法的全部情況都清楚了,如果線程能拿到鎖,那就直接結(jié)束lock階段,要是搶不到鎖,那么就進(jìn)入等待隊(duì)列中,在進(jìn)入隊(duì)列之前,如果發(fā)現(xiàn)還有機(jī)會(huì)獲取鎖,那么會(huì)再次嘗試獲取一次,如若還是獲取不到,那么就以尾插的方式進(jìn)入等待隊(duì)列,通過調(diào)用LockSupport.park方法,將線程阻塞,等待被喚醒。
unlock
主動(dòng)調(diào)用這個(gè)方法以后,就代表對(duì)于鎖的占用結(jié)束。
在ReentrantLock中實(shí)現(xiàn)
public void unlock() { sync.release(1); }
在AQS中實(shí)現(xiàn)
public final boolean release(int arg) { // 調(diào)用tryRelease方法真正實(shí)現(xiàn)解除鎖 // 只有state加的所有鎖被解除了,那么才會(huì)喚醒下一個(gè)線程 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
在Sync中實(shí)現(xiàn)
// 這里的releases就是傳入的參數(shù)1,即如果是重入鎖,那么這里需要解鎖多次 protected final boolean tryRelease(int releases) { // 計(jì)算state的值,這里的含義就是state-1 int c = getState() - releases; // 如果當(dāng)前線程不是持有鎖的線程,那么就報(bào)錯(cuò) if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 鎖是否完全釋放完的標(biāo)記 boolean free = false; // state減到0說明鎖已經(jīng)完全釋放完了 if (c == 0) { free = true; setExclusiveOwnerThread(null); } // 更新state的值 setState(c); // 只有鎖被完全釋放完,才返回true return free; }
在AQS中實(shí)現(xiàn)
// 喚醒下一個(gè)需要鎖的線程,這里的node是頭結(jié)點(diǎn) private void unparkSuccessor(Node node) { // 獲得頭結(jié)點(diǎn)的等待狀態(tài) int ws = node.waitStatus; // 如果頭結(jié)點(diǎn)是SIGNAL,那么重置為0,因?yàn)檫@個(gè)節(jié)點(diǎn)已經(jīng)沒有意義了,會(huì)被移除 if (ws < 0) node.compareAndSetWaitStatus(ws, 0); // 獲得頭結(jié)點(diǎn)后面的待喚醒的節(jié)點(diǎn) Node s = node.next; // 如果這個(gè)待喚醒的節(jié)點(diǎn)為空或者等待狀態(tài)不正確,在這里就是不等于SIGNAL if (s == null || s.waitStatus > 0) { s = null; // 從尾部開始查詢,找到合法的待喚醒節(jié)點(diǎn) for (Node p = tail; p != node && p != null; p = p.prev) if (p.waitStatus <= 0) s = p; } if (s != null) // 喚醒線程 LockSupport.unpark(s.thread); }
cancelAcquire
這個(gè)方法提供了取消線程等待獲取鎖的功能
在AQS中被實(shí)現(xiàn)
private void cancelAcquire(Node node) { // 健壯性判斷,節(jié)點(diǎn)非空才可以被取消 if (node == null) return; // 將節(jié)點(diǎn)中的線程指向去掉 node.thread = null; // 獲取到當(dāng)前節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn) Node pred = node.prev; // 通過循環(huán),跳過所有當(dāng)前被取消節(jié)點(diǎn)之前的也已經(jīng)被標(biāo)記取消的節(jié)點(diǎn) while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 通過上面的循環(huán)以后pred的值就為等待隊(duì)列中隊(duì)尾的一個(gè)合法的未被取消的節(jié)點(diǎn) // 獲取到該合法節(jié)點(diǎn)下一個(gè)指向的節(jié)點(diǎn) Node predNext = pred.next; // 標(biāo)記當(dāng)前被取消的節(jié)點(diǎn)的等待狀態(tài)為被取消 node.waitStatus = Node.CANCELLED; // 如果被取消的節(jié)點(diǎn)就是隊(duì)尾的節(jié)點(diǎn),那么就通過CAS將pred設(shè)置為尾結(jié)點(diǎn) // 就可以拋棄中間那些同樣被標(biāo)記為被取消的節(jié)點(diǎn),如果有的是 if (node == tail && compareAndSetTail(node, pred)) { // 將pred的下一個(gè)節(jié)點(diǎn)指向NULL,因?yàn)閜red現(xiàn)在就是隊(duì)尾 pred.compareAndSetNext(predNext, null); } else { // 這說明取消的節(jié)點(diǎn)不是尾結(jié)點(diǎn),而是中間的節(jié)點(diǎn) // 這個(gè)值會(huì)在下面的條件判斷被賦值為上一個(gè)節(jié)點(diǎn)的等待狀態(tài) int ws; // 如果找到的上一個(gè)合法節(jié)點(diǎn)不是頭結(jié)點(diǎn) // 并且上一個(gè)節(jié)點(diǎn)的等待狀態(tài)是SIGNAL,并且將不是SIGNAL狀態(tài)的負(fù)數(shù)狀態(tài)轉(zhuǎn)換為SIGNAL // 并且線程不為空 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && pred.thread != null) { // 能進(jìn)來就說明被取消的節(jié)點(diǎn)處于中間,那么就要將這個(gè)node從隊(duì)列中跳過 Node next = node.next; // 如果被取消的節(jié)點(diǎn)是一個(gè)有效的節(jié)點(diǎn),不為空并且狀態(tài)也是對(duì)的 if (next != null && next.waitStatus <= 0) // 那么就將上一個(gè)節(jié)點(diǎn)指向被取消節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn) pred.compareAndSetNext(predNext, next); } else { // 能進(jìn)入這里說明上一個(gè)合法節(jié)點(diǎn)已經(jīng)是頭結(jié)點(diǎn)了, // 那么就說明被取消的這個(gè)節(jié)點(diǎn)已經(jīng)是原本除了頭結(jié)點(diǎn)以外的最靠前面的節(jié)點(diǎn), // 那么被取消的這個(gè)節(jié)點(diǎn)其實(shí)就等價(jià)于頭結(jié)點(diǎn)了,應(yīng)該喚醒后面還在等待的線程節(jié)點(diǎn) // 喚醒下一個(gè)被掛起的線程,具體已經(jīng)分析過了,這里就省略了 unparkSuccessor(node); } node.next = node; } }
以上就是Java AQS的實(shí)現(xiàn)原理詳解的詳細(xì)內(nèi)容,更多關(guān)于Java AQS的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java自帶命令行工具jmap、jhat與jinfo的使用實(shí)例代碼詳解
本篇文章主要通過代碼實(shí)例對(duì)java自帶命令行工具jmap、jhat與jinfo的使用做出了詳解,需要的朋友可以參考下2017-04-04Java實(shí)現(xiàn)簡(jiǎn)單猜數(shù)字小游戲
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)猜數(shù)字游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-12-12Java基于socket編程相關(guān)知識(shí)解析
這篇文章主要為大家詳細(xì)解析了Java基于socket編程的相關(guān)知識(shí),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2016-09-09Java圖形化界面設(shè)計(jì)之布局管理器之BorderLayout案例詳解
這篇文章主要介紹了Java圖形化界面設(shè)計(jì)之布局管理器之BorderLayout案例詳解,本篇文章通過簡(jiǎn)要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-08-08