pandas apply 函數(shù) 實(shí)現(xiàn)多進(jìn)程的示例講解
前言: 在進(jìn)行數(shù)據(jù)處理的時(shí)候,我們經(jīng)常會(huì)用到 pandas 。但是 pandas 本身好像并沒有提供多進(jìn)程的機(jī)制。本文將介紹如何來(lái)自己實(shí)現(xiàn) pandas (apply 函數(shù))的多進(jìn)程執(zhí)行。其中,我們主要借助 joblib 庫(kù),這個(gè)庫(kù)為python 提供了一個(gè)非常簡(jiǎn)潔方便的多進(jìn)程實(shí)現(xiàn)方法。
所以,本文將按照下面的安排展開,前面可能比較啰嗦,若只是想知道怎么用可直接看第三部分:
- 首先簡(jiǎn)單介紹 pandas 中的分組聚合操作 groupby。
- 然后簡(jiǎn)單介紹 joblib 的使用方法。
- 最后,通過一個(gè)去停用詞的實(shí)驗(yàn)詳細(xì)介紹如何實(shí)現(xiàn) pandas 中 apply 函數(shù)多進(jìn)程執(zhí)行。
注意:本文說(shuō)的都是多進(jìn)程而不是多線程。
1. DataFrame.groupby 分組聚合操作
# groupby 操作 df1 = pd.DataFrame({'a':[1,2,1,2,1,2], 'b':[3,3,3,4,4,4], 'data':[12,13,11,8,10,3]}) df1
按照某列分組
grouped = df1.groupby('b') # 按照 'b' 這列分組了,name 為 'b' 的 key 值,group 為對(duì)應(yīng)的df_group for name, group in grouped: print name, '->' print group
3 -> a b data 0 1 3 12 1 2 3 13 2 1 3 11 4 -> a b data 3 2 4 8 4 1 4 10 5 2 4 3
按照多列分組
grouped = df1.groupby(['a','b']) # 按照 'b' 這列分組了,name 為 'b' 的 key 值,group 為對(duì)應(yīng)的df_group for name, group in grouped: print name, '->' print group
(1, 3) -> a b data 0 1 3 12 2 1 3 11 (1, 4) -> a b data 4 1 4 10 (2, 3) -> a b data 1 2 3 13 (2, 4) -> a b data 3 2 4 8 5 2 4 3
若 df.index 為[1,2,3…]這樣一個(gè) list, 那么按照 df.index分組,其實(shí)就是每組就是一行,在后面去停用詞實(shí)驗(yàn)中,我們就用這個(gè)方法把 df_all 處理成每行為一個(gè)元素的 list, 再用多進(jìn)程處理這個(gè) list。
grouped = df1.groupby(df1.index) # 按照 index 分組,其實(shí)每行就是一個(gè)組了 print len(grouped), type(grouped) for name, group in grouped: print name, '->' print group
6 <class 'pandas.core.groupby.DataFrameGroupBy'> 0 -> a b data 0 1 3 12 1 -> a b data 1 2 3 13 2 -> a b data 2 1 3 11 3 -> a b data 3 2 4 8 4 -> a b data 4 1 4 10 5 -> a b data 5 2 4 3
2. joblib 用法
refer: https://pypi.python.org/pypi/joblib
# 1. Embarrassingly parallel helper: to make it easy to write readable parallel code and debug it quickly: from joblib import Parallel, delayed from math import sqrt
處理小任務(wù)的時(shí)候,多進(jìn)程并沒有體現(xiàn)出優(yōu)勢(shì)。
%time result1 = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10000)) %time result2 = Parallel(n_jobs=8)(delayed(sqrt)(i**2) for i in range(10000))
CPU times: user 316 ms, sys: 0 ns, total: 316 ms Wall time: 309 ms CPU times: user 692 ms, sys: 384 ms, total: 1.08 s Wall time: 1.03 s
當(dāng)需要處理大量數(shù)據(jù)的時(shí)候,并行處理就體現(xiàn)出了它的優(yōu)勢(shì)
%time result = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(1000000))
CPU times: user 3min 43s, sys: 5.66 s, total: 3min 49s Wall time: 3min 33s
%time result = Parallel(n_jobs=8)(delayed(sqrt)(i**2) for i in range(1000000))
CPU times: user 50.9 s, sys: 12.6 s, total: 1min 3s Wall time: 52 s
3. apply 函數(shù)的多進(jìn)程執(zhí)行(去停用詞)
多進(jìn)程的實(shí)現(xiàn)主要參考了 stack overflow 的解答: Parallelize apply after pandas groupby
上圖中,我們要把 AbstractText 去停用詞, 處理成 AbstractText1 那樣。首先,導(dǎo)入停用詞表。
# 讀入所有停用詞 with open('stopwords.txt', 'rb') as inp: lines = inp.read() stopwords = re.findall('"(.*?)"', lines) print len(stopwords) print stopwords[:10]
692 ['a', "a's", 'able', 'about', 'above', 'according', 'accordingly', 'across', 'actually', 'after']
# 對(duì) AbstractText 去停用詞 # 方法一:暴力法,對(duì)每個(gè)詞進(jìn)行判斷 def remove_stopwords1(text): words = text.split(' ') new_words = list() for word in words: if word not in stopwords: new_words.append(word) return new_words # 方法二:先構(gòu)建停用詞的映射 for word in stopwords: if word in words_count.index: words_count[word] = -1 def remove_stopwords2(text): words = text.split(' ') new_words = list() for word in words: if words_count[word] != -1: new_words.append(word) return new_words %time df_all['AbstractText1'] = df_all['AbstractText'].apply(remove_stopwords1) %time df_all['AbstractText2'] = df_all['AbstractText'].apply(remove_stopwords2)
CPU times: user 8min 56s, sys: 2.72 s, total: 8min 59s Wall time: 8min 48s CPU times: user 1min 2s, sys: 4.12 s, total: 1min 6s Wall time: 1min 2s
上面我嘗試了兩種不同的方法來(lái)去停用詞:
方法一中使用了比較粗暴的方法:首先用一個(gè) list 存儲(chǔ)所有的 stopwords,然后對(duì)于每一個(gè) text 中的每一個(gè) word,我們判斷它是否出現(xiàn)在 stopwords 的list中(復(fù)雜度 O(n)O(n) ), 若為 stopword 則去掉。
方法二中我用 一個(gè)Series(words_count) 對(duì)所有的詞進(jìn)行映射,如果該詞為 stopword, 則把它的值修改為 -1。這樣,對(duì)于 text 中的每個(gè)詞 ww, 我們只需要判斷它的值是否為 -1 即可判定是否為 stopword (復(fù)雜度 O(1)O(1))。
所以,在這兩個(gè)方法中,我們都是采用單進(jìn)程來(lái)執(zhí)行,方法二的速度(1min 2s)明顯高于方法一(8min 48s)。
from joblib import Parallel, delayed import multiprocessing # 方法三:對(duì)方法一使用多進(jìn)程 def tmp_func(df): df['AbstractText3'] = df['AbstractText'].apply(remove_stopwords1) return df def apply_parallel(df_grouped, func): """利用 Parallel 和 delayed 函數(shù)實(shí)現(xiàn)并行運(yùn)算""" results = Parallel(n_jobs=-1)(delayed(func)(group) for name, group in df_grouped) return pd.concat(results) if __name__ == '__main__': time0 = time.time() df_grouped = df_all.groupby(df_all.index) df_all =applyParallel(df_grouped, tmp_func) print 'time costed {0:.2f}'.format(time.time() - time0)
time costed 150.81
# 方法四:對(duì)方法二使用多進(jìn)程 def tmp_func(df): df['AbstractText3'] = df['AbstractText'].apply(remove_stopwords2) return df def apply_parallel(df_grouped, func): """利用 Parallel 和 delayed 函數(shù)實(shí)現(xiàn)并行運(yùn)算""" results = Parallel(n_jobs=-1)(delayed(func)(group) for name, group in df_grouped) return pd.concat(results) if __name__ == '__main__': time0 = time.time() df_grouped = df_all.groupby(df_all.index) df_all =applyParallel(df_grouped, tmp_func) print 'time costed {0:.2f}'.format(time.time() - time0)
time costed 123.80
上面方法三和方法四分別對(duì)應(yīng)于前面方法一和方法二,但是都是用了多進(jìn)程操作。結(jié)果是方法一使用多進(jìn)程以后,速度一下子提高了好幾倍,但是方法二的多進(jìn)程速度不升反降。這是不是有問題?的確,但是首先可以肯定,我們的代碼沒有問題。下圖顯示了我用 top 命令看到各個(gè)方法的進(jìn)程執(zhí)行情況??梢钥闯?,在方法三和方法四中,的的確確是 12 個(gè)CPU核都跑起來(lái)了。只是在方法四中,每個(gè)核占用的比例都是比較低的。
fig1. 單進(jìn)程 cpu 使用情況
fig2. 方法三 cpu 使用情況
fig3. 方法四 cpu 使用情況
一個(gè)直觀的解釋就是,當(dāng)我們開啟多進(jìn)程的時(shí)候,進(jìn)程開啟和最后結(jié)果合并,進(jìn)程結(jié)束,這些操作都是要消耗時(shí)間的。如果我們執(zhí)行的任務(wù)比較小,那么進(jìn)程開啟等操作所消耗的時(shí)間可能就要比執(zhí)行任務(wù)本身消耗的時(shí)間還多。這樣就會(huì)出現(xiàn)多進(jìn)程的方法四比單進(jìn)程的方法二耗時(shí)更多的情況了。
所以總結(jié)來(lái)說(shuō),在處理小任務(wù)的時(shí)候沒有必要開啟多進(jìn)程。借助joblib (Parallel, delayed 兩個(gè)函數(shù)) ,我們能夠很方便地實(shí)現(xiàn) python 多進(jìn)程。
以上這篇pandas apply 函數(shù) 實(shí)現(xiàn)多進(jìn)程的示例講解就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Pytorch實(shí)現(xiàn)Fashion-mnist分類任務(wù)全過程
這篇文章主要介紹了Pytorch實(shí)現(xiàn)Fashion-mnist分類任務(wù)全過程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-12-12Django配置Bootstrap, js實(shí)現(xiàn)過程詳解
這篇文章主要介紹了Django配置Bootstrap, js實(shí)現(xiàn)過程詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-10-10python神經(jīng)網(wǎng)絡(luò)tensorflow利用訓(xùn)練好的模型進(jìn)行預(yù)測(cè)
這篇文章主要為大家介紹了python神經(jīng)網(wǎng)絡(luò)tensorflow利用訓(xùn)練好的模型進(jìn)行預(yù)測(cè),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05python如何繪制極坐標(biāo)輪廓圖contourf
這篇文章主要介紹了python如何繪制極坐標(biāo)輪廓圖contourf問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-08-08python環(huán)境功能強(qiáng)大的pip-audit安全漏洞掃描工具
這篇文章主要為大家介紹了python環(huán)境中功能強(qiáng)大的pip-audit安全漏洞掃描工具的功能介紹及安裝使用說(shuō)明,有需要的朋友可以借鑒參考下,希望能夠有所幫助2022-02-02Python中分支語(yǔ)句與循環(huán)語(yǔ)句實(shí)例詳解
這篇文章主要給大家介紹了關(guān)于Python中分支語(yǔ)句與循環(huán)語(yǔ)句的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用python具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2018-09-09Python 實(shí)現(xiàn)try重新執(zhí)行
今天小編就為大家分享一篇Python 實(shí)現(xiàn)try重新執(zhí)行,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來(lái)看看吧2019-12-12