亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

go中channel通信的底層實現(xiàn)

 更新時間:2025年10月01日 10:41:25   作者:QZQ54188  
Go語言channel通過同步/異步機制實現(xiàn)并發(fā)通信,避免鎖競爭,底層使用鎖、等待隊列、CAS和環(huán)形緩沖區(qū),確保高效、安全,支持select的非阻塞和喚醒處理,提升大規(guī)模并發(fā)下的穩(wěn)定性

channel通信底層實現(xiàn)

學(xué)習(xí)go語言的goroutine時,了解到goroutine的設(shè)計思想是線程之間通信不依賴共享內(nèi)存,避免使用鎖導(dǎo)致的死鎖問題,starvation問題。鎖在無競爭下確實輕量,但隨著 goroutine/線程數(shù)增加,很容易出現(xiàn)競爭。channel 提供了一種更直觀、更安全的通信方式,避免程序員手動管理鎖。使用channel可以減少人為鎖競爭和并發(fā)問題。

在學(xué)習(xí)過程中想深究一下channel的底層原理,所以寫下這篇文章記錄學(xué)習(xí)過程。

channel是go中主要的通信和同步核心原語,所以必須要做到快和可拓展,否則整個并發(fā)模型都會受影響??赏卣故钦f當(dāng) goroutine 數(shù)量增加時,channel 的性能不會迅速劣化,可以在大規(guī)模并發(fā)下保持穩(wěn)定。所以go在設(shè)計channel時主要側(cè)重于:

  1. 讓單線程(無競爭)的 channel 操作更快,比如只有一個 goroutine 在寫,一個在讀,沒有其他競爭,最好可以降低到函數(shù)調(diào)用的開銷。
  2. 讓有競爭的緩沖 channel(生產(chǎn)者/消費者模型)更快,比如多個 goroutine 同時寫 / 讀一個帶緩沖的 channel。
  3. 讓非阻塞失敗操作更快,當(dāng) goroutine 想做一個嘗試讀/寫而發(fā)現(xiàn)條件不滿足時,應(yīng)該能 立刻返回,只做最少的檢查,而不是浪費 CPU 在鎖競爭或調(diào)度上。

goruntime內(nèi)部把channel分為了三類

同步通道(Sync channels)

同步通道就是我們平常寫的無緩沖channel,即make(chan int)。這類通道沒有緩沖區(qū),發(fā)送必須等接收方準(zhǔn)備好,接收也必須等發(fā)送方準(zhǔn)備好。發(fā)送方把數(shù)據(jù)直接交給接收方,不需要額外存儲。這就相當(dāng)于兩個goroutine當(dāng)場握手,適用在需要嚴(yán)格同步的場景。

異步通道(Async channels)

異步通道就是帶緩沖區(qū)的channel:make(chan int, N)。內(nèi)部實現(xiàn)是一個環(huán)形隊列。在異步通道中發(fā)送和接收可以解耦,發(fā)送方可以先把數(shù)據(jù)放進緩沖區(qū),不用等接收方。接收方從緩沖區(qū)取數(shù)據(jù),如果沒有就阻塞。但是如果緩沖區(qū)滿了發(fā)送方同樣會阻塞,緩沖區(qū)空了接收方也會阻塞。在內(nèi)部實現(xiàn)中即使接收方已經(jīng)在等待,數(shù)據(jù)也不一定直接傳給它,而是先進入緩沖區(qū)。等接收方競爭獲取數(shù)據(jù)時,如果沒搶到,就還得繼續(xù)阻塞。

零大小元素的異步通道(Async channels with zero-sized elements)

這個是channel的特殊情況:chan struct{},這里的元素大小是0 字節(jié),所以本質(zhì)上就是一個計數(shù)信號量。不需要額外緩沖存儲數(shù)據(jù),常常用作并發(fā)控制、限流、任務(wù)完成通知。

sem := make(chan struct{}, 3) // 信號量,最多允許3個并發(fā)
// goroutine 獲取令牌
sem <- struct{}{}
// goroutine 釋放令牌
<-sem

同步通道內(nèi)部

同步channel的結(jié)構(gòu)體包含以下數(shù)據(jù):

struct Hchan {
    Lock;
    bool closed;
    SudoG* sendq;  // 等待發(fā)送的 goroutine 隊列
    SudoG* recvq;  // 等待接收的 goroutine 隊列
};

其中Lock保證并發(fā)安全,closed標(biāo)記通道是否已經(jīng)關(guān)閉,sendq / recvq是鏈表隊列,保存阻塞的發(fā)送方/接收方。

使用同步通道的發(fā)送操作時,會先給互斥量加鎖,保證后續(xù)只有自己處于臨界區(qū)。如果當(dāng)前沒有接收方,就要把自己掛起,進入等待隊列。如果有接收方在等待,那么就直接配對,把數(shù)據(jù)交給接收方,這樣發(fā)送方不需要阻塞。

以下是syncchansend代碼解讀:

