python基于celery實(shí)現(xiàn)異步任務(wù)周期任務(wù)定時(shí)任務(wù)
這篇文章主要介紹了python基于celery實(shí)現(xiàn)異步任務(wù)周期任務(wù)定時(shí)任務(wù),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
hello, 小伙伴們, 好久不更新了,這一次帶來的是celery在python中的應(yīng)用以及設(shè)置異步任務(wù)周期任務(wù)和定時(shí)任務(wù)的步驟,希望能給入坑的你帶來些許幫助.
首先是對(duì)celery的介紹,Celery其實(shí)是一個(gè)專注于實(shí)時(shí)處理和調(diào)度任務(wù)的分布式任務(wù)隊(duì)列,同時(shí)提供操作和維護(hù)分布式系統(tǒng)所需要的全部數(shù)據(jù), 因此可以用它提供的接口快速實(shí)現(xiàn)并管理一個(gè)分布式的任務(wù)隊(duì)列,它本身不是任務(wù)隊(duì)列,它是封裝了操作常見任務(wù)隊(duì)列的各種操作, 可以使用它快速進(jìn)行任務(wù)隊(duì)列的使用與管理.在Python中的組成部分是 1.用戶任務(wù) app 2.管道 broker 用于存儲(chǔ)任務(wù) 官方推薦的是 redis rabbitMQ / backend 用于存儲(chǔ)任務(wù)執(zhí)行結(jié)果的 3, 員工 worker 大致流程入下:
最左邊的是用戶, 用戶發(fā)起1個(gè)請(qǐng)求給服務(wù)器, 要服務(wù)器執(zhí)行10個(gè)任務(wù),將這10個(gè)任務(wù)分給10個(gè)調(diào)度器,即開啟10個(gè)線程進(jìn)行任務(wù)處理,worker會(huì)一直監(jiān)聽調(diào)度器是否有任務(wù), 一旦發(fā)現(xiàn)有新的任務(wù), 就會(huì)立即執(zhí)行新任務(wù),一旦執(zhí)行完就會(huì)返回給調(diào)度器, 即backend, backend會(huì)將請(qǐng)求發(fā)送給服務(wù)器, 服務(wù)器將結(jié)果返回給用戶, 表現(xiàn)的結(jié)果就是,這10個(gè)任務(wù)同時(shí)完成,同時(shí)返回,,這就是Celery的整個(gè)工作流程, 其中的角色分別為,任務(wù)(app_work), 調(diào)度器(broker + backend), 將任務(wù)緩存的部分, 即將所有任務(wù)暫時(shí)存在的地方,相當(dāng)于生產(chǎn)者, 消費(fèi)者(worker 可以指定數(shù)量, 即在創(chuàng)建worker命令的時(shí)候可以指定數(shù)量), 在worker拿到任務(wù)后,人就控制不了了, 除非把worker殺死, 不然肯定會(huì)執(zhí)行完.
也即 任務(wù)來了以后, 調(diào)度器(broker)去緩存任務(wù), worker去執(zhí)行任務(wù), 完成后返回backend,接著返回,
還有就是關(guān)于定時(shí)任務(wù)和周期任務(wù)在linux上為什么不用自身所帶著的去做,是因?yàn)閘inux周期定時(shí)任務(wù)是不可控的, 不好管理, 返回值保存也是個(gè)麻煩事, 而celery只要開啟著調(diào)度器, 就可以隨時(shí)把人物結(jié)果獲取到,即使用celery控制起來是非常方便的.
接下來就是實(shí)例代碼:
workers.py
from celery import Celery import time # 創(chuàng)建一個(gè)Celery實(shí)例, 就是用戶的應(yīng)用app 第一個(gè)參數(shù)是任務(wù)名稱, 可以隨意起 后面的就是配置的broker和backend diaoduqi= Celery("mytask", broker="redis://127.0.0.1:6379", backend="redis:127.0.0.1:6379") # 接下來是為應(yīng)用創(chuàng)建任務(wù) ab @diaoduqi.task def ab(a,b): time.sleep(15) return a+b
brokers.py
from worker import ab # 將任務(wù)交給Celery的Worker執(zhí)行 res = ab.delay(2,4) #返回任務(wù)ID print(res.id)
backends.py
from celery.result import AsyncResult from worker import diaoduqi # 異步獲取任務(wù)返回值 async_task = AsyncResult(id="31ec65e8-3995-4ee1-b3a8-1528400afd5a",app=diaoduqi) # 判斷異步任務(wù)是否執(zhí)行成功 if async_task.successful(): #獲取異步任務(wù)的返回值 result = async_task.get() print(result) else: print("任務(wù)還未執(zhí)行完成")
為了方便,現(xiàn)在直接將三個(gè)文件代表的部分命名在文件名稱中.首先是啟動(dòng)workers.py
啟動(dòng)方式是依據(jù)系統(tǒng)的不同來啟動(dòng)的, 對(duì)于linux下 celery worker -A workers -l INFO 也可以指定開啟的worker數(shù)量 即在后面添加的參數(shù)是 -c 5 表示指定5個(gè)worker 理論上指定的worker是無上限的,
在windows下需要安裝一個(gè)eventlet模塊進(jìn)行運(yùn)行, 不然不會(huì)運(yùn)行成功 pip install eventlet 可以開啟線程 不指定數(shù)量是默認(rèn)6個(gè)worker, 理論上worker的數(shù)量可以開啟無限個(gè),但是celery worker -A s1 -l INFO -P eventlet -c 5 使用eventlet 開啟5個(gè)worker 執(zhí)行
該命令后 處于就緒狀態(tài), 需要發(fā)布任務(wù), 即brokers.py進(jìn)行任務(wù)發(fā)布, 方法是使用delay的方式執(zhí)行異步任務(wù), 返回了一個(gè)任務(wù)id, 接著去backends.py中取這個(gè)任務(wù)id, 去查詢?nèi)蝿?wù)是否完成,判定條件即任務(wù).successful 判斷是否執(zhí)行完, 上面就是celery異步執(zhí)行任務(wù)的用法與解釋
接下來就是celery在項(xiàng)目中的應(yīng)用
在實(shí)際項(xiàng)目中應(yīng)用celery是有一定規(guī)則的, 即目錄結(jié)構(gòu)應(yīng)該如下.
結(jié)構(gòu)說明 首先是創(chuàng)建一個(gè)CeleryTask的包,接著是在里面創(chuàng)建一個(gè)celery.py,必須是這個(gè)文件 關(guān)于重名的問題, 找尋模塊的順序是先從當(dāng)前目錄中去尋找, 根本找不到,接著是從內(nèi)置模塊中去找, 根本就找不到寫的這個(gè)celery這個(gè)文件,
celery.py
from celery import Celery DDQ = Celery("DDQ",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379", include=["CeleryTask.TaskOne","CeleryTask.TaskTwo"])
TaskOne.py
import time from CeleryTask.celery import DDQ @DDQ.task def one1(a,b): # time.sleep(3) return a+b @DDQ.task def one2(): time.sleep(2) return "one2"
taskTwo.py
import time from CeleryTask.celery import DDQ @DDQ.task def two1(): time.sleep(2) return "two1" @DDQ.task def two2(): time.sleep(3) return "two2"
getR.py
from CeleryTask.TaskOne import one1 as one # one.delay(10,10) # two.delay(20,20) # 定時(shí)任務(wù)我們不在使用delay這個(gè)方法了,delay是立即交給task 去執(zhí)行 # 現(xiàn)在我們使用apply_async定時(shí)執(zhí)行 # 首先我們要先給task一個(gè)執(zhí)行任務(wù)的時(shí)間 import datetime, time # 獲取當(dāng)前時(shí)間 此時(shí)間為東八區(qū)時(shí)間 ctime = time.time() # 將當(dāng)前的東八區(qū)時(shí)間改為 UTC時(shí)間 注意這里一定是UTC時(shí)間,沒有其他說法 utc_time = datetime.datetime.utcfromtimestamp(ctime) # 為當(dāng)前時(shí)間增加 10 秒 add_time = datetime.timedelta(seconds=10) action_time = utc_time + add_time # action_time 就是當(dāng)前時(shí)間未來10秒之后的時(shí)間 # 現(xiàn)在我們使用apply_async定時(shí)執(zhí)行 res = one.apply_async(args=(6, 10), eta=action_time) res = one.apply_async(args=(6, 10), eta=action_time) res = one.apply_async(args=(6, 10), eta=action_time) res = one.apply_async(args=(6, 10), eta=action_time) res = one.apply_async(args=(6, 10), eta=action_time) res = one.apply_async(args=(6, 10), eta=action_time) print(res.id) # 這樣原本延遲5秒執(zhí)行的One函數(shù)現(xiàn)在就要在10秒鐘以后執(zhí)行了
接著是在命令行cd到與CeleryTask同級(jí)目錄下, 使用命令 celery worker -A CeleryTask -l INFO -P eventlet -c 50 這樣 就開啟了worker 接著去 發(fā)布任務(wù), 在定時(shí)任務(wù)中不再使用delay這個(gè)方法了,
delay是立即交給ttask去執(zhí)行, 在這里使用 apply_async定時(shí)執(zhí)行 指的是調(diào)度的時(shí)候去定時(shí)執(zhí)行
需要設(shè)置的是UTC時(shí)間, 以及定時(shí)的時(shí)間(多長(zhǎng)時(shí)間以后執(zhí)行) 之后使用 celery worker -A CeleryTask -l INFO -P eventlet -c 50 命令開啟worker, 之后運(yùn)行 getR.py文件發(fā)布任務(wù), 可以看到在定義的時(shí)間以后執(zhí)行該任務(wù)
周期任務(wù)
周期任務(wù) 指的是在指定時(shí)間去執(zhí)行任務(wù) 需要導(dǎo)入的一個(gè)模塊有 crontab
文件結(jié)構(gòu)如下
結(jié)構(gòu)同定時(shí)任務(wù)差不多,只不過需要變動(dòng)一下文件內(nèi)容 GetR文件已經(jīng)不需要了,可以刪除.
celery.py
from celery import Celery from celery.schedules import crontab DDQ = Celery("DDQ", broker="redis://127.0.0.1:6379", backend="redis://127.0.0.1:6379", include=["CeleryTask.TaskOne", "CeleryTask.TaskTwo"]) # 我要要對(duì)beat任務(wù)生產(chǎn)做一個(gè)配置,這個(gè)配置的意思就是每10秒執(zhí)行一次Celery_task.task_one任務(wù)參數(shù)是(10,10) DDQ.conf.beat_schedule = { "each10s_task": { "task": "CeleryTask.TaskOne.one1", "schedule": 10, # 每10秒鐘執(zhí)行一次 "args": (10, 10) }, "each1m_task": { "task": "CeleryTask.TaskOne.one2", "schedule": crontab(minute=1) # 每1分鐘執(zhí)行一次 也可以替換成 60 即 "schedule": 60 } }
TaskOne.py
import time from CeleryTask.celery import DDQ @DDQ.task def one1(a,b): # time.sleep(3) return a+b @DDQ.task def one2(): time.sleep(2) return "one2"
taskTwo.py
import time from CeleryTask.celery import DDQ @DDQ.task def two1(): time.sleep(2) return "two1" @DDQ.task def two2(): time.sleep(3) return "two2"
以上配置完成以后,這時(shí)候就不能直接創(chuàng)建worker了,因?yàn)橐獔?zhí)行周期任務(wù),需要首先有一個(gè)任務(wù)的生產(chǎn)方, 即 celery beat -A CeleryTask, 用來產(chǎn)生創(chuàng)建者, 接著是創(chuàng)建worker worker的創(chuàng)建命令還是原來的命令, 即 celery worker -A CeleryTask -l INFO -P eventlet -c 50 , 創(chuàng)建完worker之后, 每10秒就會(huì)由beat創(chuàng)建一個(gè)任務(wù)給 worker去執(zhí)行.至此, celery創(chuàng)建異步任務(wù), 周期任務(wù),定時(shí)任務(wù)完畢, 伙伴們自己拿去測(cè)試吧.
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Python實(shí)現(xiàn)截取PDF文件中的幾頁(yè)代碼實(shí)例
今天小編就為大家分享一篇關(guān)于Python實(shí)現(xiàn)截取PDF文件中的幾頁(yè)代碼實(shí)例,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2019-03-03python?memory_profiler庫(kù)生成器和迭代器內(nèi)存占用的時(shí)間分析
這篇文章主要介紹了python?memory_profiler庫(kù)生成器和迭代器內(nèi)存占用的時(shí)間分析,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,感興趣的小伙伴可以參考一下2022-06-06協(xié)程Python 中實(shí)現(xiàn)多任務(wù)耗資源最小的方式
協(xié)程是 Python 中另外一種實(shí)現(xiàn)多任務(wù)的方式,只不過比線程更小,占用更小執(zhí)行單元(理解為需要的資源)。這篇文章主要介紹了協(xié)程Python 中實(shí)現(xiàn)多任務(wù)耗資源最小的方式,需要的朋友可以參考下2020-10-10pycharm 使用心得(八)如何調(diào)用另一文件中的函數(shù)
事件環(huán)境: pycharm 編寫了函數(shù)do() 保存在make.py 如何在另一個(gè)file里調(diào)用do函數(shù)?2014-06-06關(guān)于python中不同函數(shù)讀取圖片格式的區(qū)別淺析
這篇文章主要給大家介紹了關(guān)于python中不同函數(shù)讀取圖片格式的區(qū)別,文中通過實(shí)例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2022-03-03python中print()函數(shù)的“,”與java中System.out.print()函數(shù)中的“+”功能詳解
python中的print()函數(shù)和java中的System.out.print()函數(shù)都有著打印字符串的功能。接下來通過本文給大家分享python中print()函數(shù)的“,”與java中System.out.print()函數(shù)中的“+”功能,需要的朋友參考下吧2017-11-11詳解Python odoo中嵌入html簡(jiǎn)單的分頁(yè)功能
在odoo中,通過iframe嵌入 html,頁(yè)面數(shù)據(jù)則通過controllers獲取,使用jinja2模板傳值渲染。這篇文章主要介紹了Python odoo中嵌入html簡(jiǎn)單的分頁(yè)功能 ,需要的朋友可以參考下2019-05-05