亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Django中celery的使用項(xiàng)目實(shí)例

 更新時(shí)間:2022年07月06日 16:01:55   作者:寵乖儀  
Celery是?個(gè) 基于python開發(fā)的分布式異步消息任務(wù)隊(duì)列,通過它可以輕松的實(shí)現(xiàn)任務(wù)的異步處理,下面這篇文章主要給大家介紹了關(guān)于Django中celery使用的相關(guān)資料,需要的朋友可以參考下

1、django應(yīng)用Celery

django框架請求/響應(yīng)的過程是同步的,框架本身無法實(shí)現(xiàn)異步響應(yīng)。

但是我們在項(xiàng)目過程中會經(jīng)常會遇到一些耗時(shí)的任務(wù), 比如:發(fā)送郵件、發(fā)送短信、大數(shù)據(jù)統(tǒng)計(jì)等等,這些操作耗時(shí)長,同步執(zhí)行對用戶體驗(yàn)非常不友好,那么在這種情況下就需要實(shí)現(xiàn)異步執(zhí)行。

異步執(zhí)行前端一般使用ajax,后端使用Celery。

2 、項(xiàng)目應(yīng)用

django項(xiàng)目應(yīng)用celery,主要有兩種任務(wù)方式,一是異步任務(wù)(發(fā)布者任務(wù)),一般是web請求,二是定時(shí)任務(wù)。

celery組成

Celery是由Python開發(fā)、簡單、靈活、可靠的分布式任務(wù)隊(duì)列,是一個(gè)處理異步任務(wù)的框架,其本質(zhì)是生產(chǎn)者消費(fèi)者模型,生產(chǎn)者發(fā)送任務(wù)到消息隊(duì)列,消費(fèi)者負(fù)責(zé)處理任務(wù)。Celery側(cè)重于實(shí)時(shí)操作,但對調(diào)度支持也很好,其每天可以處理數(shù)以百萬計(jì)的任務(wù)。特點(diǎn):

簡單:熟悉celery的工作流程后,配置使用簡單

高可用:當(dāng)任務(wù)執(zhí)行失敗或執(zhí)行過程中發(fā)生連接中斷,celery會自動(dòng)嘗試重新執(zhí)行任務(wù)

快速:一個(gè)單進(jìn)程的celery每分鐘可處理上百萬個(gè)任務(wù)

靈活:幾乎celery的各個(gè)組件都可以被擴(kuò)展及自定制

Celery由三部分構(gòu)成:

消息中間件(Broker):官方提供了很多備選方案,支持RabbitMQ、Redis、Amazon SQS、MongoDB、Memcached 等,官方推薦RabbitMQ

任務(wù)執(zhí)行單元(Worker):任務(wù)執(zhí)行單元,負(fù)責(zé)從消息隊(duì)列中取出任務(wù)執(zhí)行,它可以啟動(dòng)一個(gè)或者多個(gè),也可以啟動(dòng)在不同的機(jī)器節(jié)點(diǎn),這就是其實(shí)現(xiàn)分布式的核心

結(jié)果存儲(Backend):官方提供了諸多的存儲方式支持:RabbitMQ、 Redis、Memcached,SQLAlchemy, Django ORM、Apache Cassandra、Elasticsearch等

架構(gòu)如下:

工作原理:

任務(wù)模塊Task包含異步任務(wù)和定時(shí)任務(wù)。其中,異步任務(wù)通常在業(yè)務(wù)邏輯中被觸發(fā)并發(fā)往消息隊(duì)列,而定時(shí)任務(wù)由Celery Beat進(jìn)程周期性地將任務(wù)發(fā)往消息隊(duì)列;

任務(wù)執(zhí)行單元Worker實(shí)時(shí)監(jiān)視消息隊(duì)列獲取隊(duì)列中的任務(wù)執(zhí)行;

Woker執(zhí)行完任務(wù)后將結(jié)果保存在Backend中;

