詳解分布式任務(wù)隊列Celery使用說明
起步
Celery 是一個簡單、靈活且可靠的,處理大量消息的分布式系統(tǒng),并且提供維護這樣一個系統(tǒng)的必需工具。它是一個專注于實時處理的任務(wù)隊列,同時也支持任務(wù)調(diào)度。
運行模式是生產(chǎn)者消費者模式:

任務(wù)隊列:任務(wù)隊列是一種在線程或機器間分發(fā)任務(wù)的機制。
消息隊列:消息隊列的輸入是工作的一個單元,稱為任務(wù),獨立的職程(Worker)進程持續(xù)監(jiān)視隊列中是否有需要處理的新任務(wù)。
Celery 用消息通信,通常使用中間人(Broker)在客戶端和職程間斡旋。這個過程從客戶端向隊列添加消息開始,之后中間人把消息派送給職程,職程對消息進行處理。
Celery的架構(gòu)由三部分組成,消息中間件(message broker),任務(wù)執(zhí)行單元(worker)和任務(wù)執(zhí)行結(jié)果存儲(task result store)組成。
消息中間件:Celery本身不提供消息服務(wù),但是可以方便的和第三方提供的消息中間件集成,包括,RabbitMQ, Redis, MongoDB等,本文使用 redis 。
任務(wù)執(zhí)行單元:Worker是Celery提供的任務(wù)執(zhí)行的單元,worker并發(fā)的運行在分布式的系統(tǒng)節(jié)點中
任務(wù)結(jié)果存儲:Task result store用來存儲Worker執(zhí)行的任務(wù)的結(jié)果,Celery支持以不同方式存儲任務(wù)的結(jié)果,包括Redis,MongoDB,Django ORM,AMQP等,這里我先不去看它是如何存儲的,就先選用Redis來存儲任務(wù)執(zhí)行結(jié)果。
安裝
通過 pip 命令即可安裝:
pip install celery
本文使用 redis 做消息中間件,所以需要在安裝:
pip install redis
redis軟件也要安裝,官網(wǎng)只提供了 linux 版本的下載:https://redis.io/download,windows 的可以到 https://github.com/MicrosoftArchive/redis 下載 exe 安裝包。
簡單的demo
為了運行一個簡單的任務(wù),從中說明 celery 的使用方式。在項目文件夾內(nèi)創(chuàng)建 app.py 和 tasks.py 。tasks.py 用來定義任務(wù):
# tasks.py
import time
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery('my_tasks', broker=broker, backend=backend)
@app.task
def add(x, y):
print('enter task')
time.sleep(3)
return x + y
這些代碼做了什么事。 broker 指定任務(wù)隊列的消息中間件,backend 指定了任務(wù)執(zhí)行結(jié)果的存儲。app 就是我們創(chuàng)建的 Celery 對象。通過 app.task 修飾器將 add 函數(shù)變成一個一部的任務(wù)。
# app.py
from tasks import add
if __name__ == '__main__':
print('start task')
result = add.delay(2, 18)
print('end task')
print(result)
add.delay 函數(shù)將任務(wù)序列化發(fā)送到消息中間件。終端執(zhí)行 python app.py 可以看到輸出一個任務(wù)的唯一識別:
start task
end task
79ef4736-1ecb-4afd-aa5e-b532657acd43
這個只是將任務(wù)推送到 redis,任務(wù)還沒被消費,任務(wù)會在 celery 隊列中。
開啟 celery woker 可以將任務(wù)進行消費:
celery worker -A tasks -l info # -A 后是模塊名
A 參數(shù)指定了celery 對象的位置,l 參數(shù)指定woker的日志級別。
如果此命令在終端報錯:
File "e:\workspace\.env\lib\site-packages\celery\app\trace.py", line 537, in _fast_trace_task
tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)
這是win 10 在使用 Celery 4.x 的時候會有這個問題,解決方式可以是改用 Celery 3.x 版本,或者按照 Unable to run tasks under Windows 上提供的方式,該issue提供了兩種方式解決,一種是安裝 eventlet 擴展:
pip install eventlet celery -A <mymodule> worker -l info -P eventlet
另一種方式是添加個 FORKED_BY_MULTIPROCESSING = 1 的環(huán)境變量(推薦這種方式):
import os
os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
如果一切順利,woker 正常啟動,就能在終端看到任務(wù)被消費了:
[2018-11-27 13:59:27,830: INFO/MainProcess] Received task: tasks.add[745e5be7-4675-4f84-9d57-3f5e91c33a19]
[2018-11-27 13:59:27,831: WARNING/SpawnPoolWorker-2] enter task
[2018-11-27 13:59:30,835: INFO/SpawnPoolWorker-2] Task tasks.add[745e5be7-4675-4f84-9d57-3f5e91c33a19] succeeded in 3.0s: 20
說明我們的demo已經(jīng)成功了。
使用配置文件
在上面的demo中,是將broker和backend直接寫在代碼中的,而 Celery 還有其他配置,最好是寫出配置文件的形式,基本配置項有:
- CELERY_DEFAULT_QUEUE:默認隊列
- BROKER_URL : 代理人的網(wǎng)址
- CELERY_RESULT_BACKEND:結(jié)果存儲地址
- CELERY_TASK_SERIALIZER:任務(wù)序列化方式
- CELERY_RESULT_SERIALIZER:任務(wù)執(zhí)行結(jié)果序列化方式
- CELERY_TASK_RESULT_EXPIRES:任務(wù)過期時間
- CELERY_ACCEPT_CONTENT:指定任務(wù)接受的內(nèi)容序列化類型(序列化),一個列表;
整理一下目錄結(jié)構(gòu),將我們的任務(wù)封裝成包:

