Python詳解復雜CSV文件處理方法
項目簡介
鑒于項目保密的需要,不便透露太多項目的信息,因此,簡單介紹一下項目存在的難點:
- 海量數(shù)據:項目是對CSV文件中的數(shù)據進行處理,而特點是數(shù)據量大...真的大?。?!拿到的第一個CSV示例文件是110多萬行(小CASE),而第二個文件就到了4500萬行,等到第三個文件......好吧,一直沒見到第三個完整示例文件,因為太大了,據說是第二個示例文件的40多倍,大概二十億行......
- 業(yè)務邏輯復雜:項目是需要對CSV文件的每一行數(shù)據的各種組合可能性進行判斷,而判斷的業(yè)務邏輯較為復雜,如何在解決復雜邏輯的同時保證較高的處理效率是難點之一。
項目筆記與心得
1.分批處理與多進程及多線程加速
- 因為數(shù)據量太大,肯定是要分批對數(shù)據進行處理,否則,效率低不談,大概率也沒有足夠的內存能夠支撐,需要用到chunksize,此外,為了節(jié)約內存,以及提高處理效率,可以將文本類的數(shù)據存儲為“category”格式:
- 項目整體是計算密集型的任務,因此,需要用到多進程,充分利用CPU的多核性能;
- 多線程進行讀取與寫入,其中,寫入使用to_csv的增量寫入方法,mode參數(shù)設置為'a';
- 多進程與多線程開啟一般為死循環(huán),需要在合適的位置,放入結束循環(huán)的信號,以便處理完畢后退出多進程或多線程
"""鑒于項目保密需要,以下代碼僅為示例""" import time import pathlib as pl import pandas as pd from threading import Thread from multiprocessing import Queue, Process, cpu_count # 導入多線程Thread,多進程的隊列Queue,多進程Process,CPU核數(shù)cpu_count # 存放分段讀取的數(shù)據隊列,注:maxsize控制隊列的最大數(shù)量,避免一次性讀取到內存中的數(shù)據量太大 data_queue = Queue(maxsize=cpu_count() * 2) # 存放等待寫入磁盤的數(shù)據隊列 write_queue = Queue() def read_data(path: pl.Path, data_queue: Queue, size: int = 10000): """ 讀取數(shù)據放入隊列的方法 :return: """ data_obj = pd.read_csv(path, sep=',', header=0, chunksize=size, dtype='category') for idx, df in enumerate(data_obj): while data_queue.full(): # 如果隊列滿了,那就等待 time.sleep(1) data_queue.put((idx + 1, df)) data_queue.put((None, None)) # 放入結束信號 def write_data(out_path: pl.Path, write_queue: Queue): """ 將數(shù)據增量寫入CSV的方法 :return: """ while True: while write_queue.empty(): time.sleep(1) idx, df = write_queue.get() if df is None: return # 結束退出 df.to_csv(out_path, mode='a', header=None, index=False, encoding='ansi') # 輸出CSV def parse_data(data_queue: Queue, write_queue: Queue): """ 從隊列中取出數(shù)據,并加工的方法 :return: """ while True: while write_queue.empty(): time.sleep(1) idx, df = data_queue.get() if df is None: # 如果是空的結束信號,則結束退出進程, # 特別注意結束前把結束信號放回隊列,以便其他進程也能接收到結束信號!??! data_queue.put((idx, df)) return """處理數(shù)據的業(yè)務邏輯略過""" write_queue.put((idx, df)) # 將處理后的數(shù)據放入寫隊列 # 創(chuàng)建一個讀取數(shù)據的線程 read_pool = Thread(target=read_data, args=(read_data_queue, *args)) read_pool.start() # 開啟讀取線程 # 創(chuàng)建一個增量寫入CSV數(shù)據的線程 write_pool = Thread(target=write_data, args=(write_data_queue, *args)) write_pool.start() # 開啟寫進程 pools = [] # 存放解析進程的隊列 for i in range(cpu_count()): # 循環(huán)開啟多進程,不確定開多少個進程合適的情況下,那么按CPU的核數(shù)開比較合理 pool = Process(target=parse_data, args=(read_data_queue, write_data_queue, *args)) pool.start() # 啟動進程 pools.append(pool) # 加入隊列 for pool in pools: pool.join() # 等待所有解析進程完成 # 所有解析進程完成后,在寫隊列放入結束寫線程的信號 write_data_queue.put((None, None)) write_pool.join() # 等待寫線程結束 print('任務完成')
2.優(yōu)化算法提高效率
將類對象存入dataframe列
在嘗試了n種方案之后,最終使用了將類對象存到dataframe的列中,使用map方法,運行類方法,最后,將運行結果展開到多列中的方式。該方案本項目中取得了最佳的處理效率。
"""鑒于保密需要,以下代碼僅為示例""" class Obj: def __init__(self, ser: pd.Series): """ 初始化類對象 :param ser: 傳入series """ self.ser = ser # 行數(shù)據 self.attrs1 = [] # 屬性1 self.attrs2 = [] # 屬性2 self.attrs3 = [] # 屬性3 def __repr__(self): """ 自定義輸出 """ attrs1 = '_'.join([str(a) for a in self.attrs1]) attrs2 = '_'.join([str(a) for a in self.attrs2]) attrs3 = '_'.join([str(a) for a in self.attrs3]) return '_'.join([attrs1, attrs2, attrs3]) def run(self): """運行業(yè)務邏輯""" # 創(chuàng)建obj列,存入類對象 data['obj'] = data.apply(lambda x: Obj(x), axis=1) # 運行obj列中的類方法獲得判斷結果 data['obj'] = data['obj'].map(lambda x: x.run()) # 鏈式調用,1將類對象文本化->2拆分到多列->3刪除空列->4轉換為category格式 data[['col1', 'col2', 'col3', ...省略]] = data['obj'].map(str).str.split('_', expand=True).dropna(axis=1).astype('category') # 刪除obj列 data.drop(columns='obj', inplace=True)
減少計算次數(shù)以提高運行效率
在整個優(yōu)化過程中,對運行效率產生最大優(yōu)化效果的有兩項:
- 一是改變遍歷算法,采用直接對整行數(shù)據進行綜合判斷的方法,使原需要遍歷22個組合的計算與判斷大大減少
- 二是提前計算特征組合,制作成字典,后續(xù)直接查詢結果,而不再進行重復計算
使用numpy加速計算
numpy還是數(shù)據處理上的神器,使用numpy的方法,比自己實現(xiàn)的方法效率要高非常多,本項目中就用到了:bincount、argsort,argmax、flipud、in1d、all等,即提高了運行效率,又解決了邏輯判斷的問題:
"""numpy方法使用示例""" import numpy as np # 計算數(shù)字的個數(shù)組合bincount np.bincount([9, 2, 13, 12, 9, 10, 11]) # 輸出結果:array([0, 0, 1, 0, 0, 0, 0, 0, 0, 2, 1, 1, 1, 1], dtype=int64) # 取得個數(shù)最多的數(shù)字argmax np.argmax(np.bincount([9, 2, 13, 12, 9, 10, 11])) # 輸出結果: 9 # 將數(shù)字按照個數(shù)優(yōu)先,其次大小進行排序argsort np.argsort(np.bincount([9, 2, 13, 12, 9, 10, 11])) # 輸出結果:array([ 0, 1, 3, 4, 5, 6, 7, 8, 2, 10, 11, 12, 13, 9], dtype=int64) # 翻轉列表flipud np.flipud(np.argsort(np.bincount([9, 2, 13, 12, 9, 10, 11]))) # 輸出結果: array([ 9, 13, 12, 11, 10, 2, 8, 7, 6, 5, 4, 3, 1, 0], dtype=int64) # 查找相同值in1d np.in1d([2, 3, 4], [2, 9, 3]) # 輸出結果: array([ True, True, False]) 注:指2,3True,4False np.all(np.in1d([2, 3], [2, 9, 3])) # 輸出結果: array([ True, True]) # 是否全是all np.all(np.in1d([2, 3, 4], [2, 9, 3])) # 判斷組合1是否包含在組合2中 # 輸出結果: False np.all(np.in1d([2, 3], [2, 9, 3])) # 輸出結果: True
優(yōu)化前后的效率對比
總結
優(yōu)化算法是在這個項目上時間花費最多的工作(沒有之一)。4月12日接單,10天左右出了第1稿,雖能運行,但回頭看存在兩個問題:一是有bug需要修正,二是運行效率不高(4500萬行數(shù)據,執(zhí)行需要1小時21分鐘,如果只是在這個版本上debug需要增加判斷條件,效率只會更低);后20多天是在不斷的優(yōu)化算法的同時對bug進行修正,最后版本執(zhí)行相同數(shù)據只需要不足30分鐘,效率提高了一倍多。回顧來看,雖然調優(yōu)花費的時間多,但是每一個嘗試不論成功還是失敗都是一次寶貴的經驗積累。
到此這篇關于Python詳解復雜CSV文件處理方法的文章就介紹到這了,更多相關Python CSV文件處理內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Python關于print的操作(倒計時、轉圈顯示、進度條)
這篇文章主要介紹了Python關于print的操作(倒計時、轉圈顯示、進度條),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-05-05python3.5 + PyQt5 +Eric6 實現(xiàn)的一個計算器代碼
這篇文章主要介紹了python3.5 + PyQt5 +Eric6 實現(xiàn)的一個計算器代碼,在windows7 32位系統(tǒng)可以完美運行 計算器,有興趣的可以了解一下。2017-03-03Django調用百度AI接口實現(xiàn)人臉注冊登錄代碼實例
這篇文章主要介紹了Django調用百度AI接口實現(xiàn)人臉注冊登錄,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-04-04Python用于學習重要算法的模塊pygorithm實例淺析
這篇文章主要介紹了Python用于學習重要算法的模塊pygorithm,結合實例形式簡單分析了pygorithm模塊的功能、算法調用、源碼獲取、時間復雜度計算等相關操作技巧,需要的朋友可以參考下2018-08-08