本文使用的是redis數(shù)據(jù)庫作為消息中間件和結(jié)果存儲數(shù)據(jù)庫

1.異步任務(wù)redis

1.安裝庫

pip install celery
pip install redis

2.celery.py

在主項(xiàng)目目錄下,新建 celery.py 文件:

import os
import django
from celery import Celery
from django.conf import settings
 
# 設(shè)置系統(tǒng)環(huán)境變量,安裝django,必須設(shè)置,否則在啟動(dòng)celery時(shí)會報(bào)錯(cuò)
# celery_study 是當(dāng)前項(xiàng)目名
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_study.settings')
django.setup()
 
celery_app = Celery('celery_study')
celery_app.config_from_object('django.conf:settings')
celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

注意:是和settings.py文件同目錄,一定不能建立在項(xiàng)目根目錄,不然會引起 celery 這個(gè)模塊名的命名沖突  

同時(shí),在主項(xiàng)目的init.py中,添加如下代碼:

from .celery import celery_app
 
__all__ = ['celery_app']

3.settings.py

在配置文件中配置對應(yīng)的redis配置:

# Broker配置,使用Redis作為消息中間件
BROKER_URL = 'redis://127.0.0.1:6379/0' 
 
# BACKEND配置,這里使用redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' 
 
# 結(jié)果序列化方案
CELERY_RESULT_SERIALIZER = 'json' 
 
# 任務(wù)結(jié)果過期時(shí)間,秒
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 
 
# 時(shí)區(qū)配置
CELERY_TIMEZONE='Asia/Shanghai'   
 
# 指定導(dǎo)入的任務(wù)模塊,可以指定多個(gè)
#CELERY_IMPORTS = (     
#    'other_dir.tasks',
#)

注意:所有配置的官方文檔Configuration and defaults — Celery 5.2.0b3 documentation  

4.tasks.py

在子應(yīng)用下建立各自對應(yīng)的任務(wù)文件tasks.py(必須是tasks.py這個(gè)名字,不允許修改)

from celery import shared_task
 
@shared_task
def add(x, y):
    return x + y
 
@shared_task
def mul(x, y):
    return x * y
 
@shared_task
def xsum(numbers):
    return sum(numbers)

5.調(diào)用任務(wù)

from .tasks import *
# Create your views here.
 
def task_add_view(request):
    add.delay(100,200)
    return HttpResponse(f'調(diào)用函數(shù)結(jié)果')

6.啟動(dòng)celery

pip install eventlet
celery  -A celery_study worker  -l debug -P eventlet

注意 :celery_study是項(xiàng)目名

使用redis時(shí),有可能會出現(xiàn)如下類似的異常

AttributeError: 'str' object has no attribute 'items'

這是由于版本差異,需要卸載已經(jīng)安裝的python環(huán)境中的 redis 庫,重新指定安裝特定版本(celery4.x以下適用 redis2.10.6, celery4.3以上使用redis3.2.0以上):

xxxxxxxxxx pip install redis==2.10.6

7.獲取任務(wù)結(jié)果

在 views.py 中,通過 AsyncResult.get() 獲取結(jié)果

from celery import result
def get_result_by_taskid(request):
    task_id = request.GET.get('task_id')
	# 異步執(zhí)行
    ar = result.AsyncResult(task_id)
 
    if ar.ready():
        return JsonResponse({'status': ar.state, 'result': ar.get()})
    else:
        return JsonResponse({'status': ar.state, 'result': ''})

AsyncResult類的常用的屬性和方法:

  • state: 返回任務(wù)狀態(tài),等同status;
  • task_id: 返回任務(wù)id;
  • result: 返回任務(wù)結(jié)果,同get()方法;
  • ready(): 判斷任務(wù)是否執(zhí)行以及有結(jié)果,有結(jié)果為True,否則False;
  • info(): 獲取任務(wù)信息,默認(rèn)為結(jié)果;
  • wait(t): 等待t秒后獲取結(jié)果,若任務(wù)執(zhí)行完畢,則不等待直接獲取結(jié)果,若任務(wù)在執(zhí)行中,則wait期間一直阻塞,直到超時(shí)報(bào)錯(cuò);
  • successful(): 判斷任務(wù)是否成功,成功為True,否則為False;

