亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

django celery定時(shí)任務(wù)實(shí)戰(zhàn)詳解

 更新時(shí)間:2025年07月01日 10:53:59   作者:cleargy  
本文介紹了在Django項(xiàng)目中配置Celery定時(shí)任務(wù)的步驟,包括安裝、參數(shù)設(shè)置、創(chuàng)建任務(wù)文件、啟動(dòng)任務(wù)及后臺(tái)配置,最后整理項(xiàng)目結(jié)構(gòu),本文給大家介紹的非常詳細(xì),感興趣的朋友一起看看吧

一、celery依賴安裝

# python 3.11版本
pip install celery redis django-celery-beat django_celery_results eventlet

二、celery 參數(shù)配置

django項(xiàng)目的settings.py中新增如下celery配置

##
INSTALLED_APPS = [
    ......
    'firewall_app',
    'django_celery_beat',    # 添加Celery應(yīng)用
    'django_celery_results', # 添加Celery結(jié)果展示應(yīng)用
]
# Celery Configuration Options
# 使用 Redis 作為消息代理
CELERY_BROKER_URL = 'redis://localhost:6379/0'  # 或 'amqp://guest:guest@localhost:5672//' 如果使用 RabbitMQ
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai' # 設(shè)置時(shí)區(qū)
CELERY_ENABLE_UTC = True
# Celery Beat Settings (如果使用定時(shí)任務(wù))
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' # 如果希望在 Django Admin 中管理定時(shí)任務(wù),需要安裝 django-celery-beat
# 或者使用默認(rèn)的本地調(diào)度器:
# CELERY_BEAT_SCHEDULER = 'celery.beat:PersistentScheduler'
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
#日志輸出配置
LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'handlers': {
        'file': {
            'level': 'INFO',
            'class': 'logging.FileHandler',
            'filename': 'celery.log',  # 日志文件路徑
        },
    },
    'loggers': {
        'firewall_app.tasks': {  # 匹配您的任務(wù)模塊
            'handlers': ['file'],
            'level': 'INFO',
            'propagate': True,
        },
    },
}

三、celery定時(shí)任務(wù)

1、新建tasks.py

import logging
import time
from celery import shared_task
logger = logging.getLogger(__name__)
def run_crawler_logic():
    print("執(zhí)行爬蟲任務(wù)...")
    # 在這里調(diào)用 FortinetCrawler 或相關(guān)爬蟲函數(shù)
    # crawler = FortinetCrawler()
    # crawler.run()
    time.sleep(10) # 模擬任務(wù)執(zhí)行
    print("爬蟲任務(wù)完成.")
def run_mapping_logic():
    print("執(zhí)行漏洞映射任務(wù)...")
    # 在這里調(diào)用 map_vulnerabilities_for_all_firewalls 或相關(guān)函數(shù)
    # map_vulnerabilities_for_all_firewalls()
    # 推遲導(dǎo)入爬蟲函數(shù),避免循環(huán)引用
    time.sleep(5) # 模擬任務(wù)執(zhí)行
    print("漏洞映射任務(wù)完成.")
@shared_task
def run_crawler_task():
    """Celery task for running the web crawler."""
    # 確保 Django 環(huán)境已設(shè)置 (如果任務(wù)需要訪問 Django 模型)
    # 如果 Celery worker 和 Django 運(yùn)行在同一環(huán)境,通常不需要手動(dòng)設(shè)置
    # 但為了保險(xiǎn)起見,可以加上
    # if not django.apps.apps.ready:
    #     os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'firewall_monitor.settings')
    #     django.setup()
    logger.info("漏洞爬蟲任務(wù)開始執(zhí)行")
    run_crawler_logic()
    logger.info("漏洞爬蟲任務(wù)完成")
    return "漏洞爬蟲任務(wù)成功執(zhí)行"
@shared_task
def run_firewall_mapping_task():
    """Celery task for running the firewall vulnerability mapping."""
    # 同上,確保 Django 環(huán)境
    # if not django.apps.apps.ready:
    #     os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'firewall_monitor.settings')
    #     django.setup()
    logger.info("防火墻漏洞映射任務(wù)開始執(zhí)行")
    run_mapping_logic() 
    logger.info("防火墻漏洞映射任務(wù)完成")
    return "防火墻漏洞映射任務(wù)成功執(zhí)行" 

2、新建celery.py

import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
app = Celery('firewall_monitor')
# 使用 Django settings.py 中的配置
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自動(dòng)發(fā)現(xiàn)應(yīng)用中的任務(wù)(最好指定tasks路徑)
app.autodiscover_tasks(['mysite.tasks'])
@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

