Python隊列、進程間通信、線程案例
進程互斥鎖
多進程同時搶購余票
# 并發(fā)運行,效率高,但競爭寫同一文件,數(shù)據(jù)寫入錯亂
# data.json文件內(nèi)容為 {"ticket_num": 1}
import json
import time
from multiprocessing import Process
def search(user):
with open('data.json', 'r', encoding='utf-8') as f:
dic = json.load(f)
print(f'用戶{user}查看余票,還剩{dic.get("ticket_num")}...')
def buy(user):
with open('data.json', 'r', encoding='utf-8') as f:
dic = json.load(f)
time.sleep(0.1)
if dic['ticket_num'] > 0:
dic['ticket_num'] -= 1
with open('data.json', 'w', encoding='utf-8') as f:
json.dump(dic, f)
print(f'用戶{user}搶票成功!')
else:
print(f'用戶{user}搶票失敗')
def run(user):
search(user)
buy(user)
if __name__ == '__main__':
for i in range(10): # 模擬10個用戶搶票
p = Process(target=run, args=(f'用戶{i}', ))
p.start()
使用鎖來保證數(shù)據(jù)安全
# data.json文件內(nèi)容為 {"ticket_num": 1}
import json
import time
from multiprocessing import Process, Lock
def search(user):
with open('data.json', 'r', encoding='utf-8') as f:
dic = json.load(f)
print(f'用戶{user}查看余票,還剩{dic.get("ticket_num")}...')
def buy(user):
with open('data.json', 'r', encoding='utf-8') as f:
dic = json.load(f)
time.sleep(0.2)
if dic['ticket_num'] > 0:
dic['ticket_num'] -= 1
with open('data.json', 'w', encoding='utf-8') as f:
json.dump(dic, f)
print(f'用戶{user}搶票成功!')
else:
print(f'用戶{user}搶票失敗')
def run(user, mutex):
search(user)
mutex.acquire() # 加鎖
buy(user)
mutex.release() # 釋放鎖
if __name__ == '__main__':
# 調(diào)用Lock()類得到一個鎖對象
mutex = Lock()
for i in range(10): # 模擬10個用戶搶票
p = Process(target=run, args=(f'用戶{i}', mutex))
p.start()
進程互斥鎖:
讓并發(fā)變成串行,犧牲了執(zhí)行效率,保證了數(shù)據(jù)安全
在程序并發(fā)時,需要修改數(shù)據(jù)使用
隊列
隊列遵循的是先進先出
隊列:相當(dāng)于內(nèi)存中一個隊列空間,可以存放多個數(shù)據(jù),但數(shù)據(jù)的順序是由先進去的排在前面。
q.put() 添加數(shù)據(jù)
q.get() 取數(shù)據(jù),遵循隊列先進先出
q.get_nowait() 獲取隊列數(shù)據(jù), 隊列中沒有就會報錯
q.put_nowait 添加數(shù)據(jù),若隊列滿了也會報錯
q.full() 查看隊列是否滿了
q.empty() 查看隊列是否為空
from multiprocessing import Queue
# 調(diào)用隊列類,實例化隊列對象
q = Queue(5) # 隊列中存放5個數(shù)據(jù)
# put添加數(shù)據(jù),若隊列里的數(shù)據(jù)滿了就會卡住
q.put(1)
print('進入數(shù)據(jù)1')
q.put(2)
print('進入數(shù)據(jù)2')
q.put(3)
print('進入數(shù)據(jù)3')
q.put(4)
print('進入數(shù)據(jù)4')
q.put(5)
print('進入數(shù)據(jù)5')
# 查看隊列是否滿了
print(q.full())
# 添加數(shù)據(jù), 若隊列滿了也會報錯
q.put_nowait(6)
# q.get() 獲取的數(shù)據(jù)遵循先進先出
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
# print(q.get())
print(q.get_nowait()) # 獲取隊列數(shù)據(jù), 隊列中沒有就會報錯
# 判斷隊列是否為空
print(q.empty())
q.put(6)
print('進入數(shù)據(jù)6')
進程間通信
IPC(Inter-Process Communication)
進程間數(shù)據(jù)是相互隔離的,若想實現(xiàn)進程間通信,可以利用隊列
from multiprocessing import Process, Queue
def task1(q):
data = 'hello 你好'
q.put(data)
print('進程1添加數(shù)據(jù)到隊列')
def task2(q):
print(q.get())
print('進程2從隊列中獲取數(shù)據(jù)')
if __name__ == '__main__':
q = Queue()
p1 = Process(target=task1, args=(q, ))
p2 = Process(target=task2, args=(q, ))
p1.start()
p2.start()
print('主進程')
生產(chǎn)者與消費者
在程序中,通過隊列生產(chǎn)者把數(shù)據(jù)添加到隊列中,消費者從隊列中獲取數(shù)據(jù)
from multiprocessing import Process, Queue
import time
# 生產(chǎn)者
def producer(name, food, q):
for i in range(10):
data = food, i
msg = f'用戶{name}開始制作{data}'
print(msg)
q.put(data)
time.sleep(0.1)
# 消費者
def consumer(name, q):
while True:
data = q.get()
if not data:
break
print(f'用戶{name}開始吃{data}')
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=('neo', '煎餅', q))
p2 = Process(target=producer, args=('wick', '肉包', q))
c1 = Process(target=consumer, args=('cwz', q))
c2 = Process(target=consumer, args=('woods', q))
p1.start()
p2.start()
c1.daemon = True
c2.daemon = True
c1.start()
c2.start()
print('主')
線程
線程的概念
進程與線程都是虛擬單位
進程:資源單位
線程:執(zhí)行單位
開啟一個進程,一定會有一個線程,線程才是真正執(zhí)行者
開啟進程:
- 開辟一個名稱空間,每開啟一個進程都會占用一份內(nèi)存資源
- 會自帶一個線程
開啟線程:
- 一個進程可以開啟多個線程
- 線程的開銷遠小于進程
注意:線程不能實現(xiàn)并行,線程只能實現(xiàn)并發(fā),進程可以實現(xiàn)并行
線程的兩種創(chuàng)建方式
from threading import Thread
import time
# 創(chuàng)建線程方式1
def task():
print('線程開啟')
time.sleep(1)
print('線程結(jié)束')
if __name__ == '__main__':
t = Thread(target=task)
t.start()
# 創(chuàng)建線程方式2
class MyThread(Thread):
def run(self):
print('線程開啟...')
time.sleep(1)
print('線程結(jié)束...')
if __name__ == '__main__':
t = MyThread()
t.start()
線程對象的方法
from threading import Thread
from threading import current_thread
import time
def task():
print(f'線程開啟{current_thread().name}')
time.sleep(1)
print(f'線程結(jié)束{current_thread().name}')
if __name__ == '__main__':
t = Thread(target=task)
print(t.isAlive())
# t.daemon = True
t.start()
print(t.isAlive())
線程互斥鎖
線程之間數(shù)據(jù)是共享的
from threading import Thread
from threading import Lock
import time
mutex = Lock()
n = 100
def task(i):
print(f'線程{i}啟動')
global n
mutex.acquire()
temp = n
time.sleep(0.1)
n = temp - 1
print(n)
mutex.release()
if __name__ == '__main__':
t_l = []
for i in range(100):
t = Thread(target=task, args=(i, ))
t_l.append(t)
t.start()
for t in t_l:
t.join()
print(n)
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Python實現(xiàn)對一個函數(shù)應(yīng)用多個裝飾器的方法示例
這篇文章主要介紹了Python實現(xiàn)對一個函數(shù)應(yīng)用多個裝飾器的方法,結(jié)合實例形式分析了Python編程中一個函數(shù)使用多個裝飾器的簡單操作技巧,需要的朋友可以參考下2018-02-02
selenium查找網(wǎng)頁出現(xiàn)加載卡頓或失敗的解決方法
這篇文章主要為大家詳細介紹了selenium查找網(wǎng)頁時如何處理網(wǎng)站資源一直加載非??D或者失敗的情況,文中的示例代碼講解詳細,感興趣的小伙伴可以了解一下2023-10-10
使用python構(gòu)建WebSocket客戶端的教程詳解
WebSocket是一種在客戶端和服務(wù)器之間實現(xiàn)雙向通信的協(xié)議,常用于實時聊天、實時數(shù)據(jù)更新等場景,Python提供了許多庫來實現(xiàn) WebSocket客戶端,本教程將介紹如何使用Python構(gòu)建WebSocket客戶端,文中通過代碼示例給大家介紹的非常詳細,需要的朋友可以參考下2023-12-12
pandas如何使用列表和字典創(chuàng)建?Series
這篇文章主要介紹了pandas如何使用列表和字典創(chuàng)建?Series,pandas 是基于NumPy的一種工具,該工具是為解決數(shù)據(jù)分析任務(wù)而創(chuàng)建的,下文我們就來看看文章是怎樣介紹pandas,需要的朋友也可以參考一下2021-12-12

