Python高級編程之消息隊列(Queue)與進程池(Pool)實例詳解
本文實例講述了Python高級編程之消息隊列(Queue)與進程池(Pool)。分享給大家供大家參考,具體如下:
Queue消息隊列
1.創(chuàng)建
import multiprocessing queue = multiprocessing.Queue(隊列長度)
2.方法
方法 | 描述 |
---|---|
put | 變量名.put(數(shù)據(jù)),放入數(shù)據(jù)(如隊列已滿,則程序進入阻塞狀態(tài),等待隊列取出后再放入) |
put_nowait | 變量名.put_nowati(數(shù)據(jù)),放入數(shù)據(jù)(如隊列已滿,則不等待隊列信息取出后再放入,直接報錯) |
get | 變量名.get(數(shù)據(jù)),取出數(shù)據(jù)(如隊列為空,則程序進入阻塞狀態(tài),等待隊列防如數(shù)據(jù)后再取出) |
get_nowait | 變量名.get_nowait(數(shù)據(jù)),取出數(shù)據(jù)(如隊列為空,則不等待隊列放入信息后取出數(shù)據(jù),直接報錯),放入數(shù)據(jù)后立馬判斷是否為空有時為True,原因是放入值和判斷同時進行 |
qsize | 變量名.qsize(),消息數(shù)量 |
empty | 變量名.empty()(返回值為True或False),判斷是否為空 |
full | 變量名.full()(返回值為True或False),判斷是否為滿 |
3.進程通信
因為進程間不共享全局變量,所以使用Queue進行數(shù)據(jù)通信,可以在父進程中創(chuàng)建兩個字進程,一個往Queue里寫數(shù)據(jù),一個從Queue里取出數(shù)據(jù)。
例:
import multiprocessing import time def write_queue(queue): # 循環(huán)寫入數(shù)據(jù) for i in range(10): if queue.full(): print("隊列已滿!") break # 向隊列中放入消息 queue.put(i) print(i) time.sleep(0.5) def read_queue(queue): # 循環(huán)讀取隊列消息 while True: # 隊列為空,停止讀取 if queue.empty(): print("---隊列已空---") break # 讀取消息并輸出 result = queue.get() print(result) if __name__ == '__main__': # 創(chuàng)建消息隊列 queue = multiprocessing.Queue(3) # 創(chuàng)建子進程 p1 = multiprocessing.Process(target=write_queue, args=(queue,)) p1.start() # 等待p1寫數(shù)據(jù)進程執(zhí)行結束后,再往下執(zhí)行 p1.join() p1 = multiprocessing.Process(target=read_queue, args=(queue,)) p1.start()
執(zhí)行結果:
Pool進程池
初始化Pool時,可以指定一個最大進程數(shù),當有新的請求提交到Pool中時,如果池還沒有滿,那么就會創(chuàng)建一個新的進程用來執(zhí)行該請求;但如果池中的進程數(shù)已經達到指定的最大值,那么該請求就會等待,直到池中有進程結束,才會用之前的進程來執(zhí)行新的任務。
1.創(chuàng)建
import multiprocessing pool = multiprocessing.Pool(最大進程數(shù))
2.方法
方法 | 描述 |
---|---|
apply() | 以同步方式添加進程 |
apply_async() | 以異步方式添加進程 |
close() | 關閉Pool,使其不接受新任務(還可以使用) |
terminate() | 不管任務是否完成,立即終止 |
join() | 主進程阻塞,等待子進程的退出,必須在close和terminate后使用 |
3.進程池內通信
創(chuàng)建進程池內Queue消息隊列通信
import multiprocessing Queue:queue = multiprocessing.Manager().Queue()
例:
import multiprocessing import time
寫入數(shù)據(jù)的方法
def write_data(queue): # for循環(huán) 向消息隊列中寫入值 for i in range(5): # 添加消息 queue.put(i) print(i) time.sleep(0.2) print("隊列已滿~")
創(chuàng)建讀取數(shù)據(jù)的方法
def read_data(queue): # 循環(huán)讀取數(shù)據(jù) while True: # 判斷隊列是否為空 if queue.qsize() == 0: print("隊列為空~") break # 從隊列中讀取數(shù)據(jù) result = queue.get() print(result) if __name__ == '__main__': # 創(chuàng)建進程池 pool = multiprocessing.Pool(2) # 創(chuàng)建進程池隊列 queue = multiprocessing.Manager().Queue() # 在進程池中的進程間進行通信 # 使用線程池同步的方式,先寫后讀 # pool.apply(write_data, (queue, )) # pool.apply(read_data, (queue, )) # apply_async() 返回ApplyResult 對象 result = pool.apply_async(write_data, (queue, )) # ApplyResult對象的wait() 方法,表示后續(xù)進程必須等待當前進程執(zhí)行完再繼續(xù) result.wait() pool.apply_async(read_data, (queue, )) pool.close() # 異步后,主線程不再等待子進程執(zhí)行結束,再結束 # join() 后,表示主線程會等待子進程執(zhí)行結束后,再結束 pool.join()
運行結果:
4.案例(文件夾copy器)
代碼:
# 導入模塊 import os import multiprocessing # 拷貝文件函數(shù) def copy_dir(file_name, source_dir, desk_dir): # 要拷貝的文件路徑 source_path = source_dir+'/'+file_name # 目標路徑 desk_path = desk_dir+'/'+file_name # 獲取文件大小 file_size = os.path.getsize(source_path) # 記錄拷貝次數(shù) i = 0 # 以二進制度讀方式打開原文件 with open(source_path, "rb") as source_file: # 以二進制寫入方式創(chuàng)建并打開目標文件 with open(desk_path, "wb") as desk_file: # 循環(huán)寫入 while True: # 讀取1024字節(jié) file_data = source_file.read(1024) # 如果讀到的不為空,則將讀到的寫入目標文件 if file_data: desk_file.write(file_data) # 讀取次數(shù)+1 i += 1 # 拷貝百分比進度等于拷貝次數(shù)*1024*100/文件大小 n = i*102400/file_size if n >= 100: n = 100 print(file_name, "拷貝進度%.2f%%" % n) else: print(file_name, "拷貝成功") break if __name__ == '__main__': # 要拷貝的文件夾 source_dir = 'test' # 要拷貝到的路徑 desk_dir = 'C:/Users/Administrator/Desktop/'+source_dir # 存在文件夾則不創(chuàng)建 try: os.mkdir(desk_dir) except: print("目標文件夾已存在,未創(chuàng)建") # 獲取文件夾內文件目錄,存到列表里 file_list = os.listdir(source_dir) print(file_list) # 創(chuàng)建進程池,最多同時運行3個子進程 pool = multiprocessing.Pool(3) for file_name in file_list: # 異步方式添加到進程池內 pool.apply_async(copy_dir, args=(file_name, source_dir, desk_dir)) # 關閉進程池(停止添加,已添加的還可運行) pool.close() # 讓主進程阻塞,等待子進程結束 pool.join()
運行結果:
更多關于Python相關內容感興趣的讀者可查看本站專題:《Python進程與線程操作技巧總結》、《Python數(shù)據(jù)結構與算法教程》、《Python函數(shù)使用技巧總結》、《Python字符串操作技巧匯總》、《Python入門與進階經典教程》、《Python+MySQL數(shù)據(jù)庫程序設計入門教程》及《Python常見數(shù)據(jù)庫操作技巧匯總》
希望本文所述對大家Python程序設計有所幫助。
相關文章
Python使用urllib2模塊實現(xiàn)斷點續(xù)傳下載的方法
這篇文章主要介紹了Python使用urllib2模塊實現(xiàn)斷點續(xù)傳下載的方法,實例分析了urllib2模塊的使用及斷點續(xù)傳下載的實現(xiàn)技巧,需要的朋友可以參考下2015-06-06Python直接使用plot()函數(shù)畫圖的方法實例
Python非常簡單而又非常強大,它的功能之一就是畫出漂亮的圖表,實現(xiàn)數(shù)據(jù)的可視化,下面這篇文章主要給大家介紹了關于Python直接使用plot()函數(shù)畫圖的相關資料,需要的朋友可以參考下2022-05-05Python數(shù)據(jù)結構與算法之二叉樹結構定義與遍歷方法詳解
這篇文章主要介紹了Python數(shù)據(jù)結構與算法之二叉樹結構定義與遍歷方法,結合實例形式詳細分析了Python實現(xiàn)二叉樹結構的定義、遍歷方法及相關注意事項,需要的朋友可以參考下2017-12-12Python 字符串處理特殊空格\xc2\xa0\t\n Non-breaking space
今天遇到一個問題,使用python的find函數(shù)尋找字符串中的第一個空格時沒有找到正確的位置,下面是解決方法,需要的朋友可以參考下2020-02-02