go中channel通信的底層實現(xiàn)
channel通信底層實現(xiàn)
學(xué)習(xí)go語言的goroutine時,了解到goroutine的設(shè)計思想是線程之間通信不依賴共享內(nèi)存,避免使用鎖導(dǎo)致的死鎖問題,starvation問題。鎖在無競爭下確實輕量,但隨著 goroutine/線程數(shù)增加,很容易出現(xiàn)競爭。channel 提供了一種更直觀、更安全的通信方式,避免程序員手動管理鎖。使用channel可以減少人為鎖競爭和并發(fā)問題。
在學(xué)習(xí)過程中想深究一下channel的底層原理,所以寫下這篇文章記錄學(xué)習(xí)過程。
channel是go中主要的通信和同步核心原語,所以必須要做到快和可拓展,否則整個并發(fā)模型都會受影響??赏卣故钦f當(dāng) goroutine 數(shù)量增加時,channel 的性能不會迅速劣化,可以在大規(guī)模并發(fā)下保持穩(wěn)定。所以go在設(shè)計channel時主要側(cè)重于:
- 讓單線程(無競爭)的 channel 操作更快,比如只有一個 goroutine 在寫,一個在讀,沒有其他競爭,最好可以降低到函數(shù)調(diào)用的開銷。
- 讓有競爭的緩沖 channel(生產(chǎn)者/消費者模型)更快,比如多個 goroutine 同時寫 / 讀一個帶緩沖的 channel。
- 讓非阻塞失敗操作更快,當(dāng) goroutine 想做一個嘗試讀/寫而發(fā)現(xiàn)條件不滿足時,應(yīng)該能 立刻返回,只做最少的檢查,而不是浪費 CPU 在鎖競爭或調(diào)度上。
goruntime內(nèi)部把channel分為了三類
同步通道(Sync channels)
同步通道就是我們平常寫的無緩沖channel,即make(chan int)。這類通道沒有緩沖區(qū),發(fā)送必須等接收方準(zhǔn)備好,接收也必須等發(fā)送方準(zhǔn)備好。發(fā)送方把數(shù)據(jù)直接交給接收方,不需要額外存儲。這就相當(dāng)于兩個goroutine當(dāng)場握手,適用在需要嚴(yán)格同步的場景。
異步通道(Async channels)
異步通道就是帶緩沖區(qū)的channel:make(chan int, N)。內(nèi)部實現(xiàn)是一個環(huán)形隊列。在異步通道中發(fā)送和接收可以解耦,發(fā)送方可以先把數(shù)據(jù)放進緩沖區(qū),不用等接收方。接收方從緩沖區(qū)取數(shù)據(jù),如果沒有就阻塞。但是如果緩沖區(qū)滿了發(fā)送方同樣會阻塞,緩沖區(qū)空了接收方也會阻塞。在內(nèi)部實現(xiàn)中即使接收方已經(jīng)在等待,數(shù)據(jù)也不一定直接傳給它,而是先進入緩沖區(qū)。等接收方競爭獲取數(shù)據(jù)時,如果沒搶到,就還得繼續(xù)阻塞。
零大小元素的異步通道(Async channels with zero-sized elements)
這個是channel的特殊情況:chan struct{},這里的元素大小是0 字節(jié),所以本質(zhì)上就是一個計數(shù)信號量。不需要額外緩沖存儲數(shù)據(jù),常常用作并發(fā)控制、限流、任務(wù)完成通知。
sem := make(chan struct{}, 3) // 信號量,最多允許3個并發(fā)
// goroutine 獲取令牌
sem <- struct{}{}
// goroutine 釋放令牌
<-sem
同步通道內(nèi)部
同步channel的結(jié)構(gòu)體包含以下數(shù)據(jù):
struct Hchan {
Lock;
bool closed;
SudoG* sendq; // 等待發(fā)送的 goroutine 隊列
SudoG* recvq; // 等待接收的 goroutine 隊列
};
其中Lock保證并發(fā)安全,closed標(biāo)記通道是否已經(jīng)關(guān)閉,sendq / recvq是鏈表隊列,保存阻塞的發(fā)送方/接收方。
使用同步通道的發(fā)送操作時,會先給互斥量加鎖,保證后續(xù)只有自己處于臨界區(qū)。如果當(dāng)前沒有接收方,就要把自己掛起,進入等待隊列。如果有接收方在等待,那么就直接配對,把數(shù)據(jù)交給接收方,這樣發(fā)送方不需要阻塞。
以下是syncchansend代碼解讀:
bool syncchansend(Hchan *c, T val, bool block) {
if(c->closed) // fast-path
panic(“closed”);
if(!block && c->recvq == nil) // fast-path
return false;
lock(c);
if(c->closed) {
unlock(c);
panic(“closed”);
}
if(sg = removewaiter(&c->recvq)) {
// Have a blocked receiver, communicate with it.
unlock(c);
sg->val = val;
sg->completed = true;
unblock(sg->g);
return true;
}
if(!block) {
unlock(c);
return false;
}
// Block and wait for a pair.
sg->g = g;
sg->val = val;
addwaiter(&c->sendq, sg);
unlock(c);
block();
if(!sg->completed)
panic(“closed”); // unblocked by close
// Unblocked by a recv.
return true;
}
函數(shù)參數(shù)為c:目標(biāo)通道,val:要發(fā)送的數(shù)據(jù),block:是否允許阻塞(即普通 send vs 非阻塞 send)。第一步是檢查是否可以fast failure,為什么要有``fast failure`呢?因為我們在編寫如下代碼時:
select {
case x := <-ch: // 如果能取就取
fmt.Println(x)
default: // 如果取不到就直接失敗
fmt.Println("no data")
}
這里的<-ch不會像普通的同步channel一樣一直阻塞在這里,而是如果通道里面有數(shù)據(jù),就立即返回,如果通道里面沒數(shù)據(jù),不會阻塞,直接走到default分支。如果沒有fast failure的話,此時goruntime還會傻傻地去加鎖 -> 檢查隊列 -> 發(fā)現(xiàn)沒數(shù)據(jù) -> 解鎖 -> 返回失敗,其實完全沒有必要,因為我們只需要檢查隊列是否為空。上面源碼的邏輯就是:1. 如果通道已經(jīng)關(guān)閉,立即報錯,不需要加鎖。2. 非阻塞發(fā)送而且當(dāng)前沒有接收方在等著,就直接返回失敗,不阻塞,也不用加鎖。
lock(c);
if (c->closed) {
unlock(c);
panic("closed");
}
之后的邏輯就是可以發(fā)送的情況,所以我們需要加鎖保護臨界區(qū),加鎖之后還需要檢查通道是否關(guān)閉,避免之前在執(zhí)行完判斷之后,獲取鎖之前的某一個時間點通道被關(guān)閉,不檢查的話就可能會對關(guān)閉的channel進行操作,向關(guān)閉的channel執(zhí)行寫操作的話就會直接panic。
if(sg = removewaiter(&c->recvq)) {
// Have a blocked receiver, communicate with it.
unlock(c);
sg->val = val;
sg->completed = true;
unblock(sg->g);
return true;
}
這段邏輯就是找到一個已經(jīng)被阻塞的接收者,如果有 goroutine 已經(jīng)在 recvq 等待,那么直接配對。配對完之后就直接將要發(fā)送的數(shù)據(jù)交給接收方,發(fā)送方不阻塞,接收方也被喚醒,通信完成。
但是如果沒有找到已經(jīng)被阻塞的接收者,就會執(zhí)行下面的邏輯:
if (!block) {
unlock(c);
return false;
}
是非阻塞 send的話,并且沒人接收,就立即返回失敗。
// Block and wait for a pair. sg->g = g; sg->val = val; addwaiter(&c->sendq, sg); unlock(c); block();
這里就是沒有接收方但是允許阻塞的情況,這里我們解釋一下sg是什么。
這里的 sg 指的是sudog類型的指針。
struct sudog {
g* g; // 等待的 goroutine
element val; // 要發(fā)送或接收的值
sudog* next; // 鏈表指針,用來掛到 sendq/recvq
sudog* prev;
...
bool completed;
};
val→ 發(fā)送方要傳的數(shù)據(jù),或者接收方接收的數(shù)據(jù)存放位置。next/prev→ 把 sudog 串起來,形成 channel 的sendq和recvq隊列。completed→ 表示這個 goroutine 的操作是否已經(jīng)完成。g* g→ 指向一個正在等待的 goroutine。
在上述channel代碼中sg->g = g;表示把當(dāng)前 goroutine 記錄到 sudog 里,sg->val = val;把要發(fā)送的值放到 sudog 里,addwaiter(&c->sendq, sg);把 sudog 加入 channel 的發(fā)送等待隊列。block();把發(fā)送方掛起,等到有接收方時就可以被喚醒。這里的 sg 是一個 sudog 實例,它代表 “當(dāng)前這個 goroutine 正在等待往 channel 里發(fā)送 val”。
當(dāng)有接收方出現(xiàn)時,runtime 會從 sendq 拿一個等待的接收者 sudog,然后把這個 val 交給它,并喚醒 sg->g(這個 goroutine)。
if(!sg->completed) panic(“closed”); // unblocked by close // Unblocked by a recv. return true;
被喚醒時要區(qū)分,如果是recv 喚醒→ 通信成功。如果是close 喚醒→ 拋 panic。
上面就是同步channel發(fā)送數(shù)據(jù)的細(xì)節(jié),接收操作也是類似,都是先嘗試快速路徑,沒法完成就進入慢路徑阻塞。核心點都是sudog隊列,goroutine 阻塞喚醒和數(shù)據(jù)hand-off,區(qū)別在于send 隊列掛 send,recv 隊列掛 recv。這里給出gpt生成的類似代碼作為參考:
// 阻塞/非阻塞接收
bool synchchanrecv(Hchan *c, T* out, bool block) {
// fast-path: channel 已關(guān)閉且隊列為空
if (c->closed && c->sendq == nullptr) {
*out = zero_value(); // 返回零值
return false; // ok = false
}
// fast-path: 非阻塞且沒有發(fā)送方
if (!block && c->sendq == nullptr) {
return false;
}
lock(c); // 進入慢路徑,保護 sendq/recvq
if (c->closed && c->sendq == nullptr) {
unlock(c);
*out = zero_value();
return false;
}
// 檢查是否有等待的發(fā)送方
sudog* sg;
if ((sg = removewaiter(&c->sendq)) != nullptr) {
unlock(c);
// 直接從發(fā)送方取數(shù)據(jù)
*out = sg->val;
sg->completed = true;
unblock(sg->g); // 喚醒發(fā)送方
return true;
}
// 沒有發(fā)送方
if (!block) {
unlock(c);
return false; // 非阻塞失敗
}
// 阻塞自己,加入 recvq
sg = new_sudog();
sg->g = g; // 當(dāng)前 goroutine
sg->val_ptr = out; // 接收的數(shù)據(jù)存儲地址
addwaiter(&c->recvq, sg);
unlock(c);
block(); // 掛起當(dāng)前 goroutine,等待發(fā)送方喚醒
// 被喚醒后判斷原因
if (!sg->completed) {
// 被 channel close 喚醒
*out = zero_value();
return false;
}
// 被發(fā)送方喚醒,接收成功
return true;
}
異步通道內(nèi)部
異步通道就是帶緩沖區(qū)的channel,在進行發(fā)送或者接收操作時,如果只是簡單地在環(huán)形緩沖區(qū)中放入或者取出數(shù)據(jù),那么goruntime就不需要操作等待隊列,對于環(huán)形緩沖區(qū)goruntime采用原子指令讀寫數(shù)據(jù),此時是無鎖的,效率很高。但是一旦緩沖區(qū)滿了時發(fā)送方需要阻塞,或者緩沖區(qū)空了時接收方需要阻塞,這時需要操作等待隊列,就必須加鎖,因為等待隊列需要受互斥鎖保護,避免并發(fā)修改出錯。
這里同樣提供了快速失敗路徑,對于非阻塞失敗操作(例如嘗試 send 到一個已滿的通道,或者嘗試 recv 從一個空通道),直接檢測失敗并返回,不進入復(fù)雜的加鎖邏輯。
下面是異步通道中的核心數(shù)據(jù)結(jié)構(gòu):
struct Hchan {
uint32 cap; // channel capacity
Elem* buf; // ring buffer of size cap
// send and receive positions,
// low 32 bits represent position in the buffer,
// high 32 bits represent the current “l(fā)ap” over the ring buffer
uint64 sendx;
uint64 recvx;
};
- cap表示通道的容量,也就是緩沖區(qū)最多可以存放多少元素。
make(chan int, N)中的N就是cap。 - buf指向一個環(huán)形緩沖區(qū),大小就是cap,用來存放用戶真正寫入的數(shù)據(jù)。
- sendx/recvx分別表示發(fā)送位置和接收位置。兩個都是64位數(shù),使用時需要分為高32位和低32位看。低32位表示在緩沖區(qū)中的下標(biāo)位置,高32位表示當(dāng)前是第幾圈(lap,繞緩沖區(qū)的次數(shù))。這樣設(shè)計可以區(qū)分同一個下標(biāo)位置上的數(shù)據(jù),屬于第幾次寫/讀。避免了“ABA問題”(同一個位置被覆蓋后,無法判斷是不是舊數(shù)據(jù))。
struct Elem {
// current lap,
// the element is ready for writing on laps 0, 2, 4, ...
// for reading -- on laps 1, 3, 5, ...
uint32 lap;
T val; // user data
};
- lap是每個緩沖區(qū)元素內(nèi)額外保存的一個圈數(shù)標(biāo)記。如果是偶數(shù)圈的話說明位置是空的,可以寫;奇數(shù)圈說明這個位置有數(shù)據(jù)可以讀。這就保證了:生產(chǎn)者(send)和消費者(recv)在競爭時,能明確知道這個槽位當(dāng)前是可寫還是 可讀,不會混淆。
- val就是用戶寫入的數(shù)據(jù)。
如果是非阻塞的異步通道的話,多個發(fā)送者 goroutine可能同時往 channel 里寫數(shù)據(jù)。它們需要爭搶一個“寫入位置”,這是通過對sendx做CAS操作來完成的,誰 CAS 成功,誰就獲得了在 sendx 指向的位置寫入數(shù)據(jù)的權(quán)利。失敗的發(fā)送者會重試,直到搶到一個空槽位。
發(fā)送方寫入數(shù)據(jù)以后,要讓接收方知道這個槽位已經(jīng)寫滿。這不是通過鎖,而是通過每個元素的 lap 變量來完成:如果 lap 是偶數(shù) → 表示該槽位目前是 空的,只能寫。如果 lap 是奇數(shù) → 表示該槽位目前是 滿的,可以讀。發(fā)送者把數(shù)據(jù)寫進去時,把 lap 改成奇數(shù),通知接收者“數(shù)據(jù)可讀”。接收者讀完數(shù)據(jù)后,把 lap 改成偶數(shù),通知發(fā)送者“槽位可寫”。
接下來我們看看異步通道非阻塞發(fā)送的核心代碼:
bool asyncchansend_nonblock(Hchan* c, T val) {
uint32 pos, lap, elap;
uint64 x, newx;
Elem *e;
for(;;) {
x = atomicload64(&c->sendx);
pos = (uint32)x;
lap = (uint32)(x >> 32);
e = &c->buf[pos];
elap = atomicload32(&e->lap);
if(lap == elap) {
// The element is ready for writing on this lap.
// Try to claim the right to write to this element.
if(pos + 1 < c->cap)
newx = x + 1; // just increase the pos
else
newx = (uint64)(lap + 2) << 32;
if(!cas64(&c->sendx, x, newx))
continue; // lose the race, retry
// We own the element, do non-atomic write.
e->val = val;
// Make the element available for reading.
atomicstore32(&e->lap, elap + 1);
return true;
} else if((int32)(lap - elap) > 0) {
// The element is not yet read on the previous lap,
// the chan is full.
return false;
} else {
// The element has already been written on this lap,
// this means that c->sendx has been changed as well,
// retry.
}
}
}
x = atomicload64(&c->sendx); pos = (uint32)x; lap = (uint32)(x >> 32); e = &c->buf[pos]; elap = atomicload32(&e->lap);
首先原子讀取當(dāng)前發(fā)送位置,計算出寫位置在環(huán)形緩沖區(qū)中的下標(biāo)和當(dāng)前圈數(shù),原子讀取 e->lap,因為其他線程(發(fā)送者/接收者)會修改它。
if(lap == elap) {
// The element is ready for writing on this lap.
// Try to claim the right to write to this element.
if(pos + 1 < c->cap)
newx = x + 1; // just increase the pos
else
newx = (uint64)(lap + 2) << 32;
if(!cas64(&c->sendx, x, newx))
continue; // lose the race, retry
// We own the element, do non-atomic write.
e->val = val;
// Make the element available for reading.
atomicstore32(&e->lap, elap + 1);
return true;
}
如果當(dāng)前sendx的lap和緩沖區(qū)元素的lap一致的話,說明這個槽位正處于“可寫狀態(tài)”。因為一開始是0,如果當(dāng)前sendx的lap也是0的話表示當(dāng)前槽位為空,可以寫。所以發(fā)送者就會嘗試用CAS搶這個位置寫入數(shù)據(jù)。
if (pos + 1 < c->cap)說明當(dāng)前不是緩沖區(qū)的最后一個槽位,下一個位置仍然在環(huán)形緩沖區(qū)內(nèi)部,所以只需要把 pos 增加 1。else分支就是說明已經(jīng)寫到環(huán)形緩沖區(qū)的尾部,再往前就需要回到開頭。這時候不能僅僅把 pos 設(shè)為 0,還要把 lap 增加 2,來交替讀寫狀態(tài),進入新的寫循環(huán)。
if(!cas64(&c->sendx, x, newx)) continue; // lose the race, retry // We own the element, do non-atomic write. e->val = val; // Make the element available for reading. atomicstore32(&e->lap, elap + 1); return true;
這段邏輯就是無鎖操作的關(guān)鍵部分,CAS(Compare-And-Swap)嘗試把 sendx 從 x 更新到 newx。如果成功 → 當(dāng)前線程贏得了這個插槽的所有權(quán)。如果失敗 → 說明別的線程同時在競爭同一個位置,這個線程就要 continue 重新嘗試。如果走到了后面的e->val = val;說明該線程已經(jīng)確認(rèn)自己“獨占”了這個隊列位置,可以安全地寫入數(shù)據(jù)。注意這里用的是普通賦值(非原子),因為寫入數(shù)據(jù)的唯一寫入者已經(jīng)通過 CAS 搶占成功,不存在寫沖突。之后atomicstore32(&e->lap, elap + 1);是發(fā)布這個元素,讓消費者能看到它已經(jīng)可用了。
else if((int32)(lap - elap) > 0) {
// The element is not yet read on the previous lap,
// the chan is full.
return false;
}
lap - elap > 0 意味著:這個槽位中的舊數(shù)據(jù)還沒有被讀走,這個槽位在上一輪循環(huán)里寫過數(shù)據(jù),但還沒被讀走 → 隊列滿了,不能再寫。由于是非阻塞的,所以直接返回false。
else {
// The element has already been written on this lap,
// this means that c->sendx has been changed as well,
// retry.
}
如果走到當(dāng)前這個else分支,說明當(dāng)前l(fā)ap < elap,也就是說這個elap被其他的寫者更新了,導(dǎo)致第一次看到的lap和緩沖區(qū)元素的elap不一樣,這就體現(xiàn)了多個發(fā)送者競爭環(huán)形緩沖區(qū)時失敗的情況,此時我們不能再往這個槽寫,不然會覆蓋別人的數(shù)據(jù),所以只可以重新讀取最新的sendx并再試一次。
上述就是異步通道非阻塞讀的流程,接收(Recv)操作與發(fā)送(Send)操作完全對稱,唯一的區(qū)別是接收操作從 lap = 1 開始,并且是讀取元素而不是寫入。
接下來是異步阻塞通道的發(fā)送和接收操作,通道結(jié)構(gòu)會被擴展,增加一個互斥鎖(mutex)以及發(fā)送/接收的等待隊列(send/recv waiter queues)。加等待隊列是為了實現(xiàn)阻塞語義,緩沖區(qū)滿時,send 必須阻塞,直到有接收者取走數(shù)據(jù);緩沖區(qū)空時,recv 必須阻塞,直到有發(fā)送者寫入數(shù)據(jù)。
struct Hchan {
…
Lock;
SudoG* sendq;
SudoG* recvq;
};
Lock是互斥鎖,用來保護channel內(nèi)部的共享狀態(tài),比如說等待隊列的入隊或者出隊。sendq/recvq就是發(fā)送方和接收方的等待隊列,里面存放等待發(fā)送或者接收數(shù)據(jù)的goroutine。
異步隊列在阻塞發(fā)送時,goroutine會先嘗試進行一次非阻塞發(fā)送。如果成功,他會檢查是否有接收方在等待,如果有,就喚醒其中一個。如果非阻塞發(fā)送失?。ㄍǖ酪褲M),它就會加鎖,將自己加入發(fā)送等待隊列,然后再次檢查緩沖區(qū)是否仍然是滿的。如果緩沖區(qū)仍然滿,goroutine 就會阻塞;如果緩沖區(qū)已經(jīng)不滿,goroutine 會把自己從等待隊列移除,解鎖,然后重試。再次檢測是為了防止就在它準(zhǔn)備阻塞的那一瞬間,另一個 goroutine B 可能正好執(zhí)行了一個 recv,使得緩沖區(qū)變得“不滿”。
阻塞接收的過程完全相同,只不過把“send”換成“recv”,把“recv”換成“send”。
阻塞算法的主要難點是要確保不會發(fā)生死鎖(比如發(fā)送方在一個非滿的通道上無限期阻塞,或者接收方在一個非空的通道上無限期阻塞)。通過這種“檢查 → 入隊 → 再檢查”的方式,我們能夠保證以下情況之一一定會發(fā)生:
- 發(fā)送方看到有接收方在等待并喚醒它;
- 接收方看到緩沖區(qū)里有元素并消費它;
- 1 和 2 同時發(fā)生(這種情況下通過互斥鎖來解決競爭);
但絕不會出現(xiàn)這種情況:發(fā)送方?jīng)]看到接收方在等待,接收方也沒看到緩沖區(qū)有元素,然后兩邊都無限期阻塞。
接下來看核心代碼實現(xiàn):
void asyncchansend(Hchan* c, T val) {
for(;;) {
if(asyncchansend_nonblock(c, val)) {
// Send succeeded, see if we need to unblock a receiver.
if(c->recvq != nil) {
lock(c);
sg = removewaiter(&c->recvq);
unlock(c);
if(sg != nil)
unblock(sg->g);
}
return;
} else {
// The channel is full.
lock(c);
sg->g = g;
addwaiter(&c->sendq, sg);
if(notfull(c)) {
removewaiter(&c->sendq, sg);
unlock(c);
continue;
}
unlock(c);
block();
// Retry send.
}
}
}
首先調(diào)用 asyncchansend_nonblock 嘗試直接寫入數(shù)據(jù),如果成功,說明緩沖區(qū)有空間或者直接匹配到了等待的接收者。如果通道里有接收者在等待 (c->recvq != nil),那么我們應(yīng)該喚醒接收者避免長時間阻塞,所以我們加鎖,安全地從recvq隊列里面取出一個等待接收的 goroutine(removewaiter),再解鎖,如果真的取到一個等待者,就喚醒它。如果沒有在等待的接收者但是非阻塞還是寫入成功了,說明寫入緩沖區(qū)中,這時直接返回就好。
// The channel is full.
lock(c);
sg->g = g;
addwaiter(&c->sendq, sg);
if(notfull(c)) {
removewaiter(&c->sendq, sg);
unlock(c);
continue;
}
unlock(c);
block();
// Retry send.
else分支下是非阻塞發(fā)送失敗的情況,根據(jù)上面非阻塞發(fā)送的代碼可知,緩沖區(qū)已經(jīng)滿了,當(dāng)前goroutine不能繼續(xù)發(fā)送,就需要將自己添加到sendq發(fā)送等待隊列。之后notfull(c)再做了一次檢查,因為在剛剛排隊的過程中,可能已經(jīng)有接收者拿走了數(shù)據(jù),緩沖區(qū)不再滿。如果發(fā)現(xiàn)通道現(xiàn)在不滿了,就把自己從sendq發(fā)送等待隊列中移除,回到循環(huán)開頭 (continue),再試一次發(fā)送。但是如果如果通道依然滿,那就調(diào)用block掛起當(dāng)前 goroutine,等待別人來喚醒。被喚醒之后,繼續(xù)循環(huán),重新嘗試發(fā)送。
零大小元素的異步通道
最后我們介紹零大小元素的異步通道內(nèi)部實現(xiàn),零大小的異步 channel 在整體上和非零大小的異步 channel 類似:
- 在非阻塞情況下,操作是無鎖的。
- 等待隊列仍然由互斥鎖保護。
- 非阻塞失敗的操作會快速返回。
不同之處有:
Hchan里只有一個計數(shù)器,而不是發(fā)送/接收位置和環(huán)形緩沖區(qū);這個計數(shù)器表示 channel 中的元素數(shù)量。- 非阻塞的發(fā)送/接收通過 CAS 循環(huán)來更新計數(shù)器。
- 是否“滿”或“空”的判斷僅僅是檢查計數(shù)器的值。
close操作
對于channel的close操作,我們調(diào)用close時,要先加鎖,然后設(shè)置 closed 標(biāo)志,接著喚醒所有的等待者。異步的 send/recv 操作在進入阻塞之前都會檢查 closed 標(biāo)志。這樣就能保證與異步 send/recv 阻塞時相同的語義。要么1.close看到了有等待著,2.等待者看到了closed標(biāo)志被設(shè)置,3. 1和2同時發(fā)生時通過互斥鎖解決。
select操作
select 操作不會一次性鎖住所有相關(guān)的 channel,而是對每個 channel 分別進行細(xì)粒度的操作。select操作包含以下四個階段:
- 打亂所有相關(guān)的 channel 順序,以提供偽隨機的保證(后續(xù)所有階段都基于這個打亂后的列表)。
- 逐個檢查所有 channel,看看是否有可以立即通信的(比如緩沖不空可以讀,緩沖不滿可以寫,或者有 goroutine 在對面排隊)。如果有,就執(zhí)行通信并退出。這樣可以讓不會阻塞的
select更快、更具擴展性,因為它不需要排序和一次性鎖住所有互斥量。而且如果第一個 channel 已經(jīng)就緒了,select甚至不需要檢查后續(xù)的 channel。 - 為在所有 channel 上阻塞做準(zhǔn)備,要先把自己掛到所有相關(guān) channel 的等待隊列里(sendq/recvq),這樣任何一個 channel 一旦 ready,就能喚醒這個 select。
- 阻塞。當(dāng)某個 channel ready 時,會把你喚醒。被喚醒之后,并不是直接執(zhí)行,而是回到階段 1 再檢查一次,確認(rèn)到底是哪個 channel ready 了。
對于階段3:為在所有 channel 上阻塞做準(zhǔn)備。它的過程與異步 send/recv 的阻塞方式幾乎相同,首先鎖住 channel 的互斥量,把自己加入 send/recv 的等待隊列,之后再次檢查該 channel 是否已經(jīng)可以通信,如果已經(jīng)可以通信,那么就把自己從所有等待隊列中移除,并回到階段 2;如果仍然不行,就繼續(xù)處理下一個 channel。如果此時 channel 已經(jīng)可以通信,就要把自己從所有等待隊列中移除,并回到階段 2。
但是有一個問題,select 可能同時在多個 channel 的隊列里,必須確保只被喚醒一次。即select監(jiān)聽多個管道,有多個管道幾乎同時來數(shù)據(jù),必須確保只被喚醒一次。我們采取如下解決方式:每個 select 操作都有一個全局狀態(tài)字 sg。當(dāng)別的 goroutine 嘗試喚醒它時,會做:CAS(statep, nil, sg),如果成功:說明這個 goroutine 贏得了“喚醒權(quán)利”,它負(fù)責(zé)完成通信并喚醒 select。如果失?。赫f明另一個 goroutine 已經(jīng)搶先喚醒了,那么當(dāng)前 goroutine 就忽略這個 waiter。這樣可以避免同步 channel 多方同時匹配到一個 select,保證只發(fā)生一次通信。
下面來看select操作的核心代碼:
Scase *select(Select *sel) {
// Phase 1.
randomize channel order;
for(;;) {
// Phase 2.
foreach(Scase *cas in sel) {
if(chansend/recv_nonblock(cas->c, ...))
return cas;
}
// Phase 3.
selectstate = nil;
foreach(Scase *cas in sel) {
lock(cas->c);
cas->sg->g = g;
cas->sg->selectstatep = &selectstate;
addwaiter(&cas->c->sendq/recvq, cas->sg);
if(isready(cas->c)) {
unlock(c);
goto ready;
}
unlock(cas->c);
}
// Phase 4.
block();
ready:
CAS(&selectstate, nil, 1);
foreach(Scase *cas in sel) {
lock(cas->c);
removewaiter(&cas->c->sendq/recvq, cas->sg);
unlock(cas->c);
}
// If we were unblocked by a sync chan operation,
// the communication has completed.
if(selectstate > 1)
return selectstate; // denotes the completed case
}
}
在進入select時就要將所有的channel排序,避免其受程序編寫時順序的干擾,更加公平,保證當(dāng)多個 case 同時就緒時不會偏向前面的 case,之后的所有循環(huán)都使用這個打亂后的順序。之后就是階段2的代碼,對每個 case 做非阻塞嘗試(對無緩沖 channel 意味著有對端在等;對有緩沖的 channel 意味著緩沖非空或非滿;對關(guān)閉 channel 特殊處理)。若任一 chansend/recv_nonblock 成功,則馬上完成該通信并返回選中的 Scase。這避免了不必要的加鎖和入隊,是常見的低延遲路徑。
在接下來的階段三中,selectstate 是一個共享/可被外部 CAS 的狀態(tài)字,為nil時表示尚未有人取得“勝利權(quán)”。之后遍歷select中所有的channel,先lock(cas->c),這是在對某個 channel 的內(nèi)部隊列操作時要持有該 channel 的 mutex,保證對隊列的并發(fā)修改安全。
cas->sg->g = g; cas->sg->selectstatep = &selectstate;
這里的cas 表示 select 中的某個 case,cas->sg 是這個 case 對應(yīng)的 sudog(synchronous goroutine 的縮寫)結(jié)構(gòu)。這是 Go runtime 內(nèi)部用來把 goroutine 掛到 channel 等待隊列里的一個小對象。這第一句的意思是:把當(dāng)前 goroutine g 存進 sudog 里,當(dāng)別的 goroutine 往這個 channel 里發(fā)送/接收時,就可以通過 sudog->g 找到應(yīng)該被喚醒的 goroutine。后面這句就是保證多個 channel 只會有一個真的成功喚醒 select。因為一個 select 可能同時往多個 channel 的等待隊列里加了 sudog。假設(shè)有兩個 channel 幾乎同時就緒,兩個 goroutine 都想來喚醒你。這里用了一個全局狀態(tài)字 selectstate,所有 case 都共享它。在嘗試喚醒你時,喚醒方會執(zhí)行 CAS(selectstatep, nil, sg):如果成功,說明自己“搶到”了喚醒權(quán),可以繼續(xù)通信。如果失敗,說明別人已經(jīng)先喚醒了你,自己就放棄。這樣就避免了 “被兩個 channel 同時喚醒” 的問題。
addwaiter(&cas->c->sendq/recvq, cas->sg);
if(isready(cas->c)) {
unlock(c);
goto ready;
}
unlock(cas->c);
第一句把當(dāng)前 case 對應(yīng)的 sudog (cas->sg) 掛到 channel 的等待隊列里,這樣別的 goroutine 在操作這個 channel 時,就能找到你,完成通信并喚醒你。然后檢查通道是否立即可用,即使你剛剛把自己掛到了隊列里,也要馬上再檢查一次:因為在你掛進去的這個瞬間,可能已經(jīng)有別的 goroutine 進入 channel 操作,把它變成 ready 了。如果不檢查,就可能白白阻塞自己,而其實通信機會已經(jīng)存在。
如果 isready 發(fā)現(xiàn) channel 已經(jīng)可通信:立刻釋放鎖 unlock(c),跳轉(zhuǎn)到 ready 標(biāo)簽進入喚醒清理邏輯。但是如果 isready 沒通過,那說明這個 channel 真的還不可通信。那么就正常解鎖,繼續(xù)去處理下一個 case。
for循環(huán)之外的這個block就是掛起當(dāng)前goroutine,因為當(dāng)前goroutine調(diào)用的select是阻塞的,讓當(dāng)前 goroutine 停止運行并交給調(diào)度器調(diào)度,直到被其他 goroutine 喚醒。保證了 channel 的阻塞語義:當(dāng)條件不滿足時,goroutine 就會真正“停住”,不會白白占用 CPU。
ready:
CAS(&selectstate, nil, 1);
foreach(Scase *cas in sel) {
lock(cas->c);
removewaiter(&cas->c->sendq/recvq, cas->sg);
unlock(cas->c);
}
這段代碼就是講select被喚醒之后的清理和狀態(tài)處理階段,首先嘗試通過 CAS 將 selectstate 從 nil 設(shè)置為 1,如果沒有其他 goroutine 先喚醒并完成通信(即 selectstate == nil),select 自己就“認(rèn)領(lǐng)”通信權(quán)。如果 CAS 失敗(selectstate 已經(jīng)被寫成其他值,例如某個外部同步 channel 的 sg),說明通信已經(jīng)被其他 goroutine 完成,select 自己不再做通信。
之后的for循環(huán)就是清理等待隊列,把 select 自己在其它 channel 的等待節(jié)點從隊列中移除。因為select 可能同時掛在多個 channel 的隊列中,只要某個 channel 已經(jīng)喚醒了 select,其他 channel 隊列里的節(jié)點就不再需要。
到此這篇關(guān)于go中channel通信的底層實現(xiàn)的文章就介紹到這了,更多相關(guān)go channel通信內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
利用Go實現(xiàn)一個簡易DAG服務(wù)的示例代碼
DAG的全稱是Directed Acyclic Graph,即有向無環(huán)圖,DAG廣泛應(yīng)用于表示具有方向性依賴關(guān)系的數(shù)據(jù),如任務(wù)調(diào)度、數(shù)據(jù)處理流程、項目管理以及許多其他領(lǐng)域,下面,我將用Go語言示范如何實現(xiàn)一個簡單的DAG服務(wù),需要的朋友可以參考下2024-03-03
Golang 語言極簡類型轉(zhuǎn)換庫cast的使用詳解
本文我們通過 cast.ToString() 函數(shù)的使用,簡單介紹了cast 的使用方法,除此之外,它還支持很多其他類型,在這沒有多多介紹,對Golang 類型轉(zhuǎn)換庫 cast相關(guān)知識感興趣的朋友一起看看吧2021-11-11

