詳解python中的線程與線程池
線程
進程和線程
什么是進程?
進程就是正在運行的程序, 一個任務(wù)就是一個進程, 進程的主要工作是管理資源, 而不是實現(xiàn)功能
什么是線程?
線程的主要工作是去實現(xiàn)功能, 比如執(zhí)行計算.
線程和進程的關(guān)系就像員工與老板的關(guān)系,
老板(進程) 提供資源 和 工作空間,
員工(線程) 負責(zé)去完成相應(yīng)的任務(wù)
特點
一個進程至少由一個線程, 這一個必須存在的線程被稱為主線程, 同時一個進程也可以有多個線程, 即多線程
當(dāng)我們我們遇到一些需要重復(fù)執(zhí)行的代碼時, 就可以使用多線程分擔(dān)一些任務(wù), 進而加快運行速度
線程的實現(xiàn)
線程模塊
Python通過兩個標(biāo)準(zhǔn)庫_thread和threading, 提供對線程的支持 , threading對_thread進行了封裝。
threading模塊中提供了Thread , Lock , RLock , Condition等組件。
因此在實際的使用中我們一般都是使用threading來實現(xiàn)多線程
線程包括子線程和主線程:
主線程 : 當(dāng)一個程序啟動時 , 就有一個線程開始運行 , 該線程通常叫做程序的主線程
子線程 : 因為程序是開始時就執(zhí)行的 , 如果你需要再創(chuàng)建線程 , 那么創(chuàng)建的線程就是這個主線程的子線程
主線程的重要性體現(xiàn)在兩方面 :
- 是產(chǎn)生其他子線程的線程
- 通常它必須最后完成執(zhí)行, 比如執(zhí)行各種關(guān)閉操作
Thread類
常用參數(shù)說明
參數(shù) | 說明 |
---|---|
target | 表示調(diào)用的對象, 即子線程要執(zhí)行的任務(wù), 可以是某個內(nèi)置方法, 或是你自己寫的函數(shù) |
name | 子線程的名稱 |
args | 傳入target函數(shù)中的位置參數(shù), 是一個元組, 參數(shù)后必須加逗號 |
常用實例方法
方法 | 作用 |
---|---|
Thread.run(self) | 線程啟動時運行的方法, 由該方法調(diào)用 target參數(shù)所指定的函數(shù) |
Thread.start(self) | 啟動進程, start方法就是區(qū)幫你調(diào)用run方法 |
Thread.terminate(self) | 強制終止線程 |
Thread.join(self, timeout=None) | 阻塞調(diào)用, 主線程進行等待 |
Thread.setDaemon(self, daemonic) | 將子線程設(shè)置為守護線程, 隨主線程結(jié)束而結(jié)束 |
Thread.getName(self, name) | 獲取線程名 |
Thread.setName(self, name) | 設(shè)置線程名 |
創(chuàng)建線程
在python中創(chuàng)建線程有兩種方式, 實例Thread類和繼承重寫Thread類
實例Thread類
import threading import time def run(name, s): # 線程要執(zhí)行的任務(wù) time.sleep(s) # 停兩秒 print('I am %s' % name) # 實例化線程類, 并傳入函數(shù)及其參數(shù), t1 = threading.Thread(target=run, name='one', args=('One', 5)) t2 = threading.Thread(target=run, name='two', args=('Two', 2)) # 開始執(zhí)行, 這兩個線程會同步執(zhí)行 t1.start() t2.start() print(t1.getName()) # 獲取線程名 print(t2.getName()) # Result: one two I am Two # 運行2s后 I am One # 運行5s后
繼承Thread類
class MyThread(threading.Thread): # 繼承threading中的Thread類 # 線程所需的參數(shù) def __init__(self, name, second): super().__init__() self.name = name self.second = second # 重寫run方法,表示線程所執(zhí)行的任務(wù),必須有 def run(self): time.sleep(self.second) print('I am %s' % self.name) # 創(chuàng)建線程實例 t1 = MyThread('One', 5) t2 = MyThread('Two', 2) # 啟動線程,實際上是調(diào)用了類中的run方法 t1.start() t2.start() t1.join() print(t1.getName()) print(t2.getName()) # Result: I am Two # 運行后2s I am One # 運行后5s One Two
常用方法
join()
阻塞調(diào)用程序 , 直到調(diào)用join () 方法的線程執(zhí)行結(jié)束, 才會繼續(xù)往下執(zhí)行
# 開始執(zhí)行, 這兩個線程會同步執(zhí)行 t1.start() t2.start() t1.join() # 等待t1線程執(zhí)行完畢,再繼續(xù)執(zhí)行剩余的代碼 print(t1.getName()) print(t2.getName()) # Result: I am Two I am One one two
setDemon()
使用給線程設(shè)置守護模式: 子線程跟隨主線程的結(jié)束而結(jié)束, 不管這個子線程任務(wù)是否完成. 而非守護模式的子線程只有在執(zhí)行完成后, 主線程才會執(zhí)行完成
setDaemon() 與 join() 基本上是相對的 , join會等子線程執(zhí)行完畢 ; 而setDaemon則不會等
def run(name, s): # 線程要執(zhí)行的函數(shù) time.sleep(s) # 停兩秒 print('I am %s' % name) # 實例化線程類, 并傳入函數(shù)及其參數(shù) t1 = threading.Thread(target=run, name='one', args=('One', 5)) t2 = threading.Thread(target=run, name='two', args=('Two', 2)) # 給t1設(shè)置守護模式, 使其隨著主線程的結(jié)束而結(jié)束 t1.setDaemon(True) # 開始執(zhí)行, 這兩個線程會同步執(zhí)行 t1.start() t2.start() # 主線程會等待未設(shè)置守護模式的線程t2執(zhí)行完成 # Result: I am Two # 運行后2s
線程間的通信
互斥鎖
在同一個進程的多線程中 , 其中的變量對于所有線程來說都是共享的 , 因此 , 如果多個線程之間同時修改一個變量 , 那就亂套了 , 共享的數(shù)據(jù)就會有很大的風(fēng)險 , 所以我們需要互斥鎖 , 來鎖住數(shù)據(jù) , 防止篡改。
來看一個錯誤的示范:
a = 0 def incr(n): global a for i in range(n): a += 1 # 這兩個方法同時聲明了變量a,并對其進行修改 def decr(n): global a for i in range(n): a -= 1 t_incr = threading.Thread(target=incr, args=(1000000,)) t_decr = threading.Thread(target=decr, args=(1000000,)) t_incr.start() t_decr.start() t_incr.join() t_decr.join() print(a) # 期望結(jié)果應(yīng)該是0, 但是因為這里沒有設(shè)置互斥鎖, 所以兩個方法是同時對同一個變量進行修改, 得到的的結(jié)果值是隨機的
下面我們改一下上面的代碼 , 兩個方法加上互斥鎖:
a = 0 lock = threading.Lock() # 實例化互斥鎖對象, 方便之后的調(diào)用 def incr(n): global a for i in range(n): lock.acquire() # 上鎖的方法 a += 1 lock.release() # 解鎖的方法 # 要注意的是上鎖的位置是, 出現(xiàn)修改操作的代碼 def decr(n): global a for i in range(n): with lock: # 也可以直接使用with, 自動解鎖 a -= 1 t_incr = threading.Thread(target=incr, args=(1000000,)) t_decr = threading.Thread(target=decr, args=(1000000,)) t_incr.start() t_decr.start() t_incr.join() t_decr.join() print(a) # Result: 0
在容易出現(xiàn)搶奪資源的地方進行上鎖 , 實現(xiàn)同一時間內(nèi) , 只有一個線程可以對對象進行操作
常用方法
關(guān)鍵字 | 解釋 |
---|---|
put(item) | 入隊 , 將item放入隊列中 , 在隊列為滿時插入值會發(fā)生阻塞(1) |
get() | 出隊 , 從隊列中移除并返回一個數(shù)據(jù) , 在隊列為空時獲取值會發(fā)生阻塞 |
task_done() | 任務(wù)結(jié)束 , 意味著之前入隊的一個任務(wù)已經(jīng)完成。由隊列的消費者線程調(diào)用 |
join() | 等待完成 , 阻塞調(diào)用線程,直到隊列中的所有任務(wù)被處理掉。 |
empty() | 如果隊列為空,返回True,反之返回False |
full() | 如果隊列為滿,返回True,反之返回False |
qsize() | 隊列長度 , 返回當(dāng)前隊列的數(shù)據(jù)量 |
(1): 阻塞: 程序停在阻塞的位置 , 無法繼續(xù)執(zhí)行
導(dǎo)入和實例化
import queue q = queue.Queue(4) # 實例化隊列對象, 并設(shè)置最大數(shù)據(jù)量
put() 和 get()
q.put('a') q.put('b') print(q.get()) # : a print(q.get()) # : b q.task_done() # get后必須要加task_done,確認get操作是否完成
q.put(1) # 當(dāng)前隊列已滿,再次put就會阻塞 print(q.full()) # 由于已經(jīng)阻塞, 所以這段不會被執(zhí)行 # put會在隊列慢了點時候,在插入值會發(fā)生阻塞 # get會在隊列里沒有值的時候,會發(fā)生阻塞
empty()
print(q.empty()) # 判斷隊列是否為空: True q.put('test') print(q.empty()) # : False
qsize()
print(q.qsize()) # 當(dāng)前隊列里有多少人: 1
full()
q.put(1) q.put(1) q.put(1) print(q.full()) # : True
join()
print('testetsetset') q.join() # join會在隊列非空時發(fā)生阻塞 print('done') # 由于已經(jīng)阻塞, 所以這段不會被執(zhí)行
線程池
池的概念
線程池中實現(xiàn)準(zhǔn)備好了一些可以重復(fù)使用的線程 , 等待接受任務(wù)并執(zhí)行
主線程提交任務(wù)給 線程池 , 線程池中的每個線程會一次一個的接收任務(wù)并執(zhí)行 , 直到主線程執(zhí)行結(jié)束
主線程: 相當(dāng)于生產(chǎn)者,只管向線程池提交任務(wù)。
并不關(guān)心線程池是如何執(zhí)行任務(wù)的。
因此,并不關(guān)心是哪一個線程執(zhí)行的這個任務(wù)。
線程池: 相當(dāng)于消費者,負責(zé)接收任務(wù),
并將任務(wù)分配到一個空閑的線程中去執(zhí)行。
自定義線程池
import queue import threading import time class ThreadPool: # 自定義線程池 def __init__(self, n): # 主線程做 self.queue_obj = queue.Queue() for i in range(n): threading.Thread(target=self.worker, daemon=True).start() # 給子線程worker設(shè)置為守護模式 def worker(self): # 子線程做,由于Debug調(diào)試的只是主線程的代碼,所以在調(diào)試時看不到子線程執(zhí)行的代碼 """線程對象,寫while True 是為了能夠一直執(zhí)行任務(wù)。""" while True: # 讓線程執(zhí)行完一個任務(wù)之后不會死掉,主線程結(jié)束時,守護模式會讓worker里的死循環(huán)停止 func = self.queue_obj.get() # get已經(jīng)入隊的任務(wù), 這里會接收到主線程分配的func # 由于設(shè)置了守護模式,當(dāng)隊列為空時,不會一直阻塞在get這里 # 有了守護模式,worker會在主線程執(zhí)行完畢后死掉 func() # 將隊列里的任務(wù)拿出來調(diào)用 """ 這里func與task_done的順序非常重要,如果func放在task_done后面的話會出現(xiàn)只執(zhí)行兩次就結(jié)束。 """ self.queue_obj.task_done() # task_done 會刷新計數(shù)器 # 線程池里有一個類似計數(shù)器的機制,用來記錄put的次數(shù)(+1),每一次task_done都會回撥一次記錄的次數(shù)(-1) # 當(dāng)回撥完計數(shù)器為0之后,就會執(zhí)行join def apply_async(self, func): # 主線程做 """向隊列中傳入需要執(zhí)行的函數(shù)對象""" self.queue_obj.put(func) # 將接收到的func入隊 def join(self): # 主線程做 """等待隊列中的內(nèi)容被取完""" self.queue_obj.join() # 隊列里不為空就阻塞,為空就不阻塞
簡單使用
def task1(): # 子線程做 time.sleep(2) print('task1 over') def task2(): # 子線程做 time.sleep(3) print('task2 over') P = ThreadPool(2) # 如果在start開啟線程之后沒有傳入任務(wù)對象,worker里的get會直接阻塞 P.apply_async(task1) P.apply_async(task2) print('start') P.join() print('done') # Result: start task1 over task2 over done
如果get發(fā)生阻塞意味著隊列為空,意味著join不阻塞,意味著print('done')會執(zhí)行,
意味著主線程沒有任務(wù)在做,意味著主線程結(jié)束,意味著不等待設(shè)置了守護的線程執(zhí)行任務(wù),
意味著子線程會隨著主線程的死亡而死亡,這就是為什么會設(shè)置守護模式。
如果沒有設(shè)置守護模式意味著get發(fā)生阻塞,意味著子線程任務(wù)執(zhí)行不完,意味著主線程一直要等子線程完成,
意味著程序一直都結(jié)束不了,意味著程序有問題
python內(nèi)置線程池
原理
- 創(chuàng)建線程池
- 將任務(wù)扔進去
- 關(guān)閉線程池
- 等待線程任務(wù)執(zhí)行完畢
'''手動實現(xiàn)線程池:
主要是配合隊列來進行實現(xiàn),我們定義好一個隊列對象,然后將我們的任務(wù)對象put到我們的隊列對象中,
然后使用多線程,讓我們的線程去get隊列種的對象,然后各自去執(zhí)行自己get到的任務(wù),
這樣的話其實也就實現(xiàn)了線程池
'''
使用方法
from multiprocessing.pool import ThreadPool import time pool = ThreadPool(2) # 直接使用內(nèi)置線程池, 設(shè)置最大線程數(shù) def task1(): time.sleep(2) print('task1 over') def task2(*args, **kwargs): time.sleep(3) print('task2 over', args, kwargs) pool.apply_async(task1) pool.apply_async(task2, args=(1, 2), kwds={'a': 1, 'b': 2}) print('Task Submitted') pool.close() # 要點: close必須要在join之前, 不允許再提交任務(wù)了 pool.join() print('Mission Complete') # Result: Task Submitted task1 over task2 over (1, 2) {'a': 1, 'b': 2} Mission Complete
其他操作
操作一: close - 關(guān)閉提交通道,不允許再提交任務(wù)
操作二: terminate - 中止進程池,中止所有任務(wù)
以上所述是小編給大家介紹的python線程與線程池詳解整合,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復(fù)大家的。在此也非常感謝大家對腳本之家網(wǎng)站的支持!
相關(guān)文章
編寫Python腳本把sqlAlchemy對象轉(zhuǎn)換成dict的教程
這篇文章主要介紹了編寫Python腳本把sqlAlchemy對象轉(zhuǎn)換成dict的教程,主要是基于Python的model類構(gòu)建一個轉(zhuǎn)換的方法,需要的朋友可以參考下2015-05-05python數(shù)據(jù)庫批量插入數(shù)據(jù)的實現(xiàn)(executemany的使用)
這篇文章主要介紹了python數(shù)據(jù)庫批量插入數(shù)據(jù)的實現(xiàn)(executemany的使用),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04解決python報錯:AttributeError:?'ImageDraw'?object?h
這篇文章主要給大家介紹了關(guān)于解決python報錯:AttributeError:?'ImageDraw'?object?has?no?attribute?'textbbox'的相關(guān)資料,文中通過圖文介紹的非常詳細,需要的朋友可以參考下2024-01-01