亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Java并發(fā)編程之JUC并發(fā)核心AQS同步隊(duì)列原理剖析

 更新時(shí)間:2021年09月23日 15:54:02   作者:沒頭腦遇到不高興  
AbstractQueuedSynchronizer 簡(jiǎn)稱 AQS,可能我們幾乎不會(huì)直接去使用它,但它卻是 JUC 的核心基礎(chǔ)組件,支撐著 java 鎖和同步器的實(shí)現(xiàn),大神 Doug Lea 在設(shè)計(jì) JUC 包時(shí)希望能夠抽象一個(gè)基礎(chǔ)且通用的組件以支撐上層模塊的實(shí)現(xiàn),AQS 應(yīng)運(yùn)而生

一、AQS介紹

隊(duì)列同步器AbstractQueuedSynchronizer(簡(jiǎn)稱AQS),AQS定義了一套多線程訪問共享資源的同步器框架,是用來構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架,是一個(gè)依賴狀態(tài)(state)的同步器。Java并發(fā)編程的核心在java.util.concurrent(簡(jiǎn)稱juc)包,而juc包的大部分工具都是以AQS為基礎(chǔ)進(jìn)行構(gòu)建的,例如Semaphore、ReentranLock、CountDownLatch、CyclicBarrier等,它的作者是鼎鼎大名的Doug Lea。

AQS具備特性

  • 阻塞等待隊(duì)列
  • 共享/獨(dú)占
  • 公平/非公平
  • 可重入
  • 允許中斷

它維護(hù)了一個(gè)volatile int state(代表共享資源)和一個(gè)FIFO線程等待隊(duì)列(多線程爭(zhēng)用資源被阻塞時(shí)會(huì)進(jìn)入此隊(duì)列)。state的訪問方式有三種:

  • getState() 獲取state
  • setState() 設(shè)置state
  • compareAndSetState() 通過CAS的方式設(shè)置state值

AQS有兩種資源共享方式:Exclusive(獨(dú)占式) 和 Share(共享式)。所謂獨(dú)占式是指依據(jù)AQS中的state控制狀態(tài),只有一個(gè)線程能夠進(jìn)行工作(其它參與調(diào)度的線程會(huì)進(jìn)入阻塞狀態(tài),如ReentrantLock);共享式是指,依據(jù)AQS中的state控制狀態(tài),可以有多個(gè)滿足條件的線程同時(shí)執(zhí)行(如Semaphore/CountDownLatch)?!?/p>

AQS定義兩種隊(duì)列

  • 同步等待隊(duì)列(基于雙向鏈表實(shí)現(xiàn))
  • 條件等待隊(duì)列(基于單向鏈表實(shí)現(xiàn))

不同的自定義同步器爭(zhēng)用共享資源的方式也不同。自定義同步器在實(shí)現(xiàn)時(shí)只需要實(shí)現(xiàn)共享資源state的獲取與釋放方式即可,至于具體線程等待隊(duì)列的維護(hù)(如獲取資源失敗入隊(duì)/喚醒出隊(duì)等),AQS已經(jīng)在頂層實(shí)現(xiàn)好了。一般通過定義內(nèi)部類Sync繼承AQS將同步器所有調(diào)用都映射到Sync對(duì)應(yīng)的方法。

自定義同步器實(shí)現(xiàn)時(shí)主要實(shí)現(xiàn)以下幾種方法:

  • isHeldExclusively():該線程是否正在獨(dú)占資源,如果返回true,則表示當(dāng)前線程正在獨(dú)占資源。只有用到condition才需要去實(shí)現(xiàn)它。
  • tryAcquire(int):獨(dú)占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨(dú)占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負(fù)數(shù)表示失?。?表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回true,否則返回false。

實(shí)現(xiàn)自定義同步組件時(shí),將會(huì)調(diào)用同步器提供的模板方法,這些(部分)模板方法與描述如下。同步器提供的模板方法基本上分為3類:獨(dú)占式獲取與釋放同步狀態(tài)、共享式獲取與釋放同步狀態(tài)和查詢同步隊(duì)列中的等待線程情況。自定義同步組件將使用同步器提供的模板方法來實(shí)現(xiàn)自己的同步語義。

  • acquire(int arg):獨(dú)占式獲取同步狀態(tài),如果當(dāng)前線程獲取同步狀態(tài)成功,則由該方法返回,否則,將會(huì)進(jìn)入同步隊(duì)列等待,該方法將會(huì)調(diào)用可重寫的tryAcquire(int arg)方法;
  • acquireInterruptibly(int arg):與acquire(int arg)相同,但是該方法響應(yīng)中斷,當(dāng)前線程為獲取到同步狀態(tài)而進(jìn)入到同步隊(duì)列中,如果當(dāng)前線程被中斷,則該方法會(huì)拋出InterruptedException異常并返回;
  • tryAcquireNanos(int arg,long nanos):超時(shí)獲取同步狀態(tài),如果當(dāng)前線程在nanos時(shí)間內(nèi)沒有獲取到同步狀態(tài),那么將會(huì)返回false,已經(jīng)獲取則返回true;
  • acquireShared(int arg):共享式獲取同步狀態(tài),如果當(dāng)前線程未獲取到同步狀態(tài),將會(huì)進(jìn)入同步隊(duì)列等待,與獨(dú)占式的主要區(qū)別是在同一時(shí)刻可以有多個(gè)線程獲取到同步狀態(tài);
  • acquireSharedInterruptibly(int arg):共享式獲取同步狀態(tài),響應(yīng)中斷;
  • tryAcquireSharedNanos(int arg, long nanosTimeout):共享式獲取同步狀態(tài),增加超時(shí)限制;
  • release(int arg):獨(dú)占式釋放同步狀態(tài),該方法會(huì)在釋放同步狀態(tài)之后,將同步隊(duì)列中第一個(gè)節(jié)點(diǎn)包含的線程喚醒;
  • releaseShared(int arg):共享式釋放同步狀態(tài);

