django-celery-beat搭建定時(shí)任務(wù)的實(shí)現(xiàn)
一、創(chuàng)建django項(xiàng)目和app
1、安裝定時(shí)任務(wù)第三方包
pip install django-celery-beat # 插件用來(lái)動(dòng)態(tài)配置定時(shí)任務(wù),一般會(huì)配合 django_celery_results 一起使用,所以一起安裝 django_celery_results
pip install django_celery_results pip install eventlet # windows下運(yùn)行celery 4以后版本,還需額外安裝eventlet庫(kù)
2、創(chuàng)建django項(xiàng)目并創(chuàng)建一個(gè)使用定時(shí)任務(wù)的app
1.1創(chuàng)建django項(xiàng)目并創(chuàng)建app
創(chuàng)建的過(guò)程省略,不在這里展開(kāi),需要注意的是setting文件注冊(cè)app的配置如下:
INSTALLED_APPS = [ ...... 'myapp', # 剛創(chuàng)建的使用定時(shí)任務(wù)的app 'django_celery_beat', # 插件用來(lái)動(dòng)態(tài)配置定時(shí)任務(wù),只要進(jìn)行了第一步pip安裝就可以直接注冊(cè)了 'django_celery_results', ]
1.2 創(chuàng)建定時(shí)任務(wù)數(shù)據(jù)庫(kù)
依次執(zhí)行: python manage.py makemigrations 和 python manage.py migrate
打開(kāi)數(shù)據(jù)庫(kù)發(fā)現(xiàn),自動(dòng)創(chuàng)建了一些表如下:
這些都是定時(shí)任務(wù)需要的表格,自動(dòng)創(chuàng)建不需要手動(dòng)管理
django_celery_beat.models.ClockedSchedule # 特定時(shí)刻任務(wù) django_celery_beat.models.CrontabSchedule # 特定時(shí)間表任務(wù),例如每周1運(yùn)行的計(jì)劃 django_celery_beat.models.IntervalSchedule # 以特定間隔(例如,每5秒)運(yùn)行的計(jì)劃。 django_celery_beat.models.PeriodicTask # 此模型定義要運(yùn)行的單個(gè)周期性任務(wù)。 django_celery_beat.models.PeriodicTasks # 此模型僅用作索引以跟蹤計(jì)劃何時(shí)更改 django_celery_beat.models.SolarSchedule # 定制任務(wù)
如果安裝注冊(cè)了django_celery_results 還會(huì)有另外三個(gè)表:
2、新建一個(gè)celery.py文件
文件的作用是指定django環(huán)境、創(chuàng)建Celery app和指定Celery配置文件的啟動(dòng)位置,類似與wsgi.py或asgi.py,因此也建議文件創(chuàng)建位置與wsgi.py或asgi.py同級(jí)。
文件內(nèi)容如下:
from __future__ import absolute_import, unicode_literals import os from celery import Celery, platforms # 設(shè)置django環(huán)境 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'lc_manage.settings.settings') # 創(chuàng)建一個(gè)Celery app app = Celery('djangotask') platforms.C_FORCE_ROOT = True ? # 如果配置沒(méi)有生效需要在啟動(dòng)時(shí)設(shè)置 ?export C_FORCE_ROOT="true" # ?使用CELERY_ 作為前綴,在celeryconfig.py中寫(xiě)配置 app.config_from_object('lc_manage.celeryconfig', namespace="CELERY") # app.config_from_object('lc_manage.celeryconfig') # 發(fā)現(xiàn)任務(wù)文件每個(gè)app下的tasks.py app.autodiscover_tasks()
3、創(chuàng)建配置文件config.py
這個(gè)文件里的內(nèi)容可以寫(xiě)到項(xiàng)目的 settings.py里面,因?yàn)樯厦娴腸elery.py 中可以指定配置文件位置;不過(guò)內(nèi)容比較多,還是建議單獨(dú)創(chuàng)建一個(gè)配置文件celeryconfig.py,可以與celery.py 同級(jí)目錄,文件內(nèi)容如下:
from __future__ import absolute_import # broker 設(shè)置 指定中間代理人將任務(wù)存到哪里,這里是redis的11號(hào)庫(kù) CELERY_BROKER_URL = 'redis://:123456@127.0.0.1:6379/11' # 指定 Backend 儲(chǔ)存結(jié)果的地方,可以使用django數(shù)據(jù)庫(kù)(django-db),也可以使用redis, # 使用django數(shù)據(jù)庫(kù)(django-db),以后運(yùn)行worker就會(huì)保存到數(shù)據(jù)庫(kù)中,可以通過(guò)ORM進(jìn)行訪問(wèn) # CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1' CELERY_RESULT_BACKEND = 'django-db' # 使用django_celery_beat插件用來(lái)動(dòng)態(tài)配置任務(wù) CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' # 指定時(shí)區(qū),默認(rèn)是 UTC CELERY_TIMEZONE = 'Asia/Shanghai' # celery 序列化與反序列化配置 CELERY_TASK_SERIALIZER = 'pickle' CELERY_RESULT_SERIALIZER = 'pickle' CELERY_ACCEPT_CONTENT = ['pickle', 'json'] CELERY_TASK_IGNORE_RESULT = True # 有些情況下可以防止死鎖 ?非常重要! CELERYD_FORCE_EXECV = True # 為存儲(chǔ)結(jié)果設(shè)置過(guò)期日期,默認(rèn)1天過(guò)期。如果beat開(kāi)啟,Celery每天會(huì)自動(dòng)清除。 # 設(shè)為0,存儲(chǔ)結(jié)果永不過(guò)期 # CELERY_RESULT_EXPIRES = xx # CELERY_TASK_RESULT_EXPIRES = 60*60*24 ?# 后端存儲(chǔ)的任務(wù)超過(guò)一天時(shí),自動(dòng)刪除數(shù)據(jù)庫(kù)中的任務(wù)數(shù)據(jù),單位秒 CELERY_MAX_TASKS_PER_CHILD = 1000 ?# 每個(gè)worker執(zhí)行1000次任務(wù)后,自動(dòng)重啟worker,防止任務(wù)占用太多內(nèi)存導(dǎo)致內(nèi)存泄漏 # 禁用所有速度限制,如果網(wǎng)絡(luò)資源有限,不建議開(kāi)足馬力。 CELERY_DISABLE_RATE_LIMITS = True # celery beat配置(周期性任務(wù)設(shè)置) CELERY_ENABLE_UTC = False # 官方用來(lái)修復(fù)CELERY_ENABLE_UTC=False and USE_TZ = False 時(shí)時(shí)間比較錯(cuò)誤的問(wèn)題; # 詳情見(jiàn):https://github.com/celery/django-celery-beat/pull/216/files DJANGO_CELERY_BEAT_TZ_AWARE = False
這里需要注意的是,如果在celery.py中配置指定了confiig配置文件使用CELERY前綴:app.config_from_object(‘lc_manage.celeryconfig’, namespace=“CELERY”),那么celeryconfig.py配置文件的參數(shù)都應(yīng)加:CELERY_,當(dāng)然你也可以不用第二個(gè)參數(shù),namespace=“CELERY"寫(xiě)成app.config_from_object(‘lc_manage.celeryconfig’”), 那么celeryconfig.py中就不需要加CELERY_ 前綴,注意一定要統(tǒng)一!?。》駝t可能 會(huì)報(bào)錯(cuò):
consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused.
4、加載celery.py
我們自己創(chuàng)建的celery.py雖然與wsgi.py 或者 asgi.py等同級(jí),但是 不會(huì)像他們一樣自動(dòng)加載,需要我們通過(guò)本級(jí)文件下的__init__.py 把celery.py 加載進(jìn)來(lái),打開(kāi)__init__.py文件,添加如下內(nèi)容:
from __future__ import absolute_import, unicode_literals from .celery import app as celery_app # 使得django啟動(dòng)時(shí)加載celery的app __all__ = ('celery_app',)
5、創(chuàng)建定時(shí)任務(wù)執(zhí)行內(nèi)容
經(jīng)過(guò)上面的配置,django-celery_beta 會(huì)自動(dòng)去掃描每個(gè)app目錄下是否有 tasks.py 文件,需要?jiǎng)?chuàng)建定時(shí)任務(wù)的app下我們可以手動(dòng)創(chuàng)建tasks.py,定時(shí)任務(wù)就寫(xiě)在這個(gè)文件上:
from __future__ import absolute_import, unicode_literals from celery import shared_task @shared_task def add(x, y): ? ? print("x + y ?= ?", x + y) ? ? return x + y @shared_task def mul(x, y): ? ? print("x * y ?= ?", x * y) ? ? return x * y
二、定時(shí)器創(chuàng)建和定時(shí)任務(wù)添加
1、時(shí)間和周期控制:IntervalSchedule、ClockedSchedule和CrontabSchedule
其他帖子都把上面三個(gè)稱為定時(shí)任務(wù)與PeriodicTask放一起結(jié)束,但是個(gè)人理解以上三個(gè)都是定時(shí)任務(wù)時(shí)控制時(shí)間和周期執(zhí)行的控制器,并非創(chuàng)建定時(shí)任務(wù),真正創(chuàng)建定時(shí)任務(wù)的只有PeriodicTask,所以這里個(gè)人把這三個(gè)稱為定時(shí)任務(wù)的“時(shí)間和頻率控制器”。
- IntervalSchedule 按時(shí)間間隔頻率執(zhí)行定時(shí)任務(wù)的控制器,(例:每間隔1H/1M/…執(zhí)行一次)
- ClockedSchedule 指定某個(gè)時(shí)刻執(zhí)行定時(shí)任務(wù)的控制器, (例:2018年8月8號(hào) 8:00這個(gè)時(shí)刻執(zhí)行)
- CrontabSchedule 指定某個(gè)時(shí)間執(zhí)行定時(shí)任務(wù)的控制器 (例:每年的12月星期一的8:30)
導(dǎo)入這四個(gè)模塊:
from django_celery_beat.models import CrontabSchedule, PeriodicTask, IntervalSchedule,ClockedSchedule
1.1 IntervalSchedule 時(shí)間間隔控制器
參數(shù):every 間隔數(shù),period 間隔單位
schedule, created = IntervalSchedule.objects.update_or_create( every=1, period=IntervalSchedule.MINUTES) # 按分鐘間隔執(zhí)行
第二個(gè)參數(shù)可選
- IntervalSchedule.DAYS 固定間隔天數(shù)
- IntervalSchedule.HOURS 固定間隔小時(shí)數(shù)
- IntervalSchedule.MINUTES 固定間隔分鐘數(shù)
- IntervalSchedule.SECONDS 固定間隔秒數(shù)
- IntervalSchedule.MICROSECONDS 固定間隔微秒
返回值可直接解包,其實(shí)只有第一個(gè)參數(shù)schedule有用,在PeriodicTask創(chuàng)建定時(shí)任務(wù)時(shí)作為時(shí)間和周期控制參數(shù)傳入。解包獲得的第二個(gè)數(shù)據(jù)created 可以直接用下劃線取代
1.2 ClockedSchedule特定時(shí)刻定時(shí)器
參數(shù):clocked_time 指定時(shí)間
clocked, _ = ClockedSchedule.objects.update_or_create( clocked_time =datetime.strptime("2020-08-18 16:58:46","%Y-%m-%d %H:%M:%S")) # 按指定時(shí)間執(zhí)行
這里第二個(gè)參數(shù)直接用下劃線取代。特定時(shí)刻控制一般時(shí)執(zhí)行一次,適合在view中調(diào)用執(zhí)行一次性計(jì)劃。
1.3、周期性任務(wù) CrontabSchedule
參數(shù):
- month_of_year # 幾月執(zhí)行
- day_of_month # 幾號(hào)執(zhí)行
- day_of_week # 周幾執(zhí)行
- hour # 幾點(diǎn)執(zhí)行
- minute # 幾分執(zhí)行
- timezone # 時(shí)區(qū)
crontab, _ = CrontabSchedule.objects.update_or_create( minute="00", hour="23", day_of_week="*", # 周幾執(zhí)行 day_of_month='1', # 幾點(diǎn)執(zhí)行 month_of_year='*', timezone=pytz.timezone("Asia/Shanghai"), )
上面的星號(hào)代表不使用,上面的配置即: 每月1日的23點(diǎn)執(zhí)行一次。如果day_of_month=“*”則代表每天23點(diǎn)執(zhí)行。
2、動(dòng)態(tài)添加任務(wù) PeriodicTask
參數(shù):
- name:任務(wù)名
- task:指定的任務(wù)
- interval:時(shí)間間隔
- crontab:時(shí)間控制器
- clocked:指定時(shí)刻控制器
- expires:有效日期
- one_off:?jiǎn)⒂脿顟B(tài)(如果為True,調(diào)度將只運(yùn)行該任務(wù)一次)
- start_time:開(kāi)始時(shí)間
- enabled:?jiǎn)⒂?/li>
- last_run_at:最后運(yùn)行時(shí)間
- total_run_count:運(yùn)行總次數(shù)
- date_changed:最后的更改時(shí)間
- description:描述
- args: 傳參
注意:
1、interval、crontab、clocked三選一,不可同時(shí)使用。參數(shù)值分別對(duì)應(yīng)interval:schedule(IntervalSchedule的第一個(gè)返回值);crontab:crontab(CrontabSchedule的第一個(gè)返回值);clocked:clocked(ClockedSchedule返回值)
2、name :任務(wù)名,確保其唯一性?。。?!
3、task:后面是以字符串形式調(diào)用定時(shí)任務(wù)的具體工作內(nèi)容函數(shù),即第一項(xiàng)的第5條用@shared_task裝飾器創(chuàng)建的方法。
4、enabled:是否啟用,這里動(dòng)態(tài)創(chuàng)建的話一般設(shè)為true
5、args: 傳參(task中指定的任務(wù)需要傳參時(shí)使用),注意需要json序列化 json.dumps(…)
其他參數(shù)均為非必填項(xiàng)。
PeriodicTask.objects.update_or_create( name=working_point_record_id + 'working_time_1', defaults={ "task": "apps.tasks.add", "crontab": crontab, "enabled": True, "args": json.dumps([working_point_record_id]) },
4、封裝方法
一般情況下,可以把IntervalSchedule、ClockedSchedule和CrontabSchedule根據(jù)業(yè)務(wù)需求單獨(dú)封裝一個(gè)文件,而PeriodicTask.objects直接在對(duì)應(yīng)view視圖中調(diào)用即可。
三、啟動(dòng)定時(shí)任務(wù)beat
定時(shí)任務(wù)是獨(dú)立與django項(xiàng)目運(yùn)行的,django只是定時(shí)任務(wù)的入口和操作數(shù)據(jù)庫(kù)的入口,而這前提是django-celery-beat 已經(jīng)獨(dú)立啟動(dòng),django-celery-beat的啟動(dòng)分兩步,一是生產(chǎn)者單獨(dú)啟動(dòng)(beat),而是工作者單獨(dú)啟動(dòng)(worker),這里啟動(dòng)順序需要注意一點(diǎn),建議先啟動(dòng)worker 再啟動(dòng)beat ,曾經(jīng)遇到先啟動(dòng)beat有可能beat會(huì)啟動(dòng)失敗。
1、啟動(dòng)工作者worker
此時(shí)仍然需要重新打開(kāi)一個(gè)命令窗口,進(jìn)入django項(xiàng)目的根目錄(manage.py同級(jí)目錄)下:
# Linux下測(cè)試,啟動(dòng)Celery Celery -A 【項(xiàng)目名稱】worker -l info ? # Windows下測(cè)試,啟動(dòng)Celery Celery -A 【項(xiàng)目名稱】worker -l info -P eventlet ? # 如果Windows下Celery不工作,輸入如下命令 Celery -A 【項(xiàng)目名稱】worker -l info --pool=solo
2、啟動(dòng)生產(chǎn)者beat
beat 是一個(gè)生產(chǎn)者角色,是單獨(dú)運(yùn)行,生產(chǎn)者完全不依賴django,需要與django在一個(gè)不同的命令窗口運(yùn)行,但啟動(dòng)時(shí)需要先進(jìn)入django項(xiàng)目的根目錄,即與manage.py在同一目錄下:
celery -A 【項(xiàng)目名稱】 beat -l info
四、定時(shí)任務(wù)創(chuàng)建
這里要?jiǎng)澲攸c(diǎn)了,其實(shí)看完上面的已經(jīng)可以使用了,下面的屬于優(yōu)化選擇性理解,看個(gè)人理解能力和需求:
一般情況下,定時(shí)器創(chuàng)建和定時(shí)任務(wù)添加 PeriodicTask會(huì)在view視圖中創(chuàng)建,但是如果view視圖比較頻繁,那么每次執(zhí)行view都創(chuàng)建一個(gè)定時(shí)任務(wù)的話會(huì)有無(wú)數(shù)個(gè),會(huì)比較占用內(nèi)存資源,當(dāng)然可以在celeryconfig.py設(shè)置執(zhí)行多少次后重啟任務(wù),高并發(fā)下view會(huì)不會(huì)導(dǎo)致頻繁重啟,這里需要根據(jù)業(yè)務(wù)邏輯把@shared_task包裹下的任務(wù)處理方法做成批量處理放到單獨(dú)文件中(不需要在view視圖中調(diào)用,定時(shí)調(diào)用批處理,那么就是一個(gè)任務(wù)批量處理數(shù)據(jù)而已),在間隔固定時(shí)間下執(zhí)行,單純語(yǔ)言難以表達(dá)清楚,理解的就理解了,不理解的就慢慢體會(huì)吧。這里需要畫(huà)重點(diǎn)的是:如果你理解了單個(gè)文件寫(xiě)@shared_task批處理方法,那么你的問(wèn)題重點(diǎn)就是:我如何啟動(dòng)它呢?難道每次在啟動(dòng)beat后在命令行啟動(dòng)嗎?雖然也可以,但是如果項(xiàng)目需要?jiǎng)?chuàng)建的定時(shí)任務(wù)很多怎么辦,不用多,幾百行可以了,每次在命令行復(fù)制粘貼執(zhí)行命令 嗎?當(dāng)然不要,其實(shí)直接創(chuàng)建一個(gè)單獨(dú)的python文件,然后再根目錄下的url中導(dǎo)入即可,因?yàn)閡rl在項(xiàng)目啟動(dòng)的時(shí)候會(huì)自動(dòng)加載一次,也就相當(dāng)于啟動(dòng)django項(xiàng)目的時(shí)候就自動(dòng)創(chuàng)建了一次定時(shí)任務(wù)!
到此這篇關(guān)于django-celery-beat搭建定時(shí)任務(wù)的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)django-celery-beat 定時(shí)任務(wù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- django中celery的定時(shí)任務(wù)使用
- Django初步使用Celery處理耗時(shí)任務(wù)和定時(shí)任務(wù)問(wèn)題
- 關(guān)于Django使用 django-celery-beat動(dòng)態(tài)添加定時(shí)任務(wù)的方法
- Django+Celery實(shí)現(xiàn)定時(shí)任務(wù)的示例
- Django+Celery實(shí)現(xiàn)動(dòng)態(tài)配置定時(shí)任務(wù)的方法示例
- Django實(shí)現(xiàn)celery定時(shí)任務(wù)過(guò)程解析
- Django中使用Celery執(zhí)行定時(shí)任務(wù)問(wèn)題
相關(guān)文章
python 使用正則表達(dá)式按照多個(gè)空格分割字符的實(shí)例
今天小編就為大家分享一篇python 使用正則表達(dá)式按照多個(gè)空格分割字符的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-12-12Kears+Opencv實(shí)現(xiàn)簡(jiǎn)單人臉識(shí)別
這篇文章主要為大家詳細(xì)介紹了Kears+Opencv實(shí)現(xiàn)簡(jiǎn)單人臉識(shí)別,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-08-08celery異步定時(shí)任務(wù)訂單定時(shí)回滾
這篇文章主要為大家介紹了celery異步定時(shí)任務(wù)訂單定時(shí)回滾的實(shí)現(xiàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪2022-04-04解決tensorflow1.x版本加載saver.restore目錄報(bào)錯(cuò)的問(wèn)題
今天小編就為大家分享一篇解決tensorflow1.x版本加載saver.restore目錄報(bào)錯(cuò)的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-07-07使用Python和Scrapy實(shí)現(xiàn)抓取網(wǎng)站數(shù)據(jù)
Scrapy是一個(gè)功能強(qiáng)大的網(wǎng)絡(luò)爬蟲(chóng)框架,允許開(kāi)發(fā)者輕松地抓取和解析網(wǎng)站內(nèi)容,這篇文章主要為大家介紹了如何使用Python的Scrapy庫(kù)進(jìn)行網(wǎng)站數(shù)據(jù)抓取,需要的可以參考一下2023-05-05利用pyshp包給shapefile文件添加字段的實(shí)例
今天小編就為大家分享一篇利用pyshp包給shapefile文件添加字段的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-12-12cmd運(yùn)行python文件時(shí)對(duì)結(jié)果進(jìn)行保存的方法
今天小編就為大家分享一篇cmd運(yùn)行python文件時(shí)對(duì)結(jié)果進(jìn)行保存的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-05-05