Python大批量寫入數(shù)據(jù)(百萬級別)的方法
背景
現(xiàn)有一個百萬行數(shù)據(jù)的csv格式文件,需要在兩分鐘之內(nèi)存入數(shù)據(jù)庫。
方案
方案一:多線程+協(xié)程+異步MySql方案二:多線程+MySql批量插入
代碼
1,先通過pandas讀取所有csv數(shù)據(jù)存入列表。
2,設(shè)置N個線程,將一百萬數(shù)據(jù)均分為N份,以start,end傳遞給線程以切片的方法讀取區(qū)間數(shù)據(jù)(建議為16個線程)
3,方案二 線程內(nèi)以 executemany 方法批量插入所有數(shù)據(jù)。
4,方案一 線程內(nèi)使用異步事件循環(huán)遍歷所有數(shù)據(jù)異步插入。
5,方案一純屬沒事找事型。
方案二
import threading import pandas as pd import asyncio import time import aiomysql import pymysql data=[] error_data=[] def run(start,end): global data global error_data print("start"+threading.current_thread().name) print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) mysdb = getDb("*", *, "*", "*", "*") cursor = mysdb.cursor() sql = """insert into *_*_* values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""" cursor.executemany(sql,data[start:end]) mysdb.commit() mysdb.close() print("end" + threading.current_thread().name) print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) def csv_file_read_use_pd(csvFile): csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='\t') csv_result = csv_result.fillna(value="None") result = csv_result.values.tolist() return result class MyDataBase: def __init__(self,host=None,port=None,username=None,password=None,database=None): self.db = pymysql.connect(host=host,port=port,user=username,password=password,database=database) def close(self): self.db.close() def getDb(host,port,username,password,database): MyDb = MyDataBase(host, port, username, password,database) return MyDb.db def main(csvFile): global data #獲取全局對象 csv全量數(shù)據(jù) #讀取所有的數(shù)據(jù) 將所有數(shù)據(jù)均分成 thread_lens 份 分發(fā)給 thread_lens 個線程去執(zhí)行 thread_lens=20 csv_result=csv_file_read_use_pd(csvFile) day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) for item in csv_result: item.insert(0,day) data=csv_result thread_exe_count_list=[] #線程需要執(zhí)行的區(qū)間 csv_lens=len(csv_result) avg = csv_lens // thread_lens remainder=csv_lens % thread_lens # 0,27517 27517,55,034 nowIndex=0 for i in range(thread_lens): temp=[nowIndex,nowIndex+avg] nowIndex=nowIndex+avg thread_exe_count_list.append(temp) thread_exe_count_list[-1:][0][1]+=remainder #余數(shù)分給最后一個線程 # print(thread_exe_count_list) #th(thread_exe_count_list[0][0],thread_exe_count_list[0][1]) for i in range(thread_lens): sub_thread = threading.Thread(target=run,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],)) sub_thread.start() sub_thread.join() time.sleep(3) if __name__=="__main__": #csv_file_read_use_pd("分公司箱型箱量.csv") main("分公司箱型箱量.csv")
方案一
import threading import pandas as pd import asyncio import time import aiomysql data=[] error_data=[] async def async_basic(loop,start,end): global data global error_data print("start"+threading.current_thread().name) print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) conn = await aiomysql.connect( host="*", port=*, user="*", password="*", db="*", loop=loop ) day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) sql = """insert into **** values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""" async with conn.cursor() as cursor: for item in data[start:end]: params=[day] params.extend(item) try: x=await cursor.execute(sql,params) if x==0: error_data.append(item) print(threading.current_thread().name+" result "+str(x)) except Exception as e: print(e) error_data.append(item) time.sleep(10) pass await conn.close() #await conn.commit() #關(guān)閉連接池 # pool.close() # await pool.wait_closed() print("end" + threading.current_thread().name) print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) def csv_file_read_use_pd(csvFile): csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='\t') csv_result = csv_result.fillna(value="None") result = csv_result.values.tolist() return result def th(start,end): loop = asyncio.new_event_loop() loop.run_until_complete(async_basic(loop,start,end)) def main(csvFile): global data #獲取全局對象 csv全量數(shù)據(jù) #讀取所有的數(shù)據(jù) 將所有數(shù)據(jù)均分成 thread_lens 份 分發(fā)給 thread_lens 個線程去執(zhí)行 thread_lens=20 csv_result=csv_file_read_use_pd(csvFile) data=csv_result thread_exe_count_list=[] #線程需要執(zhí)行的區(qū)間 csv_lens=len(csv_result) avg = csv_lens // thread_lens remainder=csv_lens % thread_lens # 0,27517 27517,55,034 nowIndex=0 for i in range(thread_lens): temp=[nowIndex,nowIndex+avg] nowIndex=nowIndex+avg thread_exe_count_list.append(temp) thread_exe_count_list[-1:][0][1]+=remainder #余數(shù)分給最后一個線程 print(thread_exe_count_list) #th(thread_exe_count_list[0][0],thread_exe_count_list[0][1]) for i in range(thread_lens): sub_thread = threading.Thread(target=th,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],)) sub_thread.start() time.sleep(3) if __name__=="__main__": #csv_file_read_use_pd("分公司箱型箱量.csv") main("分公司箱型箱量.csv")
總結(jié)
到此這篇關(guān)于Python大批量寫入數(shù)據(jù)的文章就介紹到這了,更多相關(guān)Python大批量寫入數(shù)據(jù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python反射機制內(nèi)置函數(shù)及場景構(gòu)造詳解
這篇文章主要為大家介紹了python反射機制內(nèi)置函數(shù)及場景構(gòu)造示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-11-11Python網(wǎng)絡(luò)通訊之TCP協(xié)議實現(xiàn)服務(wù)器和客戶端實例
這篇文章主要介紹了Python網(wǎng)絡(luò)通訊之TCP協(xié)議實現(xiàn)服務(wù)器和客戶端實例, socket編程一種獨立于協(xié)議的網(wǎng)絡(luò)編程接口,應(yīng)用程序可以通過它發(fā)送或接收數(shù)據(jù),可對其進行像對文件一樣的打開、讀寫和關(guān)閉等操作,需要的朋友可以參考下2023-08-08python中?OpenCV和Pillow處理圖像操作及時間對比
這篇文章主要介紹了python中OpenCV和Pillow處理圖像操作及時間對比,文章圍繞主題展開詳細的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下2022-09-09Python使用Matplotlib進行圖案填充和邊緣顏色分離的三種方法
Matplotlib是Python中功能強大的繪圖庫,允許廣泛的自定義選項,一個常見的要求是分離出圖中的圖案填充和邊緣顏色,默認情況下,Matplotlib中的填充顏色與邊緣顏色相關(guān)聯(lián),但有一些方法可以獨立自定義這些顏色,本文將深入研究如何實現(xiàn)這一點的技術(shù)細節(jié),并提供分步說明和示例2025-01-01