亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Python實現(xiàn)定時任務利器之a(chǎn)pscheduler使用詳解

 更新時間:2022年10月11日 14:52:01   作者:芥末拌飯  
在Python中,還可以用第三方包來管理定時任務,比如celery、apscheduler。相對來說apscheduler使用起來更簡單一些,這里來介紹一下apscheduler的使用方法

前言

之前有介紹了用Linux crontab的方式來實現(xiàn)定時任務,這是使用Linux內(nèi)置模塊來實現(xiàn)的。而在Python中,還可以用第三方包來管理定時任務,比如celery、apscheduler。相對來說apscheduler使用起來更簡單一些,這里來介紹一下apscheduler的使用方法。

首先安裝起來很簡單,運行pip install apscheduler即可。

初識apscheduler

來個簡單的例子看看apscheduler是如何使用的。

#encoding:utf-8

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime

def sch_test():
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('時間:{}, 測試apscheduler'.format(now))

task = BlockingScheduler()
task.add_job(func=sch_test, trigger='cron', second='*/10')
task.start()

上述例子很簡單,我們首先要定義一個apscheduler的對象,然后add_job添加任務,最后start開啟任務就行了。

例子是每隔10秒運行一次sch_test任務,運行結果如下:

時間:2022-10-08 15:16:30, 測試apscheduler
時間:2022-10-08 15:16:40, 測試apscheduler
時間:2022-10-08 15:16:50, 測試apscheduler
時間:2022-10-08 15:17:00, 測試apscheduler

如果我們要在執(zhí)行任務函數(shù)時攜帶參數(shù),只要在add_job函數(shù)中添加args就行,比如task.add_job(func=sch_test, args=('a'), trigger='cron', second='*/10')。

apscheduler有哪些模塊

上面例子中我們初步了解到如何使用apschedulerl了,接下來需要知道apscheduler的設計框架。apscheduler有四個主要模塊,分別是:觸發(fā)器triggers任務存儲器job_stores、執(zhí)行器executors、調(diào)度器schedulers。

1. 觸發(fā)器triggers:

觸發(fā)器指的是任務指定的觸發(fā)方式,例子中我們用的是“cron”方式。我們可以選擇cron、date、interval中的一個。

1.cron表示的是定時任務,類似linux crontab,在指定的時間觸發(fā)。

可用參數(shù)如下:

參數(shù)釋義
year年份(4位數(shù),如2022)
month月份(1-12)
day一個月的第幾天(1-31)
week一年的第幾周(1-53)
day_of_week一星期的第幾天(0-6)
hour小時
minute分鐘
second
start_date開始時間
end_date結束時間
timezone時區(qū)
jitter觸發(fā)的誤差時間

除此之外,我們還可用表達式類型去設置cron。比如常用的有:

表達式釋義
*每個值都觸發(fā)
*/n每隔n觸發(fā)一次
a-b在a-b內(nèi)任何時間都觸發(fā)
a,b,c分別在a,b,c時間觸發(fā)

使用方法示例,在每天7點20分執(zhí)行一次:

task.add_job(func=sch_test, args=('定時任務',), trigger='cron',

hour='7', minute='20')

2.date表示具體到某個時間的一次性任務;

使用方法示例:

# 使用run_date指定運行時間
task.add_job(func='sch_test', trigger='date', run_date=datetime.datetime(2022 ,10 , 8, 16, 1, 30))
# 或者用next_run_time
task.add_job(func=sch_test,trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))

3.interval表示的是循環(huán)任務,指定一個間隔時間,每過間隔時間執(zhí)行一次。

interval可設置如下的參數(shù):

參數(shù)釋義
weeks
days一個月的第幾天
hours小時
minutes分鐘
seconds
start_date間隔觸發(fā)的開始時間
end_date間隔觸發(fā)的結束時間
jitter觸發(fā)的時間誤差

使用方法示例,每隔3秒執(zhí)行一次sch_test任務:

task.add_job(func=sch_test, args=('循環(huán)任務',), trigger='interval', seconds=3)。