內(nèi)容如下:
# __init__.py
import os
from celery import Celery
os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
app = Celery('demo')
# 通過 Celery 實例加載配置模塊
app.config_from_object('celery_app.celery_config')
# celery_config.py
BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
# UTC
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = 'Asia/Shanghai'
# 導入指定的任務(wù)模塊
CELERY_IMPORTS = (
'celery_app.task1',
'celery_app.task2',
)
# task1.py
import time
from celery_app import app
@app.task
def add(x, y):
print('enter task')
time.sleep(3)
return x + y
# task2.py
import time
from celery_app import app
@app.task
def mul(x, y):
print('enter task')
time.sleep(4)
return x * y
# app.py
from celery_app import task1
if __name__ == '__main__':
pass
print('start task')
result = task1.add.delay(2, 18)
print('end task')
print(result)
提交任務(wù)與啟動worker:
$ python app.py $ celery worker -A celery_app -l info
result = task1.add.delay(2, 18) 返回的是一個任務(wù)對象,通過 delay 函數(shù)的方式可以發(fā)現(xiàn)這個過程是非阻塞的,這個任務(wù)對象有一個方法:
r.ready() # 查看任務(wù)狀態(tài),返回布爾值, 任務(wù)執(zhí)行完成, 返回 True, 否則返回 False. r.wait() # 等待任務(wù)完成, 返回任務(wù)執(zhí)行結(jié)果,很少使用; r.get(timeout=1) # 獲取任務(wù)執(zhí)行結(jié)果,可以設(shè)置等待時間 r.result # 任務(wù)執(zhí)行結(jié)果. r.state # PENDING, START, SUCCESS,任務(wù)當前的狀態(tài) r.status # PENDING, START, SUCCESS,任務(wù)當前的狀態(tài) r.successful # 任務(wù)成功返回true r.traceback # 如果任務(wù)拋出了一個異常,你也可以獲取原始的回溯信息
定時任務(wù)
定時任務(wù)的功能類似 crontab,可以完成每日統(tǒng)計任務(wù)等。首先我們需要配置一下 schedule,通過改造上面的配置文件,添加 CELERYBEAT_SCHEDULE 配置:
import datetime
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
'task1-every-1-min': {
'task': 'celery_app.task1.add',
'schedule': datetime.timedelta(seconds=60),
'args': (2, 15),
},
'task2-once-a-day': {
'task': 'celery_app.task2.mul',
'schedule': crontab(hour=15, minute=23),
'args': (3, 6),
}
}
task 指定要執(zhí)行的任務(wù);schedule 表示計劃的時間,datetime.timedelta(seconds=60) 表示間隔一分鐘,這里其實也可以是 crontab(minute='*/1') 來替換;args 表示要傳遞的參數(shù)。
啟動 celery beat:
$ celery worker -A celery_app -l info

