亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

ReentrantLock從源碼解析Java多線程同步學(xué)習(xí)

 更新時(shí)間:2023年04月19日 15:26:25   作者:碼下客  
這篇文章主要為大家介紹了ReentrantLock從源碼解析Java多線程同步學(xué)習(xí),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

前言

如今多線程編程已成為了現(xiàn)代軟件開發(fā)中的重要部分,而并發(fā)編程中的線程同步問題更是一道難以逾越的坎。在Java語言中,synchronized是最基本的同步機(jī)制,但它也存在著許多問題,比如可重入性不足、死鎖等等。為了解決這些問題,Java提供了更加高級的同步機(jī)制——ReentrantLock。

管程

管程(Monitor)是一種用于實(shí)現(xiàn)多線程同步的抽象數(shù)據(jù)類型,它可以用來協(xié)調(diào)不同線程之間的互斥和同步訪問共享資源。通俗地說,管程就像一個(gè)門衛(wèi),控制著進(jìn)入某個(gè)共享資源區(qū)域的線程數(shù)量和時(shí)間,以避免多個(gè)線程同時(shí)訪問導(dǎo)致的數(shù)據(jù)競爭和混亂。

管程模型

  • Mesa管程模型:由美國計(jì)算機(jī)科學(xué)家Dijkstra提出,是最流行的管程模型之一。在Mesa管程模型中,每個(gè)管程也有一個(gè)條件變量和等待隊(duì)列,但與Hoare管程不同的是,當(dāng)一個(gè)線程請求進(jìn)入管程時(shí),如果條件不滿足,該線程并不會立即被阻塞,而是繼續(xù)執(zhí)行后續(xù)操作,直到該線程主動放棄鎖資源或者其他線程喚醒它。
  • Hoare管程模型:由英國計(jì)算機(jī)科學(xué)家C.A.R. Hoare提出,是最早的管程模型之一。在Hoare管程模型中,每個(gè)管程都有一個(gè)條件變量和一個(gè)等待隊(duì)列,當(dāng)一個(gè)線程請求進(jìn)入管程時(shí),如果條件不滿足,該線程就會被阻塞并加入等待隊(duì)列,直到條件滿足后才被喚醒。
  • Brinch Hansen管程模型:由丹麥計(jì)算機(jī)科學(xué)家Per Brinch Hansen提出,是一種改進(jìn)的管程模型。在Brinch Hansen管程模型中,每個(gè)管程也有一個(gè)條件變量和等待隊(duì)列,但與其他管程模型不同的是,它允許多個(gè)線程同時(shí)在管程中等待,并且不需要像Hoare管程那樣每次只喚醒一個(gè)等待線程。

在Java中,采用的是基于Mesa管程模型實(shí)現(xiàn)的管程機(jī)制。具體地,Java中的synchronized關(guān)鍵字就是基于Mesa管程模型實(shí)現(xiàn)的,包括Java中的AbstractQueuedSynchronizer(AQS)可以被看作是一種基于管程模型實(shí)現(xiàn)的同步框架。

MESA模型

主要特點(diǎn)

  • 互斥訪問

MESA模型采用了互斥訪問的機(jī)制,即同一時(shí)刻只能有一個(gè)線程進(jìn)入管程執(zhí)行代碼。

  • 條件變量

MESA模型還引入了條件變量的概念,用于實(shí)現(xiàn)線程間的等待和喚醒操作。條件變量提供了一種機(jī)制,使得線程可以在等待某個(gè)條件成立時(shí)掛起,并在條件成立時(shí)被喚醒。

  • 等待隊(duì)列

MESA模型使用等待隊(duì)列來維護(hù)處于等待狀態(tài)的線程,這些線程都在等待條件變量成立。等待隊(duì)列由一個(gè)或多個(gè)條件變量組成,每個(gè)條件變量都有自己的等待隊(duì)列。

  • 原子操作

MESA模型要求管程中的所有操作都是原子操作,即一旦進(jìn)入管程,就不能被中斷,直到操作執(zhí)行完畢。

AQS

在講ReentrantLock之前先說一下AQS,AQS(AbstractQueuedSynchronizer)是Java中的一個(gè)同步器,它是許多同步類(如ReentrantLock、Semaphore、CountDownLatch等)的基礎(chǔ)。AQS提供了一種實(shí)現(xiàn)同步操作的框架,其中包括獨(dú)占模式和共享模式,以及一個(gè)等待隊(duì)列來管理線程的等待和喚醒。AQS也借鑒了Mesa模型的思想。

