使用golang實現(xiàn)一個MapReduce的示例代碼
背景
在日常業(yè)務開發(fā)中,我們經(jīng)常遇到需要并發(fā)處理的場景。例如:
- 依據(jù)id列表查詢db,獲取數(shù)據(jù)。為了保證查詢性能,單次查詢的id列表長度最好不要超過50(依據(jù)業(yè)務來判斷),當id列表長度超過50時,拆分成并發(fā)請求,減少耗時和提高性能,返回聚合后的結果
- 外部提供的接口不支持批量寫入/讀取數(shù)據(jù),當需要批量處理數(shù)據(jù)時,為了減少耗時和提高性能,并發(fā)請求外部接口
以上處理數(shù)據(jù)的場景,都可以分成兩個階段:
- 請求階段?;径际荌O操作,請求db,或者是調用外部接口
- 處理階段。對返回的數(shù)據(jù)進行轉換,過濾,聚合等操作
同步調用,調用耗時增長明顯
并發(fā)調用,可以減少調用耗時
分析
上面說的處理數(shù)據(jù)的場景,都可以分成兩個階段:
- 請求階段。IO操作,可以并發(fā)的去進行,互不干擾
- 處理階段。同步進行,保證聚合結果的正確性
這種是一種特殊的MapReduce
為了處理這類場景,我們需要明確以下幾個部分:
- 列表長度。代表有多少數(shù)據(jù)需要進行處理
- map函數(shù)。并發(fā)處理的函數(shù),互不干擾
- reduce函數(shù)。同步處理的函數(shù)
- 最大并發(fā)數(shù)。決定需要開多少線程/協(xié)程來處理
- 拆分長度。列表長度 / 拆分長度 = 子任務數(shù)
由于我在日常開發(fā)中常使用golang語言,下面梳理下使用golang來解決這類問題的一個思路
函數(shù)簽名
func ChunkProcess(length int, procedure func(start, end int) (interface{}, error), reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int)
核心邏輯:
當最大并發(fā)數(shù) <= 1 或者子任務數(shù)(列表長度 / 拆分長度) <= 1時,同步執(zhí)行map函數(shù)和reduce函數(shù)即可
其余情況,并發(fā)處理map函數(shù),同步執(zhí)行reduce函數(shù)
- 獲取并發(fā)處理的子任務數(shù)量:lengthTask := int(math.Ceil(float64(length) / float64(chunkSize)))
- 通過sync.Mutex保證reduce同步執(zhí)行
- 通過sync.WaitGroup保證等待子任務全部執(zhí)行完成
- 通過chan控制最大并發(fā)數(shù)
代碼實現(xiàn)
package test import ( "math" "sync" ) func ChunkProcess(length int, procedure func(start, end int) (interface{}, error), reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) { if length < 1 { return } if maxConcurrent <= 1 || length <= chunkSize { doChunkProcessSerially(length, procedure, reduce, chunkSize) } else { doChunkProcessConcurrently(length, procedure, reduce, maxConcurrent, chunkSize) } } // 同步處理 func doChunkProcessSerially(length int, procedure func(start, end int) (interface{}, error), reduce func(partialResult interface{}, partialErr error, start, end int), chunkSize int) { // 拆分的子任務數(shù) chunkNums := int(math.Ceil(float64(length) / float64(chunkSize))) for i := 0; i < chunkNums; i++ { func(chunkIndex int) { defer func() { if err := recover(); err != nil { // 自定義錯誤處理 } }() start := chunkIndex * chunkSize end := start + chunkSize if end > length { end = length } // 執(zhí)行map response, err := procedure(start, end) // 執(zhí)行reduce if reduce != nil { reduce(response, err, start, end) } }(i) } } // 并發(fā)處理 func doChunkProcessConcurrently(length int, procedure func(start, end int) (interface{}, error), reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) { index := 0 chunkIndex := 0 // 拆分的子任務數(shù) lengthTask := int(math.Ceil(float64(length) / float64(chunkSize))) // 保證reduce同步執(zhí)行 var lock sync.Mutex // 保證子任務全部執(zhí)行完成 var wg sync.WaitGroup wg.Add(lengthTask) // 控制并發(fā)數(shù) throttleChan := make(chan struct{}, maxConcurrent) for { start := index end := index + chunkSize if end > length { end = length } throttleChan <- struct{}{} go func(chunkIndex int) { defer func() { <-throttleChan if err := recover(); err != nil { // 自定義錯誤處理 } wg.Done() }() // 執(zhí)行map response, err := procedure(start, end) // 執(zhí)行reduce if reduce != nil { lock.Lock() defer lock.Unlock() reduce(response, err, start, end) } }(chunkIndex) chunkIndex++ index = index + chunkSize if index >= length { break } } wg.Wait() close(throttleChan) }
測試:
func TestChunkProcess(t *testing.T) { trackIDs := []int64{123, 456, 789} results := make([]int64, 0) ChunkProcess(len(trackIDs), func(start, end int) (interface{}, error) { result := trackIDs[start] + 100 return result, nil }, func(partialResult interface{}, partialErr error, start, end int) { results = append(results, partialResult.(int64)) }, 2, 1) fmt.Println(results) }
總結
多對業(yè)務場景進行抽象分析,為這一類場景提供解決方案
以上就是使用golang實現(xiàn)一個MapReduce的詳細內容,更多關于golang實現(xiàn)MapReduce的資料請關注腳本之家其它相關文章!
相關文章
淺談golang package中init方法的多處定義及運行順序問題
這篇文章主要介紹了淺談golang package中init方法的多處定義及運行順序問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-05-05