JAVA多線程之JDK中的各種鎖詳解(看這一篇就夠了)
1.概論
1.1.實(shí)現(xiàn)鎖的要素
JAVA中的鎖都是可重入的鎖,因為不可重入的試用的時候很容易造成死鎖。這個道理很好想明白:
當(dāng)一個線程已經(jīng)持有一個鎖,并在持有該鎖的過程中再次嘗試獲取同一把鎖時,如果沒有重入機(jī)制,第二次請求會被阻塞,因為鎖已經(jīng)被自己持有。這會導(dǎo)致線程自我死鎖,因為它在等待自己釋放的鎖。
可重入是指獲取鎖的線程可以繼續(xù)重復(fù)的獲得此鎖。其實(shí)我們想都能想到要實(shí)現(xiàn)一把鎖需要些什么,首先肯定是:
標(biāo)志位,也叫信號量,標(biāo)記鎖的狀態(tài)和重入次數(shù),這樣才能完成持有鎖和釋放鎖。
接下來要考慮的是拒接策略,當(dāng)前鎖被持有期間,后續(xù)的請求線程該怎么處理,當(dāng)然可以直接拒絕,JAVA的選擇委婉點(diǎn),選擇了允許這些線程躺在鎖上阻塞等待鎖被釋放。要實(shí)現(xiàn)讓線程躺在鎖上等待,我們想想無非要:
需要支持對一個線程的阻塞、喚醒
需要記錄當(dāng)前哪個線程持有鎖
需要一個隊列維護(hù)所有阻塞在當(dāng)前鎖上的線程
OK,以上四點(diǎn)就是JAVA鎖的核心,總結(jié)起來就是信號量+隊列,分別用來記錄持有者和等待者。
1.2.阻塞、喚醒操作
首先我們來看看阻塞和喚醒的操作,在JDK中提供了一個Unsafe類,該類中提供了阻塞或喚醒線程的一對操作 原語——park/unpark:
public native void unpark(Object var1); public native void park(boolean var1, long var2);
這對原語最終會調(diào)用操作系統(tǒng)的程序接口執(zhí)行線程操作。
1.2.阻塞隊列
拿來維護(hù)所有阻塞在當(dāng)前鎖上的線程的隊列能是個普通隊列嗎?很顯然不是,它的操作必須是線程安全的是吧,所以這個隊列用阻塞隊列實(shí)現(xiàn)才合適。什么是阻塞隊列:
阻塞隊列提供了線程安全的元素插入和移除操作,并且在特定條件下會阻塞線程,直到滿足操作條件。
說到JDK中的阻塞隊列,其核心就是AbstractQueuedSynchronizer,簡稱AQS,由雙向鏈表實(shí)現(xiàn)的一個元素操作絕對安全的隊列,用來在鎖的實(shí)現(xiàn)中維護(hù)阻塞在鎖上的線程上的隊列的這個角色。
來看看AQS的源碼:
它有指向前后節(jié)點(diǎn)的指針、有一個標(biāo)志位state、還有一個提供線程操作原原語(阻塞、喚醒)的unsafe類。
所以其實(shí)AQS就長這樣:
點(diǎn)進(jìn)源碼可以看到其隨便一個方法都是線程安全的:
由于本文不是專門聊AQS這里就不擴(kuò)展了,反正知道AQS是一個線程安全的阻塞隊列就對了。
1.3.Lock接口和Sync類
JAVA中所有鎖的頂級父接口,用來規(guī)范定義一把鎖應(yīng)該有那些行為職責(zé):
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
JAVA中所有鎖的實(shí)現(xiàn)都是依托AQS去作為阻塞隊列,每個鎖內(nèi)部都會實(shí)現(xiàn)一個Sync內(nèi)部類,在自身Sync內(nèi)部以不同的策略去操作AQS實(shí)現(xiàn)不同種類的鎖。
abstract static class Sync extends AbstractQueuedSynchronizer {......}
2.各種鎖
2.1.互斥鎖
2.1.1.概論
ReentrantLock,互斥鎖,ReentrantLock本身沒有任何代碼邏輯,依靠內(nèi)部類Sync干活兒:
public class ReentrantLock implements Lock, Serializable { private final ReentrantLock.Sync sync; public void lock() { this.sync.lock(); } public void unlock() { this.sync.release(1); } ...... }
ReentrantLock的Sync繼承了AQS
abstract static class Sync extends AbstractQueuedSynchronizer {......}
Sync是抽象類,有兩個實(shí)現(xiàn):
NonfairSync,公平鎖
FairSync,非公平鎖
實(shí)例化ReentrantLock的實(shí)例時,根據(jù)傳入的標(biāo)志位可以創(chuàng)建公平和公平的實(shí)現(xiàn)
public class ReentrantLock implements Lock, java.io.Serializable{ public ReentrantLock() { sync = new NonfairSync(); } ? public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } ...... } }
2.1.2.源碼
1.lock()
公平鎖的lock():
static final class FairSync extends Sync { final void lock() { acquire(1);//進(jìn)來直接排隊 }
非公平鎖的lock():
static final class NonfairSync extends Sync { final void lock() { if (compareAndSetState(0, 1))//進(jìn)來直接搶鎖 setExclusiveOwnerThread(Thread.currentThread());//將鎖的持有者設(shè)置為當(dāng)前線程 else acquire(1);//沒搶過再去排隊 } }
acquire()是AQS的模板方法:
tryAcquire,嘗試再去獲取一次鎖,公平鎖依然是排隊搶,去看看阻塞隊列是否為空;非公平鎖依然是直接搶。
acquireQueued,將線程放入阻塞隊列。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
acquireQueued(..)是lock()最關(guān)鍵的一部分,addWaiter(..)把Thread對象加入阻塞隊列,acquireQueued(..)完成對線程的阻塞。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {//如果發(fā)現(xiàn)自己在隊頭就去拿鎖 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())//調(diào)用原語,阻塞自己 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
acquireQueued(..)函數(shù)有一個返回值,表示什么意思 呢?雖然該函數(shù)不會中斷響應(yīng),但它會記錄被阻塞期間有沒有其他線 程向它發(fā)送過中斷信號。如果有,則該函數(shù)會返回true;否則,返回false。所以才有了以下邏輯:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
當(dāng) acquireQueued(..) 返回 true 時,會調(diào)用 selfInterrupt (),自己給自己發(fā)送中斷信號,也就是自己把自己的中斷標(biāo)志位設(shè) 為true。之所以要這么做,是因為自己在阻塞期間,收到其他線程中 斷信號沒有及時響應(yīng),現(xiàn)在要進(jìn)行補(bǔ)償。這樣一來,如果該線程在loc k代碼塊內(nèi)部有調(diào)用sleep()之類的阻塞方法,就可以拋出異常,響 應(yīng)該中斷信號。
2.unlock()
unlock的邏輯很簡單,每次unlock,state-1,直到state=0時,將鎖的擁有者置null,釋放鎖。由于只有鎖的持有線程才能操作lock,所以unlock()不需要用CAS,操作時直接判斷一下是不是鎖的持有線程在操作即可。
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) {//釋放鎖 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h);//喚醒阻塞隊列中的后繼者 return true; } return false; }
釋放鎖:
protected final boolean tryRelease(int releases) { int c = getState() - releases;//每次unlock,state減1 if (Thread.currentThread() != getExclusiveOwnerThread())//判斷是不是鎖的持有線程 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) {//state為0表示該鎖沒有被持有 free = true; setExclusiveOwnerThread(null);//將鎖的持有者置null } setState(c); return free; }
喚醒后繼者:
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
2.2.讀寫鎖
讀寫鎖是一個實(shí)現(xiàn)讀寫互斥的鎖,讀寫鎖包含一個讀鎖、一個寫鎖:
public interface ReadWriteLock{ Lock readLock(); Lock writeLock(); }
讀寫鎖的使用就是直接調(diào)用對應(yīng)鎖進(jìn)行鎖定和解鎖:
ReadWriteLock rwLock=new ReetrantReadWriteLock(); Lock rLock=rwLock.readLock(); rLock.lock(); rLock.unLock(); Lock wLock=rwLock.writeLock(); wLock.lock(); wLock.unLock();
讀寫鎖的Sync內(nèi)部類對讀鎖和寫鎖采用同一個int型的信號量的高16位和低16位分別表示讀寫鎖的狀態(tài)和重入次數(shù),這樣一次CAS就能統(tǒng)一處理進(jìn)行讀寫互斥操作:
abstract static class Sync extends AbstractQueuedSynchronizer { static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; static int sharedCount(int c) { return c >>> SHARED_SHIFT; } static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } }
2.3.Condition
2.3.1.概論
condition用于更加細(xì)粒度的控制鎖上面的線程阻塞、喚醒。
以下以一個經(jīng)典的生產(chǎn)、消費(fèi)者問題為例:
隊列空的時候進(jìn)來的消費(fèi)者線程阻塞,有數(shù)據(jù)放進(jìn)來后喚醒阻塞的消費(fèi)者線程。
隊列滿的時候進(jìn)來的生產(chǎn)者線程阻塞,有空位后喚醒阻塞的生產(chǎn)者線程。
鎖粒度的實(shí)現(xiàn):
public void enqueue(){ synchronized(queue){ while(queue.full()){ queue.wait(); } //入隊列 ...... //通知消費(fèi)者,隊列中有數(shù)據(jù)了 queue.notify(); } } ? public void dequeue(){ synchronized(queue){ while(queue.empty()){ queue.wait(); } //出隊列 ...... //通知生產(chǎn)者,隊列中有空位了,可以繼續(xù)放數(shù)據(jù) queue.notify(); } }
可以發(fā)現(xiàn),喚醒的時候把阻塞的生產(chǎn)消費(fèi)線程一起喚醒了。
條件粒度的實(shí)現(xiàn):
private final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); // 用于等待隊列不滿 private final Condition notEmpty = lock.newCondition(); // 用于等待隊列非空 public void enqueue(Object item) { try { while (queue.isFull()) { notFull.await(); // 等待隊列不滿 } // 入隊列操作 // ... // 入隊后,通知等待的消費(fèi)者 notEmpty.signal(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 保持中斷狀態(tài) // 處理中斷邏輯 } finally { queue.unlock(); } } public void dequeue() { try { while (queue.isEmpty()) { notEmpty.await(); // 等待隊列非空 } // 出隊列操作 // ... // 出隊后,通知等待的生產(chǎn)者 notFull.signal(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 保持中斷狀態(tài) // 處理中斷邏輯 } finally { queue.unlock(); } }
2.3.2.底層實(shí)現(xiàn)
Condition由Lock產(chǎn)生,因此Lock中持有Condition:
public interface Lock { ...... Condition newCondition(); }
承擔(dān)功能的其實(shí)就是Syn中的ConditionObject,也就是AQS中的ConditionObject:
final ConditionObject newCondition() { return new ConditionObject(this); }
一個Condition上面阻塞著多個線程,所以每個Condition內(nèi)部都有一個隊列,用來記錄阻塞在這個condition上面的線程,這個隊列其實(shí)也是AQS實(shí)現(xiàn)的,AQS中除了實(shí)現(xiàn)一個以Node為節(jié)點(diǎn)的隊列,還實(shí)現(xiàn)了一個以ConditionObject為節(jié)點(diǎn)的隊列:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; private transient Node firstWaiter; private transient Node lastWaiter; ...... } }
Condition是個接口,定義了一系列條件操作:
public interface Condition { void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long var1) throws InterruptedException; boolean await(long var1, TimeUnit var3) throws InterruptedException; boolean awaitUntil(Date var1) throws InterruptedException; void signal(); void signalAll(); }
總結(jié)
到此這篇關(guān)于JAVA多線程之JDK中各種鎖的文章就介紹到這了,更多相關(guān)JAVA多線程JDK各種鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
AbstractProcessor擴(kuò)展MapStruct自動生成實(shí)體映射工具類
這篇文章主要為大家介紹了AbstractProcessor擴(kuò)展MapStruct自動生成實(shí)體映射工具類實(shí)現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-01-01基于自定義校驗注解(controller、method、(groups)分組的使用)
這篇文章主要介紹了基于自定義校驗注解(controller、method、(groups)分組的使用),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10mybatisplus實(shí)現(xiàn)自動填充時間的項目實(shí)踐
在數(shù)據(jù)庫操作中,頻繁設(shè)置創(chuàng)建時間和更新時間字段非常繁瑣,通過使用MyBatis-Plus的自動填充功能,可以簡化操作,本文就來詳細(xì)的介紹一下,感興趣的可以了解一下2024-10-10Spring MessageSource獲取消息不符合預(yù)期的問題解決方案
最近我參與的產(chǎn)品要做國際化支持,選擇了用Spring MessageSource來實(shí)現(xiàn),這個Spring 框架提供的工具使用很簡單,網(wǎng)上有各種教程文章,這里不做贅述,只說一個實(shí)際遇到的問題及解決方案,需要的朋友可以參考下2024-01-01