共享變量

AQS內(nèi)部維護(hù)了屬性volatile int state表示資源的可用狀態(tài)
state三種訪問方式:

  • getState()
  • setState()
  • compareAndSetState()

資源訪問方式

Exclusive-獨(dú)占,只有一個(gè)線程能執(zhí)行,如ReentrantLock

Share-共享,多個(gè)線程可以同時(shí)執(zhí)行,如Semaphore/CountDownLatch

主要方法

  • isHeldExclusively():該線程是否正在獨(dú)占資源。只有用到condition才需要去實(shí)現(xiàn)它。
  • tryAcquire(int):獨(dú)占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨(dú)占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負(fù)數(shù)表示失??;0表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回true,否則返回false

隊(duì)列

  • 同步等待隊(duì)列: 主要用于維護(hù)獲取鎖失敗時(shí)入隊(duì)的線程。
  • 條件等待隊(duì)列: 調(diào)用await()的時(shí)候會釋放鎖,然后線程會加入到條件隊(duì)列,調(diào)用signal()喚醒的時(shí)候會把條件隊(duì)列中的線程節(jié)點(diǎn)移動到同步隊(duì)列中,等待再次獲得鎖。

node節(jié)點(diǎn)等待狀態(tài)

  • 值為0,初始化狀態(tài),表示當(dāng)前節(jié)點(diǎn)在sync隊(duì)列中,等待著獲取鎖。
  • CANCELLED,值為1,表示當(dāng)前的線程被取消;
  • SIGNAL,值為-1,表示當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)包含的線程需要運(yùn)行,也就是unpark;
  • CONDITION,值為-2,表示當(dāng)前節(jié)點(diǎn)在等待condition,也就是在condition隊(duì)列中;
  • PROPAGATE,值為-3,表示當(dāng)前場景下后續(xù)的acquireShared能夠得以執(zhí)行;

ReentrantLock源碼分析

在ReentrantLock中有一個(gè)內(nèi)部類Sync會繼承 AQS然后將同步器所有調(diào)用都映射到Sync對應(yīng)的方法。

實(shí)例化ReentrantLock

/**
 * Creates an instance of {@code ReentrantLock}.
 * This is equivalent to using {@code ReentrantLock(false)}.
 */