來個例子把3種觸發(fā)器都使用一遍:

# encoding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime

def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('時間:{}, {}測試apscheduler'.format(now, job_type))

task = BlockingScheduler()
task.add_job(func=sch_test, args=('一次性任務',),trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))
task.add_job(func=sch_test, args=('定時任務',), trigger='cron', second='*/5')
task.add_job(func=sch_test, args=('循環(huán)任務',), trigger='interval', seconds=3)
task.start()

打印部分結果:

時間:2022-10-08 15:45:49, 一次性任務測試apscheduler
時間:2022-10-08 15:45:49, 循環(huán)任務測試apscheduler
時間:2022-10-08 15:45:50, 定時任務測試apscheduler
時間:2022-10-08 15:45:52, 循環(huán)任務測試apscheduler
時間:2022-10-08 15:45:55, 定時任務測試apscheduler
時間:2022-10-08 15:45:55, 循環(huán)任務測試apscheduler
時間:2022-10-08 15:45:58, 循環(huán)任務測試apscheduler

通過代碼示例和結果展示,我們可清晰的知道不同觸發(fā)器的使用區(qū)別。

2. 任務存儲器job_stores

顧名思義,任務存儲器是存儲任務的地方,默認都是存儲在內(nèi)存中。我們也可自定義存儲方式,比如將任務存到mysql中。這里有以下幾種選擇:

存儲器類型釋義
MemoryJobStore任務存儲在內(nèi)存中
SQLAlchemyJobStore使用sqlalchemy作為存儲方式,存儲在數(shù)據(jù)庫
MongoDBJobStore存儲在mongodb中
RedisJobStore存儲在redis中

通常默認存儲在內(nèi)存即可,但若程序故障重啟的話,會重新拉取任務運行了,如果你對任務的執(zhí)行要求高,那么可以選擇其他的存儲器。

使用SQLAlchemyJobStore存儲器示例:

from apscheduler.schedulers.blocking import BlockingScheduler

def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('時間:{}, {}測試apscheduler'.format(now, job_type))

sched = BlockingScheduler()
# 使用mysql存儲任務
sql_url = 'mysql+pymysql://root:root@localhost:3306/db_name?charset=utf8'
sched.add_jobstore('sqlalchemy',url=sql_url)
# 添加任務
sched.add_job(func=sch_test, args=('定時任務',), trigger='cron', second='*/5')
sched.start()

3. 執(zhí)行器executors

執(zhí)行器的功能就是將任務放到線程池或進程池中運行。有以下幾種選擇:

執(zhí)行器類型釋義
ThreadPoolExecutor線程池執(zhí)行器
ProcessPoolExecutor進程池執(zhí)行器
GeventExecutorGevent 程序執(zhí)行器
TornadoExecutorTornado 程序執(zhí)行器
TwistedExecutorTwisted 程序執(zhí)行器
AsyncIOExecutorasyncio 程序執(zhí)行器

默認是ThreadPoolExecutor, 常用的也就是第線程和進程池執(zhí)行器。如果應用是CPU密集型操作,可用ProcessPoolExecutor來執(zhí)行。

4. 調(diào)度器schedulers

調(diào)度器屬于apscheduler的核心,它扮演著統(tǒng)籌整個apscheduler系統(tǒng)的角色,存儲器、執(zhí)行器、觸發(fā)器在它的調(diào)度下正常運行。調(diào)度器有以下幾個:

調(diào)度器使用場景
BlockingScheduler當調(diào)度器是你應用中唯一要運行的,start開啟后會阻塞
BackgroundScheduler適用于調(diào)度程序在應用程序的后臺運行,start開啟后不會阻塞
AsyncIOScheduler當程序使用了asyncio的異步框架時使用。
GeventScheduler當程序用了Tornado的時候用
TwistedScheduler當程序用了Twisted的時候用
QtScheduler當應用是QT應用的時候用

不是特定場景下,我們最常用的是BlockingScheduler調(diào)度器。

異常監(jiān)聽