2.定時(shí)任務(wù)

在第一步的異步任務(wù)的基礎(chǔ)上,進(jìn)行部分修改即可

1.settings.py

from celery.schedules import crontab
 
CELERYBEAT_SCHEDULE = {
    'mul_every_30_seconds': {
         # 任務(wù)路徑
        'task': 'celery_app.tasks.mul',
         # 每30秒執(zhí)行一次
        'schedule': 5,
        'args': (14, 5)
    }
}

說明(更多內(nèi)容見文檔:Periodic Tasks — Celery 5.2.0b3 documentation):

  • task:任務(wù)函數(shù)
  • schedule:執(zhí)行頻率,可以是整型(秒數(shù)),也可以是timedelta對象,也可以是crontab對象,也可以是自定義類(繼承celery.schedules.schedule)
  • args:位置參數(shù),列表或元組
  • kwargs:關(guān)鍵字參數(shù),字典
  • options:可選參數(shù),字典,任何 apply_async() 支持的參數(shù)
  • relative:默認(rèn)是False,取相對于beat的開始時(shí)間;設(shè)置為True,則取設(shè)置的timedelta時(shí)間

在task.py中設(shè)置了日志

from celery import shared_task
import logging  
logger = logging.getLogger(__name__))
 
 
@shared_task
def mul(x, y):
    logger.info('___mul__'*10)
    return x * y

2.啟動(dòng)celery

(兩個(gè)cmd)分別啟動(dòng)worker和beat

celery -A worker celery_study -l debug -P eventlet
celery beat -A celery_study -l debug

3.任務(wù)綁定

Celery可通過task綁定到實(shí)例獲取到task的上下文,這樣我們可以在task運(yùn)行時(shí)候獲取到task的狀態(tài),記錄相關(guān)日志等

方法:

  • 在裝飾器中加入?yún)?shù) bind=True
  • 在task函數(shù)中的第一個(gè)參數(shù)設(shè)置為self

在task.py 里面寫

from celery import shared_task
import logging  
logger = logging.getLogger(__name__)
 
 
# 任務(wù)綁定
@shared_task(bind=True)
def add(self,x, y):
    logger.info('add__-----'*10)
    logger.info('name:',self.name)
    logger.info('dir(self)',dir(self))
    return x + y

其中:self對象是celery.app.task.Task的實(shí)例,可以用于實(shí)現(xiàn)重試等多種功能

from celery import shared_task
import logging  
logger = logging.getLogger(__name__)
 
 
# 任務(wù)綁定
@shared_task(bind=True)
def add(self,x, y):
    try:
        logger.info('add__-----'*10)
        logger.info('name:',self.name)
        logger.info('dir(self)',dir(self))
        raise Exception
    except Exception as e:
        # 出錯(cuò)每4秒嘗試一次,總共嘗試4次
        self.retry(exc=e, countdown=4, max_retries=4)    
    return x + y

啟動(dòng)celery

celery -A worker celery_study -l debug -P eventlet

4.任務(wù)鉤子

Celery在執(zhí)行任務(wù)時(shí),提供了鉤子方法用于在任務(wù)執(zhí)行完成時(shí)候進(jìn)行對應(yīng)的操作,在Task源碼中提供了很多狀態(tài)鉤子函數(shù)如:on_success(成功后執(zhí)行)、on_failure(失敗時(shí)候執(zhí)行)、on_retry(任務(wù)重試時(shí)候執(zhí)行)、after_return(任務(wù)返回時(shí)候執(zhí)行)

方法:通過繼承Task類,重寫對應(yīng)方法即可,

from celery import Task
 
