python使用celery實(shí)現(xiàn)訂單超時取消
本文實(shí)例為大家分享了celery實(shí)現(xiàn)訂單超時取消的具體代碼,供大家參考,具體內(nèi)容如下
Celery官方文檔中關(guān)于定時任務(wù)使用的說明
項(xiàng)目目錄結(jié)構(gòu)
我們需要新增一個任務(wù)目錄,例如order:
celey_tasks/
├── config.py
├── __init__.py
├── main.py
├── order/
├── __init__.py
└── tasks.py
在main.py中,注冊任務(wù)目錄【注意,接下來后面我們使用django的模型處理,所以必須對django的配置進(jìn)行引入】
import os from celery import Celery # 1. 創(chuàng)建示例對象 app = Celery("luffy") # 2. 加載配置 app.config_from_object("celery_tasks.config") # 3. 注冊任務(wù)[自動搜索并加載任務(wù)] # 參數(shù)必須必須是一個列表,里面的每一個任務(wù)都是任務(wù)的路徑名稱 # app.autodiscover_tasks(["任務(wù)1","任務(wù)2"]) app.autodiscover_tasks(["celery_tasks.order"]) # 4. 在終端下面運(yùn)行celery命令啟動celery # celery -A 主程序 worker --loglevel=info # celery -A celery_tasks.main worker --loglevel=info
接下來,在order任務(wù)目錄下, 創(chuàng)建固定名字的任務(wù)文件tasks.py,代碼:
from celery_tasks.main import app @app.task(name="check_order") def check_order(): print("檢查訂單是否過期!!!")
接下來,我們需要把這個任務(wù)設(shè)置定時任務(wù),所以需要借助Celery本身提供的Crontab模塊。
在配置文件中,對定時任務(wù)進(jìn)行注冊:
# 任務(wù)隊(duì)列的鏈接地址 broker_url = 'redis://127.0.0.1:6379/15' # 結(jié)果隊(duì)列的鏈接地址 result_backend = 'redis://127.0.0.1:6379/14' from celery.schedules import crontab from .main import app # 定時任務(wù)的調(diào)度列表,用于注冊定時任務(wù) app.conf.beat_schedule = { # Executes every Monday morning at 7:30 a.m. 'check_order_outtime': { # 本次調(diào)度的任務(wù) 'task': 'check_order', # 這里的任務(wù)名稱必須先到main.py中注冊 # 定時任務(wù)的調(diào)度周期 # 'schedule': crontab(minute=0, hour=0), # 每周凌晨00:00 'schedule': crontab(), # 每分鐘 # 'args': (16, 16), # 注意:任務(wù)就是一個函數(shù),所以如果有參數(shù)則需要傳遞 }, }
接下來,我們就可以重啟Celery并啟用Celery的定時任務(wù)調(diào)度器
先在終端下,運(yùn)行celery的定時任務(wù)程序,以下命令:
celery -A celery_tasks.main beat # ycelery.main 是celery的主應(yīng)用文件
然后再新建一個終端,運(yùn)行以下命令,上面的命令必須先指定:
celery -A celery_tasks.main worker --loglevel=info
定時任務(wù)
經(jīng)過上面的測試以后,我們接下來只需改造上面的任務(wù)函數(shù),用于判斷修改訂單是否超時。
要完成訂單的任務(wù)功能,如果需要調(diào)用django框架的模型操作,那么必須針對django框架進(jìn)行配置加載和初始化。
main.py,代碼
import os from celery import Celery # 1. 創(chuàng)建示例對象 app = Celery("luffy") # 把celery和django進(jìn)行組合,識別和加載django的配置文件 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffyapi.settings.dev') # 在當(dāng)前clery中啟動django框架,對django框架進(jìn)行進(jìn)行初始化 import django django.setup() # 2. 加載配置 app.config_from_object("celery_tasks.config") # 3. 注冊任務(wù)[自動搜索并加載任務(wù)] # 參數(shù)必須必須是一個列表,里面的每一個任務(wù)都是任務(wù)的路徑名稱 # app.autodiscover_tasks(["任務(wù)1","任務(wù)2"]) app.autodiscover_tasks(["celery_tasks.sms","celery_tasks.order"]) # 4. 在終端下面運(yùn)行celery命令啟動celery # celery -A 主程序 worker --loglevel=info # celery -A celery_tasks.main worker --loglevel=info
注意,因?yàn)樵赿jango中是有時區(qū)配置的,所以,我們在django框架配置中也要修改時區(qū)配置。
任務(wù)代碼tasks.py的實(shí)現(xiàn):
from celery_tasks.main import app from orders.models import Order from datetime import datetime from django.conf import settings @app.task(name="check_order") def check_order(): # 查詢出所有已經(jīng)超時的訂單 # 超時條件: 當(dāng)前時間 > (訂單生成時間 + 超時時間) =====>>>> (當(dāng)前時間 - 超時時間) > 訂單生成時間 now = datetime.now().timestamp() timeout_number = now - settings.ORDER_TIMEOUT timeout = datetime.fromtimestamp(timeout_number) timeout_order_list = Order.objects.filter(order_status=0, created_time__lte=timeout) for order in timeout_order_list: order.order_status = 3 order.save()
配置文件,settings/dev.py,代碼:
# 設(shè)置訂單超時超時的時間[單位: 秒] ORDER_TIMEOUT = 12 * 60 * 60
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Keras 中Leaky ReLU等高級激活函數(shù)的用法
這篇文章主要介紹了Keras 中Leaky ReLU等高級激活函數(shù)的用法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-07-07Python新手入門之單引號、雙引號與三引號的差異與應(yīng)用示例
在Python當(dāng)中表達(dá)字符串既可以使用單引號,也可以使用雙引號,那兩者有什么區(qū)別嗎?下面這篇文章主要給大家介紹了關(guān)于Python新手入門之單引號、雙引號與三引號的差異與應(yīng)用示例,需要的朋友可以參考下2024-03-03Python 中使用 argparse 解析命令行參數(shù)
這篇文章主要介紹了Python 中使用 argparse 解析命令行參數(shù),argparse 模塊是一個強(qiáng)大的命令行參數(shù)解析器,還有很多功能沒能在這里介紹。下面文化在哪個詳細(xì)介紹該內(nèi)容,需要的朋友可以參考一下2021-11-11python3中sorted函數(shù)里cmp參數(shù)改變詳解
在本篇文章里小編給大家整理的是關(guān)于python3中sorted函數(shù)里關(guān)于cmp這一參數(shù)的改變相關(guān)內(nèi)容,需要的朋友們可以學(xué)習(xí)下。2020-03-03