go語言實現(xiàn)并發(fā)網(wǎng)絡爬蟲的示例代碼
go語言做爬蟲也是很少嘗試,首先我的思路是看一下爬蟲的串行實現(xiàn),然后通過兩個并發(fā)實現(xiàn):一個使用鎖,另一個使用通道
這里不涉及從頁面中提取URL的邏輯(請查看Go框架colly的內(nèi)容)。網(wǎng)絡抓取只是作為一個例子來考察Go的并發(fā)性。
我們想從我們的起始頁中提取所有的URL,將這些URL保存到一個列表中,然后對列表中的每個URL做同樣的處理。頁面的圖很可能是循環(huán)的,所以我們需要記住哪些頁面已經(jīng)經(jīng)歷了這個過程(或者在使用并發(fā)時,處于這個過程的中間)。
串行爬蟲首先檢查我們是否已經(jīng)在獲取地圖中獲取了該頁面。如果我們沒有,那么它就在頁面上找到的每個URL上調(diào)用自己。注意:map 在Go中是引用類型,所以每次調(diào)用都會得到相同的 map。
func Serial(url string, fetcher Fetcher, fetched map[string]bool) { if fetched[url] { return } fetched[url] = true urls, err := fetcher.Fetch(url) if err != nil { return } for _, u := range urls { Serial(u, fetcher, fetched) } return } func main() { Serial(<page>, fetcher, make(map[string]bool)) }
fetcher將包含提取URLs到列表中的邏輯(也可以對頁面的內(nèi)容做一些處理)。這個實現(xiàn)不是本講的重點。
由于網(wǎng)絡速度很慢,我們可以使用并發(fā)性來加快這個速度。為了實現(xiàn)這一點,我們需要使用鎖(在讀/寫時鎖定已經(jīng)獲取的頁面地圖)和 waitgroup(等待所有的goroutine完成)。
已經(jīng)獲取的頁面的 map 只能由持有鎖的線程訪問,因為我們不希望多個線程開始處理同一個URL。如果在一個線程的讀和寫之間,另一個線程在第一個線程更新之前從 map 上得到了相同的讀數(shù),這就可能發(fā)生。
我們定義了fetchState結(jié)構,將 map 和鎖組合在一起,并定義了一個方法來初始化它。
爬蟲程序的開始是一樣的,檢查我們是否已經(jīng)獲取了URL,但這次使用sync.Mutex來鎖定 map,如前所述。然后,對于頁面上發(fā)現(xiàn)的每個URL,我們在一個新的goroutine中啟動相同的函數(shù)。在啟動之前,我們將WaitGroup的計數(shù)器增加1,done.Wait()在退出之前等待所有的抓取工作完成。
func ConcurrentMutex(url string, fetcher Fetcher, f *fetchState) { f.mu.Lock() already := f.fetched[url] f.fetched[url] = true f.mu.Unlock() if already { return } urls, err := fetcher.Fetch(url) if err != nil { return } var done sync.WaitGroup for _, u := range urls { done.Add(1) go func(u string) { defer done.Done() ConcurrentMutex(u, fetcher, f) }(u) } done.Wait() return } type fetchState struct { mu sync.Mutex fetched map[string]bool } func makeState() *fetchState { f := &fetchState{} f.fetched = make(map[string]bool) return f } func main() { ConcurrentMutex(<page>, fetcher, makeState()) }
注意:
[1] done.Done()的調(diào)用被推遲了,以防我們在其中一個調(diào)用中出現(xiàn)錯誤,在這種情況下,我們?nèi)匀灰f減WaitGroup的計數(shù)器。
[2] 這段代碼的一個問題是,我們沒有限制線程的數(shù)量。但值得一提的是,goroutines比其他語言的線程更輕量級,并且由Go運行時管理,系統(tǒng)調(diào)用更少。
[3] 我們把字符串u傳給立即函數(shù),以便制作一個URL的副本,然后才把它送到goroutine,因為變量u在外層for循環(huán)中發(fā)生了變化。要理解這樣做的必要性,一個更簡單的例子是,在沒有WaitGroup的情況下。
func checkThisOut() { s := "abc" sec := time.Second go func() {time.Sleep(sec); fmt.Printf("s = %v\n", s)}() go func(u string) {time.Sleep(sec); fmt.Printf("u = %v\n", u)}(s) s = "def" time.Sleep(2 * sec) } // this prints out: u = abc, s = def
[4] 我們可以運行內(nèi)置的數(shù)據(jù)競賽檢測器,通過運行go run -race .來幫助檢測競賽條件。它在這個例子中非常有效。
下一個并發(fā)版本在線程之間完全不共享內(nèi)存!嗯,這并不準確。我們只是不會自己同步訪問共享數(shù)據(jù)。相反,我們使用一個通道在goroutine之間進行通信。
在這個最后的版本中,我們有一個主函數(shù)在主線程上運行。只有這個函數(shù)能看到 map 并從通道中讀取。channel ,像 map 一樣,也是引用類型。所以這里只有一個通道。
在啟動時,我們將第一個URL寫到通道上。這是在一個goroutine中完成的,因為向一個沒有緩沖的通道的寫入會導致goroutine暫停,直到該值被另一個goroutine讀取。
我們在一個for循環(huán)中從通道中讀取URL的列表(從一個沒有緩沖的通道中讀取也會阻塞)。然后,我們以與之前的實現(xiàn)類似的方式瀏覽該列表。通過使用一個計數(shù)器,一旦沒有更多的工作者,這個循環(huán)就會中斷。
工作者獲取URL的列表,將它們傳遞給通道。如果出現(xiàn)錯誤,會傳遞一個空列表,這樣從通道讀取的for循環(huán)最終會退出(計數(shù)器的設置方式是,我們等待從每個goroutine讀取一個值)。
func ConcurrentChannel(url string, fetcher Fetcher) { ch := make(chan []string) go func() { ch <- []string{url} }() master(ch, fetcher) } func master(ch chan []string, fetcher Fetcher) { n := 1 fetched := make(map[string]bool) for urls := range ch { for _, u := range urls { if fetched[u] == false { fetched[u] = true n += 1 go worker(u, ch, fetcher) } } n -= 1 if n == 0 { break } } } func worker(url string, ch chan []string, fetcher Fetcher) { urls, err := fetcher.Fetch(url) if err != nil { ch <- []string{} } else { ch <- urls } }
到此這篇關于go語言實現(xiàn)并發(fā)網(wǎng)絡爬蟲的示例代碼的文章就介紹到這了,更多相關go語言并發(fā)網(wǎng)絡爬蟲內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章

GoFrame框架gredis優(yōu)雅的取值和類型轉(zhuǎn)換