Java中的Lock與ReentrantLock深入解析
Lock
Lock位于java.util.concurrent.locks包下,是一種線程同步機制,就像synchronized塊一樣。但是,Lock比synchronized塊更靈活、更復雜。
1. Lock繼承關系
2. 官方文檔解讀
3. Lock接口方法解析
public interface Lock { // 獲取鎖。如果鎖不可用,則當前線程將出于線程調(diào)度目的而禁用,并處于休眠狀態(tài),直到獲得鎖為止。 void lock(); // 如果當前線程未被中斷,則獲取鎖。如果鎖可用,則獲取鎖并立即返回。 // 如果鎖不可用,出于線程調(diào)度目的,將禁用當前線程,該線程將一直處于休眠狀態(tài)。 // 下面兩種情形會讓當前線程停止休眠狀態(tài): // 1.鎖由當前線程獲取。 // 2.其他一些線程中斷當前線程,并且支持對鎖獲取的中斷。 // 當前線程出現(xiàn)下面兩種情況時,將拋出InterruptedException,并清除當前線程的中斷狀態(tài)。 // 1.當前線程在進入此方法時,已經(jīng)設置為中斷狀態(tài)。 // 2. 當前線程在獲取鎖時被中斷,并且支持對鎖獲取中斷。 void lockInterruptibly() throws InterruptedException; // 嘗試獲取鎖,如果鎖處于空閑狀態(tài),則獲取鎖,并立即返回true。如果鎖不可用,則立即返回false。 // 典型用法: // 確保解鎖前一定獲取到鎖 // if (lock.tryLock()) { // try { // // manipulate protected state // } finally { // lock.unlock(); // } // } else { // // perform alternative actions // } boolean tryLock(); // 該方法為tryLock()的重載方法,兩個參數(shù)分別表示為: // time:等待鎖的最長時間 // unit:時間單位 // 如果在給定的等待時間內(nèi)是空閑的并且當前線程沒有被中斷,則獲取鎖。如果鎖可用,則此方法立即獲取鎖并返回true,如果鎖不可用,出于線程調(diào)度目的,將禁用當前線程,該線程將一直處于休眠狀態(tài)。 // 如果指定的等待時間超時,則返回false值。如果時間小于或等于0,則該方法永遠不會等待。 // 下面三種情形會讓當前線程停止休眠狀態(tài): // 1.鎖由當前線程獲取。 // 2.其他一些線程中斷當前線程,并且支持對鎖獲取的中斷。 // 3.到了指定的等待時間。 // 當前線程出現(xiàn)下面兩種情況時,將拋出InterruptedException,并清除當前線程的中斷狀態(tài)。 // 1.當前線程在進入此方法時,已經(jīng)設置為中斷狀態(tài)。 // 2.當前線程在獲取鎖時被中斷,并且支持對鎖獲取中斷。 boolean tryLock(long time, TimeUnit unit) throws InterruptedException; // 釋放鎖,與lock()、tryLock()、tryLock(long , TimeUnit)、lockInterruptibly()相對應。 void unlock(); // 返回綁定到此鎖實例的Condition實例。當前線程只有獲得了鎖,才能調(diào)用Condition實例的方法。 Condition newCondition(); }
ReentrantLock
ReentrantLock位于java.util.concurrent(J.U.C)包下,是Lock接口的實現(xiàn)類,屬于獨占鎖。可重入特性與synchronized相似,但擁有擴展的功能。
ReentrantLock表現(xiàn)為API層面的互斥鎖,通過lock()和unlock()方法完成,是顯式的,而synchronized表現(xiàn)為原生語法層面的互斥鎖,是隱式的。
在JDK 1.6之后,虛擬機對于synchronized關鍵字進行整體優(yōu)化后,在性能上synchronized與ReentrantLock已沒有明顯差距,因此在使用選擇上,需要根據(jù)場景而定,大部分情況下我們依然建議是synchronized關鍵字,原因之一是使用方便語義清晰,二是性能上虛擬機已為我們自動優(yōu)化。而ReentrantLock提供了多樣化的同步特性,如超時獲取鎖、可以被中斷獲取鎖(synchronized的同步是不能中斷的)、等待喚醒機制的多個條件變量(Condition)等,因此當我們確實需要使用到這些功能是,可以選擇ReentrantLock
ReentrantLock都是把具體實現(xiàn)委托給內(nèi)部類(Sync、NonfairSync、FairSync)
1. 可重入性
可重入性:任意線程在獲取到鎖之后能夠再次獲取該鎖而不會被鎖阻塞,synchronized和Reentrant都是可重入的,隱式顯式之分。
實現(xiàn)條件:
鎖需要去識別獲取鎖的線程是否是當前占據(jù)鎖的線程,如果是的話,就成功獲取。鎖獲取一次,內(nèi)部鎖計數(shù)器需要加一,釋放一次減一,計數(shù)為零表示為成功釋放鎖。
ReentrantLock的重入計數(shù)是使用AbstractQueuedSynchronizer的state屬性的,state大于0表示鎖被占用,等于0表示空閑,小于0則是重入次數(shù)太多導致溢出了。
2. 公平鎖模式和非公平鎖模式
ReentrantLock的構造函數(shù)接受可選的公平參數(shù),參數(shù)為true則表示獲取一個公平鎖,不帶參數(shù)或者false表示一個非公平鎖。
構造方法:
// 無參構造方法 // 默認是非公平鎖模式 public ReentrantLock() { sync = new NonfairSync(); } // 有參構造方法 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
公平性鎖和非公平性鎖父類:Sync
sync繼承于AQS
static abstract class Sync extends AbstractQueuedSynchronizer { abstract void lock(); // 非公平獲取, 非公平鎖都需要這個方法 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // state == 0表示無鎖 // 通過CAS的方式競爭鎖,體現(xiàn)非公平性,任何線程來了不用排隊都可以搶鎖 if (compareAndSetState(0, acquires)) { // 加鎖成功,state狀態(tài)改為1 // 當前哪一個線程獲取到鎖,將線程信息記錄到AQS里面 // 設置當前持有鎖的線程 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 當前線程正是鎖持有者,此段邏輯體現(xiàn)可重入性 // 同一個線程可以在不獲取鎖的情況再次進入 // nextc表示被加鎖次數(shù),即重入次數(shù) int nextc = c + acquires; if (nextc < 0) // 被鎖次數(shù)上溢(很少出現(xiàn)) throw new Error("Maximum lock count exceeded"); // 設置加鎖次數(shù),lock幾次就要unlock幾次,否則無法釋放鎖 setState(nextc); return true; } return false; } // 釋放 protected final boolean tryRelease(int releases) { int c = getState() - releases; // 只有鎖的持有者才能釋放鎖 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 鎖被釋放 free = true; setExclusiveOwnerThread(null); } setState(c); return free; } // 當前線程是否持有鎖 protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } // 鎖的持有者 final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } // 加鎖次數(shù) final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } // 是否上鎖,根據(jù)state字段可以判斷 final boolean isLocked() { return getState() != 0; } }
公平鎖模式:FairSync
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } // 方法邏輯與Snyc中的nonfairTryAcquire方法一致,只是加了線程排隊的邏輯 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // !hasQueuedPredecessors() // 判斷隊列中是否有其他線程,沒有才進行鎖的獲取,否則繼續(xù)排隊 // 體現(xiàn)公平性 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
非公平鎖模式:NonfairSync
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { // 不管鎖是否已經(jīng)被占用,采用CAS的方式進行鎖競爭 // 搶鎖成功,則把當前線程設置為活躍的線程 // 搶鎖失敗則走acquire(1)邏輯 // 體現(xiàn)非公平性,線程不必排隊 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // 搶鎖失敗,調(diào)用tryAcquire方法,最終調(diào)用nonfairTryAcquire方法 acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
3. condition
在并發(fā)編程中,每個Java對象都存在一組監(jiān)視器方法,如wait()、notify()以及notifyAll()方法,通過這些方法,我們可以實現(xiàn)線程間通信與協(xié)作(也稱為等待喚醒機制),如生產(chǎn)者-消費者模式、等待-通知模式,而且這些方法必須配合著synchronized關鍵字使用。
與synchronized的等待喚醒機制相比Condition具有更多的靈活性以及精確性,這是因為notify()在喚醒線程時是隨機(同一個鎖),而Condition則可通過多個Condition實例對象建立更加精細的線程控制,也就帶來了更多靈活性了,我們可以簡單理解為以下兩點:
- 通過Condition能夠精細的控制多線程的休眠與喚醒。
- 對于一個鎖,我們可以為多個線程間建立不同的Condition。
Condition相關方法
// 返回綁定到此鎖實例的Condition實例。當前線程只有獲得了鎖,才能調(diào)用Condition實例的方法。 Condition newCondition();
public interface Condition { // 線程程進入等待狀態(tài)直到被通知(signal)或中斷 // 當其他線程調(diào)用singal()或singalAll()方法時,該線程將被喚醒 // 當其他線程調(diào)用interrupt()方法中斷當前線程 // await()相當于synchronized等待喚醒機制中的wait()方法 void await() throws InterruptedException; // 與wait()方法相同,唯一的不同點是,該方法不會再等待的過程中響應中斷 void awaitUninterruptibly(); // 當前線程進入等待狀態(tài),直到被喚醒或被中斷或超時 // 其中nanosTimeout指的等待超時時間,單位納秒 long awaitNanos(long nanosTimeout) throws InterruptedException; // 同awaitNanos,但可以指明時間單位 boolean await(long time, TimeUnit unit) throws InterruptedException; // 線程進入等待狀態(tài),直到被喚醒、中斷或到達某個時 // 間期限(deadline),如果沒到指定時間就被喚醒,返回true,其他情況返回false boolean awaitUntil(Date deadline) throws InterruptedException; // 喚醒一個等待在Condition上的線程,該線程從等待方法返回前必須 // 獲取與Condition相關聯(lián)的鎖,功能與notify()相同 void signal(); // 喚醒所有等待在Condition上的線程,該線程從等待方法返回前必須 // 獲取與Condition相關聯(lián)的鎖,功能與notifyAll()相同 void signalAll(); }
使用await之前必須加鎖,使用signal、signalAll之后記得釋放鎖。
Condition實現(xiàn)原理
Condition的具體實現(xiàn)類是AQS的內(nèi)部類ConditionObject,前面我們分析過AQS中存在兩種隊列,一種是同步隊列,一種是等待隊列,而等待隊列就相對于Condition而言的。注意在使用Condition前必須獲得鎖,同時在Condition的等待隊列上的結點與前面同步隊列的結點是同一個類即Node,其結點的waitStatus的值為CONDITION。在實現(xiàn)類ConditionObject中有兩個結點分別是firstWaiter和lastWaiter,firstWaiter代表等待隊列第一個等待結點,lastWaiter代表等待隊列最后一個等待結點,代碼如下:
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ // 等待隊列第一個等待結點 private transient Node firstWaiter; /** Last node of condition queue. */ // 等待隊列最后一個等待結點 private transient Node lastWaiter; /** * Creates a new {@code ConditionObject} instance. */ public ConditionObject() { } }
每個Condition都對應著一個等待隊列,也就是說如果一個鎖上創(chuàng)建了多個Condition對象,那么也就存在多個等待隊列。等待隊列是一個FIFO的隊列,在隊列中每一個節(jié)點都包含了一個線程的引用,而該線程就是Condition對象上等待的線程。當一個線程調(diào)用了await()相關的方法,那么該線程將會釋放鎖,并構建一個Node節(jié)點封裝當前線程的相關信息加入到等待隊列中進行等待,直到被喚醒、中斷、超時才從隊列中移出。Condition中的等待隊列模型如下:
Node節(jié)點的數(shù)據(jù)結構,在等待隊列中使用的變量與同步隊列是不同的,Condtion中等待隊列的結點只有直接指向的后繼結點并沒有指明前驅(qū)結點,而且使用的變量是nextWaiter而不是next。
firstWaiter指向等待隊列的頭結點,lastWaiter指向等待隊列的尾結點,等待隊列中結點的狀態(tài)只有兩種即CANCELLED和CONDITION,前者表示線程已結束需要從等待隊列中移除,后者表示條件結點等待被喚醒。再次強調(diào)每個Codition對象對于一個等待隊列,也就是說AQS中只能存在一個同步隊列,但可擁有多個等待隊列。
實現(xiàn)代碼:
await方法:
public final void await() throws InterruptedException { // 判斷線程是否被中斷 if (Thread.interrupted()) throw new InterruptedException(); // 創(chuàng)建新結點加入等待隊列并返回 Node node = addConditionWaiter(); // 釋放當前線程鎖即釋放同步狀態(tài) int savedState = fullyRelease(node); int interruptMode = 0; // 判斷結點是否同步隊列(SyncQueue)中,即是否被喚醒 while (!isOnSyncQueue(node)) { // 掛起線程 LockSupport.park(this); // 判斷是否被中斷喚醒,如果是退出循環(huán)。 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 被喚醒后執(zhí)行自旋操作爭取獲得鎖,同時判斷線程是否被中斷 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // clean up if cancelled if (node.nextWaiter != null) // 清理等待隊列中不為CONDITION狀態(tài)的結點 unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } private Node addConditionWaiter() { Node t = lastWaiter; // 判斷是否為結束狀態(tài)的結點并移除 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 創(chuàng)建新結點狀態(tài)為CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); // 加入等待隊列 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
signal()方法:
public final void signal() { // 判斷是否持有獨占鎖,如果不是拋出異常 // 從這點也可以看出只有獨占模式先采用等待隊列,而共享模式下是沒有等待隊列的,也就沒法使用Condition if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; // 喚醒等待隊列第一個結點的線程 if (first != null) doSignal(first); } private void doSignal(Node first) { do { // 移除條件等待隊列中的第一個結點, // 如果后繼結點為null,那么說沒有其他結點將尾結點也設置為null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // 如果被通知節(jié)點沒有進入到同步隊列并且條件等待隊列還有不為空的節(jié)點,則繼續(xù)循環(huán)通知后續(xù)結點 } while (!transferForSignal(first) && (first = firstWaiter) != null); } // transferForSignal方法 final boolean transferForSignal(Node node) { // 嘗試設置喚醒結點的waitStatus為0,即初始化狀態(tài) // 如果設置失敗,說明當期結點node的waitStatus已不為 // CONDITION狀態(tài),那么只能是結束狀態(tài)了,因此返回false // 返回doSignal()方法中繼續(xù)喚醒其他結點的線程,注意這里并 // 不涉及并發(fā)問題,所以CAS操作失敗只可能是預期值不為CONDITION, // 而不是多線程設置導致預期值變化,畢竟操作該方法的線程是持有鎖的。 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 加入同步隊列并返回前驅(qū)結點p Node p = enq(node); int ws = p.waitStatus; // 判斷前驅(qū)結點是否為結束結點(CANCELLED=1)或者在設置 // 前驅(qū)節(jié)點狀態(tài)為Node.SIGNAL狀態(tài)失敗時,喚醒被通知節(jié)點代表的線程 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 喚醒node結點的線程 LockSupport.unpark(node.thread); return true; }
流程:signal()被調(diào)用后,先判斷當前線程是否持有獨占鎖,如果有,那么喚醒當前Condition對象中等待隊列的第一個結點的線程,并從等待隊列中移除該結點,移動到同步隊列中,如果加入同步隊列失敗,那么繼續(xù)循環(huán)喚醒等待隊列中的其他結點的線程,如果成功加入同步隊列,那么如果其前驅(qū)結點是否已結束或者設置前驅(qū)節(jié)點狀態(tài)為Node.SIGNAL狀態(tài)失敗,則通過LockSupport.unpark()喚醒被通知節(jié)點代表的線程,到此signal()任務完成。
注意被喚醒后的線程,將從前面的await()方法中的while循環(huán)中退出,因為此時該線程的結點已在同步隊列中,那么while (!isOnSyncQueue(node))將不在符合循環(huán)條件,進而調(diào)用AQS的acquireQueued()方法加入獲取同步狀態(tài)的競爭中,這就是等待喚醒機制的整個流程實現(xiàn)原理,流程如下圖所示(注意無論是同步隊列還是等待隊列使用的Node數(shù)據(jù)結構都是同一個,不過是使用的內(nèi)部變量不同罷了)
流程圖:
Condition它更強大的地方在于:能夠更加精細的控制多線程的休眠與喚醒。對于同一個鎖,我們可以創(chuàng)建多個Condition,在不同的情況下使用不同的Condition。例如,假如多線程讀/寫同一個緩沖區(qū):當向緩沖區(qū)中寫入數(shù)據(jù)之后,喚醒"讀線程";當從緩沖區(qū)讀出數(shù)據(jù)之后,喚醒"寫線程";并且當緩沖區(qū)滿的時候,"寫線程"需要等待;當緩沖區(qū)為空時,"讀線程"需要等待。如果采用Object類中的wait(),notify(),notifyAll()實現(xiàn)該緩沖區(qū),當向緩沖區(qū)寫入數(shù)據(jù)之后需要喚醒"讀線程"時,不可能通過notify()或notifyAll()明確的指定喚醒"讀線程",而只能通過notifyAll喚醒所有線程(但是notifyAll無法區(qū)分喚醒的線程是讀線程,還是寫線程)。 但是,通過Condition,就能明確的指定喚醒讀線程。
利用Condition實現(xiàn)等待-通知模式:
public class ABCThread implements Runnable { private ReentrantLock lock; private Condition sCondition; private Condition aCondition; public ABCThread(ReentrantLock lock, Condition sCondition, Condition aCondition) { this.lock = lock; this.sCondition = sCondition; // 喚醒下一個線程 this.aCondition = aCondition; // 阻塞當前線程 } @Override public void run() { int i = 0; while (i < 10) { // 加鎖 lock.lock(); try { // 接收到線程通知則繼續(xù)執(zhí)行,否則就阻塞 aCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.print(Thread.currentThread().getName()+" "); // 喚醒其他線程線程 sCondition.signal(); i++; // 釋放鎖 lock.unlock(); } } } ReentrantLock reentrantLock = new ReentrantLock(); Condition ab = reentrantLock.newCondition(); Condition bc = reentrantLock.newCondition(); Condition ca = reentrantLock.newCondition(); new Thread(new ABCThread(reentrantLock,ab,ca),"A").start(); new Thread(new ABCThread(reentrantLock,bc,ab),"B").start(); new Thread(new ABCThread(reentrantLock,ca,bc),"C").start(); reentrantLock.lock(); ca.signal(); reentrantLock.unlock();
到此這篇關于Java中的Lock與ReentrantLock深入解析的文章就介紹到這了,更多相關Java的Lock與ReentrantLock內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!