亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

簡單談?wù)刾ython中的Queue與多進(jìn)程

 更新時(shí)間:2016年08月25日 08:46:12   作者:Kevin_Yang  
本文給大家簡單總結(jié)了下再Python中的隊(duì)列對象(queue)以及多進(jìn)程(multiprocessing),非常的簡單實(shí)用,有需要的小伙伴可以參考下

最近接觸一個(gè)項(xiàng)目,要在多個(gè)虛擬機(jī)中運(yùn)行任務(wù),參考別人之前項(xiàng)目的代碼,采用了多進(jìn)程來處理,于是上網(wǎng)查了查python中的多進(jìn)程

一、先說說Queue(隊(duì)列對象)

Queue是python中的標(biāo)準(zhǔn)庫,可以直接import 引用,之前學(xué)習(xí)的時(shí)候有聽過著名的“先吃先拉”與“后吃先吐”,其實(shí)就是這里說的隊(duì)列,隊(duì)列的構(gòu)造的時(shí)候可以定義它的容量,別吃撐了,吃多了,就會報(bào)錯(cuò),構(gòu)造的時(shí)候不寫或者寫個(gè)小于1的數(shù)則表示無限多

import Queue

q = Queue.Queue(10)

向隊(duì)列中放值(put)

q.put(‘yang')

q.put(4)

q.put([‘yan','xing'])

在隊(duì)列中取值get()

默認(rèn)的隊(duì)列是先進(jìn)先出的

>>> q.get()
‘yang'
>>> q.get()
4
>>> q.get()
[‘yan', ‘xing']

當(dāng)一個(gè)隊(duì)列為空的時(shí)候如果再用get取則會堵塞,所以取隊(duì)列的時(shí)候一般是用到

get_nowait()方法,這種方法在向一個(gè)空隊(duì)列取值的時(shí)候會拋一個(gè)Empty異常

所以更常用的方法是先判斷一個(gè)隊(duì)列是否為空,如果不為空則取值

隊(duì)列中常用的方法

Queue.qsize() 返回隊(duì)列的大小
Queue.empty() 如果隊(duì)列為空,返回True,反之False
Queue.full() 如果隊(duì)列滿了,返回True,反之False
Queue.get([block[, timeout]]) 獲取隊(duì)列,timeout等待時(shí)間
Queue.get_nowait() 相當(dāng)Queue.get(False)
非阻塞 Queue.put(item) 寫入隊(duì)列,timeout等待時(shí)間
Queue.put_nowait(item) 相當(dāng)Queue.put(item, False)

二、multiprocessing中使用子進(jìn)程概念

from multiprocessing import Process

可以通過Process來構(gòu)造一個(gè)子進(jìn)程

p = Process(target=fun,args=(args))

再通過p.start()來啟動(dòng)子進(jìn)程

再通過p.join()方法來使得子進(jìn)程運(yùn)行結(jié)束后再執(zhí)行父進(jìn)程

from multiprocessing import Process
import os
 
# 子進(jìn)程要執(zhí)行的代碼
def run_proc(name):
 print 'Run child process %s (%s)...' % (name, os.getpid())
 
if __name__=='__main__':
 print 'Parent process %s.' % os.getpid()
 p = Process(target=run_proc, args=('test',))
 print 'Process will start.'
 p.start()
 p.join()
 print 'Process end.'

三、在multiprocessing中使用pool

如果需要多個(gè)子進(jìn)程時(shí)可以考慮使用進(jìn)程池(pool)來管理

from multiprocessing import Pool

from multiprocessing import Pool
import os, time
 
def long_time_task(name):
 print 'Run task %s (%s)...' % (name, os.getpid())
 start = time.time()
 time.sleep(3)
 end = time.time()
 print 'Task %s runs %0.2f seconds.' % (name, (end - start))
 
if __name__=='__main__':
 print 'Parent process %s.' % os.getpid()
 p = Pool()
 for i in range(5):
  p.apply_async(long_time_task, args=(i,))
 print 'Waiting for all subprocesses done...'
 p.close()
 p.join()
 print 'All subprocesses done.'

pool創(chuàng)建子進(jìn)程的方法與Process不同,是通過

p.apply_async(func,args=(args))實(shí)現(xiàn),一個(gè)池子里能同時(shí)運(yùn)行的任務(wù)是取決你電腦的cpu數(shù)量,如我的電腦現(xiàn)在是有4個(gè)cpu,那會子進(jìn)程task0,task1,task2,task3可以同時(shí)啟動(dòng),task4則在之前的一個(gè)某個(gè)進(jìn)程結(jié)束后才開始

上面的程序運(yùn)行后的結(jié)果其實(shí)是按照上圖中1,2,3分開進(jìn)行的,先打印1,3秒后打印2,再3秒后打印3

代碼中的p.close()是關(guān)掉進(jìn)程池子,是不再向里面添加進(jìn)程了,對Pool對象調(diào)用join()方法會等待所有子進(jìn)程執(zhí)行完畢,調(diào)用join()之前必須先調(diào)用close(),調(diào)用close()之后就不能繼續(xù)添加新的Process了。

當(dāng)時(shí)也可以是實(shí)例pool的時(shí)候給它定義一個(gè)進(jìn)程的多少

如果上面的代碼中p=Pool(5)那么所有的子進(jìn)程就可以同時(shí)進(jìn)行

三、多個(gè)子進(jìn)程間的通信

多個(gè)子進(jìn)程間的通信就要采用第一步中說到的Queue,比如有以下的需求,一個(gè)子進(jìn)程向隊(duì)列中寫數(shù)據(jù),另外一個(gè)進(jìn)程從隊(duì)列中取數(shù)據(jù),

#coding:gbk

from multiprocessing import Process, Queue
import os, time, random

# 寫數(shù)據(jù)進(jìn)程執(zhí)行的代碼:
def write(q):
 for value in ['A', 'B', 'C']:
  print 'Put %s to queue...' % value
  q.put(value)
  time.sleep(random.random())

# 讀數(shù)據(jù)進(jìn)程執(zhí)行的代碼:
def read(q):
 while True:
  if not q.empty():
   value = q.get(True)
   print 'Get %s from queue.' % value
   time.sleep(random.random())
  else:
   break

if __name__=='__main__':
 # 父進(jìn)程創(chuàng)建Queue,并傳給各個(gè)子進(jìn)程:
 q = Queue()
 pw = Process(target=write, args=(q,))
 pr = Process(target=read, args=(q,))
 # 啟動(dòng)子進(jìn)程pw,寫入:
 pw.start() 
 # 等待pw結(jié)束:
 pw.join()
 # 啟動(dòng)子進(jìn)程pr,讀取:
 pr.start()
 pr.join()
 # pr進(jìn)程里是死循環(huán),無法等待其結(jié)束,只能強(qiáng)行終止:
 print
 print '所有數(shù)據(jù)都寫入并且讀完'


四、關(guān)于上面代碼的幾個(gè)有趣的問題

if __name__=='__main__': 
 # 父進(jìn)程創(chuàng)建Queue,并傳給各個(gè)子進(jìn)程:
 q = Queue()
 p = Pool()
 pw = p.apply_async(write,args=(q,)) 
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print '所有數(shù)據(jù)都寫入并且讀完'

如果main函數(shù)寫成上面的樣本,本來我想要的是將會得到一個(gè)隊(duì)列,將其作為參數(shù)傳入進(jìn)程池子里的每個(gè)子進(jìn)程,但是卻得到

RuntimeError: Queue objects should only be shared between processes through inheritance

的錯(cuò)誤,查了下,大意是隊(duì)列對象不能在父進(jìn)程與子進(jìn)程間通信,這個(gè)如果想要使用進(jìn)程池中使用隊(duì)列則要使用multiprocess的Manager類

if __name__=='__main__':
 manager = multiprocessing.Manager()
 # 父進(jìn)程創(chuàng)建Queue,并傳給各個(gè)子進(jìn)程:
 q = manager.Queue()
 p = Pool()
 pw = p.apply_async(write,args=(q,))
 time.sleep(0.5)
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print '所有數(shù)據(jù)都寫入并且讀完'

這樣這個(gè)隊(duì)列對象就可以在父進(jìn)程與子進(jìn)程間通信,不用池則不需要Manager,以后再擴(kuò)展multiprocess中的Manager類吧

關(guān)于鎖的應(yīng)用,在不同程序間如果有同時(shí)對同一個(gè)隊(duì)列操作的時(shí)候,為了避免錯(cuò)誤,可以在某個(gè)函數(shù)操作隊(duì)列的時(shí)候給它加把鎖,這樣在同一個(gè)時(shí)間內(nèi)則只能有一個(gè)子進(jìn)程對隊(duì)列進(jìn)行操作,鎖也要在manager對象中的鎖

#coding:gbk
 
from multiprocessing import Process,Queue,Pool
import multiprocessing
import os, time, random
 
# 寫數(shù)據(jù)進(jìn)程執(zhí)行的代碼:
def write(q,lock):
 lock.acquire() #加上鎖
 for value in ['A', 'B', 'C']:
  print 'Put %s to queue...' % value  
  q.put(value)  
 lock.release() #釋放鎖 
 
# 讀數(shù)據(jù)進(jìn)程執(zhí)行的代碼:
def read(q):
 while True:
  if not q.empty():
   value = q.get(False)
   print 'Get %s from queue.' % value
   time.sleep(random.random())
  else:
   break
 
if __name__=='__main__':
 manager = multiprocessing.Manager()
 # 父進(jìn)程創(chuàng)建Queue,并傳給各個(gè)子進(jìn)程:
 q = manager.Queue()
 lock = manager.Lock() #初始化一把鎖
 p = Pool()
 pw = p.apply_async(write,args=(q,lock)) 
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print '所有數(shù)據(jù)都寫入并且讀完'

相關(guān)文章

  • 利用Anaconda安裝TensorFlow全過程

    利用Anaconda安裝TensorFlow全過程

    這篇文章主要介紹了利用Anaconda安裝TensorFlow全過程,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-02-02
  • 基于python traceback實(shí)現(xiàn)異常的獲取與處理

    基于python traceback實(shí)現(xiàn)異常的獲取與處理

    這篇文章主要介紹了基于python traceback實(shí)現(xiàn)異常的獲取與處理,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-12-12
  • PyTorch實(shí)現(xiàn)手寫數(shù)字識別的示例代碼

    PyTorch實(shí)現(xiàn)手寫數(shù)字識別的示例代碼

    本文主要介紹了PyTorch實(shí)現(xiàn)手寫數(shù)字識別的示例代碼,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下<BR>
    2022-05-05
  • Pandas高級教程之Pandas中的GroupBy操作

    Pandas高級教程之Pandas中的GroupBy操作

    通常來說groupby操作可以分為三部分:分割數(shù)據(jù),應(yīng)用變換和和合并數(shù)據(jù),本文將會詳細(xì)講解Pandas中的groupby操作,感興趣的朋友一起看看吧
    2021-07-07
  • TensorFlow梯度求解tf.gradients實(shí)例

    TensorFlow梯度求解tf.gradients實(shí)例

    今天小編就為大家分享一篇TensorFlow梯度求解tf.gradients實(shí)例,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-02-02
  • Python學(xué)習(xí)筆記之變量與轉(zhuǎn)義符

    Python學(xué)習(xí)筆記之變量與轉(zhuǎn)義符

    這篇文章主要介紹了Python學(xué)習(xí)筆記之變量與轉(zhuǎn)義符,本文從零開始學(xué)習(xí)Python,知識點(diǎn)很細(xì),有共同目標(biāo)的小伙伴可以一起來學(xué)習(xí)
    2023-03-03
  • Python+OpenCV編寫車輛計(jì)數(shù)器系統(tǒng)

    Python+OpenCV編寫車輛計(jì)數(shù)器系統(tǒng)

    本文,我們將使用歐幾里德距離跟蹤和輪廓的概念在 Python 中使用 OpenCV 構(gòu)建車輛計(jì)數(shù)器系統(tǒng),文中的示例代碼講解詳細(xì),感興趣的可以了解一下
    2022-05-05
  • Python?OpenCV實(shí)現(xiàn)圖片預(yù)處理的方法詳解

    Python?OpenCV實(shí)現(xiàn)圖片預(yù)處理的方法詳解

    這篇文章主要為大家詳細(xì)介紹了Python?OpenCV實(shí)現(xiàn)圖片預(yù)處理的方法,文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,感興趣的可以了解一下
    2022-09-09
  • Python 更快進(jìn)行探索性數(shù)據(jù)分析的四個(gè)方法

    Python 更快進(jìn)行探索性數(shù)據(jù)分析的四個(gè)方法

    今天我給大家分享幾種更快的探索性數(shù)據(jù)分析方法,它們可以進(jìn)一步加速 EDA。 我們以一個(gè)學(xué)生考試成績的例子,創(chuàng)建一個(gè)如下所示的 DataFrame 并繼續(xù)操作。歡迎收藏學(xué)習(xí),喜歡點(diǎn)贊支持
    2021-11-11
  • OpenCV+python實(shí)現(xiàn)膨脹和腐蝕的示例

    OpenCV+python實(shí)現(xiàn)膨脹和腐蝕的示例

    這篇文章主要介紹了OpenCV+python實(shí)現(xiàn)膨脹和腐蝕的示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-12-12

最新評論