定時任務在運行時,若出現(xiàn)錯誤,需要設置監(jiān)聽機制,我們通常結合logging模塊記錄錯誤信息。

使用示例:

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
import logging

# logging日志配置打印格式及保存位置
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='sche.log',
                    filemode='a')

def log_listen(event):
	if event.exception :
		print ( '任務出錯,報錯信息:{}'.format(event.exception))
	else:
		print ( '任務正常運行...' )

def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('時間:{}, {}測試apscheduler'.format(now, job_type))
    print(1/0)

sched = BlockingScheduler()
# 使用mysql存儲任務
sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'
sched.add_jobstore('sqlalchemy',url=sql_url)
# 添加任務
sched.add_job(func=sch_test, args=('定時任務',), trigger='cron', second='*/5')

# 配置任務執(zhí)行完成及錯誤時的監(jiān)聽
sched.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# 配置日志監(jiān)聽
sched._logger = logging

sched.start()

apscheduler的封裝使用

上面介紹了apscheduler框架的主要模塊,我們基本能掌握怎樣使用apscheduler了。下面就來封裝一下apscheduler吧,以后要用直接在這份代碼上修改就行了。

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
import logging
import logging.handlers
import os
import datetime


class LoggerUtils():
    def init_logger(self, logger_name):
        # 日志格式
        formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
        log_obj = logging.getLogger(logger_name)
        log_obj.setLevel(logging.INFO)
        # 設置log存儲位置
        path = '/data/logs/'
        filename = '{}{}.log'.format(path, logger_name)
        if not os.path.exists(path):
            os.makedirs(path)
        # 設置日志按照時間分割
        timeHandler = logging.handlers.TimedRotatingFileHandler(
           filename,
           when='D',  # 按照什么維度切割, S:秒,M:分,H:小時,D:天,W:周
           interval=1, # 多少天切割一次
           backupCount=10  # 保留幾天
        )
        timeHandler.setLevel(logging.INFO)
        timeHandler.setFormatter(formatter)
        log_obj.addHandler(timeHandler)
        return log_obj


class Scheduler(LoggerUtils):
    def __init__(self):
        # 執(zhí)行器設置
        executors = {
            'default': ThreadPoolExecutor(10),  # 設置一個名為“default”的ThreadPoolExecutor,其worker值為10
            'processpool': ProcessPoolExecutor(5)  # 設置一個名為“processpool”的ProcessPoolExecutor,其worker值為5
        }
        self.scheduler = BlockingScheduler(timezone="Asia/Shanghai", executors=executors)
        # 存儲器設置
        # 這里使用sqlalchemy存儲器,將任務存儲在mysql
        sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'
        self.scheduler.add_jobstore('sqlalchemy',url=sql_url)

        def log_listen(event):
            if event.exception:
                # 日志記錄
                self.scheduler._logger.error(event.traceback)
    
        # 配置任務執(zhí)行完成及錯誤時的監(jiān)聽
        self.scheduler.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
        # 配置日志監(jiān)聽
        self.scheduler._logger = self.init_logger('sche_test')

    def add_job(self, *args, **kwargs):
        """添加任務"""
        self.scheduler.add_job(*args, **kwargs)

    def start(self):
        """開啟任務"""
        self.scheduler.start()

# 測試任務
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('時間:{}, {}測試apscheduler'.format(now, job_type))
    print(1/0)


# 添加任務,開啟任務
sched = Scheduler()
# 添加任務
sched.add_job(func=sch_test, args=('定時任務',), trigger='cron', second='*/5')
# 開啟任務
sched.start()

小結

這篇文章介紹了Python實現(xiàn)定時任務的又一利器apscheduler,通過簡單例子及apscheduler框架的主要模塊分解,我們可以根據(jù)實際需求配置好模塊信息,再結合logging模塊,我們可以實時監(jiān)控到定時任務的運行情況。

以上就是Python實現(xiàn)定時任務利器之a(chǎn)pscheduler使用詳解的詳細內(nèi)容,更多關于Python apscheduler定時任務的資料請關注腳本之家其它相關文章!

相關文章

最新評論