我們目前是用兩個窗口來執(zhí)行 woker 和 beat 。當然也可以只使用一個窗口來運行(僅限linux系統(tǒng)):
$ celery -B -A celery_app worker -l info
celery.task 裝飾器
@celery.task() def name(): pass
task() 方法將任務(wù)修飾成異步, name 可以顯示指定的任務(wù)名字;serializer 指定序列化的方式;bind 一個bool值,若為True,則task實例會作為第一個參數(shù)傳遞到任務(wù)方法中,可以訪問task實例的所有的屬性,即前面反序列化中那些屬性。
@task(bind=True) # 第一個參數(shù)是self,使用self.request訪問相關(guān)的屬性 def add(self, x, y): logger.info(self.request.id)
base 可以指定任務(wù)積累,可以用來定義回調(diào)函數(shù):
import celery
class MyTask(celery.Task):
# 任務(wù)失敗時執(zhí)行
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
# 任務(wù)成功時執(zhí)行
def on_success(self, retval, task_id, args, kwargs):
pass
# 任務(wù)重試時執(zhí)行
def on_retry(self, exc, task_id, args, kwargs, einfo):
pass
@task(base=MyTask)
def add(x, y):
raise KeyError()
exc:失敗時的錯誤的類型;
task_id:任務(wù)的id;
args:任務(wù)函數(shù)的參數(shù);
kwargs:參數(shù);
einfo:失敗時的異常詳細信息;
retval:任務(wù)成功執(zhí)行的返回值;
總結(jié)
網(wǎng)上找了一份比較常用的配置文件,需要的時候可以參考下:
# 注意,celery4版本后,CELERY_BROKER_URL改為BROKER_URL
BROKER_URL = 'amqp://username:passwd@host:port/虛擬主機名'
# 指定結(jié)果的接受地址
CELERY_RESULT_BACKEND = 'redis://username:passwd@host:port/db'
# 指定任務(wù)序列化方式
CELERY_TASK_SERIALIZER = 'msgpack'
# 指定結(jié)果序列化方式
CELERY_RESULT_SERIALIZER = 'msgpack'
# 任務(wù)過期時間,celery任務(wù)執(zhí)行結(jié)果的超時時間
CELERY_TASK_RESULT_EXPIRES = 60 * 20
# 指定任務(wù)接受的序列化類型.
CELERY_ACCEPT_CONTENT = ["msgpack"]
# 任務(wù)發(fā)送完成是否需要確認,這一項對性能有一點影響
CELERY_ACKS_LATE = True
# 壓縮方案選擇,可以是zlib, bzip2,默認是發(fā)送沒有壓縮的數(shù)據(jù)
CELERY_MESSAGE_COMPRESSION = 'zlib'
# 規(guī)定完成任務(wù)的時間
CELERYD_TASK_TIME_LIMIT = 5 # 在5s內(nèi)完成任務(wù),否則執(zhí)行該任務(wù)的worker將被殺死,任務(wù)移交給父進程
# celery worker的并發(fā)數(shù),默認是服務(wù)器的內(nèi)核數(shù)目,也是命令行-c參數(shù)指定的數(shù)目
CELERYD_CONCURRENCY = 4
# celery worker 每次去rabbitmq預取任務(wù)的數(shù)量
CELERYD_PREFETCH_MULTIPLIER = 4
# 每個worker執(zhí)行了多少任務(wù)就會死掉,默認是無限的
CELERYD_MAX_TASKS_PER_CHILD = 40
# 這是使用了django-celery默認的數(shù)據(jù)庫調(diào)度模型,任務(wù)執(zhí)行周期都被存在你指定的orm數(shù)據(jù)庫中
# CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
# 設(shè)置默認的隊列名稱,如果一個消息不符合其他的隊列就會放在默認隊列里面,如果什么都不設(shè)置的話,數(shù)據(jù)都會發(fā)送到默認的隊列中
CELERY_DEFAULT_QUEUE = "default"
# 設(shè)置詳細的隊列
CELERY_QUEUES = {
"default": { # 這是上面指定的默認隊列
"exchange": "default",
"exchange_type": "direct",
"routing_key": "default"
},
"topicqueue": { # 這是一個topic隊列 凡是topictest開頭的routing key都會被放到這個隊列
"routing_key": "topic.#",
"exchange": "topic_exchange",
"exchange_type": "topic",
},
"task_eeg": { # 設(shè)置扇形交換機
"exchange": "tasks",
"exchange_type": "fanout",
"binding_key": "tasks",
},
}
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Python網(wǎng)絡(luò)爬蟲四大選擇器用法原理總結(jié)
這篇文章主要介紹了Python網(wǎng)絡(luò)爬蟲四大選擇器用法原理總結(jié),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-06-06
Python3實現(xiàn)轉(zhuǎn)換Image圖片格式
本篇文章給大家分享了Python3實現(xiàn)在線轉(zhuǎn)換Image圖片格式的功能以及相關(guān)實例代碼,有興趣的朋友參考下。2018-06-06
Pytorch中關(guān)于model.eval()的作用及分析
這篇文章主要介紹了Pytorch中關(guān)于model.eval()的作用及分析,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-02-02
python獲取一組數(shù)據(jù)里最大值max函數(shù)用法實例
這篇文章主要介紹了python獲取一組數(shù)據(jù)里最大值max函數(shù)用法,實例分析了max函數(shù)的使用技巧,需要的朋友可以參考下2015-05-05
Python中Scipy庫在信號處理中的應(yīng)用詳解
信號處理作為數(shù)字信號處理領(lǐng)域的關(guān)鍵技術(shù),涵蓋了從信號獲取、傳輸、存儲到最終應(yīng)用的一系列處理步驟,在這篇博客中,我們將深入探討Python中Scipy庫在信號處理領(lǐng)域的應(yīng)用,需要的朋友可以參考下2023-12-12

