Python實(shí)現(xiàn)數(shù)據(jù)庫(kù)并行讀取和寫入實(shí)例
這篇主要記錄一下如何實(shí)現(xiàn)對(duì)數(shù)據(jù)庫(kù)的并行運(yùn)算來(lái)節(jié)省代碼運(yùn)行時(shí)間。語(yǔ)言是Python,其他語(yǔ)言思路一樣。
前言
一共23w條數(shù)據(jù),是之前通過自然語(yǔ)言分析處理過的數(shù)據(jù),附一張截圖:
要實(shí)現(xiàn)對(duì)news主體的讀取,并且找到其中含有的股票名稱,只要發(fā)現(xiàn),就將這支股票和對(duì)應(yīng)的日期、score寫入數(shù)據(jù)庫(kù)。
顯然,幾十萬(wàn)條數(shù)據(jù)要是一條條讀寫,然后在本機(jī)上操作,耗時(shí)太久,可行性極低。所以,如何有效并行的讀取內(nèi)容,并且進(jìn)行操作,最后再寫入數(shù)據(jù)庫(kù)呢?
并行讀取和寫入
并行讀取:創(chuàng)建N*max_process個(gè)進(jìn)程,對(duì)數(shù)據(jù)庫(kù)進(jìn)行讀取。讀取的時(shí)候應(yīng)該注意:
- 每個(gè)進(jìn)程需要分配不同的connection和對(duì)應(yīng)的cursor,否則數(shù)據(jù)庫(kù)會(huì)報(bào)錯(cuò)。
- 數(shù)據(jù)庫(kù)必須能承受相應(yīng)的高并發(fā)訪問(可以手動(dòng)更改)
實(shí)現(xiàn)的時(shí)候,如果不在進(jìn)程里面創(chuàng)建新的connection,就會(huì)發(fā)生沖突,每個(gè)進(jìn)程拿到權(quán)限后,會(huì)被下個(gè)進(jìn)程釋放,所以匯報(bào)出來(lái)NoneType Error的錯(cuò)誤。
- 并行寫入:在對(duì)數(shù)據(jù)庫(kù)進(jìn)行更改的時(shí)候,不可以多進(jìn)程更改。所以,我們需要根據(jù)已有的表,創(chuàng)建max_process-1個(gè)同樣結(jié)構(gòu)的表用來(lái)寫入。表的命名規(guī)則可以直接在原來(lái)基礎(chǔ)上加上1,2,3...數(shù)字可以通過對(duì)max_process取余得到。
此時(shí),對(duì)應(yīng)進(jìn)程里面先后出現(xiàn)讀入的conn(保存消息后關(guān)閉)和寫入的conn。每個(gè)進(jìn)程對(duì)應(yīng)的表的index就是 主循環(huán)中的num對(duì)max_process取余(100->4,101->5),這樣每個(gè)進(jìn)程只對(duì)一個(gè)表進(jìn)行操作了。
部分代碼實(shí)現(xiàn)
max_process = 16 #最大進(jìn)程數(shù) def read_SQL_write(r_host,r_port,r_user,r_passwd,r_db,r_charset,w_host,w_port,w_user,w_passwd,w_db,w_charset,cmd,index=None): #得到tem字典保存著信息 try: conn = pymysql.Connect(host=r_host, port=r_port, user=r_user, passwd =r_passwd, db =r_db, charset =r_charset) cursor = conn.cursor() cursor.execute(cmd) except Exception as e: error = "[-][-]%d fail to connect SQL for reading" % index log_error('error.log',error) return else: tem = cursor.fetchone() print('[+][+]%d succeed to connect SQL for reading' % index) finally: cursor.close() conn.close() try: conn = pymysql.Connect(host=w_host, port=w_port, user=w_user, passwd =w_passwd, db =w_db, charset =w_charset) cursor = conn.cursor() cursor.execute(cmd) except Exception as e: error = "[-][-]%d fail to connect SQL for writing" % index log_error('error.log',error) return else: print('[+][+]%d succeed to connect SQL for writing' % index) r_dict = dict() r_dict['id'] = tem[0] r_dict['content_id'] = tem[1] r_dict['pub_date'] = tem[2] r_dict['title'] = cht_to_chs(tem[3]) r_dict['title_score'] =tem[4] r_dict['news_content'] = cht_to_chs(tem[5]) r_dict['content_score'] = tem[6] for key in stock_dict.keys(): #能找到對(duì)應(yīng)的股票 if stock_dict[key][1] and ( r_dict['title'].find(stock_dict[key][1])!=-1 or r_dict['news_content'].find(stock_dict[key][1])!=-1 ): w_dict=dict() w_dict['code'] = key w_dict['english_name'] = stock_dict[key][0] w_dict['cn_name'] = stock_dict[key][1] #得到分?jǐn)?shù) if r_dict['title_score']: w_dict['score']=r_dict['title_score'] else: w_dict['score']=r_dict['content_score'] #開始寫入 try: global max_process cmd = "INSERT INTO dyx_stock_score%d VALUES ('%s', '%s' , %d , '%s' , '%s' , %.2f );" % \ (index%max_process ,r_dict['content_id'] ,r_dict['pub_date'] ,w_dict['code'] ,w_dict['english_name'] ,w_dict['cn_name'] ,w_dict['score']) cursor.execute(cmd) conn.commit() except Exception as e: error = " [-]%d fail to write to SQL" % index cursor.rollback() log_error('error.log',error) else: print(" [+]%d succeed to write to SQL" % index) cursor.close() conn.close() def main(): num = 238143#數(shù)據(jù)庫(kù)查詢拿到的總數(shù) p = None for index in range(1,num+1): if index%max_process==1: if p: p.close() p.join() p = multiprocessing.Pool(max_process) r_cmd = ('select id,content_id,pub_date,title,title_score,news_content,content_score from dyx_emotion_analysis where id = %d;' % (index)) p.apply_async(func = read_SQL_write,args=(r_host,r_port,r_user,r_passwd,r_db,r_charset,w_host,w_port,w_user,w_passwd,w_db,w_charset,r_cmd,index,)) if p: p.close() p.join()
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- python實(shí)現(xiàn)數(shù)據(jù)寫入excel表格
- Python寫入數(shù)據(jù)到MP3文件中的方法
- Python基于csv模塊實(shí)現(xiàn)讀取與寫入csv數(shù)據(jù)的方法
- python讀取excel指定列數(shù)據(jù)并寫入到新的excel方法
- Python3實(shí)現(xiàn)將本地JSON大數(shù)據(jù)文件寫入MySQL數(shù)據(jù)庫(kù)的方法
- Python實(shí)現(xiàn)自定義順序、排列寫入數(shù)據(jù)到Excel的方法
- Python實(shí)現(xiàn)將數(shù)據(jù)框數(shù)據(jù)寫入mongodb及mysql數(shù)據(jù)庫(kù)的方法
- Python實(shí)現(xiàn)讀寫sqlite3數(shù)據(jù)庫(kù)并將統(tǒng)計(jì)數(shù)據(jù)寫入Excel的方法示例
- 利用python對(duì)Excel中的特定數(shù)據(jù)提取并寫入新表的方法
- Python把csv數(shù)據(jù)寫入list和字典類型的變量腳本方法
- Python實(shí)現(xiàn)將數(shù)據(jù)寫入netCDF4中的方法示例
相關(guān)文章
Django錯(cuò)誤:TypeError at / ''bool'' object is not callable解決
這篇文章主要介紹了Django 錯(cuò)誤:TypeError at / 'bool' object is not callable解決,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-08-08200 行python 代碼實(shí)現(xiàn) 2048 游戲
2048這個(gè)小游戲大家都不陌生,應(yīng)該都玩過,之前已經(jīng)在網(wǎng)上見過各個(gè)版本的2048實(shí)現(xiàn)了,有JAVA、HTML5等,今天我就給大家來(lái)一個(gè)我200 行python 代碼實(shí)現(xiàn)的2048 游戲,感興趣的朋友一起看看吧2018-01-01python3.x編碼解碼unicode字符串的實(shí)現(xiàn)示例
ASCII文本編碼是一種Unicode,存儲(chǔ)為表示字符的字節(jié)值的一個(gè)序列,本文主要介紹了python3.x編碼解碼unicode字符串的實(shí)現(xiàn)示例,具有一定的參考價(jià)值,感興趣的可以了解一下2024-01-01pandas實(shí)現(xiàn)將日期轉(zhuǎn)換成timestamp
今天小編就為大家分享一篇pandas實(shí)現(xiàn)將日期轉(zhuǎn)換成timestamp,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來(lái)看看吧2019-12-12Pytorch可視化(顯示圖片)及格式轉(zhuǎn)換問題
這篇文章主要介紹了Pytorch可視化(顯示圖片)及格式轉(zhuǎn)換問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-12-12python爬蟲指南之xpath實(shí)例解析(附實(shí)戰(zhàn))
在進(jìn)行網(wǎng)頁(yè)抓取的時(shí)候,分析定位html節(jié)點(diǎn)是獲取抓取信息的關(guān)鍵,目前我用的是lxml模塊,下面這篇文章主要給大家介紹了關(guān)于python爬蟲指南之xpath實(shí)例解析的相關(guān)資料,需要的朋友可以參考下2022-01-01