二、AQS中的隊(duì)列

1、同步等待隊(duì)列

AQS當(dāng)中的同步等待隊(duì)列也稱CLH隊(duì)列,CLH隊(duì)列是Craig、Landin、Hagersten三人發(fā)明的一種基于雙向鏈表數(shù)據(jù)結(jié)構(gòu)的隊(duì)列,是FIFO先入先出線程等待隊(duì)列,Java中的CLH隊(duì)列是原CLH隊(duì)列的一個(gè)變種,線程由原自旋機(jī)制改為阻塞機(jī)制。

這種結(jié)構(gòu)的特點(diǎn)是每個(gè)數(shù)據(jù)結(jié)構(gòu)都有兩個(gè)指針,分別指向直接的后繼節(jié)點(diǎn)和直接前驅(qū)節(jié)點(diǎn)。所以雙向鏈表可以從任意一個(gè)節(jié)點(diǎn)開始很方便的訪問前驅(qū)和后繼。每個(gè) Node 其實(shí)是由線程封裝,當(dāng)線程爭(zhēng)搶鎖失敗后會(huì)封裝成 Node 加入到 ASQ 隊(duì)列中去;當(dāng)獲取鎖的線程釋放鎖以后,會(huì)從隊(duì)列中喚醒一個(gè)阻塞的節(jié)點(diǎn)線程 。

2、條件等待隊(duì)列

條件等待隊(duì)列是單向鏈表實(shí)現(xiàn)的,此時(shí)Node(下面會(huì)介紹)中pre和next都為null。Condition是一個(gè)多線程間協(xié)調(diào)通信的工具類,使得某個(gè)、或者某些線程一起等待某個(gè)條件(Condition),只有當(dāng)該條件具備時(shí),這些等待線程才會(huì)被喚醒,從而重新爭(zhēng)奪鎖。

3、AQS隊(duì)列節(jié)點(diǎn)Node

同步隊(duì)列中的節(jié)點(diǎn)(Node)用來保存獲取同步狀態(tài)失敗的線程引用、等待狀態(tài)以及前驅(qū)和后繼節(jié)點(diǎn),節(jié)點(diǎn)的屬性類型與名稱等,Node類基本屬性定義如下所示,它是在AbstractQueuedSynchronizer中的一個(gè)內(nèi)部類。

注意:如果Node在條件隊(duì)列當(dāng)中,Node必須是獨(dú)占模式,不能是共享模式。

static final class Node {
	static final Node SHARED = new Node();
	static final Node EXCLUSIVE = null;
	static final int CANCELLED =  1;
	static final int SIGNAL    = -1;
	static final int CONDITION = -2;
	static final int PROPAGATE = -3;
	volatile int waitStatus;
	volatile Node prev;
	volatile Node next;
	volatile Thread thread;
	Node nextWaiter;
}

Node pre:前驅(qū)節(jié)點(diǎn),當(dāng)前節(jié)點(diǎn)加入到同步隊(duì)列中被設(shè)置(尾部添加)

Node next:后繼節(jié)點(diǎn)

Thread thread:節(jié)點(diǎn)同步狀態(tài)的線程

Node nextWaiter:等待隊(duì)列中的后繼節(jié)點(diǎn),如果當(dāng)前節(jié)點(diǎn)是共享的,那么這個(gè)字段是一個(gè)SHARED常量,也就是說節(jié)點(diǎn)類型(獨(dú)占和共享)和等待隊(duì)列中的后繼節(jié)點(diǎn)共用同一個(gè)字段

int waitStatus:等待狀態(tài),標(biāo)記當(dāng)前節(jié)點(diǎn)的信號(hào)量狀態(tài) (1,0,-1,-2,-3)5種狀態(tài),使用CAS更改狀態(tài),volatile保證線程可見性,高并發(fā)場(chǎng)景下,即被一個(gè)線程修改后,狀態(tài)會(huì)立馬讓其他線程可見,五種狀態(tài)分別為:

  • CANCELLED,值為1,在同步隊(duì)列中等待的線程等待超時(shí)或者被中斷,需要從同步隊(duì)列中取消等待,節(jié)點(diǎn)進(jìn)入該狀態(tài)后將不會(huì)變化
  • SIGNAL,值為-1,后繼節(jié)點(diǎn)的線程處于等待狀態(tài),而當(dāng)前的節(jié)點(diǎn)如果釋放了同步狀態(tài)或者被取消,將會(huì)通知后繼節(jié)點(diǎn),使后繼節(jié)點(diǎn)的線程得以運(yùn)行。
  • CONDITION,值為-2,節(jié)點(diǎn)在等待隊(duì)列中,節(jié)點(diǎn)的線程等待在Condition上,當(dāng)其他線程對(duì)Condition調(diào)用了signal()方法后,該節(jié)點(diǎn)會(huì)從等待隊(duì)列中轉(zhuǎn)移到同步隊(duì)列中,加入到同步狀態(tài)的獲取中
  • PROPAGATE ,值為-3,表示下一次共享式同步狀態(tài)獲取將會(huì)被無條件地傳播下去
  • INITIAL,值為0,初始狀態(tài)