(3) __init__.py添加

from .celery import app as celery_app
__all__ = ('celery_app',)

四、celery任務(wù)啟動(dòng)

# 1、Redis-x64-5.0.14.1 window版本
# redis啟動(dòng)
redis-server.exe redis.windows.conf
# 2、啟動(dòng)djiango項(xiàng)目
python manage.py runserver 8000
# 3、celery beat啟動(dòng)
celery -A mysite beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
# 4、celery 任務(wù)啟動(dòng)
celery -A mysite worker --pool=solo -l info  -P eventlet

五、celery任務(wù)配置

啟動(dòng)djiango項(xiàng)目后,在Django-Admin后臺(tái)配置定時(shí)任務(wù)。

六、celery項(xiàng)目結(jié)構(gòu)

到此這篇關(guān)于django celery定時(shí)任務(wù)的文章就介紹到這了,更多相關(guān)django celery定時(shí)任務(wù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Python ORM框架SQLAlchemy學(xué)習(xí)筆記之安裝和簡單查詢實(shí)例

    Python ORM框架SQLAlchemy學(xué)習(xí)筆記之安裝和簡單查詢實(shí)例

    這篇文章主要介紹了Python ORM框架SQLAlchemy學(xué)習(xí)筆記之安裝和簡單查詢實(shí)例,簡明入門教程,需要的朋友可以參考下
    2014-06-06
  • Python3中的算術(shù)運(yùn)算符詳解

    Python3中的算術(shù)運(yùn)算符詳解

    這篇文章主要介紹了Python3中的算術(shù)運(yùn)算符詳解,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下
    2022-09-09
  • flask結(jié)合jinja2使用詳解

    flask結(jié)合jinja2使用詳解

    本文主要介紹了flask結(jié)合jinja2使用詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-03-03
  • python輸出后面多一個(gè)None問題

    python輸出后面多一個(gè)None問題

    在Python中,函數(shù)如果沒有顯式指定返回值,會(huì)默認(rèn)返回`None`,例如,計(jì)算一個(gè)數(shù)的平方根并輸出,如果沒有處理`None`,會(huì)輸出結(jié)果后跟`None`
    2024-11-11
  • Python中的CURL PycURL使用例子

    Python中的CURL PycURL使用例子

    這篇文章主要介紹了Python中的CURL PycURL使用例子,需要的朋友可以參考下
    2014-06-06
  • 教你用Python為二年級(jí)的學(xué)生批量生成數(shù)學(xué)題

    教你用Python為二年級(jí)的學(xué)生批量生成數(shù)學(xué)題

    這兩天在學(xué)習(xí)pthon,正好遇到老師布置的暑假作業(yè),需要家長給還在出試卷,下面這篇文章主要給大家介紹了關(guān)于如何用Python為二年級(jí)的學(xué)生批量生成數(shù)學(xué)題的相關(guān)資料,需要的朋友可以參考下
    2023-02-02
  • python3 enum模塊的應(yīng)用實(shí)例詳解

    python3 enum模塊的應(yīng)用實(shí)例詳解

    這篇文章主要介紹了python3 enum模塊的應(yīng)用 ,文中提到了字典類型的缺點(diǎn)及特點(diǎn),本文通過實(shí)例代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2019-08-08
  • Python解決IndexError: list index out of range問題的三種方法

    Python解決IndexError: list index out of&nb

    IndexError是一種常見的異常類型,它通常發(fā)生在嘗試訪問列表(list)中不存在的索引時(shí),錯(cuò)誤信息“IndexError: list index out of range”意味著你試圖訪問的列表索引超出了列表的實(shí)際范圍,所以本文給大家介紹了Python成功解決IndexError: list index out of range
    2024-05-05
  • Python爬取網(wǎng)易云音樂熱門評(píng)論

    Python爬取網(wǎng)易云音樂熱門評(píng)論

    本文將詳細(xì)介紹了Python獲取網(wǎng)易云音樂熱門評(píng)論的實(shí)例。具有很好的參考價(jià)值,下面跟著小編一起來看下吧
    2017-03-03
  • Python中用max()方法求最大值的介紹

    Python中用max()方法求最大值的介紹

    這篇文章主要介紹了Python中用max()方法求最大值的介紹,是Python入門中的基礎(chǔ)知識(shí),需要的朋友可以參考下
    2015-05-05

最新評(píng)論