Java中的Semaphore源碼分析
(一)概念簡(jiǎn)介
Semaphore是一個(gè)訪問公共資源的線程數(shù)量如限流、停車等,它是一個(gè)基于AQS實(shí)現(xiàn)的共享鎖,主要是通過控制state變量來實(shí)現(xiàn)。
其內(nèi)部結(jié)構(gòu)關(guān)系為:Semaphore內(nèi)部是通過一個(gè)內(nèi)部核心成員變量sync去調(diào)用AQS中父類的方法,NoneFairSync/FairSync繼承于內(nèi)部類Sync,Sync繼承于AQS,在使用Semaphore構(gòu)造方法進(jìn)行實(shí)例化時(shí)可指定公平或非公平,其內(nèi)部主要是靠acquire和release方法進(jìn)行阻塞或釋放。
(二)使用場(chǎng)景
當(dāng)主線程進(jìn)行執(zhí)行時(shí),利用構(gòu)造方法初始化一個(gè)公平或非公平的線程訪問總數(shù),子線程調(diào)用acquire嘗試獲取訪問資源即訪問令牌,待到線程訪問總數(shù)不夠分配即分配出現(xiàn)負(fù)數(shù)時(shí)則進(jìn)行阻塞,當(dāng)其他占用資源被釋放時(shí),會(huì)調(diào)用release方法進(jìn)行喚醒阻塞中的線程。
(1)經(jīng)典停車位或餐廳,因位置有限無法容納更多。
(2)數(shù)據(jù)庫(kù)連接數(shù)限制或控制系統(tǒng)并發(fā)如限流;
(三)特點(diǎn)
(1)子線程調(diào)用acquire方法去資源總數(shù)中分配,如果分配成功則不會(huì)阻塞,否則會(huì)被阻塞,等待被喚醒;
(2)當(dāng)一個(gè)子線程任務(wù)執(zhí)行結(jié)束,會(huì)通過release方法去喚醒阻塞中的線程。
Semaphore簡(jiǎn)單使用
public static void main(String[] args) { Semaphore semaphore = new Semaphore(5); for (int i = 0; i < 10; i++){ int num = i; new Thread(()->{ System.out.println("線程"+num+"初始化!"); try { semaphore.acquire(); System.out.println("線程"+num+"拿到了鎖執(zhí)行權(quán)!"); TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("------------------線程"+num+"被喚醒執(zhí)行!-----------------"); semaphore.release(); }).start(); } System.out.println("main 線程執(zhí)行!"); }
(四)Semaphore源碼分析
(1)構(gòu)造函數(shù)
/** * Semaphore一個(gè)參數(shù)值容量,使用非公平NonfairSync類去實(shí)例化Sync繼承的AQS中的原子變量state值 */ public Semaphore(int permits) { sync = new NonfairSync(permits); } /** * Semaphore采用兩個(gè)參數(shù),一個(gè)容量和是否使用公平或非公平標(biāo)記來初始化Sync繼承AQS中的原子變量state值 */ public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
(2)acquire方法(核心)
/** * 暴露給外部用于是否阻塞調(diào)用的API * 其內(nèi)部是Semaphore內(nèi)部類(Sync)變量sync去調(diào)用acquireSharedInterruptibly */ public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } /** * AQS類中確定獲取共享鎖和是否阻塞 */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted())//判斷執(zhí)行線程是否有中斷標(biāo)記 throw new InterruptedException(); /** * AQS定義了共享鎖方法并強(qiáng)制子類去重寫該方法(模板模式) * 由其子類NonfairSync繼承內(nèi)部類Sync,Sync繼承于AQS類,最終由NoneFairSync去重寫 * NoneFairSync又調(diào)用了父類Sync中的nonfairTryAcquireShared方法確定是否需要阻塞 */ if (tryAcquireShared(arg) < 0)//嘗試獲取共享鎖 doAcquireSharedInterruptibly(arg);//在未標(biāo)記中斷前提下,是否需要真正阻塞 } NoneFairSync類:繼承于Semaphore中的內(nèi)部類Sync protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } Sync類: //自旋的模式對(duì)state進(jìn)行-1并返回state-1后的值 final int nonfairTryAcquireShared(int acquires) { /** * (1)自旋保證本次對(duì)state進(jìn)行-1 * (2)state-1后的值小于0的情況下對(duì)state進(jìn)行CAS設(shè)置新的值 */ for (;;) { int available = getState();//獲取state值 int remaining = available - acquires;//當(dāng)前線程對(duì)其-1 //如果state-1后的值小于0代表無容量可用 //如果state-1后的值大于等于0,則代表共享鎖還有位置可用,并設(shè)置新的state值 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining;//返回state-1后的值 } }
(3)doAcquireSharedInterruptibly方法(核心)
/** * AQS中定義的是否阻塞方法 * 如果在自旋過程中獲取到了共享鎖,則不進(jìn)行阻塞,這也是非公平的原因之一 * 如果在自旋中未獲取到共享鎖則shouldParkAfterFailedAcquire進(jìn)行更改等待狀態(tài) * 在成功改變等待線程信號(hào)量后再調(diào)用parkAndCheckInterrupt是否阻塞 * 上述的parkAndCheckInterrupt在被喚醒之前的一段時(shí)間內(nèi),如果存在中斷標(biāo)記則會(huì)拋出異常,否則正常執(zhí)行 */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED);//以共享鎖模式創(chuàng)建隊(duì)列 boolean failed = true; try { for (;;) {//自旋獲取共享鎖或阻塞 final Node p = node.predecessor();//獲取當(dāng)前線程的前繼節(jié)點(diǎn) if (p == head) {//前繼節(jié)點(diǎn)是否為頭節(jié)點(diǎn) int r = tryAcquireShared(arg);//嘗試再次獲取共享鎖 if (r >= 0) {//獲取到了共享鎖 setHeadAndPropagate(node, r);//設(shè)置新的頭節(jié)點(diǎn)和喚醒后繼節(jié)點(diǎn) p.next = null; // help GC failed = false;//不需要執(zhí)行婁底的cancelAcquire方法 return; } } /** * shouldParkAfterFailedAcquire改變前繼節(jié)點(diǎn)的等待狀態(tài)信號(hào)量 * parkAndCheckInterrupt真正阻塞該線程,使用LockSupport的park方法 */ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())//改變等前繼節(jié)點(diǎn)的等待狀態(tài)信號(hào)量和阻塞該線程 throw new InterruptedException(); } } finally { if (failed)//是否需要執(zhí)行保底方法 //當(dāng)該線程有中斷標(biāo)記或不需要阻塞時(shí)發(fā)生異常取消獲取鎖和過濾超時(shí)節(jié)點(diǎn) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /** * propagate > 0表示可以嘗試喚醒node結(jié)點(diǎn)的后繼結(jié)點(diǎn),可能是-3那個(gè)線程進(jìn)行導(dǎo)致其大于0 * (h = head) == null || h.waitStatus < 0(此時(shí)的h被重新賦值)可能會(huì)引起沒必要的喚醒操作 * 比如線程A任務(wù)結(jié)束后釋放許可,但是線程B任務(wù)還沒結(jié)束,此時(shí)線程C獲取到許可走到這里 * 執(zhí)行完上面的setHead后,然后 h = head 即h執(zhí)行線程C的結(jié)點(diǎn), * 而線程C對(duì)應(yīng)的結(jié)點(diǎn)的 waitStatus = SIGNAL,所以也會(huì)執(zhí)行doReleaseShared喚醒線程D * 線程D喚醒后接著去執(zhí)行 doAcquireSharedInterruptibly 中的for循環(huán), * 執(zhí)行tryAcquireShared去拿許可證的時(shí)候發(fā)現(xiàn)是小于0,后面繼續(xù)走掛起方法去掛起線程D */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared();//喚醒等待中的線程 } }
(4)release方法(核心)
/** * 暴露給外部調(diào)用釋放API * Semaphore內(nèi)部核心成員變量sync調(diào)用AQS中的releaseShared方法進(jìn)行釋放喚醒 */ public void release() { sync.releaseShared(1);//調(diào)用AQS的釋放并喚醒方法 } AQS類: public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {//強(qiáng)制子類Sync重寫該方法,模板模式 doReleaseShared();//調(diào)用AQS自身釋放共享鎖 return true; } return false; } Sync類:強(qiáng)制重寫父類AQS中的嘗試釋放鎖方法 protected final boolean tryReleaseShared(int releases) { for (;;) {//自旋操作來喚醒 int current = getState();//獲取state值 int next = current + releases;//將state值+1 if (next < current) //Integer.MAX是否達(dá)到最大,防止溢出如最大數(shù)+1變?yōu)樨?fù)數(shù) throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next))//CAS改變state值 return true; } }
(5)doReleaseShared方法(核心)
/** * 嘗試喚醒等待線程 * (1)正常喚醒頭結(jié)點(diǎn)的后繼節(jié)點(diǎn)線程 * (2)可能伴隨著多個(gè)共享鎖被釋放,為了防止空閑許可浪費(fèi),會(huì)喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)的后繼節(jié)點(diǎn) */ private void doReleaseShared() { for (;;) { Node h = head;//獲取隊(duì)列中的頭結(jié)點(diǎn) if (h != null && h != tail) {//判斷隊(duì)列中是否還有等待線程 int ws = h.waitStatus;//獲取頭結(jié)點(diǎn)的等待狀態(tài) if (ws == Node.SIGNAL) {//可喚醒狀態(tài) if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//將頭結(jié)點(diǎn)等待狀態(tài)設(shè)置為0 continue;//改變頭結(jié)點(diǎn)等待狀態(tài)失敗,則跳過本次操作,自旋再次設(shè)置 unparkSuccessor(h);//在可喚醒狀態(tài)下且改變頭節(jié)點(diǎn)等待狀態(tài)成功的前提下進(jìn)行喚醒后繼節(jié)點(diǎn) } /** * ws等于0,然后設(shè)置waitStatus為Node.PROPAGATE,表示在自旋過程同時(shí)有多個(gè)鎖都在被釋放 * 線程A執(zhí)行上面的cas操作將頭結(jié)點(diǎn)等待狀態(tài)設(shè)置為0 * 此時(shí)的線程B剛好在執(zhí)行上面if不滿足時(shí)則執(zhí)行else if邏輯,將頭結(jié)點(diǎn)狀態(tài)設(shè)置為-3, * 主要是方便被喚醒線程自旋執(zhí)行doAcquireSharedInterruptibly中的setHeadAndPropagate方法 * 即表示要喚醒head后繼結(jié)點(diǎn)的后繼結(jié)點(diǎn) * waitStatus=PROPAGATE就表示要喚醒head后繼結(jié)點(diǎn)的后繼結(jié)點(diǎn) * 線程A和B都釋放許可了,如果有多個(gè)等待線程在等待喚醒,在setHeadAndPropagate方法中會(huì)有邏輯判斷 * 防止已經(jīng)有2張?jiān)S可卻只有線程C拿到許可,線程D還在傻乎乎的等線程C釋放許可來喚醒線程D else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue;//CAS設(shè)置失敗,自旋重新設(shè)置 } if (h == head)//結(jié)束死循環(huán) break; } }
到此這篇關(guān)于Java中的Semaphore源碼分析的文章就介紹到這了,更多相關(guān)Semaphore源碼分析內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
使用 Redis 緩存實(shí)現(xiàn)點(diǎn)贊和取消點(diǎn)贊的示例代碼
這篇文章主要介紹了使用 Redis 緩存實(shí)現(xiàn)點(diǎn)贊和取消點(diǎn)贊的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03解決JSTL foEach標(biāo)簽 刷新報(bào)錯(cuò)的方法
本篇文章是對(duì)JSTL foEach標(biāo)簽刷新報(bào)錯(cuò)的方法進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下2013-05-05SpringBoot?Http遠(yuǎn)程調(diào)用的方法
這篇文章主要為大家詳細(xì)介紹了SpringBoot?Http遠(yuǎn)程調(diào)用的方法,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-08-08Spring-boot JMS 發(fā)送消息慢的解決方法
這篇文章主要為大家詳細(xì)介紹了Spring-boot JMS 發(fā)送消息慢的解決方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-08-08基于javassist進(jìn)行動(dòng)態(tài)編程過程解析
這篇文章主要介紹了基于javassist進(jìn)行動(dòng)態(tài)編程過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-05-05Java實(shí)現(xiàn)滑動(dòng)驗(yàn)證碼的示例代碼
這篇文章主要為大家介紹了如何用Java語言實(shí)現(xiàn)滑動(dòng)驗(yàn)證碼的生成,項(xiàng)目采用了springboot,maven等技術(shù),感興趣的小伙伴可以跟隨小編學(xué)習(xí)一下2022-02-02mybatis攔截器無法注入spring bean的問題解決
本文主要介紹了mybatis攔截器無法注入spring bean的問題解決,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-02-02Java?MyBatis實(shí)戰(zhàn)之QueryWrapper中and和or拼接技巧大全
在Java中QueryWrapper是MyBatis-Plus框架中的一個(gè)查詢構(gòu)造器,它提供了豐富的查詢方法,其中包括and和or方法,可以用于構(gòu)建復(fù)雜的查詢條件,這篇文章主要給大家介紹了關(guān)于Java?MyBatis實(shí)戰(zhàn)之QueryWrapper中and和or拼接技巧的相關(guān)資料,需要的朋友可以參考下2024-07-07