三、同步隊(duì)列源碼分析

1、同步隊(duì)列分析

同步器擁有首節(jié)點(diǎn)(head)和尾節(jié)點(diǎn)(tail),沒有成功獲取同步狀態(tài)的線程將會(huì)成為節(jié)點(diǎn)加入該隊(duì)列的尾部,同步隊(duì)列的基本結(jié)構(gòu)如圖所示。

同步器包含了兩個(gè)節(jié)點(diǎn)類型的引用,一個(gè)指向頭節(jié)點(diǎn),而另一個(gè)指向尾節(jié)點(diǎn)。同步器提供了一個(gè)基于CAS的設(shè)置尾節(jié)點(diǎn)的方法:compareAndSetTail(Node expect,Node update),它需要傳遞當(dāng)前線程“認(rèn)為”的尾節(jié)點(diǎn)和當(dāng)前節(jié)點(diǎn),只有設(shè)置成功后,當(dāng)前節(jié)點(diǎn)才正式與之前的尾節(jié)點(diǎn)建立關(guān)聯(lián)。 涉及兩個(gè)變化:

  • 1. 新的線程封裝成 Node 節(jié)點(diǎn)追加到同步隊(duì)列中,設(shè)置 prev 節(jié)點(diǎn)以及修改當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)的 next 節(jié)點(diǎn)指向自己
  • 2. 通過 CAS 講 tail 重新指向新的尾部節(jié)點(diǎn)

head節(jié)點(diǎn)表示獲取鎖成功的節(jié)點(diǎn),當(dāng)頭結(jié)點(diǎn)在釋放同步狀態(tài)時(shí),會(huì)喚醒后繼節(jié)點(diǎn),如果后繼節(jié)點(diǎn)獲得鎖成功,會(huì)把自己設(shè)置為頭結(jié)點(diǎn),節(jié)點(diǎn)的變化過程如下

同步隊(duì)列遵循FIFO,首節(jié)點(diǎn)是獲取同步狀態(tài)成功的節(jié)點(diǎn),首節(jié)點(diǎn)的線程在釋放同步狀態(tài)時(shí),將會(huì)喚醒后繼節(jié)點(diǎn),而后繼節(jié)點(diǎn)將會(huì)在獲取同步狀態(tài)成功時(shí)將自己設(shè)置為首節(jié)點(diǎn),該過程如下圖所示。涉及兩個(gè)變化:

  • 1. 修改 head 節(jié)點(diǎn)指向下一個(gè)獲得鎖的節(jié)點(diǎn)
  • 2. 新的獲得鎖的節(jié)點(diǎn),將 prev 的指針指向 null

設(shè)置首節(jié)點(diǎn)是通過獲取同步狀態(tài)成功的線程來完成的,由于只有一個(gè)線程能夠成功獲取到同步狀態(tài),因此設(shè)置頭節(jié)點(diǎn)的方法并不需要使用CAS來保證,它只需要將首節(jié)
點(diǎn)設(shè)置成為原首節(jié)點(diǎn)的后繼節(jié)點(diǎn)并斷開原首節(jié)點(diǎn)的next引用即可。

2、同步隊(duì)列——獨(dú)占模式源碼分析

