Golang 并發(fā)的實現(xiàn)
并發(fā)問題概覽
問題類型 | 描述 |
---|---|
數(shù)據(jù)競爭 | 多個協(xié)程對共享變量進行非同步讀寫操作 |
死鎖 | 多個協(xié)程互相等待對方釋放資源 |
活鎖 | 協(xié)程不斷嘗試獲取資源但始終失敗 |
協(xié)程泄漏 | 協(xié)程未能及時退出,程序中 goroutine 數(shù)量飆升 |
Channel 誤用 | 通道未關(guān)閉、重復關(guān)閉、關(guān)閉后寫入等問題 |
調(diào)度抖動 | 非預期的調(diào)度行為導致響應(yīng)不穩(wěn)定 |
數(shù)據(jù)競爭
當兩個或多個 goroutine 同時讀寫一個變量,并且至少有一個是寫操作,而又沒有同步措施時,就會發(fā)生數(shù)據(jù)競爭。
var count int func add() { for i := 0; i< 1000; i++ { count++ } } func main() { go add() go add() time.Sleep(time.Second) fmt.Println(count) }
死鎖
死鎖是指兩個或多個協(xié)程相互等待,導致程序永久阻塞。
func main() { ch := make(chan int) // 沒有其他協(xié)程接收,死鎖 ch <- 1 }
func main() { ch1 := make(chan int) ch2 := make(chan int) go func() { <-ch1 ch2 <- 1 }() go func() { <-ch2 ch1 <- 1 }() // 程序卡死 time.Sleep(time.Second * 2) }
協(xié)程泄漏
程序創(chuàng)建了大量 goroutine,但它們沒有退出條件,一直處于阻塞或者等待狀態(tài),導致程序資源消耗飆升。
func main() { ch := make(chan int) for { go func() { // 不斷產(chǎn)生阻塞的 goroutine,直到內(nèi)存耗盡為止 <-ch }() } }
Channel 誤用
// 寫入已關(guān)閉通道 ch := make(chan int) close(ch) ch <- 1 // panic // 重復關(guān)閉通道 close(ch) close(ch) // panic // 從空通道中讀取,沒有寫入,造成死鎖 <-ch
調(diào)度器問題與性能抖動
- 協(xié)程爆炸。短時間內(nèi)創(chuàng)建了大量 goroutine,可能會導致 CPU 抖動、調(diào)度混亂。
- 大量阻塞系統(tǒng)調(diào)用。一個協(xié)程如果陷入系統(tǒng)調(diào)用阻塞,會被 OS 掛起,從而影響調(diào)度。
- 非公平調(diào)度。雖然 Go 的調(diào)度器基于 GMP 模型,但仍存在協(xié)程饑餓的可能。
最佳實踐總結(jié)
類型 | 建議 |
---|---|
數(shù)據(jù)共享 | 使用 Channel 或者 sync.Mutex/sync.RWMUtex 做同步 |
goroutine 控制 | 使用 WaitGroup 或者 context 管理協(xié)程生命周期 |
Channel 操作 | 所有寫操作前確保通道未關(guān)閉;關(guān)閉通道應(yīng)由發(fā)送方負責 |
并發(fā)任務(wù)分發(fā) | 使用協(xié)程池(限制并發(fā)數(shù))避免系統(tǒng)資源耗盡 |
調(diào)試工具 | 使用 race、pprof、trace、delve |
日志分析 | 打印 goroutine ID,觀察并發(fā)流程 |
實際案例分析
抓取系統(tǒng)協(xié)程泄漏
現(xiàn)象:
- CPU 使用率低
- 內(nèi)存占用持續(xù)上漲
- goroutine 數(shù)量不斷增長
分析:
- 使用 pprof 查看 goroutine 源碼位置
- 定位原因是某個 select 分支缺少 <-done,導致協(xié)程無法退出
處理:
- 所有的 for + select 中都加上 ctx.Done() 處理退出
func worker() { go func() { for { select { case msg := <-someChan: // 處理消息 fmt.Println(msg) // ? 沒有退出條件,協(xié)程永遠不會退出 } } }() }
func worker(ctx context.Context) { go func() { for { select { case msg := <-someChan: fmt.Println(msg) case <-ctx.Done(): // ? 收到取消信號,退出協(xié)程 fmt.Println("worker exiting") return } } }() } ctx, cancel := context.WithCancel(context.Background()) worker(ctx) // 一段時間后或某個條件下,調(diào)用 cancel() 來通知協(xié)程退出 time.Sleep(5 * time.Second) cancel()
異步任務(wù)競爭導致數(shù)據(jù)錯亂
現(xiàn)象:
- 后臺異步處理任務(wù)對全局 map 并發(fā)寫入
分析:
- 偶發(fā)出現(xiàn)數(shù)據(jù)錯誤,調(diào)試困難
處理:
- 使用 sync.Mutex 或者 sync.Map
// 全局 map,非線程安全 var data = make(map[int]int) func main() { for i := 0; i < 100; i++ { go func(i int) { data[i] = i // ?? 多個協(xié)程同時寫入 map,會導致數(shù)據(jù)競爭或 panic }(i) } time.Sleep(1 * time.Second) fmt.Println("done") }
var ( data = make(map[int]int) mu sync.Mutex ) func main() { for i := 0; i < 100; i++ { go func(i int) { mu.Lock() data[i] = i mu.Unlock() }(i) } time.Sleep(1 * time.Second) fmt.Println("done") }
var data sync.Map func main() { for i := 0; i < 100; i++ { go func(i int) { data.Store(i, i) }(i) } time.Sleep(1 * time.Second) data.Range(func(k, v interface{}) bool { fmt.Printf("key: %v, value: %v\n", k, v) return true }) }
高并發(fā)下創(chuàng)建全局計數(shù)器
- 推薦使用 sync/atomic 包。sync/atomic 提供了原子操作的能力,在無需加鎖的前提下,保證線程安全,適用于計數(shù)器等場景。
var globalCounter int64 func worker(wg *sync.WaitGroup) { defer wg.Done() // 原子加1,確保并發(fā)安全 atomic.AddInt64(&globalCounter, 1) } func main() { var wg sync.WaitGroup wg.Add(1000) for i := 0; i < 1000; i++ { go worker(&wg) } // 確保主 goroutine 等待所有子 goroutine 完成 wg.Wait() fmt.Println("計數(shù)器值:", globalCounter) }
- 使用 sync.Mutex。線程安全但是性能略低,適用于復雜邏輯下的線程保護,不推薦用于簡單加減場景。
var counter int var mu sync.Mutex func main() { mu.Lock() counter++ mu.UnLock() }
- 使用 Channel 實現(xiàn)計數(shù)。性能不如原子操作,適用于有通道通信需求的場景。
var counter = make(chan int, 1) func init() { counter <- 0 } func main() { v := <-counter v++ counter <- v }
到此這篇關(guān)于Golang 并發(fā)的實現(xiàn)的文章就介紹到這了,更多相關(guān)Golang 并發(fā)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!