聊聊通過celery_one避免Celery定時(shí)任務(wù)重復(fù)執(zhí)行的問題
在使用Celery統(tǒng)計(jì)每日訪問數(shù)量的時(shí)候,發(fā)現(xiàn)一個(gè)任務(wù)會同時(shí)執(zhí)行兩次,發(fā)現(xiàn)同一時(shí)間內(nèi)(1s內(nèi))竟然同時(shí)發(fā)送了兩次任務(wù),也就是同時(shí)產(chǎn)生了兩個(gè)worker,造成統(tǒng)計(jì)兩次,一直找不到原因。
參考:http://chabaoo.cn/article/226849.htm
有人使用 Redis 實(shí)現(xiàn)了分布式鎖,然后也有人使用了 Celery Once。
Celery Once 也是利用 Redis 加鎖來實(shí)現(xiàn), Celery Once 在 Task 類基礎(chǔ)上實(shí)現(xiàn)了 QueueOnce 類,該類提供了任務(wù)去重的功能,所以在使用時(shí),我們自己實(shí)現(xiàn)的方法需要將 QueueOnce 設(shè)置為 base
@task(base=QueueOnce, once={'graceful': True})
后面的 once 參數(shù)表示,在遇到重復(fù)方法時(shí)的處理方式,默認(rèn) graceful 為 False,那樣 Celery 會拋出 AlreadyQueued 異常,手動設(shè)置為 True,則靜默處理。
另外如果要手動設(shè)置任務(wù)的 key,可以指定 keys 參數(shù)
@celery.task(base=QueueOnce, once={'keys': ['a']})
def slow_add(a, b):
sleep(30)
return a + b
解決步驟
Celery One允許你將Celery任務(wù)排隊(duì),防止多次執(zhí)行
安裝
pip install -U celery_once
要求,需要Celery4.0,老版本可能運(yùn)行,但不是官方支持的。
使用celery_once,tasks需要繼承一個(gè)名為QueueOnce的抽象base tasks
Once安裝完成后,需要配置一些關(guān)于ONCE的選項(xiàng)在Celery配置中
from celery import Celery
from celery_once import QueueOnce
from time import sleep
celery = Celery('tasks', broker='amqp://guest@localhost//')
# 一般之前的配置沒有這個(gè),需要添加上
celery.conf.ONCE = {
'backend': 'celery_once.backends.Redis',
'settings': {
'url': 'redis://localhost:6379/0',
'default_timeout': 60 * 60
}
}
# 在原本沒有參數(shù)的里面加上base
@celery.task(base=QueueOnce)
def slow_task():
sleep(30)
return "Done!"
要確定配置,需要取決于使用哪個(gè)backend進(jìn)行鎖定,查看Backends
在后端,這將覆蓋apply_async和delay。它不影響直接調(diào)用任務(wù)。
在運(yùn)行任務(wù)時(shí),celery_once檢查是否沒有鎖定(針對Redis鍵)。否則,任務(wù)將正常運(yùn)行。一旦任務(wù)完成(或由于異常而結(jié)束),鎖將被清除。如果在任務(wù)完成之前嘗試再次運(yùn)行該任務(wù),將會引發(fā)AlreadyQueued異常。
example.delay(10)
example.delay(10)
Traceback (most recent call last):
..
AlreadyQueued()
result = example.apply_async(args=(10))
result = example.apply_async(args=(10))
Traceback (most recent call last):
..
AlreadyQueued()
graceful:如果在任務(wù)的選項(xiàng)中設(shè)置了once={'graceful': True},或者在運(yùn)行時(shí)設(shè)置了apply_async,則任務(wù)可以返回None,而不是引發(fā)AlreadyQueued異常。
from celery_once import AlreadyQueued
# Either catch the exception,
try:
example.delay(10)
except AlreadyQueued:
pass
# Or, handle it gracefully at run time.
result = example.apply(args=(10), once={'graceful': True})
# or by default.
@celery.task(base=QueueOnce, once={'graceful': True})
def slow_task():
sleep(30)
return "Done!"
其他功能請?jiān)L問:https://pypi.org/project/celery_once/
到此這篇關(guān)于通過celery_one避免Celery定時(shí)任務(wù)重復(fù)執(zhí)行的文章就介紹到這了,更多相關(guān)Celery定時(shí)任務(wù)重復(fù)執(zhí)行內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
pycharm配置python 設(shè)置pip安裝源為豆瓣源
這篇文章主要介紹了pycharm配置python 設(shè)置pip安裝源為豆瓣源,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-02-02
Python Dask庫處理大規(guī)模數(shù)據(jù)集的強(qiáng)大功能實(shí)戰(zhàn)
Dask是一個(gè)靈活、開源的Python庫,專為處理大規(guī)模數(shù)據(jù)集而設(shè)計(jì),與傳統(tǒng)的單機(jī)計(jì)算相比,Dask能夠在分布式系統(tǒng)上運(yùn)行,有效利用集群的計(jì)算資源,本文將深入介紹Dask的核心概念、功能和實(shí)際應(yīng)用,通過豐富的示例代碼展示其在大數(shù)據(jù)處理領(lǐng)域的強(qiáng)大能力2023-12-12
python腳本內(nèi)運(yùn)行l(wèi)inux命令的方法
這篇文章主要介紹了python腳本內(nèi)運(yùn)行l(wèi)inux命令的方法,實(shí)例分析了Python基于subprocess模塊操作Linux命令的相關(guān)技巧,需要的朋友可以參考下2015-07-07
Python time模塊詳解(常用函數(shù)實(shí)例講解,非常好)
在平常的代碼中,我們常常需要與時(shí)間打交道。在Python中,與時(shí)間處理有關(guān)的模塊就包括:time,datetime以及calendar。這篇文章,主要講解time模塊。2014-04-04