acquire方法(獨(dú)占獲?。┰创a分析

通過調(diào)用同步器的acquire(int arg)方法可以獲取同步狀態(tài),該方法對(duì)中斷不敏感,也就是由于線程獲取同步狀態(tài)失敗后進(jìn)入同步隊(duì)列中,后續(xù)對(duì)線程進(jìn)行中斷操作時(shí),線程不會(huì)從同步隊(duì)列中移出。

public final void acquire(int arg) {
       if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

上面方法中首先調(diào)用自定義同步器實(shí)現(xiàn)的tryAcquire(int arg)方法(重寫該方法),該方法保證線程安全的獲取同步狀態(tài),如果同步狀態(tài)獲取失敗,則構(gòu)造同步節(jié)點(diǎn)(獨(dú)占式Node.EXCLUSIVE,同一時(shí)刻只能有一個(gè)線程成功獲取同步狀態(tài))并通過addWaiter(Node node)方法將該節(jié)點(diǎn)加入到同步隊(duì)列的尾部,最后調(diào)用acquireQueued(Node node,int arg)方法,使得該節(jié)點(diǎn)以“死循環(huán)”的方式獲取同步狀態(tài)。如果獲取不到則阻塞節(jié)點(diǎn)中的線程,而被阻塞線程的喚醒主要依靠前驅(qū)節(jié)點(diǎn)的出隊(duì)或阻塞線程被中斷來實(shí)現(xiàn)。

下面看下addWaiter方法的實(shí)現(xiàn):把當(dāng)前線程構(gòu)建為Node節(jié)點(diǎn);判斷尾結(jié)點(diǎn)是否為空,通過CAS的方式將當(dāng)前節(jié)點(diǎn)放到隊(duì)列尾部;如果尾結(jié)點(diǎn)不為空或者前面CAS插入尾結(jié)點(diǎn)失敗,調(diào)用enq方法,通過自旋的方式插入尾結(jié)點(diǎn)。

private Node addWaiter(Node mode) {
	// 1. 將當(dāng)前線程構(gòu)建成Node類型
	Node node = new Node(Thread.currentThread(), mode);
	// Try the fast path of enq; backup to full enq on failure
	Node pred = tail;
	// 2. 1當(dāng)前尾節(jié)點(diǎn)是否為null?
	if (pred != null) {
		// 2.2 將當(dāng)前節(jié)點(diǎn)尾插入的方式
		node.prev = pred;
		// 2.3 CAS將節(jié)點(diǎn)插入同步隊(duì)列的尾部
		if (compareAndSetTail(pred, node)) {
			pred.next = node;
			return node;
		}
	}
	enq(node);
	return node;
}

下面看下enq方法的實(shí)現(xiàn):判斷尾結(jié)點(diǎn)是否為空,如果為空,則通過CAS的方式創(chuàng)建一個(gè)空的頭結(jié)點(diǎn)(Thread為空),并將尾結(jié)點(diǎn)也指向頭結(jié)點(diǎn);如果尾結(jié)點(diǎn)不為空或者上面CAS創(chuàng)建頭結(jié)點(diǎn)失敗,將當(dāng)前隊(duì)列的前驅(qū)指針指向原來的尾結(jié)點(diǎn),通過CAS的方式將當(dāng)前節(jié)點(diǎn)放到隊(duì)列尾部,將原來尾結(jié)點(diǎn)的后繼指針指向當(dāng)前節(jié)點(diǎn);如果前面都失敗了,進(jìn)行下一次循環(huán)。當(dāng)前線程構(gòu)造的node節(jié)點(diǎn)通過addWaiter方法執(zhí)行入隊(duì)之后,其waitStatus為0,頭結(jié)點(diǎn)的waitStatus也是0,此時(shí)是下面這種結(jié)構(gòu)

private Node enq(final Node node) {
	for (;;) {
		Node t = tail;
		if (t == null) { // Must initialize
			//隊(duì)列為空需要初始化,創(chuàng)建空的頭節(jié)點(diǎn)
			if (compareAndSetHead(new Node()))
				tail = head;
		} else {
			node.prev = t;
			//set尾部節(jié)點(diǎn)
			if (compareAndSetTail(t, node)) {//當(dāng)前節(jié)點(diǎn)置為尾部
				t.next = node; //前驅(qū)節(jié)點(diǎn)的next指針指向當(dāng)前節(jié)點(diǎn)
				return t;
			}
		}
	}
}

通過addWaiter 方法把線程添加到鏈表后, 會(huì)接著把 Node 作為參數(shù)傳遞給acquireQueued 方法,去競(jìng)爭(zhēng)鎖

  • 1. 獲取當(dāng)前節(jié)點(diǎn)的 prev 節(jié)點(diǎn)
  • 2. 如果 prev 節(jié)點(diǎn)為 head 節(jié)點(diǎn),那么它就有資格去爭(zhēng)搶鎖,調(diào)用 tryAcquire 搶占鎖
  • 3. 搶占鎖成功以后,把獲得鎖的節(jié)點(diǎn)設(shè)置為 head ,并且移除原來的初始head節(jié)點(diǎn),通過setHead方法和p.next=null來將新的head(獲取鎖的線程節(jié)點(diǎn))與原h(huán)ead斷開連接,并將新的head的thread設(shè)為null。由此可見head節(jié)點(diǎn)的waitStatus都為0
  • 4. 如果當(dāng)前節(jié)點(diǎn)的前驅(qū)不是head或者當(dāng)前節(jié)點(diǎn)是head但是獲取鎖失敗,則根據(jù)前驅(qū)節(jié)點(diǎn)的waitStatus(SIGNAL)決定是否需要掛起線程
/**
 * 已經(jīng)在隊(duì)列當(dāng)中的Thread節(jié)點(diǎn),準(zhǔn)備阻塞等待獲取鎖
 */
final boolean acquireQueued(final Node node, int arg) {
	boolean failed = true;
	try {
		boolean interrupted = false;
		for (;;) {//死循環(huán)
			final Node p = node.predecessor();//找到當(dāng)前結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)
			if (p == head && tryAcquire(arg)) {//如果前驅(qū)結(jié)點(diǎn)是頭結(jié)點(diǎn),才tryAcquire,其他結(jié)點(diǎn)是沒有機(jī)會(huì)tryAcquire的。
				setHead(node);//獲取同步狀態(tài)成功,將當(dāng)前結(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn)。
				p.next = null; // help GC
				failed = false;
				return interrupted;
			}
			/**
			 * 如果前驅(qū)節(jié)點(diǎn)不是Head,通過shouldParkAfterFailedAcquire判斷是否應(yīng)該阻塞
			 * 前驅(qū)節(jié)點(diǎn)信號(hào)量為-1,當(dāng)前線程可以安全被parkAndCheckInterrupt用來阻塞線程
			 */
			if (shouldParkAfterFailedAcquire(p, node) &&
					parkAndCheckInterrupt())
				interrupted = true;
		}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}

下面是setHead方法的實(shí)現(xiàn)

private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

在acquireQueued(final Node node,int arg)方法中,當(dāng)前線程在“死循環(huán)”中嘗試獲取同步狀態(tài),而只有前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)才能夠嘗試獲取同步狀態(tài),這是為什么?原因有兩個(gè),如下。

  • 第一,頭節(jié)點(diǎn)是成功獲取到同步狀態(tài)的節(jié)點(diǎn),而頭節(jié)點(diǎn)的線程釋放了同步狀態(tài)之后,將會(huì)喚醒其后繼節(jié)點(diǎn),后繼節(jié)點(diǎn)的線程被喚醒后需要檢查自己的前驅(qū)節(jié)點(diǎn)是否是頭節(jié)點(diǎn)。
  • 第二,維護(hù)同步隊(duì)列的FIFO原則。該方法中,節(jié)點(diǎn)自旋獲取同步狀態(tài)的行為如圖所示

如果前驅(qū)節(jié)點(diǎn)不是Head,通過shouldParkAfterFailedAcquire判斷是否應(yīng)該阻塞:如果前驅(qū)節(jié)點(diǎn)waitStatus為-1(SIGNAL的狀態(tài)),當(dāng)前線程可以安全的被parkAndCheckInterrupt用來阻塞線程;通過循環(huán)掃描鏈表把 CANCELLED 狀態(tài)的節(jié)點(diǎn)移除;如果前驅(qū)節(jié)點(diǎn)waitStatus不是-1,則通過CAS將前驅(qū)節(jié)點(diǎn)的waitStatus改為-1。

第一次循環(huán)進(jìn)入shouldParkAfterFailedAcquire方法時(shí)head節(jié)點(diǎn)為0,會(huì)將其改為SIGNAL,此時(shí)會(huì)返回false,那么外層的方法acquireQueued方法會(huì)執(zhí)行第二次循環(huán)進(jìn)入shouldParkAfterFailedAcquire方法,此時(shí)會(huì)返回true,當(dāng)前線程可以被阻塞,則調(diào)用parkAndCheckInterrupt()方法阻塞當(dāng)前線程,其底層調(diào)用的是UnSafe類里面的park方法。

shouldParkAfterFailedAcquire方法會(huì)將前驅(qū)節(jié)點(diǎn)的waitStatus改為SIGNAL,因?yàn)橹挥星膀?qū)節(jié)點(diǎn)的狀態(tài)是SIGNAL后繼節(jié)點(diǎn)才可以被阻塞,次數(shù)除了tail節(jié)點(diǎn)的狀態(tài)是0,其他的都是-1。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	int ws = pred.waitStatus;
	if (ws == Node.SIGNAL)
		/*
		 * 若前驅(qū)結(jié)點(diǎn)的狀態(tài)是SIGNAL,意味著當(dāng)前結(jié)點(diǎn)可以被安全地park
		 */
		return true;
	if (ws > 0) {
		/*
		 * 前驅(qū)節(jié)點(diǎn)狀態(tài)如果被取消狀態(tài),將被移除出隊(duì)列
		 */
		do {
			node.prev = pred = pred.prev;
		} while (pred.waitStatus > 0);
		pred.next = node;
	} else {
		/*
		 * 當(dāng)前驅(qū)節(jié)點(diǎn)waitStatus為 0 or PROPAGATE狀態(tài)時(shí)
		 * 將其設(shè)置為SIGNAL狀態(tài),然后當(dāng)前結(jié)點(diǎn)才可以可以被安全地park
		 */
		compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
	}
	return false;
}

