Django中使用Celery的方法示例
起步
在 《分布式任務(wù)隊列Celery使用說明》 中介紹了在 Python 中使用 Celery 來實驗異步任務(wù)和定時任務(wù)功能。本文介紹如何在 Django 中使用 Celery。
安裝
pip install django-celery
這個命令使用的依賴是 Celery 3.x 的版本,所以會把我之前安裝的 4.x 卸載,不過對功能上并沒有什么影響。我們也完全可以僅用Celery在django中使用,但使用 django-celery 模塊能更好的管理 celery。
使用
可以把有關(guān) Celery 的配置放到 settings.py 里去,但我比較習(xí)慣單獨(dú)一個文件來放,然后在 settings.py 引入進(jìn)來:
# celery_config.py import djcelery import os os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1') djcelery.setup_loader() BROKER_URL = 'redis://127.0.0.1:6379/1' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2' # UTC CELERY_ENABLE_UTC = True CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_IMPORTS = ( 'app.tasks', ) # 有些情況可以防止死鎖 CELERY_FORCE_EXECV = True # 設(shè)置并發(fā)的worker數(shù)量 CELERYD_CONCURRENCY = 4 # 任務(wù)發(fā)送完成是否需要確認(rèn),這一項對性能有一點(diǎn)影響 CELERY_ACKS_LATE = True # 每個worker執(zhí)行了多少任務(wù)就會銷毀,防止內(nèi)存泄露,默認(rèn)是無限的 CELERYD_MAX_TASKS_PER_CHILD = 40 # 規(guī)定完成任務(wù)的時間 CELERYD_TASK_TIME_LIMIT = 15 * 60 # 在15分鐘內(nèi)完成任務(wù),否則執(zhí)行該任務(wù)的worker將被殺死,任務(wù)移交給父進(jìn)程 # 設(shè)置默認(rèn)的隊列名稱,如果一個消息不符合其他的隊列就會放在默認(rèn)隊列里面,如果什么都不設(shè)置的話,數(shù)據(jù)都會發(fā)送到默認(rèn)的隊列中 CELERY_DEFAULT_QUEUE = "default" # 設(shè)置詳細(xì)的隊列 CELERY_QUEUES = { "default": { # 這是上面指定的默認(rèn)隊列 "exchange": "default", "exchange_type": "direct", "routing_key": "default" }, "beat_queue": { "exchange": "beat_queue", "exchange_type": "direct", "routing_key": "beat_queue" } }
配置文件中設(shè)置了 CELERY_IMPORTS 導(dǎo)入的任務(wù),所以在django app中創(chuàng)建相應(yīng)的任務(wù)文件:
# app/tasks.py from celery.task import Task import time class TestTask(Task): name = 'test-task' # 給任務(wù)設(shè)置個自定義名稱 def run(self, *args, **kwargs): print('start test task') time.sleep(4) print('args={}, kwargs={}'.format(args, kwargs)) print('end test task')
在 settings.py 添加:
INSTALLED_APPS = [ # ... 'djcelery', ] # Celery from learn_django.celery_config import *
觸發(fā)任務(wù)或提交任務(wù)可以在view中來調(diào)用:
# views.py from django.http import HttpResponse from app.tasks import TestTask def test_task(request): # 執(zhí)行異步任務(wù) print('start do request') t = TestTask() t.delay() print('end do request') return HttpResponse('ok')
啟動 woker 的命令是:
python manage.py celery worker -l info
再啟動django,訪問該view,可以看到任務(wù)在worker中被消費(fèi)了。
定時任務(wù)
在celery的配置文件 celery_config.py 文件中添加:
CELERYBEAT_SCHEDULE = { 'task1-every-1-min': { # 自定義名稱 'task': 'test-task', # 與任務(wù)中name名稱一致 'schedule': datetime.timedelta(seconds=5), 'args': (2, 15), 'options': { 'queue': 'beat_queue', # 指定要使用的隊列 } }, }
通過 options 的 queque 來指定要使用的隊列,這里需要單獨(dú)的隊列是因為,如果所有任務(wù)都使用同一隊列,對于定時任務(wù)來說,任務(wù)提交后會位于隊列尾部,任務(wù)的執(zhí)行時間會靠后,所以對于定時任務(wù)來說,使用單獨(dú)的隊列。
啟動 beat:
python manage.py celery beat -l info
監(jiān)控工具 flower
如果celery中的任務(wù)執(zhí)行失敗了,有些場景是需要對這些任務(wù)進(jìn)行監(jiān)控, flower 是基于 Tornado 開發(fā)的web應(yīng)用。安裝用 pip install flower ;啟動它可以是:
python manage.py celery flower # python manage.py celery flower --basic_auth=admin:admin
用瀏覽器訪問 http://localhost:5555 即可查看:
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Python編程pytorch深度卷積神經(jīng)網(wǎng)絡(luò)AlexNet詳解
AlexNet和LeNet的架構(gòu)非常相似。這里我們提供了一個稍微精簡版本的AlexNet,去除了當(dāng)年需要兩個小型GPU同時運(yùn)算的設(shè)計特點(diǎn)2021-10-10PyQT5 QTableView顯示綁定數(shù)據(jù)的實例詳解
今天小編就為大家分享一篇PyQT5 QTableView顯示綁定數(shù)據(jù)的實例詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-06-06Python數(shù)據(jù)結(jié)構(gòu)與算法中的隊列詳解(2)
這篇文章主要為大家詳細(xì)介紹了Python中的隊列,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助2022-03-03PyQt5?python?數(shù)據(jù)庫?表格動態(tài)增刪改詳情
這篇文章主要介紹了PyQt5?python?數(shù)據(jù)庫?表格動態(tài)增刪改詳情,首先手動連接數(shù)據(jù)庫與下一個的程序連接數(shù)據(jù)庫是獨(dú)立的2個部分,下面來看看文章的詳細(xì)介紹2022-01-01python實現(xiàn)微信每日一句自動發(fā)送給喜歡的人
這篇文章主要為大家詳細(xì)介紹了python實現(xiàn)微信每日一句自動發(fā)送給喜歡的人,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-04-04python實現(xiàn)撲克牌交互式界面發(fā)牌程序
這篇文章主要介紹了python實現(xiàn)撲克牌交互式界面發(fā)牌程序,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2020-04-04