Python并發(fā)多線程的具體操作步驟
一、threading模塊介紹
multiprocess模塊的完全模仿了threading模塊的接口,二者在使用層面,有很大的相似性
二、開(kāi)啟線程的兩種方式
方式一
#方式一 from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() print('主線程')
方式二
# 方式二 from threading import Thread import time class Sayhi(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): time.sleep(2) print('%s say hello' % self.name) if __name__ == '__main__': t = Sayhi('ly') t.start() print('主線程')
三、在一個(gè)進(jìn)程下開(kāi)啟多個(gè)線程與在一個(gè)進(jìn)程下開(kāi)啟多個(gè)子進(jìn)程的區(qū)別
1 誰(shuí)的開(kāi)啟速度快
from threading import Thread from multiprocessing import Process import os def work(): print('hello') if __name__ == '__main__': # 在主進(jìn)程下開(kāi)啟線程 t=Thread(target=work) t.start() print('主線程/主進(jìn)程') ''' 打印結(jié)果: hello 主線程/主進(jìn)程 ''' # 在主進(jìn)程下開(kāi)啟子進(jìn)程 t=Process(target=work) t.start() print('主線程/主進(jìn)程') ''' 打印結(jié)果: 主線程/主進(jìn)程 hello '''
2 瞅一瞅pid
from threading import Thread from multiprocessing import Process import os def work(): print('hello',os.getpid()) if __name__ == '__main__': # part1:在主進(jìn)程下開(kāi)啟多個(gè)線程,每個(gè)線程都跟主進(jìn)程的pid一樣 t1=Thread(target=work) t2=Thread(target=work) t1.start() t2.start() print('主線程/主進(jìn)程pid',os.getpid()) # part2:開(kāi)多個(gè)進(jìn)程,每個(gè)進(jìn)程都有不同的pid p1=Process(target=work) p2=Process(target=work) p1.start() p2.start() print('主線程/主進(jìn)程pid',os.getpid())
3 同一進(jìn)程內(nèi)的線程共享該進(jìn)程的數(shù)據(jù)?
from threading import Thread from multiprocessing import Process import os def work(): global n n=0 if __name__ == '__main__': # n=100 # p=Process(target=work) # p.start() # p.join() # print('主',n) #毫無(wú)疑問(wèn)子進(jìn)程p已經(jīng)將自己的全局的n改成了0,但改的僅僅是它自己的,查看父進(jìn)程的n仍然為100 n=1 t=Thread(target=work) t.start() t.join() print('主',n) #查看結(jié)果為0,因?yàn)橥贿M(jìn)程內(nèi)的線程之間共享進(jìn)程內(nèi)的數(shù)據(jù)
四、練習(xí)
練習(xí)一:
多線程并發(fā)的socket服務(wù)端
# -*- coding: UTF-8 -*- #!/usr/bin/env python3 import multiprocessing import threading import socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.bind(('127.0.0.1',8080)) s.listen(5) def action(conn): while True: data=conn.recv(1024) print(data) conn.send(data.upper()) if __name__ == '__main__': while True: conn,addr=s.accept() p=threading.Thread(target=action,args=(conn,)) p.start()
客戶端
# -*- coding: UTF-8 -*- #!/usr/bin/env python3 import socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue s.send(msg.encode('utf-8')) data=s.recv(1024) print(data)
練習(xí)二:三個(gè)任務(wù),一個(gè)接收用戶輸入,一個(gè)將用戶輸入的內(nèi)容格式化成大寫(xiě),一個(gè)將格式化后的結(jié)果存入文件
from threading import Thread msg_l=[] format_l=[] def talk(): while True: msg=input('>>: ').strip() if not msg:continue msg_l.append(msg) def format_msg(): while True: if msg_l: res=msg_l.pop() format_l.append(res.upper()) def save(): while True: if format_l: with open('db.txt','a',encoding='utf-8') as f: res=format_l.pop() f.write('%s\n' %res) if __name__ == '__main__': t1=Thread(target=talk) t2=Thread(target=format_msg) t3=Thread(target=save) t1.start() t2.start() t3.start()
五、線程相關(guān)的其他方法
Thread實(shí)例對(duì)象的方法 # isAlive(): 返回線程是否活動(dòng)的。 # getName(): 返回線程名。 # setName(): 設(shè)置線程名。 threading模塊提供的一些方法: # threading.currentThread(): 返回當(dāng)前的線程變量。 # threading.enumerate(): 返回一個(gè)包含正在運(yùn)行的線程的list。正在運(yùn)行指線程啟動(dòng)后、結(jié)束前,不包括啟動(dòng)前和終止后的線程。 # threading.activeCount(): 返回正在運(yùn)行的線程數(shù)量,與len(threading.enumerate())有相同的結(jié)果。 from threading import Thread import threading from multiprocessing import Process import os def work(): import time time.sleep(3) print(threading.current_thread().getName()) if __name__ == '__main__': #在主進(jìn)程下開(kāi)啟線程 t=Thread(target=work) t.start() print(threading.current_thread().getName()) print(threading.current_thread()) #主線程 print(threading.enumerate()) #連同主線程在內(nèi)有兩個(gè)運(yùn)行的線程 print(threading.active_count()) print('主線程/主進(jìn)程') ''' 打印結(jié)果: MainThread <_MainThread(MainThread, started 140735268892672)> [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>] 主線程/主進(jìn)程 Thread-1 '''
主線程等待子線程結(jié)束
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('ly',)) t.start() t.join() print('主線程') print(t.is_alive()) ''' ly say hello 主線程 False '''
六、守護(hù)進(jìn)程
無(wú)論是進(jìn)程還是線程,都遵循:守護(hù)xxx會(huì)等待主xxx運(yùn)行完畢后被銷(xiāo)毀
需要強(qiáng)調(diào)的是:運(yùn)行完畢并非終止運(yùn)行
1.對(duì)主進(jìn)程來(lái)說(shuō),運(yùn)行完畢指的是主進(jìn)程代碼運(yùn)行完畢
2.對(duì)主線程來(lái)說(shuō),運(yùn)行完畢指的是主線程所在的進(jìn)程內(nèi)所有非守護(hù)線程統(tǒng)統(tǒng)運(yùn)行完畢,主線程才算運(yùn)行完畢
詳細(xì)解釋:
#1 主進(jìn)程在其代碼結(jié)束后就已經(jīng)算運(yùn)行完畢了(守護(hù)進(jìn)程在此時(shí)就被回收),然后主進(jìn)程會(huì)一直等非守護(hù)的子進(jìn)程都運(yùn)行完畢后回收子進(jìn)程的資源(否則會(huì)產(chǎn)生僵尸進(jìn)程),才會(huì)結(jié)束, #2 主線程在其他非守護(hù)線程運(yùn)行完畢后才算運(yùn)行完畢(守護(hù)線程在此時(shí)就被回收)。因?yàn)橹骶€程的結(jié)束意味著進(jìn)程的結(jié)束,進(jìn)程整體的資源都將被回收,而進(jìn)程必須保證非守護(hù)線程都運(yùn)行完畢后才能結(jié)束。 from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('ly',)) t.setDaemon(True) #必須在t.start()之前設(shè)置 t.start() print('主線程') print(t.is_alive()) ''' 主線程 True '''
迷惑人的例子
from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") t1=Thread(target=foo) t2=Thread(target=bar) t1.daemon=True t1.start() t2.start() print("main-------")
七、Python GIL(Global Interpreter Lock)
1、介紹
''' 定義: In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython's memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.) ''' 結(jié)論:在Cpython解釋器中,同一個(gè)進(jìn)程下開(kāi)啟的多線程,同一時(shí)刻只能有一個(gè)線程執(zhí)行,無(wú)法利用多核優(yōu)勢(shì)
首先需要明確的一點(diǎn)是 GIL 并不是 Python 的特性,它是在實(shí)現(xiàn)Python解析器(CPython)時(shí)所引入的一個(gè)概念。就好比 C++ 是一套語(yǔ)言(語(yǔ)法)標(biāo)準(zhǔn),但是可以用不同的編譯器來(lái)編譯成可執(zhí)行代碼。有名的編譯器,例如:GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過(guò)CPython,PyPy,Psyco等不同的Python執(zhí)行環(huán)境來(lái)執(zhí)行。像其中的JPython就沒(méi)有 GIL 。然而因?yàn)?CPython 是大部分環(huán)境下默認(rèn)的Python執(zhí)行環(huán)境。所以在很多人的概念里 CPython 就是 Python ,也就想當(dāng)然的把 GIL歸結(jié)為Python語(yǔ)言的缺陷。所以這里要先明確一點(diǎn): GIL 并不是 Python 的特性,Python完全可以不依賴于GIL。
2、GIL介紹
GIL本質(zhì)就是一把互斥鎖,既然是互斥鎖,所有互斥鎖的本質(zhì)都一樣,都是將并發(fā)運(yùn)行變成串行,以此來(lái)控制同一時(shí)間內(nèi)共享數(shù)據(jù)只能被一個(gè)任務(wù)所修改,進(jìn)而保證數(shù)據(jù)安全。
可以肯定的一點(diǎn)是:保護(hù)不同的數(shù)據(jù)的安全,就應(yīng)該加不同的鎖。
要想了解GIL,首先確定一點(diǎn):每次執(zhí)行python程序,都會(huì)產(chǎn)生一個(gè)獨(dú)立的進(jìn)程。例如python test.py,python aaa.py,python bbb.py會(huì)產(chǎn)生3個(gè)不同的python進(jìn)程
''' # 驗(yàn)證python test.py只會(huì)產(chǎn)生一個(gè)進(jìn)程 # test.py內(nèi)容 import os,time print(os.getpid()) time.sleep(1000) ''' python3 test.py # 在windows下 tasklist |findstr python # 在linux下 ps aux |grep python
在一個(gè)python的進(jìn)程內(nèi),不僅有test.py的主線程或者由該主線程開(kāi)啟的其他線程,還有解釋器開(kāi)啟的垃圾回收等解釋器級(jí)別的線程,總之,所有線程都運(yùn)行在這一個(gè)進(jìn)程內(nèi),毫無(wú)疑問(wèn)
#1 所有數(shù)據(jù)都是共享的,這其中,代碼作為一種數(shù)據(jù)也是被所有線程共享的(test.py的所有代碼以及Cpython解釋器的所有代碼)
例如:test.py定義一個(gè)函數(shù)work(代碼內(nèi)容如下圖),在進(jìn)程內(nèi)所有線程都能訪問(wèn)到work的代碼,于是我們可以開(kāi)啟三個(gè)線程然后target都指向該代碼,能訪問(wèn)到意味著就是可以執(zhí)行。
#2 所有線程的任務(wù),都需要將任務(wù)的代碼當(dāng)做參數(shù)傳給解釋器的代碼去執(zhí)行,即所有的線程要想運(yùn)行自己的任務(wù),首先需要解決的是能夠訪問(wèn)到解釋器的代碼。
綜上:
如果多個(gè)線程的target=work,那么執(zhí)行流程是,多個(gè)線程先訪問(wèn)到解釋器的代碼,即拿到執(zhí)行權(quán)限,然后將target的代碼交給解釋器的代碼去執(zhí)行
解釋器的代碼是所有線程共享的,所以垃圾回收線程也可能訪問(wèn)到解釋器的代碼而去執(zhí)行,這就導(dǎo)致了一個(gè)問(wèn)題:對(duì)于同一個(gè)數(shù)據(jù)100,可能線程1執(zhí)行x=100的同時(shí),而垃圾回收?qǐng)?zhí)行的是回收100的操作,解決這種問(wèn)題沒(méi)有什么高明的方法,就是加鎖處理,如下圖的GIL,保證python解釋器同一時(shí)間只能執(zhí)行一個(gè)任務(wù)的代碼
3、GIL與Lock
GIL保護(hù)的是解釋器級(jí)的數(shù)據(jù),保護(hù)用戶自己的數(shù)據(jù)則需要自己加鎖處理,如下圖
4、GIL與多線程
有了GIL的存在,同一時(shí)刻同一進(jìn)程中只有一個(gè)線程被執(zhí)行
聽(tīng)到這里,有的同學(xué)立馬質(zhì)問(wèn):進(jìn)程可以利用多核,但是開(kāi)銷(xiāo)大,而python的多線程開(kāi)銷(xiāo)小,但卻無(wú)法利用多核優(yōu)勢(shì),也就是說(shuō)python沒(méi)用了,php才是最牛逼的語(yǔ)言?
要解決這個(gè)問(wèn)題,我們需要在幾個(gè)點(diǎn)上達(dá)成一致:
#1. cpu到底是用來(lái)做計(jì)算的,還是用來(lái)做I/O的?
#2. 多cpu,意味著可以有多個(gè)核并行完成計(jì)算,所以多核提升的是計(jì)算性能#3. 每個(gè)cpu一旦遇到I/O阻塞,仍然需要等待,所以多核對(duì)I/O操作沒(méi)什么用處
一個(gè)工人相當(dāng)于cpu,此時(shí)計(jì)算相當(dāng)于工人在干活,I/O阻塞相當(dāng)于為工人干活提供所需原材料的過(guò)程,工人干活的過(guò)程中如果沒(méi)有原材料了,則工人干活的過(guò)程需要停止,直到等待原材料的到來(lái)。
如果你的工廠干的大多數(shù)任務(wù)都要有準(zhǔn)備原材料的過(guò)程(I/O密集型),那么你有再多的工人,意義也不大,還不如一個(gè)人,在等材料的過(guò)程中讓工人去干別的活,反過(guò)來(lái)講,如果你的工廠原材料都齊全,那當(dāng)然是工人越多,效率越高
結(jié)論:
對(duì)計(jì)算來(lái)說(shuō),cpu越多越好,但是對(duì)于I/O來(lái)說(shuō),再多的cpu也沒(méi)用
當(dāng)然對(duì)運(yùn)行一個(gè)程序來(lái)說(shuō),隨著cpu的增多執(zhí)行效率肯定會(huì)有所提高(不管提高幅度多大,總會(huì)有所提高),這是因?yàn)橐粋€(gè)程序基本上不會(huì)是純計(jì)算或者純I/O,所以我們只能相對(duì)的去看一個(gè)程序到底是計(jì)算密集型還是I/O密集型,從而進(jìn)一步分析python的多線程到底有無(wú)用武之地
# 分析:我們有四個(gè)任務(wù)需要處理,處理方式肯定是要玩出并發(fā)的效果,解決方案可以是:
方案一:開(kāi)啟四個(gè)進(jìn)程
方案二:一個(gè)進(jìn)程下,開(kāi)啟四個(gè)線程
# 單核情況下,分析結(jié)果:如果四個(gè)任務(wù)是計(jì)算密集型,沒(méi)有多核來(lái)并行計(jì)算,方案一徒增了創(chuàng)建進(jìn)程的開(kāi)銷(xiāo),方案二勝
如果四個(gè)任務(wù)是I/O密集型,方案一創(chuàng)建進(jìn)程的開(kāi)銷(xiāo)大,且進(jìn)程的切換速度遠(yuǎn)不如線程,方案二勝
# 多核情況下,分析結(jié)果:如果四個(gè)任務(wù)是計(jì)算密集型,多核意味著并行計(jì)算,在python中一個(gè)進(jìn)程中同一時(shí)刻只有一個(gè)線程執(zhí)行用不上多核,方案一勝
如果四個(gè)任務(wù)是I/O密集型,再多的核也解決不了I/O問(wèn)題,方案二勝
# 結(jié)論:現(xiàn)在的計(jì)算機(jī)基本上都是多核,python對(duì)于計(jì)算密集型的任務(wù)開(kāi)多線程的效率并不能帶來(lái)多大性能上的提升,甚至不如串行(沒(méi)有大量切換),但是,對(duì)于IO密集型的任務(wù)效率還是有顯著提升的。
5、多線程性能測(cè)試
計(jì)算密集型:多進(jìn)程效率高
from multiprocessing import Process from threading import Thread import os,time def work(): res=0 for i in range(100000000): res*=i if __name__ == '__main__': l=[] print(os.cpu_count()) # 本機(jī)為4核 start=time.time() for i in range(4): p=Process(target=work) # 耗時(shí)5s多 p=Thread(target=work) # 耗時(shí)18s多 l.append(p) p.start() for p in l: p.join() stop=time.time() print('run time is %s' %(stop-start))
I/O密集型:多線程效率高
from multiprocessing import Process from threading import Thread import threading import os,time def work(): time.sleep(2) print('===>') if __name__ == '__main__': l=[] print(os.cpu_count()) # 本機(jī)為4核 start=time.time() for i in range(400): # p=Process(target=work) # 耗時(shí)12s多,大部分時(shí)間耗費(fèi)在創(chuàng)建進(jìn)程上 p=Thread(target=work) # 耗時(shí)2s多 l.append(p) p.start() for p in l: p.join() stop=time.time() print('run time is %s' %(stop-start))
應(yīng)用:
多線程用于IO密集型,如:socket,爬蟲(chóng),web 多進(jìn)程用于計(jì)算密集型,如:金融分析
6、CPU 和 GIL 必須都具備才可以執(zhí)行代碼
拿到 CPU 權(quán)限 -> 拿到 GIL 解釋器鎖 -> 執(zhí)行代碼
在 Python 3.2 之后 GIL 有了新的實(shí)現(xiàn),目的是為了解決 That GIL Thrashing 問(wèn)題,這是Antoine Pitrou 的功勞
7、GIL 解釋器鎖會(huì)在兩種情況下釋放
1.主動(dòng)釋放
遇到 IO 操作或者分配的 CPU 時(shí)間片到時(shí)間了。
注意,GIL存在的意義在于維護(hù)線程安全,x=10涉及到IO操作,如果也被當(dāng)成普通的IO操作,主動(dòng)交出GIL,那么一定會(huì)出現(xiàn)數(shù)據(jù)不安全問(wèn)題,所以x=10一定是被區(qū)分對(duì)待了。
至于x=10如何實(shí)現(xiàn)的被區(qū)分對(duì)待,這其實(shí)很好理解,任何的io操作都是向操作系統(tǒng)發(fā)送系統(tǒng)調(diào)用,即調(diào)用操作系統(tǒng)的某一接口實(shí)現(xiàn)的,比如變量賦值操作肯定是調(diào)用了一種接口,文件讀寫(xiě)操作肯定也是調(diào)用了一種接口,網(wǎng)絡(luò)io也是調(diào)用了某一種接口,這就給區(qū)分對(duì)待提供了實(shí)現(xiàn)的依據(jù),即變量賦值操作并不屬于主動(dòng)釋放的范疇,這樣GIL在線程安全方面才會(huì)有所作為。
2.被動(dòng)釋放
python3.2之后定義了一個(gè)全局變量
/ Python/ceval.c /* ... static volatile int gil_drop_request = 0;
注意當(dāng)只有一個(gè)線程時(shí),該線程會(huì)一直運(yùn)行,不會(huì)釋放GIL,當(dāng)有多個(gè)線程時(shí)
例如:thead1,thread2
如果thread1一直沒(méi)有主動(dòng)釋放掉GIL,那肯定不會(huì)讓他一直運(yùn)行下去啊,實(shí)際上在thread1運(yùn)行的過(guò)程時(shí),thread2就會(huì)執(zhí)行一個(gè)cv_wait(gil,TIMEOUT)的函數(shù),(默認(rèn)TIMEOUT值為5milliseconds,但是可以修改),一旦到了時(shí)間,就會(huì)將全局變量
gil_drop_request = 1;線程thread1就會(huì)被強(qiáng)制釋放GIL,然后線程thread2開(kāi)始運(yùn)行并返回一個(gè)ack給線程thread1,線程thread1開(kāi)始調(diào)用cv_wait(gil,TIMEOUT)
八、同步鎖
三個(gè)需要注意的點(diǎn):
#1.線程搶的是GIL鎖,GIL鎖相當(dāng)于執(zhí)行權(quán)限,拿到執(zhí)行權(quán)限后才能拿到互斥鎖Lock,其他線程也可以搶到GIL,但如果發(fā)現(xiàn)Lock仍然沒(méi)有被釋放則阻塞,即便是拿到執(zhí)行權(quán)限GIL也要立刻交出來(lái)#2.join是等待所有,即整體串行,而鎖只是鎖住修改共享數(shù)據(jù)的部分,即部分串行,要想保證數(shù)據(jù)安全的根本原理在于讓并發(fā)變成串行,join與互斥鎖都可以實(shí)現(xiàn),毫無(wú)疑問(wèn),互斥鎖的部分串行效率要更高#3. 一定要看本小節(jié)最后的GIL與互斥鎖的經(jīng)典分析
GIL VS Lock
首先我們需要達(dá)成共識(shí):鎖的目的是為了保護(hù)共享的數(shù)據(jù),同一時(shí)間只能有一個(gè)線程來(lái)修改共享的數(shù)據(jù)
然后,我們可以得出結(jié)論:保護(hù)不同的數(shù)據(jù)就應(yīng)該加不同的鎖。
最后,問(wèn)題就很明朗了,GIL 與Lock是兩把鎖,保護(hù)的數(shù)據(jù)不一樣,前者是解釋器級(jí)別的(當(dāng)然保護(hù)的就是解釋器級(jí)別的數(shù)據(jù),比如垃圾回收的數(shù)據(jù)),后者是保護(hù)用戶自己開(kāi)發(fā)的應(yīng)用程序的數(shù)據(jù),很明顯GIL不負(fù)責(zé)這件事,只能用戶自定義加鎖處理,即Lock
過(guò)程分析:所有線程搶的是GIL鎖,或者說(shuō)所有線程搶的是執(zhí)行權(quán)限
線程1搶到GIL鎖,拿到執(zhí)行權(quán)限,開(kāi)始執(zhí)行,然后加了一把Lock,還沒(méi)有執(zhí)行完畢,即線程1還未釋放Lock,有可能線程2搶到GIL鎖,開(kāi)始執(zhí)行,執(zhí)行過(guò)程中發(fā)現(xiàn)Lock還沒(méi)有被線程1釋放,于是線程2進(jìn)入阻塞,被奪走執(zhí)行權(quán)限,有可能線程1拿到GIL,然后正常執(zhí)行到釋放Lock。。。這就導(dǎo)致了串行運(yùn)行的效果
既然是串行,那我們執(zhí)行
t1.start()
t1.join
t2.start()
t2.join()
這也是串行執(zhí)行啊,為何還要加Lock呢,需知join是等待t1所有的代碼執(zhí)行完,相當(dāng)于鎖住了t1的所有代碼,而Lock只是鎖住一部分操作共享數(shù)據(jù)的代碼。
詳細(xì)
因?yàn)镻ython解釋器幫你自動(dòng)定期進(jìn)行內(nèi)存回收,你可以理解為python解釋器里有一個(gè)獨(dú)立的線程,每過(guò)一段時(shí)間它起wake up做一次全局輪詢看看哪些內(nèi)存數(shù)據(jù)是可以被清空的,此時(shí)你自己的程序里的線程和py解釋器自己的線程是并發(fā)運(yùn)行的,假設(shè)你的線程刪除了一個(gè)變量,py解釋器的垃圾回收線程在清空這個(gè)變量的過(guò)程中的clearing時(shí)刻,可能一個(gè)其它線程正好又重新給這個(gè)還沒(méi)來(lái)及得清空的內(nèi)存空間賦值了,結(jié)果就有可能新賦值的數(shù)據(jù)被刪除了,為了解決類(lèi)似的問(wèn)題,python解釋器簡(jiǎn)單粗暴的加了鎖,即當(dāng)一個(gè)線程運(yùn)行時(shí),其它人都不能動(dòng),這樣就解決了上述的問(wèn)題, 這可以說(shuō)是Python早期版本的遺留問(wèn)題。 from threading import Thread import os,time def work(): global n temp=n time.sleep(0.1) n=temp-1 if __name__ == '__main__': n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #結(jié)果可能為99
鎖通常被用來(lái)實(shí)現(xiàn)對(duì)共享資源的同步訪問(wèn)。為每一個(gè)共享資源創(chuàng)建一個(gè)Lock對(duì)象,當(dāng)你需要訪問(wèn)該資源時(shí),調(diào)用acquire方法來(lái)獲取鎖對(duì)象(如果其它線程已經(jīng)獲得了該鎖,則當(dāng)前線程需等待其被釋放),待資源訪問(wèn)完后,再調(diào)用release方法釋放鎖:
import threading R=threading.Lock() R.acquire() ''' 對(duì)公共數(shù)據(jù)的操作 ''' R.release() from threading import Thread,Lock import os,time def work(): global n lock.acquire() temp=n time.sleep(0.1) n=temp-1 lock.release() if __name__ == '__main__': lock=Lock() n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #結(jié)果肯定為0,由原來(lái)的并發(fā)執(zhí)行變成串行,犧牲了執(zhí)行效率保證了數(shù)據(jù)安全
GIL鎖與互斥鎖綜合分析(重點(diǎn)?。。。?/p>
分析:
#1. 100個(gè)線程去搶GIL鎖,即搶執(zhí)行權(quán)限
#2. 肯定有一個(gè)線程先搶到GIL(暫且稱為線程1),然后開(kāi)始執(zhí)行,一旦執(zhí)行就會(huì)拿到lock.acquire()
#3. 極有可能線程1還未運(yùn)行完畢,就有另外一個(gè)線程2搶到GIL,然后開(kāi)始運(yùn)行,但線程2發(fā)現(xiàn)互斥鎖lock還未被線程1釋放,于是阻 塞,被迫交出執(zhí)行權(quán)限,即釋放GIL
#4. 直到線程1重新?lián)尩紾IL,開(kāi)始從上次暫停的位置繼續(xù)執(zhí)行,直到正常釋放互斥鎖lock,然后其他的線程再重復(fù)2 3 4的過(guò)程
互斥鎖與join的區(qū)別(重點(diǎn)?。。。?/strong>
# 不加鎖:并發(fā)執(zhí)行,速度快,數(shù)據(jù)不安全 from threading import current_thread,Thread,Lock import os,time def task(): global n print('%s is running' %current_thread().getName()) temp=n time.sleep(0.5) n=temp-1 if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:0.5216062068939209 n:99 ''' # 不加鎖:未加鎖部分并發(fā)執(zhí)行,加鎖部分串行執(zhí)行,速度慢,數(shù)據(jù)安全 from threading import current_thread,Thread,Lock import os,time def task(): # 未加鎖的代碼并發(fā)運(yùn)行 time.sleep(3) print('%s start to run' %current_thread().getName()) global n # 加鎖的代碼串行運(yùn)行 lock.acquire() temp=n time.sleep(0.5) n=temp-1 lock.release() if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:53.294203758239746 n:0 ''' # 有的同學(xué)可能有疑問(wèn):既然加鎖會(huì)讓運(yùn)行變成串行,那么我在start之后立即使用join,就不用加鎖了啊,也是串行的效果啊 # 沒(méi)錯(cuò):在start之后立刻使用jion,肯定會(huì)將100個(gè)任務(wù)的執(zhí)行變成串行,毫無(wú)疑問(wèn),最終n的結(jié)果也肯定是0,是安全的,但問(wèn)題是 # start后立即join:任務(wù)內(nèi)的所有代碼都是串行執(zhí)行的,而加鎖,只是加鎖的部分即修改共享數(shù)據(jù)的部分是串行的 # 單從保證數(shù)據(jù)安全方面,二者都可以實(shí)現(xiàn),但很明顯是加鎖的效率更高. from threading import current_thread,Thread,Lock import os,time def task(): time.sleep(3) print('%s start to run' %current_thread().getName()) global n temp=n time.sleep(0.5) n=temp-1 if __name__ == '__main__': n=100 lock=Lock() start_time=time.time() for i in range(100): t=Thread(target=task) t.start() t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 start to run Thread-2 start to run ...... Thread-100 start to run 主:350.6937336921692 n:0 #耗時(shí)是多么的恐怖 '''
九、死鎖現(xiàn)象與遞歸鎖
進(jìn)程也有死鎖與遞歸鎖,在進(jìn)程那里忘記說(shuō)了,放到這里一切說(shuō)了額
所謂死鎖: 是指兩個(gè)或兩個(gè)以上的進(jìn)程或線程在執(zhí)行過(guò)程中,因爭(zhēng)奪資源而造成的一種互相等待的現(xiàn)象,若無(wú)外力作用,它們都將無(wú)法推進(jìn)下去。此時(shí)稱系統(tǒng)處于死鎖狀態(tài)或系統(tǒng)產(chǎn)生了死鎖,這些永遠(yuǎn)在互相等待的進(jìn)程稱為死鎖進(jìn)程,如下就是死鎖
from threading import Thread,Lock import time mutexA=Lock() mutexB=Lock() class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print('\033[41m%s 拿到A鎖\033[0m' %self.name) mutexB.acquire() print('\033[42m%s 拿到B鎖\033[0m' %self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print('\033[43m%s 拿到B鎖\033[0m' %self.name) time.sleep(2) mutexA.acquire() print('\033[44m%s 拿到A鎖\033[0m' %self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t=MyThread() t.start() ''' Thread-1 拿到A鎖 Thread-1 拿到B鎖 Thread-1 拿到B鎖 Thread-2 拿到A鎖 然后就卡住,死鎖了 '''
解決方法,遞歸鎖,在Python中為了支持在同一線程中多次請(qǐng)求同一資源,python提供了可重入鎖RLock。
這個(gè)RLock內(nèi)部維護(hù)著一個(gè)Lock和一個(gè)counter變量,counter記錄了acquire的次數(shù),從而使得資源可以被多次require。直到一個(gè)線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會(huì)發(fā)生死鎖:
mutexA=mutexB=threading.RLock() #一個(gè)線程拿到鎖,counter加1,該線程內(nèi)又碰到加鎖的情況,則counter繼續(xù)加1,這期間所有其他線程都只能等待,等待該線程釋放所有鎖,即counter遞減到0為止
十、信號(hào)量Semaphore
同進(jìn)程的一樣,Semaphore管理一個(gè)內(nèi)置的計(jì)數(shù)器, 每當(dāng)調(diào)用acquire()時(shí)內(nèi)置計(jì)數(shù)器-1; 調(diào)用release() 時(shí)內(nèi)置計(jì)數(shù)器+1; 計(jì)數(shù)器不能小于0;當(dāng)計(jì)數(shù)器為0時(shí),acquire()將阻塞線程直到其他線程調(diào)用release()。
實(shí)例:(同時(shí)只有5個(gè)線程可以獲得semaphore,即可以限制最大連接數(shù)為5):
from threading import Thread,Semaphore import threading import time # def func(): # if sm.acquire(): # print (threading.currentThread().getName() + ' get semaphore') # time.sleep(2) # sm.release() def func(): sm.acquire() print('%s get sm' %threading.current_thread().getName()) time.sleep(3) sm.release() if __name__ == '__main__': sm=Semaphore(5) for i in range(23): t=Thread(target=func) t.start()
與進(jìn)程池是完全不同的概念,進(jìn)程池Pool(4),最大只能產(chǎn)生4個(gè)進(jìn)程,而且從頭到尾都只是這四個(gè)進(jìn)程,不會(huì)產(chǎn)生新的,而信號(hào)量是產(chǎn)生一堆線程/進(jìn)程
十一、Event
同進(jìn)程的一樣
線程的一個(gè)關(guān)鍵特性是每個(gè)線程都是獨(dú)立運(yùn)行且狀態(tài)不可預(yù)測(cè)。如果程序中的其 他線程需要通過(guò)判斷某個(gè)線程的狀態(tài)來(lái)確定自己下一步的操作,這時(shí)線程同步問(wèn)題就會(huì)變得非常棘手。為了解決這些問(wèn)題,我們需要使用threading庫(kù)中的Event對(duì)象。 對(duì)象包含一個(gè)可由線程設(shè)置的信號(hào)標(biāo)志,它允許線程等待某些事件的發(fā)生。在 初始情況下,Event對(duì)象中的信號(hào)標(biāo)志被設(shè)置為假。如果有線程等待一個(gè)Event對(duì)象, 而這個(gè)Event對(duì)象的標(biāo)志為假,那么這個(gè)線程將會(huì)被一直阻塞直至該標(biāo)志為真。一個(gè)線程如果將一個(gè)Event對(duì)象的信號(hào)標(biāo)志設(shè)置為真,它將喚醒所有等待這個(gè)Event對(duì)象的線程。如果一個(gè)線程等待一個(gè)已經(jīng)被設(shè)置為真的Event對(duì)象,那么它將忽略這個(gè)事件, 繼續(xù)執(zhí)行
event.isSet():返回event的狀態(tài)值; event.wait():如果 event.isSet()==False將阻塞線程; event.set(): 設(shè)置event的狀態(tài)值為T(mén)rue,所有阻塞池的線程激活進(jìn)入就緒狀態(tài), 等待操作系統(tǒng)調(diào)度; event.clear():恢復(fù)event的狀態(tài)值為False。
例如,有多個(gè)工作線程嘗試鏈接MySQL,我們想要在鏈接前確保MySQL服務(wù)正常才讓那些工作線程去連接MySQL服務(wù)器,如果連接不成功,都會(huì)去嘗試重新連接。那么我們就可以采用threading.Event機(jī)制來(lái)協(xié)調(diào)各個(gè)工作線程的連接操作
from threading import Thread,Event import threading import time,random def conn_mysql(): count=1 while not event.is_set(): if count > 3: raise TimeoutError('鏈接超時(shí)') print('<%s>第%s次嘗試鏈接' % (threading.current_thread().getName(), count)) event.wait(0.5) count+=1 print('<%s>鏈接成功' %threading.current_thread().getName()) def check_mysql(): print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName()) time.sleep(random.randint(2,4)) event.set() if __name__ == '__main__': event=Event() conn1=Thread(target=conn_mysql) conn2=Thread(target=conn_mysql) check=Thread(target=check_mysql) conn1.start() conn2.start() check.start()
十二、條件Condition(了解)
使得線程等待,只有滿足某條件時(shí),才釋放n個(gè)線程
import threading def run(n): con.acquire() con.wait() print("run the thread: %s" %n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start() while True: inp = input('>>>') if inp == 'q': break con.acquire() con.notify(int(inp)) con.release() def condition_func(): ret = False inp = input('>>>') if inp == '1': ret = True return ret def run(n): con.acquire() con.wait_for(condition_func) print("run the thread: %s" %n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start()
十三、定時(shí)器
定時(shí)器,指定n秒后執(zhí)行某操作
from threading import Timer def hello(): print("hello, world") t = Timer(1, hello) t.start() # after 1 seconds, "hello, world" will be printed
驗(yàn)證碼定時(shí)器
from threading import Timer import random,time class Code: def __init__(self): self.make_cache() def make_cache(self,interval=5): self.cache=self.make_code() print(self.cache) self.t=Timer(interval,self.make_cache) self.t.start() def make_code(self,n=4): res='' for i in range(n): s1=str(random.randint(0,9)) s2=chr(random.randint(65,90)) res+=random.choice([s1,s2]) return res def check(self): while True: inp=input('>>: ').strip() if inp.upper() == self.cache: print('驗(yàn)證成功',end='\n') self.t.cancel() break if __name__ == '__main__': obj=Code() obj.check()
十四、線程queue
queue隊(duì)列 :使用import queue,用法與進(jìn)程Queue一樣
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
• class queue.Queue(maxsize=0) 先進(jìn)先出
import queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結(jié)果(先進(jìn)先出): first second third '''
- class queue.LifoQueue(maxsize=0) 先進(jìn)先出 # last in fisrt out 后進(jìn)先出
import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結(jié)果(后進(jìn)先出): third second first '''
- class queue.PriorityQueue(maxsize=0) # 存儲(chǔ)數(shù)據(jù)時(shí)可設(shè)置優(yōu)先級(jí)的隊(duì)列
import queue q=queue.PriorityQueue() #put進(jìn)入一個(gè)元組,元組的第一個(gè)元素是優(yōu)先級(jí)(通常是數(shù)字,也可以是非數(shù)字之間的比較),數(shù)字越小優(yōu)先級(jí)越高 q.put((20,'a')) q.put((10,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) ''' 結(jié)果(數(shù)字越小優(yōu)先級(jí)越高,優(yōu)先級(jí)高的優(yōu)先出隊(duì)): (10, 'b') (20, 'a') (30, 'c') '''
其他
Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite. 構(gòu)造一個(gè)優(yōu)先級(jí)隊(duì)列,其中maxsize是一個(gè)整數(shù),用于設(shè)置可以放入隊(duì)列的項(xiàng)目數(shù)量的上限.一旦達(dá)到這個(gè)上限,插入就會(huì)阻塞,直到隊(duì)列中有項(xiàng)目被消耗。如果maxsize小于或等于0,則隊(duì)列長(zhǎng)度為無(wú)窮大。 The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data). 首先檢索最低值的條目(最低值的條目是指列表經(jīng)過(guò)排序后取到的索引為0的那個(gè)元素,一般條目是(優(yōu)先級(jí)數(shù)字,數(shù)據(jù))這種元組的形式 exception queue.Empty Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty. 當(dāng)表示非阻塞的get()或get_nowait()在一個(gè)空的隊(duì)列對(duì)象中被調(diào)用時(shí),會(huì)拋出異常 exception queue.Full Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full. 當(dāng)表示非阻塞的put()或put_nowait()在一個(gè)滿的隊(duì)列對(duì)象中被調(diào)用時(shí),會(huì)拋出異常 Queue.qsize() Queue.empty() #return True if empty 當(dāng)隊(duì)列為空返回True Queue.full() # return True if full 當(dāng)隊(duì)列為滿返回True Queue.put(item, block=True, timeout=None) Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case). 將一個(gè)項(xiàng)放入隊(duì)列。如果可選參數(shù)block為true并且timeout為None(默認(rèn)值),則在必要時(shí)阻塞,直到有空閑槽可用。如果參數(shù)timeout是一個(gè)正數(shù),它最多阻塞timeout秒,如果在這段時(shí)間內(nèi)沒(méi)有可用的空閑槽,則會(huì)引發(fā)Full異常。否則(block為false),如果有空閑槽可用,則將一個(gè)項(xiàng)目放入隊(duì)列中,否則引發(fā)Full異常(在這種情況下,timeout被忽略)。 Queue.put_nowait(item) Equivalent to put(item, False). Queue.get(block=True, timeout=None) Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case). 從隊(duì)列中移除并返回一個(gè)項(xiàng)。如果可選參數(shù)block為true并且timeout為None(默認(rèn)值),則在必要時(shí)阻塞,直到有可用的項(xiàng)。如果timeout為正數(shù),則最多阻塞timeout秒,如果在該時(shí)間內(nèi)沒(méi)有可用項(xiàng),則拋出Empty異常。否則(block為false),如果一個(gè)項(xiàng)目可用,則返回那個(gè)項(xiàng)目,否則引發(fā)Empty異常(在這種情況下,timeout被忽略)。 Queue.get_nowait() Equivalent to get(False). Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads. 提供了兩種方法來(lái)支持追蹤進(jìn)入隊(duì)列的任務(wù)是否已被生產(chǎn)者的守護(hù)線程完全處理。 Queue.task_done() Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete. 假定先前進(jìn)入隊(duì)列的任務(wù)已完成。并且被隊(duì)列生產(chǎn)者使用。對(duì)于每個(gè)用于獲取任務(wù)的get(),后續(xù)對(duì)task_done()的調(diào)用都會(huì)告訴隊(duì)列任務(wù)的處理已經(jīng)完成。 If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue). 如果join()當(dāng)前正被阻塞,它將在所有項(xiàng)都被處理完時(shí)恢復(fù)(這意味著對(duì)于每個(gè)已經(jīng)put()到隊(duì)列中的項(xiàng)都接收到task_done()調(diào)用)。 Raises a ValueError if called more times than there were items placed in the queue. 如果調(diào)用次數(shù)超過(guò)放入隊(duì)列的項(xiàng)數(shù),將引發(fā)ValueError。 Queue.join() 阻塞,直到queue被消費(fèi)完畢
十五、Python標(biāo)準(zhǔn)模塊–concurrent.futures
# 1 介紹 concurrent.futures模塊提供了高度封裝的異步調(diào)用接口 ThreadPoolExecutor:線程池,提供異步調(diào)用 ProcessPoolExecutor: 進(jìn)程池,提供異步調(diào)用 Both implement the same interface, which is defined by the abstract Executor class. # 2 基本方法 # submit(fn, *args, **kwargs) 異步提交任務(wù) # map(func, *iterables, timeout=None, chunksize=1) 取代for循環(huán)submit的操作 # shutdown(wait=True) 相當(dāng)于進(jìn)程池的pool.close()+pool.join()操作 wait=True,等待池內(nèi)所有任務(wù)執(zhí)行完畢回收完資源后才繼續(xù) wait=False,立即返回,并不會(huì)等待池內(nèi)的任務(wù)執(zhí)行完畢 但不管wait參數(shù)為何值,整個(gè)程序都會(huì)等到所有任務(wù)執(zhí)行完畢 submit和map必須在shutdown之前 # result(timeout=None) 取得結(jié)果 # add_done_callback(fn) 回調(diào)函數(shù)
ProcessPoolExecutor
# 介紹 The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned. ProcessPoolExecutor類(lèi)是Executor的子類(lèi),它使用一個(gè)進(jìn)程池來(lái)異步執(zhí)行調(diào)用。ProcessPoolExecutor會(huì)調(diào)用多進(jìn)程模塊,這允許它避開(kāi)全局解釋器鎖,但也意味著只能執(zhí)行和返回可pickle的對(duì)象。 class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None) An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised. 能夠異步調(diào)用數(shù)量不超過(guò)參數(shù)max_workers的子進(jìn)程,如果max_workers為None或未給出,則默認(rèn)值為機(jī)器上的處理器數(shù)。如果max_workers小于或等于0,則會(huì)拋出異常ValueError # 用法 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ProcessPoolExecutor(max_workers=3) futures=[] for i in range(11): future=executor.submit(task,i) futures.append(future) executor.shutdown(True) print('+++>') for future in futures: print(future.result())
ThreadPoolExecutor
#介紹
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
ThreadPoolExecutor是Executor的一個(gè)子類(lèi),可以異步調(diào)用線程池里的線程
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.
能夠異步調(diào)用數(shù)量不超過(guò)參數(shù)max_workers的子進(jìn)程.
Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.
3.5版本中的變化:如果max_workers為None或者沒(méi)有被指定,它將默認(rèn)為計(jì)算機(jī)的處理器個(gè)數(shù)乘以5,假設(shè)ThreadPoolExecutor(線程池)通常用于重復(fù)I / O操作而不是CPU的計(jì)算,那么它的實(shí)際效率會(huì)低于ProcessPoolExecutor(進(jìn)程池)
New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.
3.6版本中新增功能:添加了thread_name_prefix參數(shù),允許用戶更方便地控制線程??梢宰远x由線程池創(chuàng)建的線程名,以便于調(diào)試。
#用法
與ProcessPoolExecutor相同
map的用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(11): # future=executor.submit(task,i) executor.map(task,range(1,12)) #map取代了for+submit 回調(diào)函數(shù) from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import requests import json import os def get_page(url): print('<進(jìn)程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def parse_page(res): res=res.result() print('<進(jìn)程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] # p=Pool(3) # for url in urls: # p.apply_async(get_page,args=(url,),callback=pasrse_page) # p.close() # p.join() p=ProcessPoolExecutor(3) for url in urls: p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個(gè)future對(duì)象obj,需要用obj.result()拿到結(jié)果
總結(jié)
到此這篇關(guān)于Python并發(fā)多線程的文章就介紹到這了,更多相關(guān)Python并發(fā)多線程操作內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
在python3中使用shuffle函數(shù)要注意的地方
今天小編就為大家分享一篇在python3中使用shuffle函數(shù)要注意的地方,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-02-02解決Shell執(zhí)行python文件,傳參空格引起的問(wèn)題
今天小編就為大家分享一篇解決Shell執(zhí)行python文件,傳參空格引起的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-10-10Python在字典中獲取帶權(quán)重的隨機(jī)值實(shí)現(xiàn)方式
這篇文章主要介紹了Python在字典中獲取帶權(quán)重的隨機(jī)值,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-11-11Python基于scipy實(shí)現(xiàn)信號(hào)濾波功能
本文將以實(shí)戰(zhàn)的形式基于scipy模塊使用Python實(shí)現(xiàn)簡(jiǎn)單濾波處理。這篇文章主要介紹了Python基于scipy實(shí)現(xiàn)信號(hào)濾波功能,需要的朋友可以參考下2019-05-05python GUI計(jì)算器的實(shí)現(xiàn)
這篇文章主要介紹了python gui計(jì)算器的實(shí)現(xiàn),幫助大家更好的理解和學(xué)習(xí)python gui編程,感興趣的朋友可以了解下2020-10-10利用Python實(shí)現(xiàn)繪制3D愛(ài)心的代碼分享
最近你是否也被李峋的愛(ài)心跳動(dòng)代碼所感動(dòng),心動(dòng)不如行動(dòng),相同的代碼很多,我們今天換一個(gè)玩法!構(gòu)建一個(gè)三維的跳動(dòng)愛(ài)心!嗯!這篇博客本著開(kāi)源的思想!不是說(shuō)誰(shuí)對(duì)浪漫過(guò)敏的2022-11-11在Django的URLconf中使用多個(gè)視圖前綴的方法
這篇文章主要介紹了在Django的URLconf中使用多個(gè)視圖前綴的方法,Django是Python中最為著名的遵循MVC結(jié)構(gòu)的開(kāi)發(fā)框架,需要的朋友可以參考下2015-07-07