基于Go語言實現(xiàn)一個并發(fā)下載器
本文將實現(xiàn)一個并發(fā)的文件下載器,可以在不重新啟動整個下載的情況下處理錯誤。這將通過分塊下載文件來實現(xiàn)。
Idea
首先從發(fā)出下載的HTTP
請求開始,當(dāng)采用HEAD option
來請求要下載的文件時,在某些服務(wù)器上,返回的標(biāo)頭之一是Content-Length
。此標(biāo)頭以字節(jié)為單位指定文件的大小。知道文件大小后,分派多個Goroutine
,每個Goroutine
都分配有一個要下載的數(shù)據(jù)范圍。Goroutine
發(fā)送GET
請求來執(zhí)行下載,該請求將具有標(biāo)頭Range
,此標(biāo)頭將告訴服務(wù)器要返回多少文件。Goroutine
完成下載后,數(shù)據(jù)將通過通道發(fā)回。一旦所有的Goroutines
完成,將加入數(shù)據(jù)并寫出文件。
實現(xiàn)
探針
probe
模塊主要負責(zé)探針功能,偵測要下載的文件是否包含Content-Length
的HTTP
頭部。如果存在,那么會返回分塊下載的文件大小,具體代碼如下:
package probe import ( "fmt" "log" "net/http" "strconv" ) type Probe struct { workers int url string } func NewProbe(worker int, url string) *Probe { return &Probe{ workers: worker, url: url, } } func (p *Probe) GetFileSize() (int, error) { var size = -1 client := &http.Client{} req, err := http.NewRequest("HEAD", p.url, nil) if err != nil { log.Fatal(err) } resp, err := client.Do(req) if err != nil { log.Fatal(err) } if header, ok := resp.Header["Content-Length"]; ok { fileSize, err := strconv.Atoi(header[0]) if err != nil { log.Fatal("File size could not be determined : ", err) } size = fileSize / p.workers } else { log.Fatal("File size was not provided!") return size, fmt.Errorf("file size was not provided.") } return size, nil }
通過發(fā)送一條HEAD
的HTTP
請求來拿到目標(biāo)文件的大小,從而確定并發(fā)下載的分塊大小。
下載器
接下來是下載器部分,首先定義下載器的結(jié)構(gòu)體
type Downloader struct { result chan Part size int workers int }
下載器包括了一個由文件分塊組成的channel
,它的定義如下
type Part struct { Data []byte Index int }
包含了文件分塊的數(shù)據(jù)流以及對應(yīng)索引順序。同時下載器也定義了分塊下載的大小,并發(fā)數(shù)量。
func (d *Downloader) Download(index int, url string) { client := &http.Client{} // calculate offset by multiplying // index with size start := index * d.size // Write data range in correct format // I'm reducing one from the end size to account for // the next chunk starting there dataRange := fmt.Sprintf("bytes=%d-%d", start, start+d.size-1) // if this is downloading the last chunk // rewrite the header. It's an easy way to specify // getting the rest of the file if index == d.workers-1 { dataRange = fmt.Sprintf("bytes=%d-", start) } log.Println(dataRange) req, err := http.NewRequest("GET", url, nil) if err != nil { // TODO: restart download return } req.Header.Add("Range", dataRange) resp, err := client.Do(req) if err != nil { // TODO: restart download return } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { // TODO: restart download return } d.result <- Part{Index: index, Data: body} }
當(dāng)執(zhí)行下載操作時,該方法將向下載請求添加標(biāo)頭Range
。此標(biāo)頭將指定要獲取文件的哪些部分。HTTP
請求完成后,數(shù)據(jù)將寫入函數(shù)調(diào)用時傳遞的通道。
當(dāng)下載開始后,不需要等待下載完成,可以直接開始合并分塊文件,原理在于golang
的channel
本身就具有并發(fā)的屬性。從channel
中持續(xù)讀取已經(jīng)下載好的分塊文件,然后根據(jù)索引順序?qū)懭氡镜匚募小?/p>
func (d *Downloader) Merge(filename string) error { log.Println("start to merge data") parts := make([][]byte, d.workers) counter := 0 for part := range d.result { counter++ parts[part.Index] = part.Data if counter == d.workers { break } } log.Println("sort data as original order") file := []byte{} for _, part := range parts { file = append(file, part...) } log.Println("write data into buffer array") err := ioutil.WriteFile(filename, file, 0777) return err }
運行
至此,我們可以編寫一個main
函數(shù)來測試并發(fā)下載器。下載的目標(biāo)文件是http://212.183.159.230/512MB.zip ,大小為512MB,我們控制并發(fā)數(shù)為5,測試下載到本地的時間。
package main import ( "flag" "log" "time" "go-store/applications/downloader/download" "go-store/applications/downloader/probe" ) var ( // to test internet url = flag.String("url", "http://212.183.159.230/512MB.zip", "download url") // number of goroutines to spawn for download. workers = flag.Int("worker", 5, "concurrent downloader number") // filename for downloaded file filename = flag.String("file", "data.zip", "downloaded filename") ) func main() { flag.Parse() start := time.Now() probe := probe.NewProbe(*workers, *url) size, err := probe.GetFileSize() if err != nil { panic(err) } results := make(chan download.Part, *workers) downloader := download.NewDownloader(results, size, *workers) for i := 0; i < *workers; i++ { go downloader.Download(i, *url) } err = downloader.Merge(*filename) end := time.Now() if err != nil { panic(err) } log.Println("cost time: ", end.Sub(start)) }
結(jié)果如下
song@ubuntu20-04:~/go/src/github.com/surzia/go-store/applications/downloader$ go build main.go
song@ubuntu20-04:~/go/src/github.com/surzia/go-store/applications/downloader$ ./main
2023/02/26 12:13:59 bytes=429496728-
2023/02/26 12:13:59 bytes=107374182-214748363
2023/02/26 12:13:59 bytes=214748364-322122545
2023/02/26 12:13:59 bytes=322122546-429496727
2023/02/26 12:13:59 bytes=0-107374181
2023/02/26 12:14:21 start to merge data
2023/02/26 12:14:21 sort data as original order
2023/02/26 12:14:23 write data into buffer array
2023/02/26 12:14:23 cost time: 24.43482453s
用時約25s。對比直接下載該文件
song@ubuntu20-04:~/Downloads$ curl http://212.183.159.230/512MB.zip -o 512M.zip
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 512M 100 512M 0 0 14.6M 0 0:00:34 0:00:34 --:--:-- 17.9M
用時34s,并發(fā)下載器的速度提升了10s左右。
結(jié)論
Go
是一門天然支持并發(fā)的語言,利用該特性我們可以大大提升程序的效率。
完整代碼見github
到此這篇關(guān)于基于Go語言實現(xiàn)一個并發(fā)下載器的文章就介紹到這了,更多相關(guān)Go并發(fā)下載器內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
使用IDEA配置GO語言的開發(fā)環(huán)境備忘錄
最近在配置idea開發(fā)go語言時碰到很多問題,想著很多人都可能會遇到,所以下面這篇文章主要給大家介紹了關(guān)于使用IDEA配置GO語言的開發(fā)環(huán)境,文中通過圖文介紹的非常詳細,需要的朋友可以參考下2024-05-05linux下通過go語言獲得系統(tǒng)進程cpu使用情況的方法
這篇文章主要介紹了linux下通過go語言獲得系統(tǒng)進程cpu使用情況的方法,實例分析了Go語言使用linux的系統(tǒng)命令ps來分析cpu使用情況的技巧,需要的朋友可以參考下2015-03-03go語言中的udp協(xié)議及TCP通訊實現(xiàn)示例
這篇文章主要為大家介紹了go語言中的udp協(xié)議及TCP通訊的實現(xiàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步早日升職加薪2022-04-04手把手教你如何在Goland中創(chuàng)建和運行項目
歡迎來到本指南!我們將手把手地教您在Goland中如何創(chuàng)建、配置并運行項目,通過簡單的步驟,您將迅速上手這款強大的集成開發(fā)環(huán)境(IDE),輕松實現(xiàn)您的編程夢想,讓我們一起開啟這段精彩的旅程吧!2024-02-02golang?sync.Cond同步機制運用及實現(xiàn)
在?Go?里有專門為同步通信而生的?channel,所以較少看到?sync.Cond?的使用,不過它也是并發(fā)控制手段里的一種,今天我們就來認識下它的相關(guān)實現(xiàn),加深對同步機制的運用2023-09-09