bool syncchansend(Hchan *c, T val, bool block) {
    if(c->closed)  // fast-path
    	panic(“closed”);
    if(!block && c->recvq == nil)  // fast-path
    	return false;
    lock(c);
    if(c->closed) {
        unlock(c);
        panic(“closed”);
    }
    if(sg = removewaiter(&c->recvq)) {
        // Have a blocked receiver, communicate with it.
        unlock(c);
        sg->val = val;
        sg->completed = true;
        unblock(sg->g);
        return true;
    }
    if(!block) {
        unlock(c);
        return false;
    }
    // Block and wait for a pair.
    sg->g = g;
    sg->val = val;
    addwaiter(&c->sendq, sg);
    unlock(c);
    block();
    if(!sg->completed)
    	panic(“closed”);  // unblocked by close
    // Unblocked by a recv.
    return true;
}

函數(shù)參數(shù)為c:目標(biāo)通道,val:要發(fā)送的數(shù)據(jù),block:是否允許阻塞(即普通 send vs 非阻塞 send)。第一步是檢查是否可以fast failure,為什么要有``fast failure`呢?因為我們在編寫如下代碼時:

select {
case x := <-ch:    // 如果能取就取
    fmt.Println(x)
default:           // 如果取不到就直接失敗
    fmt.Println("no data")
}

這里的<-ch不會像普通的同步channel一樣一直阻塞在這里,而是如果通道里面有數(shù)據(jù),就立即返回,如果通道里面沒數(shù)據(jù),不會阻塞,直接走到default分支。如果沒有fast failure的話,此時goruntime還會傻傻地去加鎖 -> 檢查隊列 -> 發(fā)現(xiàn)沒數(shù)據(jù) -> 解鎖 -> 返回失敗,其實完全沒有必要,因為我們只需要檢查隊列是否為空。上面源碼的邏輯就是:1. 如果通道已經(jīng)關(guān)閉,立即報錯,不需要加鎖。2. 非阻塞發(fā)送而且當(dāng)前沒有接收方在等著,就直接返回失敗,不阻塞,也不用加鎖。

lock(c);
if (c->closed) {
    unlock(c);
    panic("closed");
}

之后的邏輯就是可以發(fā)送的情況,所以我們需要加鎖保護臨界區(qū),加鎖之后還需要檢查通道是否關(guān)閉,避免之前在執(zhí)行完判斷之后,獲取鎖之前的某一個時間點通道被關(guān)閉,不檢查的話就可能會對關(guān)閉的channel進行操作,向關(guān)閉的channel執(zhí)行寫操作的話就會直接panic。

if(sg = removewaiter(&c->recvq)) {
    // Have a blocked receiver, communicate with it.
    unlock(c);
    sg->val = val;
    sg->completed = true;
    unblock(sg->g);
    return true;
}

這段邏輯就是找到一個已經(jīng)被阻塞的接收者,如果有 goroutine 已經(jīng)在 recvq 等待,那么直接配對。配對完之后就直接將要發(fā)送的數(shù)據(jù)交給接收方,發(fā)送方不阻塞,接收方也被喚醒,通信完成。

但是如果沒有找到已經(jīng)被阻塞的接收者,就會執(zhí)行下面的邏輯:

if (!block) {
    unlock(c);
    return false;
}

是非阻塞 send的話,并且沒人接收,就立即返回失敗。

// Block and wait for a pair.
sg->g = g;
sg->val = val;
addwaiter(&c->sendq, sg);
unlock(c);
block();

這里就是沒有接收方但是允許阻塞的情況,這里我們解釋一下sg是什么。

這里的 sg 指的是sudog類型的指針。

struct sudog {
    g* g;          // 等待的 goroutine
    element val;   // 要發(fā)送或接收的值
    sudog* next;   // 鏈表指針,用來掛到 sendq/recvq
    sudog* prev;
    ...
    bool completed;
};
  • val → 發(fā)送方要傳的數(shù)據(jù),或者接收方接收的數(shù)據(jù)存放位置。
  • next/prev → 把 sudog 串起來,形成 channel 的 sendqrecvq 隊列。
  • completed → 表示這個 goroutine 的操作是否已經(jīng)完成。
  • g* g → 指向一個正在等待的 goroutine。

在上述channel代碼中sg->g = g;表示把當(dāng)前 goroutine 記錄到 sudog 里,sg->val = val;把要發(fā)送的值放到 sudog 里,addwaiter(&c->sendq, sg);把 sudog 加入 channel 的發(fā)送等待隊列。block();把發(fā)送方掛起,等到有接收方時就可以被喚醒。這里的 sg 是一個 sudog 實例,它代表 “當(dāng)前這個 goroutine 正在等待往 channel 里發(fā)送 val”。

當(dāng)有接收方出現(xiàn)時,runtime 會從 sendq 拿一個等待的接收者 sudog,然后把這個 val 交給它,并喚醒 sg->g(這個 goroutine)。

if(!sg->completed)
	panic(“closed”);  // unblocked by close
// Unblocked by a recv.
return true;

被喚醒時要區(qū)分,如果是recv 喚醒→ 通信成功。如果是close 喚醒→ 拋 panic。

上面就是同步channel發(fā)送數(shù)據(jù)的細(xì)節(jié),接收操作也是類似,都是先嘗試快速路徑,沒法完成就進入慢路徑阻塞。核心點都是sudog隊列,goroutine 阻塞喚醒和數(shù)據(jù)hand-off,區(qū)別在于send 隊列掛 send,recv 隊列掛 recv。這里給出gpt生成的類似代碼作為參考:

// 阻塞/非阻塞接收
bool synchchanrecv(Hchan *c, T* out, bool block) {
    // fast-path: channel 已關(guān)閉且隊列為空
    if (c->closed && c->sendq == nullptr) {
        *out = zero_value();  // 返回零值
        return false;         // ok = false
    }
    // fast-path: 非阻塞且沒有發(fā)送方
    if (!block && c->sendq == nullptr) {
        return false;
    }
    lock(c);  // 進入慢路徑,保護 sendq/recvq
    if (c->closed && c->sendq == nullptr) {
        unlock(c);
        *out = zero_value();
        return false;
    }
    // 檢查是否有等待的發(fā)送方
    sudog* sg;
    if ((sg = removewaiter(&c->sendq)) != nullptr) {
        unlock(c);
        // 直接從發(fā)送方取數(shù)據(jù)
        *out = sg->val;
        sg->completed = true;
        unblock(sg->g);  // 喚醒發(fā)送方
        return true;
    }
    // 沒有發(fā)送方
    if (!block) {
        unlock(c);
        return false;  // 非阻塞失敗
    }
    // 阻塞自己,加入 recvq
    sg = new_sudog();
    sg->g = g;        // 當(dāng)前 goroutine
    sg->val_ptr = out; // 接收的數(shù)據(jù)存儲地址
    addwaiter(&c->recvq, sg);
    unlock(c);
    block();  // 掛起當(dāng)前 goroutine,等待發(fā)送方喚醒

    // 被喚醒后判斷原因
    if (!sg->completed) {
        // 被 channel close 喚醒
        *out = zero_value();
        return false;
    }
    // 被發(fā)送方喚醒,接收成功
    return true;
}

異步通道內(nèi)部

異步通道就是帶緩沖區(qū)的channel,在進行發(fā)送或者接收操作時,如果只是簡單地在環(huán)形緩沖區(qū)中放入或者取出數(shù)據(jù),那么goruntime就不需要操作等待隊列,對于環(huán)形緩沖區(qū)goruntime采用原子指令讀寫數(shù)據(jù),此時是無鎖的,效率很高。但是一旦緩沖區(qū)滿了時發(fā)送方需要阻塞,或者緩沖區(qū)空了時接收方需要阻塞,這時需要操作等待隊列,就必須加鎖,因為等待隊列需要受互斥鎖保護,避免并發(fā)修改出錯。

這里同樣提供了快速失敗路徑,對于非阻塞失敗操作(例如嘗試 send 到一個已滿的通道,或者嘗試 recv 從一個空通道),直接檢測失敗并返回,不進入復(fù)雜的加鎖邏輯。

下面是異步通道中的核心數(shù)據(jù)結(jié)構(gòu):

struct Hchan {
    uint32 cap;   // channel capacity
    Elem*  buf;   // ring buffer of size cap

    // send and receive positions,
    // low 32 bits represent position in the buffer,
    // high 32 bits represent the current “l(fā)ap” over the ring buffer
    uint64 sendx;
    uint64 recvx;
};
  • cap表示通道的容量,也就是緩沖區(qū)最多可以存放多少元素。make(chan int, N)中的N就是cap。
  • buf指向一個環(huán)形緩沖區(qū),大小就是cap,用來存放用戶真正寫入的數(shù)據(jù)。
  • sendx/recvx分別表示發(fā)送位置和接收位置。兩個都是64位數(shù),使用時需要分為高32位和低32位看。低32位表示在緩沖區(qū)中的下標(biāo)位置,高32位表示當(dāng)前是第幾圈(lap,繞緩沖區(qū)的次數(shù))。這樣設(shè)計可以區(qū)分同一個下標(biāo)位置上的數(shù)據(jù),屬于第幾次寫/讀。避免了“ABA問題”(同一個位置被覆蓋后,無法判斷是不是舊數(shù)據(jù))。
struct Elem {
    // current lap,
    // the element is ready for writing on laps 0, 2, 4, ...
    // for reading -- on laps 1, 3, 5, ...
    uint32 lap;
    T      val;  // user data
};
  • lap是每個緩沖區(qū)元素內(nèi)額外保存的一個圈數(shù)標(biāo)記。如果是偶數(shù)圈的話說明位置是空的,可以寫;奇數(shù)圈說明這個位置有數(shù)據(jù)可以讀。這就保證了:生產(chǎn)者(send)和消費者(recv)在競爭時,能明確知道這個槽位當(dāng)前是可寫還是 可讀,不會混淆。
  • val就是用戶寫入的數(shù)據(jù)。

如果是非阻塞的異步通道的話,多個發(fā)送者 goroutine可能同時往 channel 里寫數(shù)據(jù)。它們需要爭搶一個“寫入位置”,這是通過對sendx做CAS操作來完成的,誰 CAS 成功,誰就獲得了在 sendx 指向的位置寫入數(shù)據(jù)的權(quán)利。失敗的發(fā)送者會重試,直到搶到一個空槽位。

發(fā)送方寫入數(shù)據(jù)以后,要讓接收方知道這個槽位已經(jīng)寫滿。這不是通過鎖,而是通過每個元素的 lap 變量來完成:如果 lap 是偶數(shù) → 表示該槽位目前是 空的,只能寫。如果 lap 是奇數(shù) → 表示該槽位目前是 滿的,可以讀。發(fā)送者把數(shù)據(jù)寫進去時,把 lap 改成奇數(shù),通知接收者“數(shù)據(jù)可讀”。接收者讀完數(shù)據(jù)后,把 lap 改成偶數(shù),通知發(fā)送者“槽位可寫”。

接下來我們看看異步通道非阻塞發(fā)送的核心代碼:

bool asyncchansend_nonblock(Hchan* c, T val) {
    uint32 pos, lap, elap;
    uint64 x, newx;
    Elem *e;

    for(;;) {
        x = atomicload64(&c->sendx);
        pos = (uint32)x;
        lap = (uint32)(x >> 32);
        e = &c->buf[pos];
        elap = atomicload32(&e->lap);
        if(lap == elap) {
            // The element is ready for writing on this lap.
            // Try to claim the right to write to this element.
            if(pos + 1 < c->cap)
            	newx = x + 1;  // just increase the pos
            else
            	newx = (uint64)(lap + 2) << 32;
            if(!cas64(&c->sendx, x, newx))
            	continue;  // lose the race, retry
            // We own the element, do non-atomic write.
            e->val = val;
            // Make the element available for reading.
            atomicstore32(&e->lap, elap + 1);
            return true;
        } else if((int32)(lap - elap) > 0) {
            // The element is not yet read on the previous lap,
            // the chan is full.
            return false;
        } else {
            // The element has already been written on this lap,
            // this means that c->sendx has been changed as well,
            // retry.
        }
    }
}
x = atomicload64(&c->sendx);
pos = (uint32)x;
lap = (uint32)(x >> 32);
e = &c->buf[pos];
elap = atomicload32(&e->lap);

首先原子讀取當(dāng)前發(fā)送位置,計算出寫位置在環(huán)形緩沖區(qū)中的下標(biāo)和當(dāng)前圈數(shù),原子讀取 e->lap,因為其他線程(發(fā)送者/接收者)會修改它。

if(lap == elap) {
    // The element is ready for writing on this lap.
    // Try to claim the right to write to this element.
    if(pos + 1 < c->cap)
    	newx = x + 1;  // just increase the pos
    else
    	newx = (uint64)(lap + 2) << 32;
    if(!cas64(&c->sendx, x, newx))
    	continue;  // lose the race, retry
    // We own the element, do non-atomic write.
    e->val = val;
    // Make the element available for reading.
    atomicstore32(&e->lap, elap + 1);
    return true;
}

如果當(dāng)前sendx的lap和緩沖區(qū)元素的lap一致的話,說明這個槽位正處于“可寫狀態(tài)”。因為一開始是0,如果當(dāng)前sendx的lap也是0的話表示當(dāng)前槽位為空,可以寫。所以發(fā)送者就會嘗試用CAS搶這個位置寫入數(shù)據(jù)。

if (pos + 1 < c->cap)說明當(dāng)前不是緩沖區(qū)的最后一個槽位,下一個位置仍然在環(huán)形緩沖區(qū)內(nèi)部,所以只需要把 pos 增加 1。else分支就是說明已經(jīng)寫到環(huán)形緩沖區(qū)的尾部,再往前就需要回到開頭。這時候不能僅僅把 pos 設(shè)為 0,還要把 lap 增加 2,來交替讀寫狀態(tài),進入新的寫循環(huán)。

if(!cas64(&c->sendx, x, newx))
	continue;  // lose the race, retry
// We own the element, do non-atomic write.
e->val = val;
// Make the element available for reading.
atomicstore32(&e->lap, elap + 1);
return true;

這段邏輯就是無鎖操作的關(guān)鍵部分,CAS(Compare-And-Swap)嘗試把 sendxx 更新到 newx。如果成功 → 當(dāng)前線程贏得了這個插槽的所有權(quán)。如果失敗 → 說明別的線程同時在競爭同一個位置,這個線程就要 continue 重新嘗試。如果走到了后面的e->val = val;說明該線程已經(jīng)確認(rèn)自己“獨占”了這個隊列位置,可以安全地寫入數(shù)據(jù)。注意這里用的是普通賦值(非原子),因為寫入數(shù)據(jù)的唯一寫入者已經(jīng)通過 CAS 搶占成功,不存在寫沖突。之后atomicstore32(&e->lap, elap + 1);是發(fā)布這個元素,讓消費者能看到它已經(jīng)可用了。

else if((int32)(lap - elap) > 0) {
    // The element is not yet read on the previous lap,
    // the chan is full.
    return false;
}

lap - elap > 0 意味著:這個槽位中的舊數(shù)據(jù)還沒有被讀走,這個槽位在上一輪循環(huán)里寫過數(shù)據(jù),但還沒被讀走 → 隊列滿了,不能再寫。由于是非阻塞的,所以直接返回false。

else {
    // The element has already been written on this lap,
    // this means that c->sendx has been changed as well,
    // retry.
}

如果走到當(dāng)前這個else分支,說明當(dāng)前l(fā)ap < elap,也就是說這個elap被其他的寫者更新了,導(dǎo)致第一次看到的lap和緩沖區(qū)元素的elap不一樣,這就體現(xiàn)了多個發(fā)送者競爭環(huán)形緩沖區(qū)時失敗的情況,此時我們不能再往這個槽寫,不然會覆蓋別人的數(shù)據(jù),所以只可以重新讀取最新的sendx并再試一次。

上述就是異步通道非阻塞讀的流程,接收(Recv)操作與發(fā)送(Send)操作完全對稱,唯一的區(qū)別是接收操作從 lap = 1 開始,并且是讀取元素而不是寫入。

接下來是異步阻塞通道的發(fā)送和接收操作,通道結(jié)構(gòu)會被擴展,增加一個互斥鎖(mutex)以及發(fā)送/接收的等待隊列(send/recv waiter queues)。加等待隊列是為了實現(xiàn)阻塞語義,緩沖區(qū)滿時,send 必須阻塞,直到有接收者取走數(shù)據(jù);緩沖區(qū)空時,recv 必須阻塞,直到有發(fā)送者寫入數(shù)據(jù)。

struct Hchan {
    …
    Lock;
    SudoG* sendq;
    SudoG* recvq;
};

Lock是互斥鎖,用來保護channel內(nèi)部的共享狀態(tài),比如說等待隊列的入隊或者出隊。sendq/recvq就是發(fā)送方和接收方的等待隊列,里面存放等待發(fā)送或者接收數(shù)據(jù)的goroutine。

異步隊列在阻塞發(fā)送時,goroutine會先嘗試進行一次非阻塞發(fā)送。如果成功,他會檢查是否有接收方在等待,如果有,就喚醒其中一個。如果非阻塞發(fā)送失?。ㄍǖ酪褲M),它就會加鎖,將自己加入發(fā)送等待隊列,然后再次檢查緩沖區(qū)是否仍然是滿的。如果緩沖區(qū)仍然滿,goroutine 就會阻塞;如果緩沖區(qū)已經(jīng)不滿,goroutine 會把自己從等待隊列移除,解鎖,然后重試。再次檢測是為了防止就在它準(zhǔn)備阻塞的那一瞬間,另一個 goroutine B 可能正好執(zhí)行了一個 recv,使得緩沖區(qū)變得“不滿”。

阻塞接收的過程完全相同,只不過把“send”換成“recv”,把“recv”換成“send”。

阻塞算法的主要難點是要確保不會發(fā)生死鎖(比如發(fā)送方在一個非滿的通道上無限期阻塞,或者接收方在一個非空的通道上無限期阻塞)。通過這種“檢查 → 入隊 → 再檢查”的方式,我們能夠保證以下情況之一一定會發(fā)生:

  1. 發(fā)送方看到有接收方在等待并喚醒它;
  2. 接收方看到緩沖區(qū)里有元素并消費它;
  3. 1 和 2 同時發(fā)生(這種情況下通過互斥鎖來解決競爭);

但絕不會出現(xiàn)這種情況:發(fā)送方?jīng)]看到接收方在等待,接收方也沒看到緩沖區(qū)有元素,然后兩邊都無限期阻塞。

接下來看核心代碼實現(xiàn):

void asyncchansend(Hchan* c, T val) {
    for(;;) {
        if(asyncchansend_nonblock(c, val)) {
            // Send succeeded, see if we need to unblock a receiver.
            if(c->recvq != nil) {
                lock(c);
                sg = removewaiter(&c->recvq);
                unlock(c);
                if(sg != nil)
                	unblock(sg->g);
            }
            return;
        } else {
            // The channel is full.
            lock(c);
            sg->g = g;
            addwaiter(&c->sendq, sg);
            if(notfull(c)) {
                removewaiter(&c->sendq, sg);
                unlock(c);
                continue;
            }
            unlock(c);
            block();
            // Retry send.
        }
    }
}

首先調(diào)用 asyncchansend_nonblock 嘗試直接寫入數(shù)據(jù),如果成功,說明緩沖區(qū)有空間或者直接匹配到了等待的接收者。如果通道里有接收者在等待 (c->recvq != nil),那么我們應(yīng)該喚醒接收者避免長時間阻塞,所以我們加鎖,安全地從recvq隊列里面取出一個等待接收的 goroutine(removewaiter),再解鎖,如果真的取到一個等待者,就喚醒它。如果沒有在等待的接收者但是非阻塞還是寫入成功了,說明寫入緩沖區(qū)中,這時直接返回就好。

// The channel is full.
lock(c);
sg->g = g;
addwaiter(&c->sendq, sg);
if(notfull(c)) {
    removewaiter(&c->sendq, sg);
    unlock(c);
    continue;
}
unlock(c);
block();
// Retry send.

else分支下是非阻塞發(fā)送失敗的情況,根據(jù)上面非阻塞發(fā)送的代碼可知,緩沖區(qū)已經(jīng)滿了,當(dāng)前goroutine不能繼續(xù)發(fā)送,就需要將自己添加到sendq發(fā)送等待隊列。之后notfull(c)再做了一次檢查,因為在剛剛排隊的過程中,可能已經(jīng)有接收者拿走了數(shù)據(jù),緩沖區(qū)不再滿。如果發(fā)現(xiàn)通道現(xiàn)在不滿了,就把自己從sendq發(fā)送等待隊列中移除,回到循環(huán)開頭 (continue),再試一次發(fā)送。但是如果如果通道依然滿,那就調(diào)用block掛起當(dāng)前 goroutine,等待別人來喚醒。被喚醒之后,繼續(xù)循環(huán),重新嘗試發(fā)送。

零大小元素的異步通道

最后我們介紹零大小元素的異步通道內(nèi)部實現(xiàn),零大小的異步 channel 在整體上和非零大小的異步 channel 類似:

  • 在非阻塞情況下,操作是無鎖的。
  • 等待隊列仍然由互斥鎖保護。
  • 非阻塞失敗的操作會快速返回。

不同之處有:

  • Hchan 里只有一個計數(shù)器,而不是發(fā)送/接收位置和環(huán)形緩沖區(qū);這個計數(shù)器表示 channel 中的元素數(shù)量。
  • 非阻塞的發(fā)送/接收通過 CAS 循環(huán)來更新計數(shù)器。
  • 是否“滿”或“空”的判斷僅僅是檢查計數(shù)器的值。

close操作

對于channel的close操作,我們調(diào)用close時,要先加鎖,然后設(shè)置 closed 標(biāo)志,接著喚醒所有的等待者。異步的 send/recv 操作在進入阻塞之前都會檢查 closed 標(biāo)志。這樣就能保證與異步 send/recv 阻塞時相同的語義。要么1.close看到了有等待著,2.等待者看到了closed標(biāo)志被設(shè)置,3. 1和2同時發(fā)生時通過互斥鎖解決。

select操作

select 操作不會一次性鎖住所有相關(guān)的 channel,而是對每個 channel 分別進行細(xì)粒度的操作。select操作包含以下四個階段:

  1. 打亂所有相關(guān)的 channel 順序,以提供偽隨機的保證(后續(xù)所有階段都基于這個打亂后的列表)。
  2. 逐個檢查所有 channel,看看是否有可以立即通信的(比如緩沖不空可以讀,緩沖不滿可以寫,或者有 goroutine 在對面排隊)。如果有,就執(zhí)行通信并退出。這樣可以讓不會阻塞的 select 更快、更具擴展性,因為它不需要排序和一次性鎖住所有互斥量。而且如果第一個 channel 已經(jīng)就緒了,select 甚至不需要檢查后續(xù)的 channel。
  3. 為在所有 channel 上阻塞做準(zhǔn)備,要先把自己掛到所有相關(guān) channel 的等待隊列里(sendq/recvq),這樣任何一個 channel 一旦 ready,就能喚醒這個 select。
  4. 阻塞。當(dāng)某個 channel ready 時,會把你喚醒。被喚醒之后,并不是直接執(zhí)行,而是回到階段 1 再檢查一次,確認(rèn)到底是哪個 channel ready 了。

對于階段3:為在所有 channel 上阻塞做準(zhǔn)備。它的過程與異步 send/recv 的阻塞方式幾乎相同,首先鎖住 channel 的互斥量,把自己加入 send/recv 的等待隊列,之后再次檢查該 channel 是否已經(jīng)可以通信,如果已經(jīng)可以通信,那么就把自己從所有等待隊列中移除,并回到階段 2;如果仍然不行,就繼續(xù)處理下一個 channel。如果此時 channel 已經(jīng)可以通信,就要把自己從所有等待隊列中移除,并回到階段 2。

但是有一個問題,select 可能同時在多個 channel 的隊列里,必須確保只被喚醒一次。即select監(jiān)聽多個管道,有多個管道幾乎同時來數(shù)據(jù),必須確保只被喚醒一次。我們采取如下解決方式:每個 select 操作都有一個全局狀態(tài)字 sg。當(dāng)別的 goroutine 嘗試喚醒它時,會做:CAS(statep, nil, sg),如果成功:說明這個 goroutine 贏得了“喚醒權(quán)利”,它負(fù)責(zé)完成通信并喚醒 select。如果失?。赫f明另一個 goroutine 已經(jīng)搶先喚醒了,那么當(dāng)前 goroutine 就忽略這個 waiter。這樣可以避免同步 channel 多方同時匹配到一個 select,保證只發(fā)生一次通信。

下面來看select操作的核心代碼:

Scase *select(Select *sel) {
    // Phase 1.
    randomize channel order;
    for(;;) {
        // Phase 2.
        foreach(Scase *cas in sel) {
            if(chansend/recv_nonblock(cas->c, ...))
            	return cas;
        }
        // Phase 3.
        selectstate = nil;
        foreach(Scase *cas in sel) {
            lock(cas->c);
            cas->sg->g = g;
            cas->sg->selectstatep = &selectstate;
            addwaiter(&cas->c->sendq/recvq, cas->sg);
            if(isready(cas->c)) {
                unlock(c);
                goto ready;
            }
            unlock(cas->c);
        }
        // Phase 4.
        block();
        ready:
        CAS(&selectstate, nil, 1);
        foreach(Scase *cas in sel) {
            lock(cas->c);
            removewaiter(&cas->c->sendq/recvq, cas->sg);
            unlock(cas->c);
        }
        // If we were unblocked by a sync chan operation,
        // the communication has completed.
        if(selectstate > 1)
        return selectstate;  // denotes the completed case
    }
}

在進入select時就要將所有的channel排序,避免其受程序編寫時順序的干擾,更加公平,保證當(dāng)多個 case 同時就緒時不會偏向前面的 case,之后的所有循環(huán)都使用這個打亂后的順序。之后就是階段2的代碼,對每個 case 做非阻塞嘗試(對無緩沖 channel 意味著有對端在等;對有緩沖的 channel 意味著緩沖非空或非滿;對關(guān)閉 channel 特殊處理)。若任一 chansend/recv_nonblock 成功,則馬上完成該通信并返回選中的 Scase。這避免了不必要的加鎖和入隊,是常見的低延遲路徑。

在接下來的階段三中,selectstate 是一個共享/可被外部 CAS 的狀態(tài)字,為nil時表示尚未有人取得“勝利權(quán)”。之后遍歷select中所有的channel,先lock(cas->c),這是在對某個 channel 的內(nèi)部隊列操作時要持有該 channel 的 mutex,保證對隊列的并發(fā)修改安全。

cas->sg->g = g;
cas->sg->selectstatep = &selectstate;

這里的cas 表示 select 中的某個 casecas->sg 是這個 case 對應(yīng)的 sudog(synchronous goroutine 的縮寫)結(jié)構(gòu)。這是 Go runtime 內(nèi)部用來把 goroutine 掛到 channel 等待隊列里的一個小對象。這第一句的意思是:把當(dāng)前 goroutine g 存進 sudog 里,當(dāng)別的 goroutine 往這個 channel 里發(fā)送/接收時,就可以通過 sudog->g 找到應(yīng)該被喚醒的 goroutine。后面這句就是保證多個 channel 只會有一個真的成功喚醒 select。因為一個 select 可能同時往多個 channel 的等待隊列里加了 sudog。假設(shè)有兩個 channel 幾乎同時就緒,兩個 goroutine 都想來喚醒你。這里用了一個全局狀態(tài)字 selectstate,所有 case 都共享它。在嘗試喚醒你時,喚醒方會執(zhí)行 CAS(selectstatep, nil, sg):如果成功,說明自己“搶到”了喚醒權(quán),可以繼續(xù)通信。如果失敗,說明別人已經(jīng)先喚醒了你,自己就放棄。這樣就避免了 “被兩個 channel 同時喚醒” 的問題。

addwaiter(&cas->c->sendq/recvq, cas->sg);
if(isready(cas->c)) {
    unlock(c);
    goto ready;
}
unlock(cas->c);

第一句把當(dāng)前 case 對應(yīng)的 sudog (cas->sg) 掛到 channel 的等待隊列里,這樣別的 goroutine 在操作這個 channel 時,就能找到你,完成通信并喚醒你。然后檢查通道是否立即可用,即使你剛剛把自己掛到了隊列里,也要馬上再檢查一次:因為在你掛進去的這個瞬間,可能已經(jīng)有別的 goroutine 進入 channel 操作,把它變成 ready 了。如果不檢查,就可能白白阻塞自己,而其實通信機會已經(jīng)存在。

如果 isready 發(fā)現(xiàn) channel 已經(jīng)可通信:立刻釋放鎖 unlock(c),跳轉(zhuǎn)到 ready 標(biāo)簽進入喚醒清理邏輯。但是如果 isready 沒通過,那說明這個 channel 真的還不可通信。那么就正常解鎖,繼續(xù)去處理下一個 case。

for循環(huán)之外的這個block就是掛起當(dāng)前goroutine,因為當(dāng)前goroutine調(diào)用的select是阻塞的,讓當(dāng)前 goroutine 停止運行并交給調(diào)度器調(diào)度,直到被其他 goroutine 喚醒。保證了 channel 的阻塞語義:當(dāng)條件不滿足時,goroutine 就會真正“停住”,不會白白占用 CPU。

ready:
CAS(&selectstate, nil, 1);
foreach(Scase *cas in sel) {
    lock(cas->c);
    removewaiter(&cas->c->sendq/recvq, cas->sg);
    unlock(cas->c);
}

這段代碼就是講select被喚醒之后的清理和狀態(tài)處理階段,首先嘗試通過 CAS 將 selectstatenil 設(shè)置為 1,如果沒有其他 goroutine 先喚醒并完成通信(即 selectstate == nil),select 自己就“認(rèn)領(lǐng)”通信權(quán)。如果 CAS 失敗(selectstate 已經(jīng)被寫成其他值,例如某個外部同步 channel 的 sg),說明通信已經(jīng)被其他 goroutine 完成,select 自己不再做通信。

之后的for循環(huán)就是清理等待隊列,把 select 自己在其它 channel 的等待節(jié)點從隊列中移除。因為select 可能同時掛在多個 channel 的隊列中,只要某個 channel 已經(jīng)喚醒了 select,其他 channel 隊列里的節(jié)點就不再需要。

到此這篇關(guān)于go中channel通信的底層實現(xiàn)的文章就介紹到這了,更多相關(guān)go channel通信內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Golang獲取目錄下的文件及目錄信息操作

    Golang獲取目錄下的文件及目錄信息操作

    這篇文章主要介紹了Golang獲取目錄下的文件及目錄信息操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • 詳解go-zero如何實現(xiàn)計數(shù)器限流

    詳解go-zero如何實現(xiàn)計數(shù)器限流

    這篇文章主要來和大家說說限流,主要包括計數(shù)器限流算法以及具體的代碼實現(xiàn),文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2023-08-08
  • GO語言常用的文件讀取方式

    GO語言常用的文件讀取方式

    這篇文章主要介紹了GO語言常用的文件讀取方式,涉及一次性讀取、分塊讀取與逐行讀取等方法,是非常實用的技巧,需要的朋友可以參考下
    2014-12-12
  • Go中JSON解析時tag的使用

    Go中JSON解析時tag的使用

    本文主要介紹了Go中JSON解析時tag的使用,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-01-01
  • Go語言 init函數(shù)的具體使用

    Go語言 init函數(shù)的具體使用

    init()函數(shù)是Go語言中一種特殊的函數(shù),用于在包被導(dǎo)入時執(zhí)行一次性的初始化操作,本文就來介紹一下Go語言 init函數(shù)的具體使用,感興趣的可以了解一下
    2024-09-09
  • golang context接口類型方法介紹

    golang context接口類型方法介紹

    這篇文章主要為大家介紹了golang context接口類型方法詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-09-09
  • 利用Go實現(xiàn)一個簡易DAG服務(wù)的示例代碼

    利用Go實現(xiàn)一個簡易DAG服務(wù)的示例代碼

    DAG的全稱是Directed Acyclic Graph,即有向無環(huán)圖,DAG廣泛應(yīng)用于表示具有方向性依賴關(guān)系的數(shù)據(jù),如任務(wù)調(diào)度、數(shù)據(jù)處理流程、項目管理以及許多其他領(lǐng)域,下面,我將用Go語言示范如何實現(xiàn)一個簡單的DAG服務(wù),需要的朋友可以參考下
    2024-03-03
  • Golang 語言極簡類型轉(zhuǎn)換庫cast的使用詳解

    Golang 語言極簡類型轉(zhuǎn)換庫cast的使用詳解

    本文我們通過 cast.ToString() 函數(shù)的使用,簡單介紹了cast 的使用方法,除此之外,它還支持很多其他類型,在這沒有多多介紹,對Golang 類型轉(zhuǎn)換庫 cast相關(guān)知識感興趣的朋友一起看看吧
    2021-11-11
  • Go語言實現(xiàn)socket實例

    Go語言實現(xiàn)socket實例

    這篇文章主要介紹了Go語言實現(xiàn)socket的方法,實例分析了socket客戶端與服務(wù)器端的實現(xiàn)技巧,具有一定參考借鑒價值,需要的朋友可以參考下
    2015-02-02
  • 淺析Go中序列化與反序列化的基本使用

    淺析Go中序列化與反序列化的基本使用

    序列化是指將對象轉(zhuǎn)換成字節(jié)流,從而存儲對象或?qū)ο髠鬏數(shù)絻?nèi)存、數(shù)據(jù)庫或文件的過程,反向過程稱為“反序列化”。本文主要介紹了Go中序列化與反序列化的基本使用,需要的可以參考一下
    2023-04-04

最新評論