Django異步任務(wù)線程池實(shí)現(xiàn)原理
這篇文章主要介紹了Django異步任務(wù)線程池實(shí)現(xiàn)原理,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
當(dāng)數(shù)據(jù)庫(kù)數(shù)據(jù)量很大時(shí)(百萬(wàn)級(jí)),許多批量數(shù)據(jù)修改請(qǐng)求的響應(yīng)會(huì)非常慢,一些不需要即時(shí)響應(yīng)的任務(wù)可以放到后臺(tái)的異步線程中完成,發(fā)起異步任務(wù)的請(qǐng)求就可以立即響應(yīng)
選擇用線程池的原因是:線程比進(jìn)程更為可控。不像子進(jìn)程,子線程會(huì)在所屬進(jìn)程結(jié)束時(shí)立即結(jié)束。線程可共享內(nèi)存。
請(qǐng)求任務(wù)異步處理的原理
使用python manage.py runserver模式啟動(dòng)的Django應(yīng)用只有一個(gè)進(jìn)程,對(duì)于每個(gè)請(qǐng)求,主線程會(huì)開啟一個(gè)子線程來(lái)處理請(qǐng)求。請(qǐng)求子線程向主線程申請(qǐng)一個(gè)新線程,然后把耗時(shí)的任務(wù)交給新線程,自身立即響應(yīng),這就是請(qǐng)求任務(wù)異步處理的原理。
可視化線程池
如果想要管理這批異步線程,知道他們是否在運(yùn)行中,可以使用線程池(ThreadPoolExecutor)。
線程池會(huì)先啟動(dòng)若干數(shù)量的線程,并讓這些線程都處于睡眠狀態(tài),當(dāng)向線程池submit一個(gè)任務(wù)后,會(huì)喚醒線程池中的某一個(gè)睡眠線程,讓它來(lái)處理這個(gè)任務(wù),當(dāng)處理完這個(gè)任務(wù),線程又處于睡眠狀態(tài)。
submit任務(wù)后會(huì)返回一個(gè)期程(future),這個(gè)對(duì)象可以查看線程池中執(zhí)行此任務(wù)的線程是否仍在處理中
因此可以構(gòu)建一個(gè)全局可視化線程池:
from concurrent.futures.thread import ThreadPoolExecutor class ThreadPool(object): def __init__(self): # 線程池 self.executor = ThreadPoolExecutor(20) # 用于存儲(chǔ)每個(gè)項(xiàng)目批量任務(wù)的期程 self.future_dict = {} # 檢查某個(gè)項(xiàng)目是否有正在運(yùn)行的批量任務(wù) def is_project_thread_running(self, project_id): future = self.future_dict.get(project_id, None) if future and future.running(): # 存在正在運(yùn)行的批量任務(wù) return True return False # 展示所有的異步任務(wù) def check_future(self): data = {} for project_id, future in self.future_dict.items(): data[project_id] = future.running() return data def __del__(self): self.executor.shutdown() # 主線程中的全局線程池 # global_thread_pool的生命周期是Django主線程運(yùn)行的生命周期 global_thread_pool = ThreadPool()
使用:
# 檢查異步任務(wù) if global_thread_pool.is_project_thread_running(project_id): raise exceptions.ValidationError(detail='存在正在處理的批量任務(wù),請(qǐng)稍后重試') # 提交一個(gè)異步任務(wù) future = global_thread_pool.executor.submit(self.batch_thread, project_id) global_thread_pool.future_dict[project_id] = future # 查看所有異步任務(wù) @login_required def check_future(request): data = global_thread_pool.check_future() return HttpResponse(status=status.HTTP_200_OK, content=json.dumps(data))
串行執(zhí)行
使用線程鎖
在全局線程池中初始化線程鎖
class ThreadPool(object): def __init__(self): self.executor = ThreadPoolExecutor(20) self.future_dict = {} self.lock = threading.Lock()
然后執(zhí)行線程前需要獲取鎖并再執(zhí)行結(jié)束后釋放鎖
def batch_thread(self): global_thread_pool.lock.acquire() try: ... global_thread_pool.lock.release() except Exception: trace_log = traceback.format_exc() logger.error('異步任務(wù)執(zhí)行失敗:\n %s' % trace_log) global_thread_pool.lock.release()
需要捕捉異常預(yù)防子線程出錯(cuò)而無(wú)法釋放鎖的情況
異步線程任務(wù)執(zhí)行前先檢查數(shù)據(jù)庫(kù)連接是否可用,然后關(guān)掉不可用連接
由于django的數(shù)據(jù)庫(kù)連接是保存到線程本地變量中的,通過(guò)ThreadPoolExecutor創(chuàng)建的線程會(huì)保存各自的數(shù)據(jù)庫(kù)連接。
當(dāng)連接被保存的時(shí)間超過(guò)mysql連接的最大超時(shí)時(shí)間,連接失效,但不會(huì)被線程釋放。
之后再調(diào)起線程執(zhí)行涉及到數(shù)據(jù)庫(kù)操作的異步任務(wù)時(shí),會(huì)用到失效的數(shù)據(jù)庫(kù)連接,導(dǎo)致報(bào)錯(cuò)“MySQL server has gone away”。
解決方案是在線程池的所有異步任務(wù)執(zhí)行前先檢查數(shù)據(jù)庫(kù)連接是否可用,然后關(guān)掉不可用連接
def batch_thread(self): for conn in connections.all(): conn.close_if_unusable_or_obsolete() ...
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Python 圖像處理之顏色遷移(reinhard VS welsh)
這篇文章主要介紹了分別利用reinhard算法和welsh算法實(shí)現(xiàn)圖像的顏色遷移,并對(duì)二者算法的效果進(jìn)行了對(duì)比,感興趣的小伙伴可以了解一下2021-12-12Python標(biāo)準(zhǔn)庫(kù)之time庫(kù)的使用教程詳解
這篇文章主要介紹了Python的time庫(kù)的使用教程,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)python基礎(chǔ)的小伙伴們有非常好的幫助,需要的朋友可以參考下2022-04-04Python中l(wèi)ist列表添加元素的3種方法總結(jié)
這篇文章主要介紹了Python中l(wèi)ist列表添加元素的3種方法總結(jié),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-01-01Python中bytes和str的區(qū)別與聯(lián)系詳解
Python3最重要的新特性之一是對(duì)字符串和二進(jìn)制數(shù)據(jù)流做了明確的區(qū),下面這篇文章主要給大家介紹了關(guān)于Python中bytes和str區(qū)別與聯(lián)系的相關(guān)資料,需要的朋友可以參考下2022-05-05python學(xué)生信息管理系統(tǒng)實(shí)現(xiàn)代碼
這篇文章主要為大家詳細(xì)介紹了python學(xué)生信息管理系統(tǒng)的實(shí)現(xiàn)代碼,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-06-06