Python進程池log死鎖問題分析及解決
背景
最近線上運行的一個python任務(wù)負責(zé)處理一批數(shù)據(jù),為提高處理效率,使用了python進程池,并會打印log。最近發(fā)現(xiàn),任務(wù)時常會出現(xiàn)夯住的情況,當(dāng)查看現(xiàn)場時發(fā)現(xiàn),夯住時通常會有幾個子進程打印了相關(guān)錯誤日志,然后整個任務(wù)就停滯在那里了。
原因
夯住的原因正是由于一行不起眼的log導(dǎo)致,簡而言之,Python的logging模塊在寫文件模式下,是不支持多進程的,強行使用可能會導(dǎo)致死鎖。
問題復(fù)現(xiàn)
可以用下面的代碼來描述我們遇到的問題
import logging from threading import Thread from queue import Queue from logging.handlers import QueueListener, QueueHandler from multiprocessing import Pool ? def setup_logging(): # log的時候會寫到一個隊列里,然后有一個單獨的線程從這個隊列里去獲取日志信息并寫到文件里 _log_queue = Queue() QueueListener( _log_queue, logging.FileHandler("out.log")).start() logging.getLogger().addHandler(QueueHandler(_log_queue)) ? # 父進程里起一個單獨的線程來寫日志 def write_logs(): while True: logging.info("hello, I just did something") Thread(target=write_logs).start() ? def runs_in_subprocess(): print("About to log...") logging.info("hello, I did something") print("...logged") ? if __name__ == '__main__': setup_logging() ? # 讓一個進程池在死循環(huán)里執(zhí)行,增加觸發(fā)死鎖的幾率 while True: with Pool() as pool: pool.apply(runs_in_subprocess)
我們在linux上執(zhí)行該代碼:
About to log... ...logged About to log... ...logged About to log...
發(fā)現(xiàn)程序輸出幾行之后就卡住了。
問題出在了哪里
python的進程池是基于fork
實現(xiàn)的,當(dāng)我們只使用fork()
創(chuàng)建子進程而不是用execve()
來替換進程上下時,需要注意一個問題:fork()
出來的子進程會和父進程共享內(nèi)存空間,除了父進程所擁有的線程。
對于代碼
from threading import Thread, enumerate from os import fork from time import sleep ? # Start a thread: Thread(target=lambda: sleep(60)).start() ? if fork(): print("The parent process has {} threads".format( len(enumerate()))) else: print("The child process has {} threads".format( len(enumerate())))
輸出:
The parent process has 2 threads
The child process has 1 threads
可以發(fā)現(xiàn),父進程中的子線程并沒有被fork到子進程中,而這正是導(dǎo)致死鎖的原因:
- 當(dāng)父進程中的線程要向隊列中寫log時,它需要獲取鎖
- 如果恰好在獲取鎖后進行了
fork
操作,那這個鎖也會被帶到子進程中,同時這個鎖的狀態(tài)是占用中 - 這時候子進程要寫日志的話,也需要獲取鎖,但是由于鎖是占用狀態(tài),導(dǎo)致永遠也無法獲取,至此,死鎖產(chǎn)生。
如何解決
使用多進程共享隊列
出現(xiàn)上述死鎖的原因之一在于在fork子進程的時候,把隊列和鎖的狀態(tài)都給fork
過來了,那要避免死鎖,一種方案就是使用進程共享的隊列。
import logging import multiprocessing from logging.handlers import QueueListener from time import sleep ? ? def listener_configurer(): root = logging.getLogger() h = logging.handlers.RotatingFileHandler('out.log', 'a', 300, 10) f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s') h.setFormatter(f) root.addHandler(h) ? # 從隊列獲取元素,并寫日志 def listener_process(queue, configurer): configurer() while False: try: record = queue.get() if record is None: break logger = logging.getLogger(record.name) logger.handle(record) except Exception: import sys, traceback print('Whoops! Problem:', file=sys.stderr) traceback.print_exc(file=sys.stderr) ? # 業(yè)務(wù)進程的日志配置,使用queueHandler, 將要寫的日志塞入隊列 def worker_configurer(queue): h = logging.handlers.QueueHandler(queue) root = logging.getLogger() root.addHandler(h) root.setLevel(logging.DEBUG) ? ? def runs_in_subprocess(queue, configurer): configurer(queue) print("About to log...") logging.debug("hello, I did something: %s", multiprocessing.current_process().name) print("...logged, %s",queue.qsize()) ? ? if __name__ == '__main__': queue = multiprocessing.Queue(-1) listener = multiprocessing.Process(target=listener_process, args=(queue, listener_configurer)) listener.start() #父進程也持續(xù)寫日志 worker_configurer(queue) def write_logs(): while True: logging.debug("in main process, I just did something") Thread(target=write_logs).start() ? while True: multiprocessing.Process(target=runs_in_subprocess, args=(queue, worker_configurer)).start() sleep(2) ?
在上面代碼中,我們設(shè)置了一個進程間共享的隊列,將每個子進程的寫日志操作轉(zhuǎn)換為向隊列添加元素,然后由單獨的另一個進程將日志寫入文件。和文章開始處的問題代碼相比,雖然都使用了隊列,但此處用的是進程共享隊列,不會隨著fork
子進程而出現(xiàn)多個拷貝,更不會出現(xiàn)給子進程拷貝了一個已經(jīng)占用了的鎖的情況。
spawn
出現(xiàn)死鎖的另外一層原因是我們只進行了fork
, 但是沒有進行execve
, 即子進程仍然和父進程享有同樣的內(nèi)存空間導(dǎo)致,因此另一種解決方法是在fork后緊跟著執(zhí)行execve
調(diào)用,對應(yīng)于python中的spawn
操作,修改后的代碼如下:
if __name__ == '__main__': setup_logging() ? while True: # 使用spawn類型的啟動 with get_context("spawn").Pool() as pool: pool.apply(runs_in_subprocess)
使用spawn
方法時,父進程會啟動一個新的 Python 解釋器進程。 子進程將只繼承那些運行進程對象的 run()
方法所必須的資源,來自父進程的非必需文件描述符和句柄將不會被繼承,因此使用此方法啟動進程會比較慢,但是安全。
以上就是Python進程池log死鎖問題分析及解決的詳細內(nèi)容,更多關(guān)于Python進程池log死鎖的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Flask交互基礎(chǔ)(GET、 POST 、PUT、 DELETE)的使用
這篇文章主要介紹了Flask交互基礎(chǔ)(GET、 POST 、PUT、 DELETE)的使用,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04Python+drawpad實現(xiàn)CPU監(jiān)控小程序
這篇文章主要為大家詳細介紹了如何利用Python+drawpad實現(xiàn)一個簡單的CPU監(jiān)控小程序,文中示例代碼講解詳細,感興趣的小伙伴可以嘗試一下2022-08-08Python Pyqt5多線程更新UI代碼實例(防止界面卡死)
這篇文章通過代碼實例給大家介紹了Python Pyqt5多線程更新UI防止界面卡死的問題,代碼簡單易懂,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2021-12-12Python中用post、get方式提交數(shù)據(jù)的方法示例
最近在學(xué)習(xí)使用Python,發(fā)現(xiàn)網(wǎng)上很少提到如何使用post,所以下面這篇文章主要給大家介紹了關(guān)于Python中用post、get方式提交數(shù)據(jù)的方法,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起看看吧。2017-09-09python使用redis實現(xiàn)消息隊列(異步)的實現(xiàn)完整例程
本文主要介紹了python使用redis實現(xiàn)消息隊列(異步)的實現(xiàn)完整例程,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01