public ReentrantLock() {
    sync = new NonfairSync();
}
/**
 * Creates an instance of {@code ReentrantLock} with the
 * given fairness policy.
 *
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

ReentrantLock還提供了一個(gè)傳布爾值的實(shí)例化方式,這個(gè)傳true用來創(chuàng)建一個(gè)公平鎖的,默認(rèn)是創(chuàng)建非公平鎖。非公平鎖的話 sync是用NonfairSync來進(jìn)行實(shí)例化,公平鎖sync是用FairSync來進(jìn)行實(shí)例化。

加鎖

現(xiàn)在假設(shè)有AB兩個(gè)線程來競爭鎖

A線程加鎖成功

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        //CAS修改state狀態(tài)
        if (compareAndSetState(0, 1))
           //修改成功設(shè)置exclusiveOwnerThread
            setExclusiveOwnerThread(Thread.currentThread());
        else
           //嘗試獲取資源
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

假定A線程先CAS修改成功,他會設(shè)置exclusiveOwnerThread為A線程

B線程嘗試加鎖

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

我們先看tryAcquire()方法,這里體現(xiàn)出了他的可重入性。

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    //獲取當(dāng)前資源標(biāo)識
    int c = getState();
    if (c == 0) {
        //如果資源沒被占有CAS嘗試加鎖
        if (compareAndSetState(0, acquires)) {
            //修改成功設(shè)置exclusiveOwnerThread
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //資源被占有要判斷占有資源的線程是不是當(dāng)前線程,加鎖成功設(shè)置的exclusiveOwnerThread值在這里就派上了用處
    else if (current == getExclusiveOwnerThread()) {
        //這下面就是將重入次數(shù)設(shè)置到資源標(biāo)識里
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

根據(jù)上面源碼我們可以看出B線程嘗試加鎖是失敗的,接下來看嘗試加鎖失敗后的方法acquireQueued(addWaiter(Node.EXCLUSIVE), arg),該方法實(shí)現(xiàn)分為兩個(gè)部分:

  • addWaiter(Node.EXCLUSIVE):該方法會將當(dāng)前線程加入到等待隊(duì)列(即 Sync 中的 queue)的尾部,并返回該節(jié)點(diǎn)。這個(gè)節(jié)點(diǎn)的模式是獨(dú)占模式(Node.EXCLUSIVE),表示當(dāng)前線程想要獲取獨(dú)占鎖。
  • acquireQueued(Node node, int arg):該方法是一個(gè)循環(huán)方法,用于等待和獲取鎖。

我們先解析addWaiter

private Node addWaiter(Node mode) {
    //構(gòu)建一個(gè)當(dāng)前線程的node節(jié)點(diǎn) 這里prev 和 next 都為null
    Node node = new Node(Thread.currentThread(), mode);
    // 指向雙向鏈表的尾節(jié)點(diǎn)的引用
    Node pred = tail;
    //B線程進(jìn)來目前還未構(gòu)建任何隊(duì)列這里肯定是空的
    if (pred != null) {
        //如果已經(jīng)構(gòu)建過隊(duì)列會把當(dāng)前線程的node節(jié)點(diǎn)的上一個(gè)node節(jié)點(diǎn)指向tail尾節(jié)點(diǎn)
        node.prev = pred;
        //CAS操作把當(dāng)前線程的node節(jié)點(diǎn)設(shè)置為新的tail尾節(jié)點(diǎn)
        if (compareAndSetTail(pred, node)) {
            //把舊的tail尾節(jié)點(diǎn)的下一個(gè)node節(jié)點(diǎn)指向當(dāng)前線程的node節(jié)點(diǎn)
            pred.next = node;
            return node;
        }
    }
    //尾節(jié)點(diǎn)為空執(zhí)行
    enq(node);
    return node;
}
private Node enq(final Node node) {
    //死循環(huán)當(dāng)tail 尾節(jié)點(diǎn)不為空才會跳出
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            //用CAS構(gòu)建出一個(gè)head空的node節(jié)點(diǎn)
            if (compareAndSetHead(new Node()))
            //將當(dāng)前空的node節(jié)點(diǎn)交給尾節(jié)點(diǎn)(下一次循環(huán)就會走else分支)
                tail = head;
        } else {
            //把我們addWaiter中創(chuàng)建的node節(jié)點(diǎn)的prev指向了當(dāng)前線程node節(jié)點(diǎn)
            node.prev = t;
            //將tail尾節(jié)點(diǎn)更改為當(dāng)前線程的node節(jié)點(diǎn)
            if (compareAndSetTail(t, node)) {
                //將t的下一個(gè)節(jié)點(diǎn)指向當(dāng)前線程創(chuàng)建的node節(jié)點(diǎn)
                t.next = node;
                return t;
            }
        }
    }
}

執(zhí)行完這里初步的一個(gè)等待隊(duì)列就構(gòu)建好了

解析完addWaiter我們再來解析acquireQueued,addWaiter執(zhí)行完后的結(jié)果會返回一個(gè)雙向列表的node節(jié)點(diǎn)

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        //中斷標(biāo)志位
        boolean interrupted = false;
        for (;;) {
            //獲取當(dāng)前線程node節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)
            final Node p = node.predecessor();
            //如果上一個(gè)節(jié)點(diǎn)就是head節(jié)點(diǎn)說明當(dāng)前線程其實(shí)是處在隊(duì)列第一位然后就會再次嘗試加鎖
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //這里是重點(diǎn) 這個(gè)方法來判斷當(dāng)前線程是否應(yīng)該進(jìn)入等待狀態(tài)
            if (shouldParkAfterFailedAcquire(p, node) &&
                //調(diào)用LockSupport.park(this)阻塞了當(dāng)前線程等待被喚醒
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //獲取上一個(gè)節(jié)點(diǎn)的等待狀態(tài)
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)//表示當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)包含的線程需要運(yùn)行
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {//當(dāng)前線程被取消
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//設(shè)置等待狀態(tài)為-1
    }
    return false;
}

運(yùn)行到parkAndCheckInterrupt()B線程就會被阻塞了,后續(xù)的邏輯我們在解鎖操作unlock之后再繼續(xù)說

釋放鎖

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
   //嘗試釋放鎖
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            //head節(jié)點(diǎn)不為空并且等待狀態(tài)不是0就去進(jìn)行unpark操作
            unparkSuccessor(h);
        return true;
    }
    return false;
}
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    //head節(jié)點(diǎn)等待狀態(tài)
    int ws = node.waitStatus;
    //
    if (ws < 0)
       //將頭節(jié)點(diǎn)的等待狀態(tài)修改為0
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    //獲取頭節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)(也就是B節(jié)點(diǎn))
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {//節(jié)點(diǎn)為空或者線程處于取消狀態(tài)
        s = null;
        //從尾節(jié)點(diǎn)往上找符合條件的節(jié)點(diǎn)
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        //對該線程進(jìn)行unpark喚醒(B節(jié)點(diǎn))
        LockSupport.unpark(s.thread);
}

喚醒之后我們的B線程就能繼續(xù)往下走了,我們繼續(xù)看剛剛的acquireQueued()方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            //這里嘗試獲取鎖由于A線程釋放了鎖這里是肯定獲取成功的
            if (p == head && tryAcquire(arg)) {
                //把head設(shè)置為當(dāng)前節(jié)點(diǎn)(也就是往前移一位,并且把上一個(gè)節(jié)點(diǎn)指向指為null)
                setHead(node);
                //把剛剛的上一個(gè)節(jié)點(diǎn)也指向?yàn)閚ull (這里他就沒引用了會被GC回收掉)
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                //剛剛在這里阻塞現(xiàn)在被喚醒
                parkAndCheckInterrupt())
                //設(shè)置標(biāo)志中斷位為true 然后開始下一次循環(huán)
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

總結(jié)

  • 鎖的實(shí)現(xiàn)方式:ReentrantLock內(nèi)部通過維護(hù)一個(gè)state變量來表示鎖的狀態(tài),其中高16位表示持有鎖的線程ID,低16位表示重入次數(shù)。使用CAS操作來獲取鎖,如果當(dāng)前鎖未被持有,則將state的高16位設(shè)置為當(dāng)前線程ID,并將低16位設(shè)置為1,表示重入次數(shù)為1;如果當(dāng)前鎖已經(jīng)被持有,則判斷持有鎖的線程是否為當(dāng)前線程,如果是,則將state的低16位加1表示重入,如果不是,則進(jìn)入等待隊(duì)列。
  • 等待隊(duì)列:ReentrantLock中的等待隊(duì)列采用了CLH隊(duì)列的實(shí)現(xiàn)方式,每個(gè)等待線程會被封裝成一個(gè)Node節(jié)點(diǎn),節(jié)點(diǎn)中維護(hù)了前繼節(jié)點(diǎn)、后繼節(jié)點(diǎn)和等待狀態(tài)等信息。當(dāng)一個(gè)線程需要等待鎖時(shí),會將自己封裝成一個(gè)Node節(jié)點(diǎn)插入到等待隊(duì)列的尾部,并在自旋等待時(shí)自動阻塞。
  • 公平鎖與非公平鎖:ReentrantLock可以通過構(gòu)造函數(shù)中的fair參數(shù)來指定鎖的公平性,當(dāng)fair為true時(shí)表示該鎖是公平鎖,即等待隊(duì)列中的線程會按照先進(jìn)先出的順序獲取鎖;當(dāng)fair為false時(shí)表示該鎖是非公平鎖,即等待隊(duì)列中的線程會按照隨機(jī)順序獲取鎖。
  • 鎖的釋放:ReentrantLock中的鎖釋放采用了state變量的遞減來實(shí)現(xiàn),當(dāng)一個(gè)線程釋放鎖時(shí),會將state的低16位減1,如果減1后低16位變?yōu)?,則表示當(dāng)前線程已經(jīng)完全釋放了鎖,此時(shí)會將高16位清零,表示鎖變?yōu)榱丝色@取狀態(tài),等待隊(duì)列中的線程可以繼續(xù)競爭鎖。

以上就是ReentrantLock從源碼解析Java多線程同步學(xué)習(xí)的詳細(xì)內(nèi)容,更多關(guān)于Java多線程ReentrantLock的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評論