python并發(fā)編程之多進(jìn)程、多線程、異步和協(xié)程詳解
最近學(xué)習(xí)python并發(fā),于是對(duì)多進(jìn)程、多線程、異步和協(xié)程做了個(gè)總結(jié)。
一、多線程
多線程就是允許一個(gè)進(jìn)程內(nèi)存在多個(gè)控制權(quán),以便讓多個(gè)函數(shù)同時(shí)處于激活狀態(tài),從而讓多個(gè)函數(shù)的操作同時(shí)運(yùn)行。即使是單CPU的計(jì)算機(jī),也可以通過不停地在不同線程的指令間切換,從而造成多線程同時(shí)運(yùn)行的效果。
多線程相當(dāng)于一個(gè)并發(fā)(concunrrency)系統(tǒng)。并發(fā)系統(tǒng)一般同時(shí)執(zhí)行多個(gè)任務(wù)。如果多個(gè)任務(wù)可以共享資源,特別是同時(shí)寫入某個(gè)變量的時(shí)候,就需要解決同步的問題,比如多線程火車售票系統(tǒng):兩個(gè)指令,一個(gè)指令檢查票是否賣完,另一個(gè)指令,多個(gè)窗口同時(shí)賣票,可能出現(xiàn)賣出不存在的票。
在并發(fā)情況下,指令執(zhí)行的先后順序由內(nèi)核決定。同一個(gè)線程內(nèi)部,指令按照先后順序執(zhí)行,但不同線程之間的指令很難說清除哪一個(gè)會(huì)先執(zhí)行。因此要考慮多線程同步的問題。同步(synchronization)是指在一定的時(shí)間內(nèi)只允許某一個(gè)線程訪問某個(gè)資源。
1、thread模塊
2、threading模塊
threading.Thread 創(chuàng)建一個(gè)線程。
給判斷是否有余票和賣票,加上互斥鎖,這樣就不會(huì)造成一個(gè)線程剛判斷沒有余票,而另外一個(gè)線程就執(zhí)行賣票操作。
#! /usr/bin/python #-* coding: utf-8 -* # __author__ ="tyomcat" import threading import time import os def booth(tid): global i global lock while True: lock.acquire() if i!=0: i=i-1 print "窗口:",tid,",剩余票數(shù):",i time.sleep(1) else: print "Thread_id",tid,"No more tickets" os._exit(0) lock.release() time.sleep(1) i = 100 lock=threading.Lock() for k in range(10): new_thread = threading.Thread(target=booth,args=(k,)) new_thread.start()
二、協(xié)程(又稱微線程,纖程)
協(xié)程,與線程的搶占式調(diào)度不同,它是協(xié)作式調(diào)度。協(xié)程也是單線程,但是它能讓原來要使用異步+回調(diào)方式寫的非人類代碼,可以用看似同步的方式寫出來。
1、協(xié)程在python中可以由生成器(generator)來實(shí)現(xiàn)。
首先要對(duì)生成器和yield有一個(gè)扎實(shí)的理解.
調(diào)用一個(gè)普通的python函數(shù),一般是從函數(shù)的第一行代碼開始執(zhí)行,結(jié)束于return語句、異?;蛘吆瘮?shù)執(zhí)行(也可以認(rèn)為是隱式地返回了None)。
一旦函數(shù)將控制權(quán)交還給調(diào)用者,就意味著全部結(jié)束。而有時(shí)可以創(chuàng)建能產(chǎn)生一個(gè)序列的函數(shù),來“保存自己的工作”,這就是生成器(使用了yield關(guān)鍵字的函數(shù))。
能夠“產(chǎn)生一個(gè)序列”是因?yàn)楹瘮?shù)并沒有像通常意義那樣返回。return隱含的意思是函數(shù)正將執(zhí)行代碼的控制權(quán)返回給函數(shù)被調(diào)用的地方。而"yield"的隱含意思是控制權(quán)的轉(zhuǎn)移是臨時(shí)和自愿的,我們的函數(shù)將來還會(huì)收回控制權(quán)。
看一下生產(chǎn)者/消費(fèi)者的例子:
#! /usr/bin/python #-* coding: utf-8 -* # __author__ ="tyomcat" import time import sys # 生產(chǎn)者 def produce(l): i=0 while 1: if i < 10: l.append(i) yield i i=i+1 time.sleep(1) else: return # 消費(fèi)者 def consume(l): p = produce(l) while 1: try: p.next() while len(l) > 0: print l.pop() except StopIteration: sys.exit(0) if __name__ == "__main__": l = [] consume(l)
當(dāng)程序執(zhí)行到produce的yield i時(shí),返回了一個(gè)generator并暫停執(zhí)行,當(dāng)我們?cè)赾ustom中調(diào)用p.next(),程序又返回到produce的yield i 繼續(xù)執(zhí)行,這樣 l 中又append了元素,然后我們print l.pop(),直到p.next()引發(fā)了StopIteration異常。
2、Stackless Python
3、greenlet模塊
基于greenlet的實(shí)現(xiàn)則性能僅次于Stackless Python,大致比Stackless Python慢一倍,比其他方案快接近一個(gè)數(shù)量級(jí)。其實(shí)greenlet不是一種真正的并發(fā)機(jī)制,而是在同一線程內(nèi),在不同函數(shù)的執(zhí)行代碼塊之間切換,實(shí)施“你運(yùn)行一會(huì)、我運(yùn)行一會(huì)”,并且在進(jìn)行切換時(shí)必須指定何時(shí)切換以及切換到哪。
4、eventlet模塊
三、多進(jìn)程
1、子進(jìn)程(subprocess包)
在python中,通過subprocess包,fork一個(gè)子進(jìn)程,并運(yùn)行外部程序。
調(diào)用系統(tǒng)的命令的時(shí)候,最先考慮的os模塊。用os.system()和os.popen()來進(jìn)行操作。但是這兩個(gè)命令過于簡(jiǎn)單,不能完成一些復(fù)雜的操作,如給運(yùn)行的命令提供輸入或者讀取命令的輸出,判斷該命令的運(yùn)行狀態(tài),管理多個(gè)命令的并行等等。這時(shí)subprocess中的Popen命令就能有效的完成我們需要的操作
>>>import subprocess >>>command_line=raw_input() ping -c 10 www.baidu.com >>>args=shlex.split(command_line) >>>p=subprocess.Popen(args)
利用subprocess.PIPE將多個(gè)子進(jìn)程的輸入和輸出連接在一起,構(gòu)成管道(pipe):
import subprocess child1 = subprocess.Popen(["ls","-l"], stdout=subprocess.PIPE) child2 = subprocess.Popen(["wc"], stdin=child1.stdout,stdout=subprocess.PIPE) out = child2.communicate() print(out)
communicate() 方法從stdout和stderr中讀出數(shù)據(jù),并輸入到stdin中。
2、多進(jìn)程(multiprocessing包)
(1)、multiprocessing包是Python中的多進(jìn)程管理包。與threading.Thread類似,它可以利用multiprocessing.Process對(duì)象來創(chuàng)建一個(gè)進(jìn)程。
進(jìn)程池 (Process Pool)可以創(chuàng)建多個(gè)進(jìn)程。
apply_async(func,args) 從進(jìn)程池中取出一個(gè)進(jìn)程執(zhí)行func,args為func的參數(shù)。它將返回一個(gè)AsyncResult的對(duì)象,你可以對(duì)該對(duì)象調(diào)用get()方法以獲得結(jié)果。
close() 進(jìn)程池不再創(chuàng)建新的進(jìn)程
join() wait進(jìn)程池中的全部進(jìn)程。必須對(duì)Pool先調(diào)用close()方法才能join。
#! /usr/bin/env python # -*- coding:utf-8 -*- # __author__ == "tyomcat" # "我的電腦有4個(gè)cpu" from multiprocessing import Pool import os, time def long_time_task(name): print 'Run task %s (%s)...' % (name, os.getpid()) start = time.time() time.sleep(3) end = time.time() print 'Task %s runs %0.2f seconds.' % (name, (end - start)) if __name__=='__main__': print 'Parent process %s.' % os.getpid() p = Pool() for i in range(4): p.apply_async(long_time_task, args=(i,)) print 'Waiting for all subprocesses done...' p.close() p.join() print 'All subprocesses done.'
(2)、多進(jìn)程共享資源
通過共享內(nèi)存和Manager對(duì)象:用一個(gè)進(jìn)程作為服務(wù)器,建立Manager來真正存放資源。
其它的進(jìn)程可以通過參數(shù)傳遞或者根據(jù)地址來訪問Manager,建立連接后,操作服務(wù)器上的資源。
#! /usr/bin/env python # -*- coding:utf-8 -*- # __author__ == "tyomcat" from multiprocessing import Queue,Pool import multiprocessing,time,random def write(q): for value in ['A','B','C','D']: print "Put %s to Queue!" % value q.put(value) time.sleep(random.random()) def read(q,lock): while True: lock.acquire() if not q.empty(): value=q.get(True) print "Get %s from Queue" % value time.sleep(random.random()) else: break lock.release() if __name__ == "__main__": manager=multiprocessing.Manager() q=manager.Queue() p=Pool() lock=manager.Lock() pw=p.apply_async(write,args=(q,)) pr=p.apply_async(read,args=(q,lock)) p.close() p.join() print print "所有數(shù)據(jù)都寫入并且讀完"
四、異步
無論是線程還是進(jìn)程,使用的都是同步進(jìn)制,當(dāng)發(fā)生阻塞時(shí),性能會(huì)大幅度降低,無法充分利用CPU潛力,浪費(fèi)硬件投資,更重要造成軟件模塊的鐵板化,緊耦合,無法切割,不利于日后擴(kuò)展和變化。
不管是進(jìn)程還是線程,每次阻塞、切換都需要陷入系統(tǒng)調(diào)用(system call),先讓CPU跑操作系統(tǒng)的調(diào)度程序,然后再由調(diào)度程序決定該跑哪一個(gè)進(jìn)程(線程)。多個(gè)線程之間在一些訪問互斥的代碼時(shí)還需要加上鎖,
現(xiàn)下流行的異步server都是基于事件驅(qū)動(dòng)的(如nginx)。
異步事件驅(qū)動(dòng)模型中,把會(huì)導(dǎo)致阻塞的操作轉(zhuǎn)化為一個(gè)異步操作,主線程負(fù)責(zé)發(fā)起這個(gè)異步操作,并處理這個(gè)異步操作的結(jié)果。由于所有阻塞的操作都轉(zhuǎn)化為異步操作,理論上主線程的大部分時(shí)間都是在處理實(shí)際的計(jì)算任務(wù),少了多線程的調(diào)度時(shí)間,所以這種模型的性能通常會(huì)比較好。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
django項(xiàng)目中新增app的2種實(shí)現(xiàn)方法
這篇文章主要介紹了django項(xiàng)目中新增app的2種實(shí)現(xiàn)方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-04-04python在回調(diào)函數(shù)中獲取返回值的方法
今天小編就為大家分享一篇python在回調(diào)函數(shù)中獲取返回值的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-02-02python神經(jīng)網(wǎng)絡(luò)tf.name_scope和tf.variable_scope函數(shù)區(qū)別
這篇文章主要為大家介紹了python神經(jīng)網(wǎng)絡(luò)tf.name_scope和tf.variable_scope函數(shù)的使用區(qū)別,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05Tensorflow中批量讀取數(shù)據(jù)的案列分析及TFRecord文件的打包與讀取
這篇文章主要介紹了Tensorflow中批量讀取數(shù)據(jù)的案列分析及TFRecord文件的打包與讀取,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-06-06python中sort()和sorted()的區(qū)別及用法實(shí)例
我們通常會(huì)遇到對(duì)數(shù)據(jù)庫中的數(shù)據(jù)進(jìn)行排序的問題,下面這篇文章主要給大家介紹了關(guān)于python中sort()和sorted()的區(qū)別及用法的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-06-06