通過分析acquireQueued方法可以得出結(jié)論:頭結(jié)點(diǎn)是獲取同步狀態(tài)成功的節(jié)點(diǎn),頭結(jié)點(diǎn)的所有有效后繼節(jié)點(diǎn)線程都會(huì)被阻塞,釋放鎖后需要挨個(gè)喚醒頭結(jié)點(diǎn)的后續(xù)線程節(jié)點(diǎn)。

獨(dú)占式同步狀態(tài)獲取流程,也就是acquire(int arg)方法調(diào)用流程,如圖所示。

release方法(獨(dú)占釋放)源碼分析

當(dāng)前線程獲取同步狀態(tài)并執(zhí)行了相應(yīng)邏輯之后,就需要釋放同步狀態(tài),使得后續(xù)節(jié)點(diǎn)能夠繼續(xù)獲取同步狀態(tài)。通過調(diào)用同步器的release(int arg)方法可以釋放同步狀態(tài),該方法在tryRelease方法釋放了同步狀態(tài)之后,會(huì)喚醒其后繼節(jié)點(diǎn)(進(jìn)而使后繼節(jié)點(diǎn)重新嘗試獲取同步狀態(tài))。

public final boolean release(int arg) {
	if (tryRelease(arg)) {//釋放一次鎖
		Node h = head;
		if (h != null && h.waitStatus != 0)
			unparkSuccessor(h);//喚醒后繼結(jié)點(diǎn)
		return true;
	}
	return false;
}

該方法執(zhí)行時(shí),會(huì)喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)線程,unparkSuccessor(Node node)方法底層使用UnSafe的unpark方法來喚醒處于等待狀態(tài)的線程。

private void unparkSuccessor(Node node) {
	//獲取wait狀態(tài)
	int ws = node.waitStatus;
	if (ws < 0)
		compareAndSetWaitStatus(node, ws, 0);// 將等待狀態(tài)waitStatus設(shè)置為初始值0
 
	/**
	 * 若后繼結(jié)點(diǎn)為空,或狀態(tài)為CANCEL(已失效),則從后尾部往前遍歷找到最前的一個(gè)處于正常阻塞狀態(tài)的結(jié)點(diǎn)
	 * 進(jìn)行喚醒
	 */
	Node s = node.next;
	if (s == null || s.waitStatus > 0) {
		s = null;
		for (Node t = tail; t != null && t != node; t = t.prev)
			if (t.waitStatus <= 0)
				s = t;
	}
	if (s != null)
		LockSupport.unpark(s.thread);//喚醒線程
}