class MyHookTask(Task):
 
    def on_success(self, retval, task_id, args, kwargs):
        logger.info(f'task id:{task_id} , arg:{args} , successful !')
 
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.info(f'task id:{task_id} , arg:{args} , failed ! erros: {exc}')
 
    def on_retry(self, exc, task_id, args, kwargs, einfo):
        logger.info(f'task id:{task_id} , arg:{args} , retry !  erros: {exc}')
 
# 在對應(yīng)的task函數(shù)的裝飾器中,通過 base=MyHookTask 指定
@shared_task(base=MyHookTask, bind=True)
def add(self,x, y):
    logger.info('add__-----'*10)
    logger.info('name:',self.name)
    logger.info('dir(self)',dir(self))
    return x + y

啟動(dòng)celery

celery -A worker celery_study -l debug -P eventlet

5.任務(wù)編排

在很多情況下,一個(gè)任務(wù)需要由多個(gè)子任務(wù)或者一個(gè)任務(wù)需要很多步驟才能完成,Celery也能實(shí)現(xiàn)這樣的任務(wù),完成這類型的任務(wù)通過以下模塊完成:

  • group: 并行調(diào)度任務(wù)
  • chain: 鏈?zhǔn)饺蝿?wù)調(diào)度
  • chord: 類似group,但分header和body2個(gè)部分,header可以是一個(gè)group任務(wù),執(zhí)行完成后調(diào)用body的任務(wù)
  • map: 映射調(diào)度,通過輸入多個(gè)入?yún)矶啻握{(diào)度同一個(gè)任務(wù)
  • starmap: 類似map,入?yún)㈩愃疲猘rgs
  • chunks: 將任務(wù)按照一定數(shù)量進(jìn)行分組

文檔:Next Steps — Celery 5.2.0b3 documentation

1.group

urls.py:

path('primitive/', views.test_primitive),

views.py:

from .tasks import *
from celery import group
 
def test_primitive(request):
    # 創(chuàng)建10個(gè)并列的任務(wù)
    lazy_group = group(add.s(i, i) for i in range(10))
    promise = lazy_group()
    result = promise.get()
    return JsonResponse({'function': 'test_primitive', 'result': result})

說明:

通過task函數(shù)的 s 方法傳入?yún)?shù),啟動(dòng)任務(wù)

上面這種方法需要進(jìn)行等待,如果依然想實(shí)現(xiàn)異步的方式,那么就必須在tasks.py中新建一個(gè)task方法,調(diào)用group,示例如下:

tasks.py:

@shared_task
def group_task(num):
    return group(add.s(i, i) for i in range(num))().get()

urls.py:

path('first_group/', views.first_group),

views.py:

def first_group(request):
    ar = tasks.group_task.delay(10)
 
    return HttpResponse('返回first_group任務(wù),task_id:' + ar.task_id)

2.chain

默認(rèn)上一個(gè)任務(wù)的結(jié)果作為下一個(gè)任務(wù)的第一個(gè)參數(shù)

def test_primitive(request):
    # 等同調(diào)用  mul(add(add(2, 2), 5), 8)
    promise = chain(tasks.add.s(2, 2), tasks.add.s(5), tasks.mul.s(8))()
    #  72
    result = promise.get()  
    return JsonResponse({'function': 'test_primitive', 'result': result})

3.chord

任務(wù)分割,分為header和body兩部分,hearder任務(wù)執(zhí)行完在執(zhí)行body,其中hearder返回結(jié)果作為參數(shù)傳遞給body

def test_primitive(request):
    # header:  [3, 12] 
    # body: xsum([3, 12])
    promise = chord(header=[tasks.add.s(1,2),tasks.mul.s(3,4)],body=tasks.xsum.s())()
    result = promise.get()
    return JsonResponse({'function': 'test_primitive', 'result': result})

6、celery管理和監(jiān)控

celery通過flower組件實(shí)現(xiàn)管理和監(jiān)控功能 ,flower組件不僅僅提供監(jiān)控功能,還提供HTTP API可實(shí)現(xiàn)對woker和task的管理

