Python并發(fā)編程實例教程之線程的玩法
一、線程基礎以及守護進程
線程是CPU調(diào)度的最小單位
全局解釋器鎖
全局解釋器鎖GIL(global interpreter lock)
全局解釋器鎖的出現(xiàn)主要是為了完成垃圾回收機制的回收機制,對不同線程的引用計數(shù)的變化記錄的更加精準。
全局解釋器鎖導致了同一個進程中的多個線程只能有一個線程真正被CPU執(zhí)行。
GIL鎖每執(zhí)行700條指令才會進行一次(輪轉(zhuǎn))切換(從一個線程切換到另外一個線程)
節(jié)省的是IO操作(不占用CPU)的時間,而不是CPU計算的時間,因為CPU的計算速度非常快,大多數(shù)情況下,我們沒有辦法把一條進程中所有的IO操作都規(guī)避掉。
threading模塊
import time from threading import Thread, current_thread, enumerate, active_count def func(i): print('start%s' % i, current_thread().ident) # 函數(shù)中獲取當前線程id time.sleep(1) print('end%s' % i) if __name__ == '__main__': t1 = [] for i in range(3): t = Thread(target=func, args=(i,)) t.start() print(t.ident) # 查看當前線程id t1.append(t) print(enumerate(), active_count()) for t in t1: t.join() print('所有線程執(zhí)行完畢')
線程是不能從外部強制終止(terminate),所有的子線程只能是自己執(zhí)行完代碼之后就關閉。
current_thread 獲取當前的線程對象
current_thread().ident 或者 線程對象.ident 獲取當前線程id。
enumerate返回一個列表,存儲了所有活著的線程對象,包括主線程。
active_count返回一個數(shù)字,存儲了所有活著的線程個數(shù)。
【注意】enumerate導入之后,會和內(nèi)置函數(shù)enumerate重名,需要做特殊的處理
- from threading import enumerate as en
- import threading
threading.enumerate()
面向?qū)ο蠓绞介_啟一個線程
from threading import Thread class MyThread(Thread): def __init__(self, a, b): super(MyThread, self).__init__() self.a = a self.b = b def run(self): print(self.ident) t = MyThread(1, 3) t.start() # 開啟線程,才在線程中執(zhí)行run方法 print(t.ident)
線程之間的數(shù)據(jù)是共享的
from threading import Thread n = 100 def func(): global n n -= 1 t_li = [] for i in range(100): t = Thread(target=func) t.start() t_li.append(t) for t in t_li: t.join() print(n)
結(jié)果是:0
守護線程
- 主線程會等待子線程結(jié)束之后才結(jié)束,為什么?
因為主線程結(jié)束,進程就會結(jié)束。
- 守護線程隨著主線程的結(jié)束而結(jié)束
- 守護進程會隨著主進程的代碼結(jié)束而結(jié)束,如果主進程代碼之后還有其他子進程在運行,守護進程不守護。
- 守護線程會隨著主線程的結(jié)束而結(jié)束,如果主線程代碼結(jié)束之后還有其他子線程在運行,守護線程也守護。
import time from threading import Thread def son(): while True: print('in son') time.sleep(1) def son2(): for i in range(3): print('in son2...') time.sleep(1) # flag a t = Thread(target=son) t.daemon = True t.start() # flag b a-->b用時0s Thread(target=son2).start()
為什么守護線程會在主線程的代碼結(jié)束之后繼續(xù)守護其他子線程?
答:因為守護進程和守護線程的結(jié)束原理不同。守護進程需要主進程來回收資源,守護線程是隨著主線程的結(jié)束而結(jié)束,其他子線程–>主線程結(jié)束–>主進程結(jié)束–>整個進程中所有的資源都被回收,守護線程也會被回收。
二、線程鎖(互斥鎖)
線程之間也存在數(shù)據(jù)不安全
import dis a = 0 def func(): global a a += 1 dis.dis(func) # 得到func方法中的代碼翻譯成CPU指令 """ 結(jié)果 0 LOAD_GLOBAL 0 (a) 2 LOAD_CONST 1 (1) 4 INPLACE_ADD 6 STORE_GLOBAL 0 (a) 8 LOAD_CONST 0 (None) 10 RETURN_VALUE """
+=、-=、*=、/=、while、if、帶返回值的方法(都是先計算后賦值,前提要涉及到全局變量或靜態(tài)變量) 等都是數(shù)據(jù)不安全的,append、pop、queue、logging模塊等都是數(shù)據(jù)安全的。
列表中的方法或者字典中的方法去操作全局變量的時候,數(shù)據(jù)是安全的。
只有一個線程,永遠不會出現(xiàn)線程不安全現(xiàn)象。
采用加鎖的方式來保證數(shù)據(jù)安全。
from threading import Thread, Lock n = 0 def add(lock): for i in range(500000): global n with lock: n += 1 def sub(lock): for i in range(500000): global n with lock: n -= 1 t_li = [] lock = Lock() for i in range(2): t1 = Thread(target=add, args=(lock,)) t1.start() t2 = Thread(target=sub, args=(lock,)) t2.start() t_li.append(t1) t_li.append(t2) for t in t_li: t.join() print(n)
線程安全的單例模式
import time from threading import Thread, Lock class A: __instance = None lock = Lock() def __new__(cls, *args, **kwargs): with cls.lock: if not cls.__instance: time.sleep(0.00001) cls.__instance = super().__new__(cls) return cls.__instance def func(): a = A() print(a) for i in range(10): Thread(target=func).start()
不用考慮加鎖的小技巧
- 不要操作全局變量
- 不要在類中操作靜態(tài)變量
因為多個線程同時操作全局變量/靜態(tài)變量,會產(chǎn)生數(shù)據(jù)不安全現(xiàn)象。
三、線程鎖(遞歸鎖)
from threading import Lock, RLock # Lock 互斥鎖 # RLock 遞歸(recursion)鎖 l = Lock() l.acquire() print('希望被鎖住的代碼') l.release() rl = RLock() # 在同一個線程中可以被acquire多次 rl.acquire() rl.acquire() rl.acquire() print('希望被鎖住的代碼') rl.release()
from threading import Thread, RLock def func(i, lock): lock.acquire() lock.acquire() print(i, ':start') lock.release() lock.release() print(i, ':end') lock = RLock() for i in range(5): Thread(target=func, args=(i, lock)).start()
互斥鎖與遞歸鎖
遞歸鎖在同一個線程中可以被acquire多次,而互斥鎖不行
互斥鎖效率高,遞歸鎖效率相對低
多把互斥鎖容易產(chǎn)生死鎖現(xiàn)象,遞歸鎖可以快速解決死鎖
四、死鎖
死鎖:指兩個或兩個以上的進程或線程在執(zhí)行過程中,因爭奪資源而造成的一種互相等待的現(xiàn)象。
死鎖現(xiàn)象是怎么產(chǎn)生的?
答:有多把鎖,并且在多個線程中交叉使用。與互斥鎖、遞歸鎖無關,都會發(fā)生死鎖。如果是互斥鎖,出現(xiàn)了死鎖現(xiàn)象,最快速的解決方案是把所有的互斥鎖都改成一把遞歸鎖(noodle_lock = fork_lock = RLock()),程序的效率會降低。
from threading import Thread, Lock import time noodle_lock = Lock() fork_lock = Lock() def eat1(name): noodle_lock.acquire() print(name, '搶到面了') fork_lock.acquire() print(name, '搶到叉子了') print(name, '吃面') time.sleep(0.0001) fork_lock.release() print(name, '放下叉子了') noodle_lock.release() print(name, '放下面了') def eat2(name): fork_lock.acquire() print(name, '搶到叉子了') noodle_lock.acquire() print(name, '搶到面了') print(name, '吃面') noodle_lock.release() print(name, '放下面了') fork_lock.release() print(name, '放下叉子了') Thread(target=eat1, args=('lucy',)).start() Thread(target=eat2, args=('jack',)).start() Thread(target=eat1, args=('rose',)).start() Thread(target=eat2, args=('disen',)).start()
五、隊列
隊列:線程之間數(shù)據(jù)安全的容器
線程隊列:數(shù)據(jù)安全,先進先出
原理:加鎖 + 鏈表
Queue
fifo 先進先出的隊列
get和put
import queue q = queue.Queue(3) # fifo 先進先出的隊列 q.put(1) q.put(2) print(q.get()) print(q.get())
1
2
get_nowait
import queue # from queue import Empty # 不是內(nèi)置的錯誤類型,而是queue模塊中的錯誤 q = queue.Queue() # fifo 先進先出的隊列 try: q.get_nowait() except queue.Empty: pass print('隊列為空,繼續(xù)執(zhí)行其他代碼')
put_nowait
用的很少,因為隊列滿時,拋異常,數(shù)據(jù)放不進去,丟失了。
LifoQueue
后進先出的隊列,也就是棧。last in first out
from queue import LifoQueue lq = LifoQueue() lq.put(1) lq.put(2) print(lq.get()) print(lq.get())
2
1
PriorityQueue
優(yōu)先級隊列,按照放入數(shù)據(jù)的第一位數(shù)值從小到大輸出
from queue import PriorityQueue priq = PriorityQueue() priq.put((2, 'lucy')) priq.put((0, 'rose')) priq.put((1, 'jack')) print(priq.get()) print(priq.get()) print(priq.get())
(0, 'rose')
(1, 'jack')
(2, 'lucy')
三種隊列使用場景
先進先出:用于處理服務類任務(買票任務)
后進先出:算法中用的比較多
優(yōu)先級隊列:比如,VIP制度,VIP用戶優(yōu)先;
六、相關面試題
請聊聊進程隊列的特點和實現(xiàn)原理
特點:實現(xiàn)進程之間的通信;數(shù)據(jù)安全;先進先出。
實現(xiàn)原理:基于管道 + 鎖 實現(xiàn)的,管道是基于文件級別的socket + pickle 實現(xiàn)的。
你了解生產(chǎn)者消費者模型嗎,如何實現(xiàn)
了解
為什么了解?工作經(jīng)驗
采集圖片/爬取音樂:由于要爬取大量的數(shù)據(jù),想提高爬取效率
有用過一個生產(chǎn)者消費者模型,這個模型是我自己寫的,消息中間件,用的是xxx(redis),我獲取網(wǎng)頁的過程作為生產(chǎn)者,分析網(wǎng)頁,獲取所有歌曲歌曲鏈接的過程作為消費者。
自己寫監(jiān)控,或者是自己寫郵件報警系統(tǒng),監(jiān)控程序作為生產(chǎn)者,一旦發(fā)現(xiàn)有問題的程序,就需要把要發(fā)送的郵件信息交給消息中間件redis,消費者就從中間件中取值,然后來處理發(fā)郵件的邏輯。
什么時候用過?
項目 或者 例子,結(jié)合上面一起
在python中實現(xiàn)生產(chǎn)者消費者模型可以用哪些機制
消息中間件
celery(分布式框架):定時發(fā)短信的任務
從你的角度說說進程在計算機中扮演什么角色
進程用來管理一個運行中的程序的資源,是資源分配的最小單位
進程與進程之間內(nèi)存是隔離的
進程是由操作系統(tǒng)負責調(diào)度的,并且多個進程之間是一種競爭關系,所以我們應該對進程的三狀態(tài)時刻關注,盡量減少進程中的IO操作,或者在進程里面開線程來規(guī)避IO,讓我們寫的程序在運行的時候能夠更多的占用CPU資源。
為什么線程之間的數(shù)據(jù)不安全
線程之間數(shù)據(jù)共享
多線程的情況下,
如果在計算某一個變量的時候,還要進行賦值操作,這個過程不是由一條完整的CPU指令完成的;
如果在判斷某個bool表達式之后,再做某些操作,這個過程也不是由一條完整的CPU指令完成的;
在中間發(fā)生了GIL鎖的切換(時間片的輪轉(zhuǎn)),可能會導致數(shù)據(jù)不安全。
讀程序,請確認執(zhí)行到最后number的長度是否一定為 1
import threading import time # loop = 1E7 # 10000000. loop = int(1E7) # 10000000 def _add(loop: int = 1): global numbers for _ in range(loop): numbers.append(0) def _sub(loop: int = 1): global numbers for _ in range(loop): while not numbers: time.sleep(1E-8) numbers.pop() numbers = [0] ta = threading.Thread(target=_add, args=(loop,)) ts = threading.Thread(target=_sub, args=(loop,)) # ts1 = threading.Thread(target=_sub, args=(loop,)) ta.start() ts.start() # ts1.start() ta.join() ts.join() # ts1.join()
因為只開啟了一個進行pop操作的線程,如果開啟多個pop操作的線程,必須在while前面加鎖,因為可能有兩個線程,一個執(zhí)行了while not numbers,發(fā)生了GIL的切換,另外一個線程執(zhí)行完了代碼,numbers剛好沒有了數(shù)據(jù),導致結(jié)果一個pop成功,一個pop不成功。
所以number長度一定為1,如果把注釋去了,不一定為1
讀程序,請確認執(zhí)行到最后number的長度是否一定為 1
import threading import time loop = int(1E7) def _add(loop: int = 1): global numbers for _ in range(loop): numbers.append(0) def _sub(loop: int = 1): global numbers for _ in range(loop): while not numbers: time.sleep(1E-8) numbers.pop() numbers = [0] ta = threading.Thread(target=_add, args=(loop,)) ts = threading.Thread(target=_sub, args=(loop,)) ta.start() ta.join() ts.start() ts.join()
一定為1,因為是同步的。
讀程序,請確認執(zhí)行到最后number是否一定為 0
import threading loop = int(1E7) def _add(loop: int = 1): global numbers for _ in range(loop): numbers += 1 def _sub(loop: int = 1): global numbers for _ in range(loop): numbers -= 1 numbers = 0 ta = threading.Thread(target=_add, args=(loop,)) ts = threading.Thread(target=_sub, args=(loop,)) ta.start() ta.join() ts.start() ts.join()
一定等于0,因為是同步的。
讀程序,請確認執(zhí)行到最后number是否一定為 0
import threading loop = int(1E7) def _add(loop: int = 1): global numbers for _ in range(loop): numbers += 1 def _sub(loop: int = 1): global numbers for _ in range(loop): numbers -= 1 numbers = 0 ta = threading.Thread(target=_add, args=(loop,)) ts = threading.Thread(target=_sub, args=(loop,)) ta.start() ts.start() ta.join() ts.join()
不一定為0,因為是異步的且存在 += 操作
七、判斷數(shù)據(jù)是否安全
是否數(shù)據(jù)共享,是同步還是異步(數(shù)據(jù)共享并且異步的情況下)
- +=、-=、*=、/=、a = 計算之后賦值給變量
- if、while 條件,這兩個判斷是由多個線程完成的
這兩種情況下,數(shù)據(jù)不安全。
八、進程池 & 線程池
以前,有多少個任務就開多少個進程或線程。
什么是池
要在程序開始的時候,還沒有提交任務,先創(chuàng)建幾個線程或者進程,放在一個池子里,這就是池
為什么要用池
如果先開好進程/線程,那么有任務之后就可以直接使用這個池中的數(shù)據(jù)了;并且開好的進程/線程會一直存在在池中,可以被多個任務反復利用,這樣極大的減少了開啟/關閉/調(diào)度進程/調(diào)度線程的時間開銷。
池中的線程/進程個數(shù)控制了操作系統(tǒng)需要調(diào)用的任務個數(shù),控制池中的單位,有利于提高操作系統(tǒng)的效率,減輕操作系統(tǒng)的負擔。
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor # threading模塊 沒有提供池 # multiprocessing模塊 仿照threading增加了Pool(逐漸被淘汰) # concurrent.futures模塊 線程池,進程池都能夠用相似的方式開啟/使用 ThreadPoolExecutor() # 參數(shù)代表開啟多少個線程,線程的個數(shù)一般起cpu個數(shù)*4(或者*5) ProcessPoolExecutor() # 參數(shù)代表開啟多少個進程,進程的個數(shù)一般起cpu個數(shù)+1
創(chuàng)建線程池并提交任務
from concurrent.futures import ThreadPoolExecutor from threading import current_thread import time def func(a, b): print(current_thread().ident, a, b) time.sleep(1) tp = ThreadPoolExecutor(4) # 創(chuàng)建線程池對象 for i in range(20): # tp.submit(func, i, i + 1) # 向池中提交任務 tp.submit(func, a=i, b=i + 1) # 位置傳參,關鍵字傳參都可以
創(chuàng)建進程池并提交任務
from concurrent.futures import ProcessPoolExecutor import os import time def func(a, b): print(os.getpid(), 'start', a, b) time.sleep(1) print(os.getpid(), 'end', a, b) if __name__ == '__main__': tp = ProcessPoolExecutor(4) # 創(chuàng)建進程池對象 for i in range(20): # tp.submit(func, i, i + 1) # 向池中提交任務 tp.submit(func, a=i, b=i + 1) # 位置傳參,關鍵字傳參都可以
獲取任務結(jié)果
from concurrent.futures import ProcessPoolExecutor import os import time def func(a, b): print(os.getpid(), 'start', a, b) time.sleep(1) print(os.getpid(), 'end', a, b) return a * b if __name__ == '__main__': tp = ProcessPoolExecutor(4) # 創(chuàng)建進程池對象 future_d = {} for i in range(20): # 異步非阻塞的 ret = tp.submit(func, a=i, b=i + 1) # future未來對象 # print(ret) # <Future at 0x1ad918e1148 state=running> # print(ret.result()) # 這樣需要等待,同步的 future_d[i] = ret for key in future_d: # 同步阻塞的 print(key, future_d[key].result())
tp對象的map
map 只適合傳遞簡單的參數(shù),并且必須是一個可迭代的類型
from concurrent.futures import ProcessPoolExecutor import os import time def func(a): print(os.getpid(), 'start', a[0], a[1]) time.sleep(1) print(os.getpid(), 'end', a[0], a[1]) return a[0] * a[1] if __name__ == '__main__': tp = ProcessPoolExecutor(4) ret = tp.map(func, ((i, i + 1) for i in range(20))) # 一般函數(shù)只接收一個參數(shù),要想傳入多個,使用元組方式 for r in ret: print(r)
回調(diào)函數(shù)
當有一個結(jié)果需要進行處理時,都會綁定一個回調(diào)函數(shù)來處理,除非是得到所有結(jié)果之后才做處理,我們使用 把結(jié)果存入列表 遍歷列表 的方式。
回調(diào)函數(shù)效率最高的。
from concurrent.futures import ThreadPoolExecutor from threading import current_thread import time def func(a, b): print(current_thread().ident, 'start', a, b) time.sleep(1) print(current_thread().ident, 'end', a) return a * b if __name__ == '__main__': tp = ThreadPoolExecutor(4) future_d = {} for i in range(20): # 異步非阻塞的 ret = tp.submit(func, a=i, b=i + 1) future_d[i] = ret for key in future_d: # 同步阻塞的 print(key, future_d[key].result())
上述代碼,打印結(jié)果是按照順序(0,1,2,3……),并不是誰先結(jié)束就打印誰。
使用回調(diào)函數(shù)以后,誰先執(zhí)行完就打印誰,代碼如下:
from concurrent.futures import ThreadPoolExecutor from threading import current_thread import time def func(a, b): print(current_thread().ident, 'start', a, b) time.sleep(1) print(current_thread().ident, 'end', a) return a, a * b def print_func(ret): # 異步阻塞 每個任務都是各自阻塞各自,誰先執(zhí)行完誰先打印 print(ret.result()) if __name__ == '__main__': tp = ThreadPoolExecutor(4) for i in range(20): # 異步非阻塞的 ret = tp.submit(func, a=i, b=i + 1) # [ret0, ret1, ..., ret19] ret.add_done_callback(print_func) # 異步阻塞 [print_func, print_func,...,print_func] # 回調(diào)機制 # 回調(diào)函數(shù) 給ret對象綁定一個回調(diào)函數(shù),等待ret對應的任務有了結(jié)果之后立即調(diào)用print_func函數(shù) # 就可以對結(jié)果立即進行處理,而不用按照順序接收結(jié)果處理結(jié)果
ret這個任務會在執(zhí)行完畢的瞬間立即觸發(fā)print_func函數(shù),并且把任務的返回值對象傳遞到print_func做參數(shù)。
回調(diào)函數(shù)的例子
from concurrent.futures import ThreadPoolExecutor import requests def get_page(url): # 訪問網(wǎng)頁,獲取網(wǎng)頁源代碼,用線程池中的線程來操作 respone = requests.get(url) if respone.status_code == 200: return {'url': url, 'text': respone.text} def parse_page(res): # 獲取到字典結(jié)果之后,計算網(wǎng)頁源代碼的長度,把'https://www.baidu.com : 長度值'寫到文件里,線程任務執(zhí)行完畢之后綁定回調(diào)函數(shù) res = res.result() parse_res = 'url:<%s> size:[%s]\n' % (res['url'], len(res['text'])) with open('db.txt', 'a') as f: f.write(parse_res) if __name__ == '__main__': urls = [ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://www.tencent.com/zh-cn', 'http://www.sina.com.cn/' ] tp = ThreadPoolExecutor(4) for url in urls: ret = tp.submit(get_page, url) ret.add_done_callback(parse_page) # 誰先回來誰就先把結(jié)果寫進文件 # 不用回調(diào)函數(shù): # 按照順序獲取網(wǎng)頁,baidu python openstack tencent sina # 也只能按照順序?qū)? # 用上了回調(diào)函數(shù) # 按照順序獲取網(wǎng)頁,baidu python openstack tencent sina # 哪一個網(wǎng)頁先返回結(jié)果,就先執(zhí)行哪個網(wǎng)頁對應的回調(diào)函數(shù)(parse_page)
進程池線程池的應用場景
進程池:
場景:高計算的場景,沒有IO操作(沒有文件操作,沒有數(shù)據(jù)庫操作,沒有網(wǎng)絡操作,沒有input);
進程的個數(shù):[cpu_count*1, cpu_count*2]
線程池:
場景:爬蟲
線程的個數(shù):一般根據(jù)IO的比例定制,cpu_count*5
總結(jié)
到此這篇關于Python并發(fā)編程實例教程之線程的文章就介紹到這了,更多相關Python并發(fā)編程線程內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Python pydotplus安裝及可視化圖形創(chuàng)建教程
這篇文章主要為大家介紹了Python pydotplus安裝及可視化圖形創(chuàng)建教程示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-10-10PyCharm安裝配置Qt Designer+PyUIC圖文教程
這篇文章主要介紹了PyCharm安裝配置Qt Designer+PyUIC圖文教程,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-05-05django 框架實現(xiàn)的用戶注冊、登錄、退出功能示例
這篇文章主要介紹了django 框架實現(xiàn)的用戶注冊、登錄、退出功能,結(jié)合實例形式詳細分析了Django框架用戶注冊、登陸、退出等功能具體實現(xiàn)方法及操作注意事項,需要的朋友可以參考下2019-11-11Python實現(xiàn)蟻群優(yōu)化算法的示例代碼
蟻群算法是一種源于大自然生物世界的新的仿生進化算法,本文主要介紹了Python如何實現(xiàn)蟻群算法,文中通過示例代碼具有一定的參考價值,感興趣的小伙伴們可以了解一下2023-08-08