上面方法中判斷node的后繼節(jié)點(diǎn)是空或者waitStatus是撤銷狀態(tài),會(huì)從tail往前遍歷找到一個(gè)離node節(jié)點(diǎn)最近的節(jié)點(diǎn),這是為什么呢? 原因在于上面acquire時(shí)調(diào)用的enq入隊(duì)方法:先compareAndSetTail(t, node)設(shè)置尾結(jié)點(diǎn),然后t.next=node將前驅(qū)節(jié)點(diǎn)的next指針指向當(dāng)前節(jié)點(diǎn),如果t.next=node還沒有執(zhí)行的話,鏈表還沒有建立完整,從前向后遍歷時(shí)會(huì)出現(xiàn)遍歷到t時(shí)找不到t的后繼節(jié)點(diǎn),從后往前遍歷則不會(huì)出現(xiàn)這種情況。

private Node enq(final Node node) {
	for (;;) {
		Node t = tail;
		if (t == null) { // Must initialize
			//隊(duì)列為空需要初始化,創(chuàng)建空的頭節(jié)點(diǎn)
			if (compareAndSetHead(new Node()))
				tail = head;
		} else {
			node.prev = t;
			//set尾部節(jié)點(diǎn)
			if (compareAndSetTail(t, node)) {//當(dāng)前節(jié)點(diǎn)置為尾部
				t.next = node; //前驅(qū)節(jié)點(diǎn)的next指針指向當(dāng)前節(jié)點(diǎn)
				return t;
			}
		}
	}
}

分析了獨(dú)占式同步狀態(tài)獲取和釋放過程后,適當(dāng)做個(gè)總結(jié):在獲取同步狀態(tài)時(shí),同步器維護(hù)一個(gè)同步隊(duì)列,獲取狀態(tài)失敗的線程都會(huì)被加入到隊(duì)列中(都是頭結(jié)點(diǎn)的后繼節(jié)點(diǎn))并在隊(duì)列中進(jìn)行自旋;移出隊(duì)列(或停止自旋)的條件是前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)且成功獲取了同步狀態(tài)。在釋放同步狀態(tài)時(shí),同步器調(diào)用tryRelease(int arg)方法釋放同步狀態(tài),然后喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)。

3、同步隊(duì)列——共享模式源碼分析

同步隊(duì)列共享模式與獨(dú)占模式異同點(diǎn):

  • 共享模式跟獨(dú)占模式的相同點(diǎn):獲取同步狀態(tài)失敗的線程會(huì)被包裝成node節(jié)點(diǎn)添加到隊(duì)列尾部
  • 共享模式跟獨(dú)占模式的不同點(diǎn):獨(dú)占模式中同步狀態(tài)是獨(dú)占的,只有一個(gè)線程可以獲取到同步資源,因此釋放同步資源時(shí)會(huì)喚醒head節(jié)點(diǎn)后面的一個(gè)節(jié)點(diǎn);而共享模式因?yàn)槎鄠€(gè)線程可以共享同步資源,所以喚醒線程時(shí)會(huì)喚醒head節(jié)點(diǎn)后面的所有有效節(jié)點(diǎn)。

acquireShared方法(共享獲取)源碼分析

acquireShared方法會(huì)先調(diào)用tryAcquireShared獲取同步狀態(tài),如果返回值小于0表示獲取失敗,需要進(jìn)行排隊(duì);如果獲取成功,則可以向下執(zhí)行。

public final void acquireShared(int arg) {
	if (tryAcquireShared(arg) < 0)//返回值小于0,獲取同步狀態(tài)失敗,排隊(duì)去;獲取同步狀態(tài)成功,直接返回去干自己的事兒。
		doAcquireShared(arg);
}