官網(wǎng):flower · PyPI

文檔:Flower - Celery monitoring tool — Flower 1.0.1 documentation

安裝flower

pip install flower

啟動(dòng)flower

flower -A celery_study --port=5555   

說明:

  • -A:項(xiàng)目名
  • --port: 端口號

訪問

在瀏覽器輸入:http://127.0.0.1:5555

通過api操作

curl http://127.0.0.1:5555/api/workers

總結(jié)

到此這篇關(guān)于Django中celery使用項(xiàng)目的文章就介紹到這了,更多相關(guān)Django中celery使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • python中的type,元類,類,對象用法

    python中的type,元類,類,對象用法

    這篇文章主要介紹了python中的type,元類,類,對象用法,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-05-05
  • Python文件操作函數(shù)用法實(shí)例詳解

    Python文件操作函數(shù)用法實(shí)例詳解

    這篇文章主要介紹了Python文件操作函數(shù)用法實(shí)例詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-12-12
  • Django中session登錄驗(yàn)證操作指南

    Django中session登錄驗(yàn)證操作指南

    本文介紹了如何使用Django中的session登錄驗(yàn)證來保護(hù)網(wǎng)站的安全性。在此過程中,我們首先介紹了Django的認(rèn)證架構(gòu)和基本概念,然后我們深入探討了如何使用session實(shí)現(xiàn)登錄驗(yàn)證功能。最后,我們解釋了如何創(chuàng)建一個(gè)Custom?User?Model,以及如何使用它來自定義用戶對象。
    2023-04-04
  • python time.sleep()是睡眠線程還是進(jìn)程

    python time.sleep()是睡眠線程還是進(jìn)程

    這篇文章主要介紹了python time.sleep()是睡眠線程還是進(jìn)程,通過實(shí)例代碼給大家介紹了Python Sleep休眠函數(shù) ,需要的朋友可以參考下
    2019-07-07
  • 淺談如何使用Python控制手機(jī)(一)

    淺談如何使用Python控制手機(jī)(一)

    這篇文章主要為大家介紹了如何使用Python控制手機(jī),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
    2021-11-11
  • Django ModelForm操作及驗(yàn)證方式

    Django ModelForm操作及驗(yàn)證方式

    這篇文章主要介紹了Django ModelForm操作及驗(yàn)證方式,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-03-03
  • Python中Matplotlib的簡單使用

    Python中Matplotlib的簡單使用

    這篇文章主要介紹了Python中Matplotlib的簡單使用,Matplotlib是一個(gè)用于繪制數(shù)據(jù)可視化圖形的Python庫,支持繪制各種靜態(tài),動(dòng)態(tài),交互式的圖表,它是數(shù)據(jù)科學(xué)和機(jī)器學(xué)習(xí)領(lǐng)域最流行的可視化庫之一,需要的朋友可以參考下
    2023-07-07
  • python過濾字符串中不屬于指定集合中字符的類實(shí)例

    python過濾字符串中不屬于指定集合中字符的類實(shí)例

    這篇文章主要介紹了python過濾字符串中不屬于指定集合中字符的類,涉及Python針對字符串與集合的相關(guān)操作技巧,需要的朋友可以參考下
    2015-06-06
  • 基于Python的關(guān)鍵字監(jiān)控及告警

    基于Python的關(guān)鍵字監(jiān)控及告警

    這篇文章主要為大家詳細(xì)介紹了基于Python的關(guān)鍵字監(jiān)控及告警,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-07-07
  • 詳解Python實(shí)現(xiàn)多進(jìn)程異步事件驅(qū)動(dòng)引擎

    詳解Python實(shí)現(xiàn)多進(jìn)程異步事件驅(qū)動(dòng)引擎

    本篇文章主要介紹了詳解Python實(shí)現(xiàn)多進(jìn)程異步事件驅(qū)動(dòng)引擎,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-08-08

最新評論