Python全棧之隊(duì)列詳解
1. lock互斥鎖
知識(shí)點(diǎn):
lock.acquire()# 上鎖 lock.release()# 解鎖 #同一時(shí)間允許一個(gè)進(jìn)程上一把鎖 就是Lock 加鎖可以保證多個(gè)進(jìn)程修改同一塊數(shù)據(jù)時(shí),同一時(shí)間只能有一個(gè)任務(wù)可以進(jìn)行修改,即串行的修改,沒(méi)錯(cuò),速度是慢了,但犧牲速度卻保證了數(shù)據(jù)安全。 #同一時(shí)間允許多個(gè)進(jìn)程上多把鎖 就是[信號(hào)量Semaphore] 信號(hào)量是鎖的變形: 實(shí)際實(shí)現(xiàn)是 計(jì)數(shù)器 + 鎖,同時(shí)允許多個(gè)進(jìn)程上鎖 # 互斥鎖Lock : 互斥鎖就是進(jìn)程的互相排斥,誰(shuí)先搶到資源,誰(shuí)就上鎖改資源內(nèi)容,為了保證數(shù)據(jù)的同步性 # 注意:多個(gè)鎖一起上,不開(kāi)鎖,會(huì)造成死鎖.上鎖和解鎖是一對(duì).
程序?qū)崿F(xiàn):
# ### 鎖 lock 互斥鎖 from multiprocessing import Process,Lock """ 上鎖和解鎖是一對(duì), 連續(xù)上鎖不解鎖是死鎖 ,只有在解鎖的狀態(tài)下,其他進(jìn)程才有機(jī)會(huì)上鎖 """ """ # 創(chuàng)建一把鎖 lock = Lock() # 上鎖 lock.acquire() # lock.acquire() # 連續(xù)上鎖,造成了死鎖現(xiàn)象; print("我在裊裊炊煙 .. 你在焦急等待 ... 廁所進(jìn)行時(shí) ... ") # 解鎖 lock.release() """ # ### 12306 搶票軟件 import json,time,random # 1.讀寫(xiě)數(shù)據(jù)庫(kù)當(dāng)中的票數(shù) def wr_info(sign , dic=None): if sign == "r": with open("ticket",mode="r",encoding="utf-8") as fp: dic = json.load(fp) return dic elif sign == "w": with open("ticket",mode="w",encoding="utf-8") as fp: json.dump(dic,fp) # dic = wr_info("w",dic={"count":0}) # print(dic , type(dic) ) # 2.執(zhí)行搶票的方法 def get_ticket(person): # 先獲取數(shù)據(jù)庫(kù)中實(shí)際票數(shù) dic = wr_info("r") # 模擬一下網(wǎng)絡(luò)延遲 time.sleep(random.uniform(0.1,0.7)) # 判斷票數(shù) if dic["count"] > 0: print("{}搶到票了".format(person)) # 搶到票后,讓當(dāng)前票數(shù)減1 dic["count"] -= 1 # 更新數(shù)據(jù)庫(kù)中的票數(shù) wr_info("w",dic) else: print("{}沒(méi)有搶到票哦".format(person)) # 3.對(duì)搶票和讀寫(xiě)票數(shù)做一個(gè)統(tǒng)一的調(diào)用 def main(person,lock): # 查看剩余票數(shù) dic = wr_info("r") print("{}查看票數(shù)剩余: {}".format(person,dic["count"])) # 上鎖 lock.acquire() # 開(kāi)始搶票 get_ticket(person) # 解鎖 lock.release() if __name__ == "__main__": lock = Lock() lst = ["梁新宇","康裕康","張保張","于朝志","薛宇健","韓瑞瑞","假摔先","劉子濤","黎明輝","趙鳳勇"] for i in lst: p = Process( target=main,args=( i , lock ) ) p.start() """ 創(chuàng)建進(jìn)程,開(kāi)始搶票是異步并發(fā)程序 直到開(kāi)始搶票的時(shí)候,變成同步程序, 先搶到鎖資源的先執(zhí)行,后搶到鎖資源的后執(zhí)行; 按照順序依次執(zhí)行;是同步程序; 搶票的時(shí)候,變成同步程序,好處是可以等到數(shù)據(jù)修改完成之后,在讓下一個(gè)人搶,保證數(shù)據(jù)不亂。 如果不上鎖的話,只剩一張票的時(shí)候,那么所有的人都能搶到票,因?yàn)槌绦驁?zhí)行的速度太快,所以接近同步進(jìn)程,導(dǎo)致數(shù)據(jù)也不對(duì)。 """
ticket文件
{"count": 0}
2. 事件_紅綠燈效果
2.1 信號(hào)量_semaphore
# ### 信號(hào)量 Semaphore 本質(zhì)上就是鎖,只不過(guò)是多個(gè)進(jìn)程上多把鎖,可以控制上鎖的數(shù)量 """Semaphore = lock + 數(shù)量 """ from multiprocessing import Semaphore , Process import time , random """ # 同一時(shí)間允許多個(gè)進(jìn)程上5把鎖 sem = Semaphore(5) #上鎖 sem.acquire() print("執(zhí)行操作 ... ") #解鎖 sem.release() """ def singsong_ktv(person,sem): # 上鎖 sem.acquire() print("{}進(jìn)入了唱吧ktv , 正在唱歌 ~".format(person)) # 唱一段時(shí)間 time.sleep( random.randrange(4,8) ) # 4 5 6 7 print("{}離開(kāi)了唱吧ktv , 唱完了 ... ".format(person)) # 解鎖 sem.release() if __name__ == "__main__": sem = Semaphore(5) lst = ["趙鳳勇" , "沈思雨", "趙萬(wàn)里" , "張宇" , "假率先" , "孫杰龍" , "陳璐" , "王雨涵" , "楊元濤" , "劉一鳳" ] for i in lst: p = Process(target=singsong_ktv , args = (i , sem) ) p.start() """ # 總結(jié): Semaphore 可以設(shè)置上鎖的數(shù)量 , 同一時(shí)間上多把鎖 創(chuàng)建進(jìn)程時(shí),是異步并發(fā),執(zhí)行任務(wù)時(shí),是同步程序; """ # 趙萬(wàn)里進(jìn)入了唱吧ktv , 正在唱歌 ~ # 趙鳳勇進(jìn)入了唱吧ktv , 正在唱歌 ~ # 張宇進(jìn)入了唱吧ktv , 正在唱歌 ~ # 沈思雨進(jìn)入了唱吧ktv , 正在唱歌 ~ # 孫杰龍進(jìn)入了唱吧ktv , 正在唱歌 ~
2.2 事件_紅綠燈效果
# ### 事件 (Event) """ # 阻塞事件 : e = Event()生成事件對(duì)象e e.wait()動(dòng)態(tài)給程序加阻塞 , 程序當(dāng)中是否加阻塞完全取決于該對(duì)象中的is_set() [默認(rèn)返回值是False] # 如果是True 不加阻塞 # 如果是False 加阻塞 # 控制這個(gè)屬性的值 # set()方法 將這個(gè)屬性的值改成True # clear()方法 將這個(gè)屬性的值改成False # is_set()方法 判斷當(dāng)前的屬性是否為T(mén)rue (默認(rèn)上來(lái)是False) """ from multiprocessing import Process,Event import time , random # 1 ''' e = Event() # 默認(rèn)屬性值是False. print(e.is_set()) # 判斷內(nèi)部成員屬性是否是False e.wait() # 如果是False , 代碼程序阻塞 print(" 代碼執(zhí)行中 ... ") ''' # 2 ''' e = Event() # 將這個(gè)屬性的值改成True e.set() # 判斷內(nèi)部成員屬性是否是True e.wait() # 如果是True , 代碼程序不阻塞 print(" 代碼執(zhí)行中 ... ") # 將這個(gè)屬性的值改成False e.clear() e.wait() print(" 代碼執(zhí)行中 .... 2") ''' # 3 """ e = Event() # wait(3) 代表最多等待3秒; e.wait(3) print(" 代碼執(zhí)行中 .... 3") """ # ### 模擬經(jīng)典紅綠燈效果 # 紅綠燈切換 def traffic_light(e): print("紅燈亮") while True: if e.is_set(): # 綠燈狀態(tài) -> 切紅燈 time.sleep(1) print("紅燈亮") # True => False e.clear() else: # 紅燈狀態(tài) -> 切綠燈 time.sleep(1) print("綠燈亮") # False => True e.set() # e = Event() # traffic_light(e) # 車(chē)的狀態(tài) def car(e,i): # 判斷是否是紅燈,如果是加上wait阻塞 if not e.is_set(): print("car{} 在等待 ... ".format(i)) e.wait() # 否則不是,代表綠燈通行; print("car{} 通行了 ... ".format(i)) """ # 1.全國(guó)紅綠燈 if __name__ == "__main__": e = Event() # 創(chuàng)建交通燈 p1 = Process(target=traffic_light , args=(e,)) p1.start() # 創(chuàng)建小車(chē)進(jìn)程 for i in range(1,21): time.sleep(random.randrange(2)) p2 = Process(target=car , args=(e,i)) p2.start() """ # 2.包頭紅綠燈,沒(méi)有車(chē)的時(shí)候,把紅綠燈關(guān)了,省電; if __name__ == "__main__": lst = [] e = Event() # 創(chuàng)建交通燈 p1 = Process(target=traffic_light , args=(e,)) # 設(shè)置紅綠燈為守護(hù)進(jìn)程 p1.daemon = True p1.start() # 創(chuàng)建小車(chē)進(jìn)程 for i in range(1,21): time.sleep(random.randrange(2)) p2 = Process(target=car , args=(e,i)) lst.append(p2) p2.start() # 讓所有的小車(chē)全部跑完,把紅綠燈炸飛 print(lst) for i in lst: i.join() print("關(guān)閉成功 .... ")
事件知識(shí)點(diǎn):
# 阻塞事件 : e = Event()生成事件對(duì)象e e.wait()動(dòng)態(tài)給程序加阻塞 , 程序當(dāng)中是否加阻塞完全取決于該對(duì)象中的is_set() [默認(rèn)返回值是False] # 如果是True 不加阻塞 # 如果是False 加阻塞 # 控制這個(gè)屬性的值 # set()方法 將這個(gè)屬性的值改成True # clear()方法 將這個(gè)屬性的值改成False # is_set()方法 判斷當(dāng)前的屬性是否為T(mén)rue (默認(rèn)上來(lái)是False)
3. queue進(jìn)程隊(duì)列
# ### 進(jìn)程隊(duì)列(進(jìn)程與子進(jìn)程是相互隔離的,如果兩者想要進(jìn)行通信,可以利用隊(duì)列實(shí)現(xiàn)) from multiprocessing import Process,Queue # 引入線程模塊; 為了捕捉queue.Empty異常; import queue # 1.基本語(yǔ)法 """順序: 先進(jìn)先出,后進(jìn)后出""" # 創(chuàng)建進(jìn)程隊(duì)列 q = Queue() # put() 存放 q.put(1) q.put(2) q.put(3) # get() 獲取 """在獲取不到任何數(shù)據(jù)時(shí),會(huì)出現(xiàn)阻塞""" # print( q.get() ) # print( q.get() ) # print( q.get() ) # print( q.get() ) # get_nowait() 拿不到數(shù)據(jù)報(bào)異常 """[windows]效果正常 [linux]不兼容""" try: print( q.get_nowait() ) print( q.get_nowait() ) print( q.get_nowait() ) print( q.get_nowait() ) except : #queue.Empty pass # put_nowait() 非阻塞版本的put # 設(shè)置當(dāng)前隊(duì)列最大長(zhǎng)度為3 ( 元素個(gè)數(shù)最多是3個(gè) ) """在指定隊(duì)列長(zhǎng)度的情況下,如果塞入過(guò)多的數(shù)據(jù),會(huì)導(dǎo)致阻塞""" # q2 = Queue(3) # q2.put(111) # q2.put(222) # q2.put(333) # q2.put(444) """使用put_nowait 在隊(duì)列已滿的情況下,塞入數(shù)據(jù)會(huì)直接報(bào)錯(cuò)""" q2 = Queue(3) try: q2.put_nowait(111) q2.put_nowait(222) q2.put_nowait(333) q2.put_nowait(444) except: pass # 2.進(jìn)程間的通信IPC def func(q): # 2.子進(jìn)程獲取主進(jìn)程存放的數(shù)據(jù) res = q.get() print(res,"<22>") # 3.子進(jìn)程中存放數(shù)據(jù) q.put("劉一縫") if __name__ == "__main__": q3 = Queue() p = Process(target=func,args=(q3,)) p.start() # 1.主進(jìn)程存入數(shù)據(jù) q3.put("趙鳳勇") # 為了等待子進(jìn)程把數(shù)據(jù)存放隊(duì)列后,主進(jìn)程在獲取數(shù)據(jù); p.join() # 4.主進(jìn)程獲取子進(jìn)程存放的數(shù)據(jù) print(q3.get() , "<33>")
小提示: 一般主進(jìn)程比子進(jìn)程執(zhí)行的快一些
隊(duì)列知識(shí)點(diǎn):
# 進(jìn)程間通信 IPC # IPC Inter-Process Communication # 實(shí)現(xiàn)進(jìn)程之間通信的兩種機(jī)制: # 管道 Pipe # 隊(duì)列 Queue # put() 存放 # get() 獲取 # get_nowait() 拿不到報(bào)異常 # put_nowait() 非阻塞版本的put q.empty() 檢測(cè)是否為空 (了解) q.full() 檢測(cè)是否已經(jīng)存滿 (了解)
4. 生產(chǎn)者消費(fèi)者模型
# ### 生產(chǎn)者和消費(fèi)者模型 """ # 爬蟲(chóng)案例 1號(hào)進(jìn)程負(fù)責(zé)抓取其他多個(gè)網(wǎng)站中相關(guān)的關(guān)鍵字信息,正則匹配到隊(duì)列中存儲(chǔ)(mysql) 2號(hào)進(jìn)程負(fù)責(zé)把隊(duì)列中的內(nèi)容拿取出來(lái),將經(jīng)過(guò)修飾后的內(nèi)容布局到自個(gè)的網(wǎng)站中 1號(hào)進(jìn)程可以理解成生產(chǎn)者 2號(hào)進(jìn)程可以理解成消費(fèi)者 從程序上來(lái)看 生產(chǎn)者負(fù)責(zé)存儲(chǔ)數(shù)據(jù) (put) 消費(fèi)者負(fù)責(zé)獲取數(shù)據(jù) (get) 生產(chǎn)者和消費(fèi)者比較理想的模型: 生產(chǎn)多少,消費(fèi)多少 . 生產(chǎn)數(shù)據(jù)的速度 和 消費(fèi)數(shù)據(jù)的速度 相對(duì)一致 """ # 1.基礎(chǔ)版生產(chǎn)著消費(fèi)者模型 """問(wèn)題 : 當(dāng)前模型,程序不能正常終止 """ """ from multiprocessing import Process,Queue import time,random # 消費(fèi)者模型 def consumer(q,name): while True: # 獲取隊(duì)列中的數(shù)據(jù) food = q.get() time.sleep(random.uniform(0.1,1)) print("{}吃了{(lán)}".format(name,food)) # 生產(chǎn)者模型 def producer(q,name,food): for i in range(5): time.sleep(random.uniform(0.1,1)) # 展示生產(chǎn)的數(shù)據(jù) print( "{}生產(chǎn)了{(lán)}".format( name , food+str(i) ) ) # 存儲(chǔ)生產(chǎn)的數(shù)據(jù)在隊(duì)列中 q.put(food+str(i)) if __name__ == "__main__": q = Queue() p1 = Process( target=consumer,args=(q , "趙萬(wàn)里") ) p2 = Process( target=producer,args=(q , "趙沈陽(yáng)" , "香蕉" ) ) p1.start() p2.start() p2.join() """ # 2.優(yōu)化模型 """特點(diǎn) : 手動(dòng)在隊(duì)列的最后,加入標(biāo)識(shí)None, 終止消費(fèi)者模型""" """ from multiprocessing import Process,Queue import time,random # 消費(fèi)者模型 def consumer(q,name): while True: # 獲取隊(duì)列中的數(shù)據(jù) food = q.get() # 如果最后一次獲取的數(shù)據(jù)是None , 代表隊(duì)列已經(jīng)沒(méi)有更多數(shù)據(jù)可以獲取了,終止循環(huán); if food is None: break time.sleep(random.uniform(0.1,1)) print("{}吃了{(lán)}".format(name,food)) # 生產(chǎn)者模型 def producer(q,name,food): for i in range(5): time.sleep(random.uniform(0.1,1)) # 展示生產(chǎn)的數(shù)據(jù) print( "{}生產(chǎn)了{(lán)}".format( name , food+str(i) ) ) # 存儲(chǔ)生產(chǎn)的數(shù)據(jù)在隊(duì)列中 q.put(food+str(i)) if __name__ == "__main__": q = Queue() p1 = Process( target=consumer,args=(q , "趙萬(wàn)里") ) p2 = Process( target=producer,args=(q , "趙沈陽(yáng)" , "香蕉" ) ) p1.start() p2.start() p2.join() q.put(None) # 香蕉0 香蕉1 香蕉2 香蕉3 香蕉4 None """ # 3.多個(gè)生產(chǎn)者和消費(fèi)者 """ 問(wèn)題 : 雖然可以解決問(wèn)題 , 但是需要加入多個(gè)None , 代碼冗余""" from multiprocessing import Process,Queue import time,random # 消費(fèi)者模型 def consumer(q,name): while True: # 獲取隊(duì)列中的數(shù)據(jù) food = q.get() # 如果最后一次獲取的數(shù)據(jù)是None , 代表隊(duì)列已經(jīng)沒(méi)有更多數(shù)據(jù)可以獲取了,終止循環(huán); if food is None: break time.sleep(random.uniform(0.1,1)) print("{}吃了{(lán)}".format(name,food)) # 生產(chǎn)者模型 def producer(q,name,food): for i in range(5): time.sleep(random.uniform(0.1,1)) # 展示生產(chǎn)的數(shù)據(jù) print( "{}生產(chǎn)了{(lán)}".format( name , food+str(i) ) ) # 存儲(chǔ)生產(chǎn)的數(shù)據(jù)在隊(duì)列中 q.put(food+str(i)) if __name__ == "__main__": q = Queue() p1 = Process( target=consumer,args=(q , "趙萬(wàn)里") ) p1_1 = Process( target=consumer,args=(q , "趙世超") ) p2 = Process( target=producer,args=(q , "趙沈陽(yáng)" , "香蕉" ) ) p2_2 = Process( target=producer,args=(q , "趙鳳勇" , "大蒜" ) ) p1.start() p1_1.start() p2.start() p2_2.start() # 等待所有數(shù)據(jù)填充完畢 p2.join() p2_2.join() # 把None 關(guān)鍵字放在整個(gè)隊(duì)列的最后,作為跳出消費(fèi)者循環(huán)的標(biāo)識(shí)符; q.put(None) # 給第一個(gè)消費(fèi)者加一個(gè)None , 用來(lái)終止 q.put(None) # 給第二個(gè)消費(fèi)者加一個(gè)None , 用來(lái)終止 # ...
5. joinablequeue隊(duì)列使用
# ### JoinableQueue 隊(duì)列 """ put 存放 get 獲取 task_done 計(jì)算器屬性值-1 join 配合task_done來(lái)使用 , 阻塞 put 一次數(shù)據(jù), 隊(duì)列的內(nèi)置計(jì)數(shù)器屬性值+1 get 一次數(shù)據(jù), 通過(guò)task_done讓隊(duì)列的內(nèi)置計(jì)數(shù)器屬性值-1 join: 會(huì)根據(jù)隊(duì)列計(jì)數(shù)器的屬性值來(lái)判斷是否阻塞或者放行 隊(duì)列計(jì)數(shù)器屬性是 等于 0 , 代碼不阻塞放行 隊(duì)列計(jì)數(shù)器屬性是 不等 0 , 意味著代碼阻塞 """ from multiprocessing import JoinableQueue jq = JoinableQueue() jq.put("王同培") # +1 jq.put("王偉") # +2 print(jq.get()) print(jq.get()) # print(jq.get()) 阻塞 jq.task_done() # -1 jq.task_done() # -1 jq.join() print(" 代碼執(zhí)行結(jié)束 .... ") # ### 2.使用JoinableQueue 改造生產(chǎn)著消費(fèi)者模型 from multiprocessing import Process,Queue import time,random # 消費(fèi)者模型 def consumer(q,name): while True: # 獲取隊(duì)列中的數(shù)據(jù) food = q.get() time.sleep(random.uniform(0.1,1)) print("{}吃了{(lán)}".format(name,food)) # 讓隊(duì)列的內(nèi)置計(jì)數(shù)器屬性-1 q.task_done() # 生產(chǎn)者模型 def producer(q,name,food): for i in range(5): time.sleep(random.uniform(0.1,1)) # 展示生產(chǎn)的數(shù)據(jù) print( "{}生產(chǎn)了{(lán)}".format( name , food+str(i) ) ) # 存儲(chǔ)生產(chǎn)的數(shù)據(jù)在隊(duì)列中 q.put(food+str(i)) if __name__ == "__main__": q = JoinableQueue() p1 = Process( target=consumer,args=(q , "趙萬(wàn)里") ) p2 = Process( target=producer,args=(q , "趙沈陽(yáng)" , "香蕉" ) ) p1.daemon = True p1.start() p2.start() p2.join() # 必須等待隊(duì)列中的所有數(shù)據(jù)全部消費(fèi)完畢,再放行 q.join() print("程序結(jié)束 ... ")
6. 總結(jié)
ipc可以讓進(jìn)程之間進(jìn)行通信 lock其實(shí)也讓進(jìn)程之間進(jìn)行通信了,多個(gè)進(jìn)程去搶一把鎖,一個(gè)進(jìn)程搶到 這 把鎖了,其他的進(jìn)程就搶不到這把鎖了,進(jìn)程通過(guò)socket底層互相發(fā) 消息,告訴其他進(jìn)程當(dāng)前狀態(tài)已經(jīng)被鎖定了,不能再?gòu)?qiáng)了。 進(jìn)程之間默認(rèn)是隔離的,不能通信的,如果想要通信,必須通過(guò)ipc的 方式(lock、joinablequeue、Manager)
本篇文章就到這里了,希望能夠給你帶來(lái)幫助,也希望您能夠多多關(guān)注腳本之家的更多內(nèi)容!
相關(guān)文章
python中溫度單位轉(zhuǎn)換的實(shí)例方法
在本篇文章里小編給大家整理關(guān)于python中溫度單位轉(zhuǎn)換的實(shí)例方法,有需要的朋友們可以學(xué)習(xí)參考下。2020-12-12python多進(jìn)程提取處理大量文本的關(guān)鍵詞方法
今天小編就為大家分享一篇python多進(jìn)程提取處理大量文本的關(guān)鍵詞方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-06-06python爬蟲(chóng) urllib模塊發(fā)起post請(qǐng)求過(guò)程解析
這篇文章主要介紹了python爬蟲(chóng) urllib模塊發(fā)起post請(qǐng)求過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-08-08Python實(shí)現(xiàn)迪杰斯特拉算法并生成最短路徑的示例代碼
這篇文章主要介紹了Python實(shí)現(xiàn)迪杰斯特拉算法并生成最短路徑的示例代碼,幫助大家更好的理解和使用python,感興趣的朋友可以了解下2020-12-12python pandas合并Sheet,處理列亂序和出現(xiàn)Unnamed列的解決
這篇文章主要介紹了python pandas合并Sheet,處理列亂序和出現(xiàn)Unnamed列的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-03-03解決Shell執(zhí)行python文件,傳參空格引起的問(wèn)題
今天小編就為大家分享一篇解決Shell執(zhí)行python文件,傳參空格引起的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-10-10詳解python中return和print的區(qū)別和用途
在 Python 中,return 和 print 是兩種常見(jiàn)的語(yǔ)句,用于在函數(shù)中輸出信息或返回值,盡管它們看起來(lái)相似,但它們有不同的作用和用法,本文將詳細(xì)介紹 return 和 print 在函數(shù)中的區(qū)別,并提供豐富的示例代碼,以幫助你更好地理解它們的用途2023-11-11