Scrapy的Pipeline之處理CPU密集型或阻塞型操作詳解
Pipeline處理CPU密集型或阻塞型操作
Twisted框架的reactor適合于處理短的、非阻塞的操作。但是如果要處理一些復(fù)雜的、或者包含阻塞的操作又該怎么辦呢?Twisted提供了線程池來(lái)在其他的線程而不是主線程(Twisted的reactor線程)中執(zhí)行慢的操作——使用reactor.callInThread() API。這就意味著reactor在執(zhí)行計(jì)算時(shí)還能保持運(yùn)行并對(duì)事件做出反應(yīng)。一定要記住線程池中的處理不是線程安全的。這就意味著當(dāng)你使用了全局的狀態(tài)之后,還要面臨所有那些傳統(tǒng)的多線程編程的同步問(wèn)題。下面是一個(gè)簡(jiǎn)單的例子:
class UsingBlocking(object): @defer.inlineCallbacks def process_item(self, item, spider): price = item["price"][0] out = defer.Deferred() reactor.callInThread(self._do_calculation, price, out) item["price"][0] = yield out defer.returnValue(item) def _do_calculation(self, price, out): new_price = price + 1 time.sleep(0.10) reactor.callFromThread(out.callback, new_price)
在上面的Pipeline中,對(duì)于每個(gè)Item,我們提取出它的price字段,想要在_do_caculation()方法中對(duì)它進(jìn)行處理。這個(gè)方法使用了time.sleep(),一個(gè)阻塞的操作。我們調(diào)用reactor.callInThread()方法使它運(yùn)行在另一個(gè)線程中,該方法的第一個(gè)參數(shù)是想要調(diào)用的函數(shù),后面的參數(shù)則會(huì)全部傳遞給被調(diào)用的函數(shù)作為參數(shù)。在這里我們給被調(diào)用的函數(shù)傳遞了price,還有一個(gè)創(chuàng)建的Deferred對(duì)象out。當(dāng)_do_caculation()函數(shù)完成計(jì)算后,我們會(huì)使用out的回調(diào)函數(shù)來(lái)返回這個(gè)值。接下來(lái),yield這個(gè) Deferred對(duì)象并為price設(shè)置一個(gè)新的值,最后返回Item。
在_do_caculation()函數(shù)中我們把price加一,然后休眠了100ms。其實(shí)這個(gè)時(shí)間是很長(zhǎng)的,如果在reactor的線程中調(diào)用這個(gè)函數(shù),那就意味著我們每秒只能處理不超過(guò)10個(gè)頁(yè)面。不過(guò)如果把它放在另一個(gè)線程中來(lái)調(diào)用就不會(huì)出現(xiàn)這種問(wèn)題了。這些計(jì)算任務(wù)會(huì)在線程池中排隊(duì),等待某個(gè)線程處于可用狀態(tài),然后這個(gè)線程就會(huì)執(zhí)行這個(gè)任務(wù),休眠100ms。最后一步是激活out的回調(diào)函數(shù)。通常情況下,我們可以這樣來(lái)激活:out.callback(new_price),但是既然現(xiàn)在我們處于另外一個(gè)線程中,這樣做就不安全了。如果我們執(zhí)意這樣做了,這個(gè)Deferred對(duì)象的代碼,也就是Scrapy的功能就會(huì)在別的線程中執(zhí)行,這樣會(huì)導(dǎo)致數(shù)據(jù)被損壞。所以我們調(diào)用了reactor.callFromThread()函數(shù),同樣的,它也是以一個(gè)函數(shù)作為參數(shù),并把額外的參數(shù)直接傳遞給被調(diào)用的函數(shù)。這個(gè)函數(shù)會(huì)在主線程中排隊(duì)并等待被調(diào)用,它反過(guò)來(lái)解鎖了process_item()方法中的yield語(yǔ)句,并恢復(fù)Scrapy對(duì)這個(gè)Item的操作。
如果我們的pipeline中含有全局狀態(tài)會(huì)怎么樣呢?比如,計(jì)數(shù)器或者平均值等,我們需要在_do_caculation()函數(shù)中使用的。例如有以下兩個(gè)變量,beta和delta:
class UsingBlocking(object): def __init__(self): self.beta, self.delta = 0, 0 ... def _do_calculation(self, price, out): self.beta += 1 time.sleep(0.001) self.delta += 1 new_price = price + self.beta - self.delta + 1 assert abs(new_price-price-1) < 0.01 time.sleep(0.10)...
上面的代碼有一些問(wèn)題,并且在運(yùn)行的時(shí)候會(huì)給出assertion錯(cuò)誤。這是因?yàn)?,如果一個(gè)線程在self.beta += 1和self.delta += 1語(yǔ)句之間切換的話,另一個(gè)線程就會(huì)恢復(fù)執(zhí)行并使用beta和delta的值來(lái)計(jì)算price,這里線程會(huì)發(fā)現(xiàn)這兩個(gè)值處于不一致的狀態(tài)(beta比delta大),這樣,錯(cuò)誤的產(chǎn)生了。中間短的sleep會(huì)讓線程切換更可能發(fā)生,不過(guò)即使沒(méi)有它,同樣也會(huì)出現(xiàn)競(jìng)態(tài)條件。為了阻止競(jìng)態(tài)條件的發(fā)生,我們必須使用鎖,例如Python的threading.RLock()鎖。使用了這個(gè)遞歸鎖,就能確保兩個(gè)線程不會(huì)同時(shí)執(zhí)行鎖保護(hù)的臨界區(qū)的代碼:
class UsingBlocking(object): def __init__(self): ... self.lock = threading.RLock() ... def _do_calculation(self, price, out): with self.lock: self.beta += 1 ... new_price = price + self.beta - self.delta + 1 assert abs(new_price-price-1) < 0.01 ...
現(xiàn)在的代碼就沒(méi)問(wèn)題了,要注意的是,我們不需要保護(hù)整個(gè)代碼,只需要能夠覆蓋全局狀態(tài)的使用就行了。
在ITEM_PIPELINES中加上:
ITEM_PIPELINES = { ... 'properties.pipelines.computation.UsingBlocking': 500, }
運(yùn)行一下會(huì)發(fā)現(xiàn),時(shí)延由于100ms的休眠的緣故變調(diào)了,不過(guò)吞吐量還是保持不變,大約每秒25個(gè)。
到此這篇關(guān)于Scrapy的Pipeline之處理CPU密集型或阻塞型操作詳解的文章就介紹到這了,更多相關(guān)Pipeline處理CPU密集型或阻塞型操作內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
windows系統(tǒng)上通過(guò)whl文件安裝triton模塊的簡(jiǎn)單步驟
這篇文章主要介紹了在Windows系統(tǒng)中通過(guò).whl文件安裝Triton的步驟,包括確認(rèn)系統(tǒng)環(huán)境、下載合適的.whl文件、使用pip安裝、驗(yàn)證安裝、使用Triton以及解決潛在問(wèn)題,需要的朋友可以參考下2025-01-01python pyautogui手動(dòng)活動(dòng)(模擬鼠標(biāo)鍵盤)自動(dòng)化庫(kù)使用
這篇文章主要為大家介紹了python pyautogui手動(dòng)活動(dòng)(模擬鼠標(biāo)鍵盤)自動(dòng)化庫(kù)使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01Python 將字符串轉(zhuǎn)換為列表的7種方法匯總
這篇文章主要介紹了Python 將字符串轉(zhuǎn)換為列表的7種方法匯總,在本文中,我們將嘗試將給定的字符串轉(zhuǎn)換為列表,其中根據(jù)用戶的選擇,遇到空格或任何其他特殊字符,為此,我們?cè)趕tring中使用split()方法,需要的朋友可以參考下2023-11-11這可能是最好玩的python GUI入門實(shí)例(推薦)
這篇文章主要介紹了這可能是最好玩的python GUI入門實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-07-07淺談Python 字符串格式化輸出(format/printf)
下面小編就為大家?guī)?lái)一篇淺談Python 字符串格式化輸出(format/printf)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-07-07python中內(nèi)置庫(kù)csv的使用及說(shuō)明
這篇文章主要介紹了python中內(nèi)置庫(kù)csv的使用及說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-11-11