獲取失敗時(shí)會(huì)調(diào)用doAcquireShared方法進(jìn)行排隊(duì):

  • 先通過addWaiter方法入隊(duì),上面已經(jīng)介紹了,這里就不再重復(fù)分析了,唯一不同的就是添加的是一個(gè)共享模式的node

  • 判斷當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是否是head,如果是的話再次調(diào)用tryAcquireShared獲取同步資源,如果獲取成功,則將當(dāng)前node設(shè)置為head并且喚醒等待的線程節(jié)點(diǎn)

  • 如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)不是head或者是head但是獲取同步資源失敗,則跟上面的共享模式一樣調(diào)用shouldParkAfterFailedAcquire方法將node的前驅(qū)節(jié)點(diǎn)設(shè)置為SIGNAL(-1)狀態(tài),然后阻塞當(dāng)前線程。

    private void doAcquireShared(int arg) {
    	final Node node = addWaiter(Node.SHARED);//入隊(duì)
    	boolean failed = true;
    	try {
    		boolean interrupted = false;
    		for (;;) {
    			final Node p = node.predecessor();//前驅(qū)節(jié)點(diǎn)
    			if (p == head) {
    				int r = tryAcquireShared(arg); //非公平鎖實(shí)現(xiàn),再嘗試獲取鎖
    				//state==0時(shí)tryAcquireShared會(huì)返回>=0(CountDownLatch中返回的是1)。
    				// state為0說明共享次數(shù)已經(jīng)到了,可以獲取鎖了
    				if (r >= 0) {//r>0表示state==0,前繼節(jié)點(diǎn)已經(jīng)釋放鎖,鎖的狀態(tài)為可被獲取
    					//這一步設(shè)置node為head節(jié)點(diǎn)設(shè)置node.waitStatus->Node.PROPAGATE,然后喚醒node.thread
    					setHeadAndPropagate(node, r);
    					p.next = null; // help GC
    					if (interrupted)
    						selfInterrupt();
    					failed = false;
    					return;
    				}
    			}
    			//前繼節(jié)點(diǎn)非head節(jié)點(diǎn),將前繼節(jié)點(diǎn)狀態(tài)設(shè)置為SIGNAL,通過park掛起node節(jié)點(diǎn)的線程
    			if (shouldParkAfterFailedAcquire(p, node) &&
    					parkAndCheckInterrupt())
    				interrupted = true;
    		}
    	} finally {
    		if (failed)
    			cancelAcquire(node);
    	}
    }

    下面看下setHeadAndPropagate方法:

  • 調(diào)用setHead方法將當(dāng)前節(jié)點(diǎn)設(shè)置head

  • 判斷如果需要執(zhí)行喚醒,通過上面的分析這里會(huì)調(diào)用doReleaseShared執(zhí)行喚醒

    /**
     * 把node節(jié)點(diǎn)設(shè)置成head節(jié)點(diǎn),且Node.waitStatus->Node.PROPAGATE
     */
    private void setHeadAndPropagate(Node node, int propagate) {
    	Node h = head; //h用來保存舊的head節(jié)點(diǎn)
    	setHead(node);//head引用指向node節(jié)點(diǎn)
    	/* 這里意思有兩種情況是需要執(zhí)行喚醒操作
    	 * 1.propagate > 0 表示調(diào)用方指明了后繼節(jié)點(diǎn)需要被喚醒
    	 * 2.頭節(jié)點(diǎn)后面的節(jié)點(diǎn)需要被喚醒(waitStatus<0),不論是老的頭結(jié)點(diǎn)還是新的頭結(jié)點(diǎn)
    	 */
    	if (propagate > 0 || h == null || h.waitStatus < 0 ||
    			(h = head) == null || h.waitStatus < 0) {
    		Node s = node.next;
    		if (s == null || s.isShared())//node是最后一個(gè)節(jié)點(diǎn)或者 node的后繼節(jié)點(diǎn)是共享節(jié)點(diǎn)
    			/* 如果head節(jié)點(diǎn)狀態(tài)為SIGNAL,喚醒head節(jié)點(diǎn)線程,重置head.waitStatus->0
    			 * head節(jié)點(diǎn)狀態(tài)為0(第一次添加時(shí)是0),設(shè)置head.waitStatus->Node.PROPAGATE表示狀態(tài)需要向后繼節(jié)點(diǎn)傳播
    			 */
    			doReleaseShared();
    	}
    }

    看下doReleaseShared方法的實(shí)現(xiàn):

    判斷head!=null && head!=tail,然后判斷head的waitStatus如果是SIGNAL則會(huì)使用CAS的方式將其改為0,這里沒有直接改成PROPAGATE,是因?yàn)閡nparkSuccessor(h)中,如果ws < 0會(huì)設(shè)置為0,所以ws先設(shè)置為0,再設(shè)置為PROPAGATE,這里需要控制并發(fā),因?yàn)槿肟谟衧etHeadAndPropagate跟release兩個(gè),避免兩次unpark。head狀態(tài)為SIGNAL,且成功設(shè)置為0之后喚醒head.next節(jié)點(diǎn)線程,此時(shí)head、head.next的線程都喚醒了,head.next會(huì)去競(jìng)爭(zhēng)鎖,成功后head會(huì)指向獲取鎖的節(jié)點(diǎn),也就是head發(fā)生了變化。head發(fā)生變化后h==head會(huì)不成立了,此時(shí)會(huì)重新循環(huán),繼續(xù)喚醒重新獲取的新head的下一個(gè)節(jié)點(diǎn)。

    如果本身頭節(jié)點(diǎn)的waitStatus是0,將其設(shè)置為PROPAGATE狀態(tài)。意味著需要將狀態(tài)向后一個(gè)節(jié)點(diǎn)傳播。

    最后判斷如果h==head,即已經(jīng)沒有要喚醒的節(jié)點(diǎn)了,跳出循環(huán)向下執(zhí)行。如果h!=head,則說明head指針發(fā)生了變化,head已經(jīng)指向了新喚醒的線程node,繼續(xù)執(zhí)行下次循環(huán),獲取新的head,喚醒head的后續(xù)節(jié)點(diǎn)。

     private void doReleaseShared() {
    	for (;;) {
    		Node h = head;
    		if (h != null && h != tail) {
    			int ws = h.waitStatus;
    			if (ws == Node.SIGNAL) {//head是SIGNAL狀態(tài)
    				/* head狀態(tài)是SIGNAL,重置head節(jié)點(diǎn)waitStatus為0,這里不直接設(shè)為Node.PROPAGATE,
    				 * 是因?yàn)閡nparkSuccessor(h)中,如果ws < 0會(huì)設(shè)置為0,所以ws先設(shè)置為0,再設(shè)置為PROPAGATE
    				 * 這里需要控制并發(fā),因?yàn)槿肟谟衧etHeadAndPropagate跟release兩個(gè),避免兩次unpark
    				 */
    				if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    					continue; //設(shè)置失敗,重新循環(huán)
    				/* head狀態(tài)為SIGNAL,且成功設(shè)置為0之后,喚醒head.next節(jié)點(diǎn)線程
    				 * 此時(shí)head、head.next的線程都喚醒了,head.next會(huì)去競(jìng)爭(zhēng)鎖,成功后head會(huì)指向獲取鎖的節(jié)點(diǎn),
    				 * 也就是head發(fā)生了變化??醋畹紫乱恍写a可知,head發(fā)生變化后會(huì)重新循環(huán),繼續(xù)喚醒head的下一個(gè)節(jié)點(diǎn)
    				 */
    				unparkSuccessor(h);
    				/*
    				 * 如果本身頭節(jié)點(diǎn)的waitStatus是出于重置狀態(tài)(waitStatus==0)的,將其設(shè)置為“傳播”狀態(tài)。
    				 * 意味著需要將狀態(tài)向后一個(gè)節(jié)點(diǎn)傳播
    				 */
    			}
    			else if (ws == 0 &&
    					!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    				continue;                // loop on failed CAS
    		}
    		if (h == head) //如果head變了,重新循環(huán)
    			break;
    	}
    }

    releaseShared方法(共享釋放)源碼分析

    調(diào)用tryReleaseShared方法釋放資源成功時(shí)會(huì)調(diào)用doReleaseShared方法執(zhí)行喚醒邏輯,上面已經(jīng)分析過了。

public final boolean releaseShared(int arg) {
	if (tryReleaseShared(arg)) {
		doReleaseShared();
		return true;
	}
	return false;
}

到此這篇關(guān)于Java并發(fā)編程之JUC并發(fā)核心AQS同步隊(duì)列原理剖析的文章就介紹到這了,更多相關(guān)JUC并發(fā)核心AQS同步隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java Lambda List轉(zhuǎn)Map代碼實(shí)例

    Java Lambda List轉(zhuǎn)Map代碼實(shí)例

    這篇文章主要介紹了Java Lambda List轉(zhuǎn)Map代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-03-03
  • 線上Java程序占用CPU過高解決方案

    線上Java程序占用CPU過高解決方案

    這篇文章主要介紹了線上Java程序占用CPU過高解決方案,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-11-11
  • Spring Boot實(shí)現(xiàn)圖片上傳/加水印一把梭操作實(shí)例代碼

    Spring Boot實(shí)現(xiàn)圖片上傳/加水印一把梭操作實(shí)例代碼

    這篇文章主要給大家介紹了關(guān)于Spring Boot實(shí)現(xiàn)圖片上傳/加水印一把梭操作的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2018-11-11
  • 在SpringBoot項(xiàng)目中使用JetCache緩存的詳細(xì)教程

    在SpringBoot項(xiàng)目中使用JetCache緩存的詳細(xì)教程

    Spring Boot是一個(gè)非常流行的Java開發(fā)框架,JetCache是一個(gè)基于注解的高性能緩存框架,本文將介紹如何在Spring Boot項(xiàng)目中使用JetCache緩存,并提供一個(gè)詳細(xì)案例來說明如何配置和使用JetCache,需要的朋友可以參考下
    2024-06-06
  • 關(guān)于Java中XML Namespace 命名空間問題

    關(guān)于Java中XML Namespace 命名空間問題

    這篇文章主要介紹了Java中XML Namespace 命名空間,XML命名空間是由國(guó)際化資源標(biāo)識(shí)符 (IRI) 標(biāo)識(shí)的 XML 元素和屬性集合,該集合通常稱作 XML“詞匯”,對(duì)XML Namespace 命名空間相關(guān)知識(shí)感興趣的朋友一起看看吧
    2021-08-08
  • Java操作數(shù)據(jù)庫(行級(jí)鎖,for update)

    Java操作數(shù)據(jù)庫(行級(jí)鎖,for update)

    這篇文章主要介紹了Java操作數(shù)據(jù)庫(行級(jí)鎖,for update),文章圍繞Java操作數(shù)據(jù)庫的相關(guān)資料展開詳細(xì)內(nèi)容,需要的小伙伴可以參考一下,希望對(duì)你有所幫助
    2021-12-12
  • tomcat目錄結(jié)構(gòu)簡(jiǎn)介_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理

    tomcat目錄結(jié)構(gòu)簡(jiǎn)介_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理

    這篇文章主要介紹了tomcat目錄結(jié)構(gòu)簡(jiǎn)介_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理的相關(guān)資料,需要的朋友可以參考下
    2017-07-07
  • Java String的intern用法解析

    Java String的intern用法解析

    這篇文章主要介紹了Java String的intern用法解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-10-10
  • springboot 如何解決static調(diào)用service為null

    springboot 如何解決static調(diào)用service為null

    這篇文章主要介紹了springboot 如何解決static調(diào)用service為null的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-06-06
  • java高并發(fā)的線程中斷的幾種方式詳解

    java高并發(fā)的線程中斷的幾種方式詳解

    這篇文章主要介紹了Java線程中斷機(jī)制幾種方法及示例,向大家分享了這幾種方法的介紹幾代碼示例,具有一定參考價(jià)值,需要的朋友可以了解下。
    2021-10-10

最新評(píng)論