Python?Apschedule定時(shí)任務(wù)框架的用法詳解
一: 安裝
pip install apscheduler
二: 基本概念
1: 觸發(fā)器: 調(diào)度邏輯,描述任務(wù)何時(shí)被觸發(fā)。(日期觸發(fā),時(shí)間間隔,cronjob表達(dá)式)
2: 作業(yè)存儲器:指定作業(yè)存儲的位置,默認(rèn)是保存在內(nèi)存中,
3: 執(zhí)行器:將任務(wù)(函數(shù))提交到線程池或者進(jìn)程持中運(yùn)行,當(dāng)任務(wù)完成時(shí),通知調(diào)度器發(fā)生相應(yīng)的事件。
4:調(diào)度器:任務(wù)調(diào)度器,屬于控制角色,通過它配置作業(yè)存儲器、執(zhí)行器和觸發(fā)器,添加、修改和刪除任務(wù)。調(diào)度器協(xié)調(diào)觸發(fā)器、作業(yè)存儲器、執(zhí)行器的運(yùn)行,通
常只有一個(gè)調(diào)度程序運(yùn)行在應(yīng)用程序中,開發(fā)人員通常不需要直接處理作業(yè)存儲器、執(zhí)行器或觸發(fā)器,配置作業(yè)存儲器和執(zhí)行器是通過調(diào)度器來完成的。
三:基本案例
任務(wù)一: 每隔三秒鐘執(zhí)行一次。
from datetime import datetime import os # 1: 導(dǎo)入這個(gè)最簡單的調(diào)度器 from apscheduler.schedulers.blocking import BlockingScheduler # 2: 定義我們的job def tick(): print("Tick! The time is: %s" % datetime.now()) if __name__ == '__main__': # 3: 實(shí)例化BlockingScheduler調(diào)度器,沒有參數(shù)表示存儲器是:內(nèi)存 # 執(zhí)行器是線程池,默認(rèn)的線程并發(fā)數(shù)是10個(gè)。 scheduler = BlockingScheduler() # 4:調(diào)度器綁定任務(wù),并指定觸發(fā)器 # 觸發(fā)器:‘interval'表示間隔執(zhí)行, ‘date', 表示指定時(shí)間觸發(fā), ‘cron'表示固定時(shí)間間隔觸發(fā)。 scheduler.add_job(tick, 'interval', seconds=3) try: # 5:執(zhí)行任務(wù) scheduler.start() except Exception as e: print(e)
任務(wù)二: 每天固定時(shí)間執(zhí)行:
from datetime import datetime import os # 1: 導(dǎo)入這個(gè)最簡單的調(diào)度器 from apscheduler.schedulers.blocking import BlockingScheduler # 2: 定義我們的job def tick(): print("Tick! The time is: %s" % datetime.now()) if __name__ == '__main__': # 3: 實(shí)例化BlockingScheduler調(diào)度器,沒有參數(shù)表示存儲器是:內(nèi)存 # 執(zhí)行器是線程池,默認(rèn)的線程并發(fā)數(shù)是10個(gè)。 scheduler = BlockingScheduler() # 4:調(diào)度器綁定任務(wù),并指定觸發(fā)器 # 每天10點(diǎn)10分執(zhí)行 # scheduler.add_job(tick, trigger='cron', hour=10, minute=10) # 每天 10點(diǎn)10分,11點(diǎn)10分,12點(diǎn)10分執(zhí)行 scheduler.add_job(tick, trigger='cron', hour='10-12', minute=10) try: # 5:執(zhí)行任務(wù) scheduler.start() except Exception as e: print(e)
四: 調(diào)度器
1:調(diào)度器的執(zhí)行原理:
循環(huán)詢問作業(yè)存儲器,有沒有到期要執(zhí)行的任務(wù),如果有則計(jì)算運(yùn)行的時(shí)間點(diǎn)。
交給執(zhí)行器按照時(shí)間點(diǎn)運(yùn)行。
2:調(diào)度器的分類:
1: BlockingScheduler: 調(diào)用start后會(huì)阻塞主線程。
2:BackgroundScheduler: 調(diào)用start后默認(rèn)開啟守護(hù)線程,不會(huì)阻塞主線程。
3:AsyncIOScheduler: 與AsyncIO配合使用
4: GeventScheduler: 與Gevent配合使用
5: TwistedScheduler: 與Twisted配合使用
6: QtScheduler: 與Qt配合使用
五: 作業(yè)存儲器
內(nèi)存:程序崩潰,則重啟時(shí),重新加入任務(wù)。
數(shù)據(jù)庫:程序崩潰,重啟時(shí),恢復(fù)中斷的狀態(tài)。推薦使用:PostgreSQL
六: 執(zhí)行器
1: 線程池執(zhí)行器:默認(rèn)
2:進(jìn)程池執(zhí)行器:CPU密集型
3:線程池+進(jìn)程池執(zhí)行器:
七:多執(zhí)行器,存儲器,單調(diào)度器案例:
案例: 配置兩個(gè)存儲器:一個(gè)mongodb,一個(gè)sqlite。配置一個(gè)線程池執(zhí)行器和一個(gè)進(jìn)程池執(zhí)行器。
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor jobstorage = { "mongodb": MongoDBJobStore(), "default": SQLAlchemyJobStore(url='mongodb數(shù)據(jù)庫地址') } executes = { "default": ThreadPoolExecutor(20), "processpool": ProcessPoolExecutor(5) } job_default = { # coalesce默認(rèn)情況下關(guān)閉 'coalesce': False, # 作業(yè)的默認(rèn)最大運(yùn)行實(shí)例限制為3 "max_instances": 3 } scheduler = BackgroundScheduler(jobstores=jobstorage, executors=executes, job_defaults=job_default, timezone=utc)
八: 不同存儲器執(zhí)行案例
1: 內(nèi)存存儲
from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime def my_job(id= 'my_job'): print(id, '--->', datetime.now()) # 1: 定義任務(wù)存儲器為內(nèi)存,其實(shí)默認(rèn)的也是這個(gè) jobstorage = { "default": MemoryJobStore() } # 2:定義執(zhí)行器, 10進(jìn)程20線程執(zhí)行 execytors = { "default": ThreadPoolExecutor(20), "processpoll": ProcessPoolExecutor(10) } # 3:定義任務(wù)設(shè)置 job_defaults = { 'coalesce': False, 'max_instances': 3 } # 4: 實(shí)例化調(diào)度器 scheduler = BlockingScheduler(jobstorage=jobstorage, execytors=execytors, job_defaults=job_defaults) # 5: 給調(diào)度器增加任務(wù) # 每5分鐘執(zhí)行一次 scheduler.add_job(my_job, args=['job_interval'], id='job_interval', trigger='interval', seconds=5, replace_existing=True) # 截止到2021-10-25日前,每個(gè)4月到8月的每天7點(diǎn)到11點(diǎn),每10分鐘執(zhí)行一次 scheduler.add_job(my_job, args=['job_cron'], id='job_cron', trigger='cron', month='4-8,11-12', hour='7-11', second='*/10', end_date='2021-10-25') # 默認(rèn)的配置:立刻執(zhí)行一次 scheduler.add_job(my_job, args=['job_once_now'], id='job_once_now') # 某個(gè)具體節(jié)點(diǎn),執(zhí)行一次 scheduler.add_job(my_job, args=['job_date_once'], id='job_date_once', trigger='date', run_date='2021-08-05 07:48:05')
數(shù)據(jù)庫存儲:
上面代碼只需要修改存儲器即可:
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime def my_job(id= 'my_job'): print(id, '--->', datetime.now()) # 1: 定義任務(wù)存儲器為內(nèi)存,其實(shí)默認(rèn)的也是這個(gè) jobstorage = { "default": SQLAlchemyJobStore(url="數(shù)據(jù)庫地址") }
數(shù)據(jù)庫存儲存在的問題:
1:假如程序中斷了,則再次開啟的時(shí)候,調(diào)度器會(huì)將數(shù)據(jù)庫中沒有執(zhí)行的任務(wù)再次添加進(jìn)來。如果我們此時(shí)再次運(yùn)行程序,則優(yōu)惠追加進(jìn)來相同的任務(wù)。如何讓他不再加進(jìn)來呢?
在追加任務(wù)的時(shí)候增加配置項(xiàng):replace_existing=True
scheduler.add_job(my_job, args=['job_interval',],id='job_interval',trigger='interval',seconds=3,replace_existing=True)
2:如果程序錯(cuò)過了我們指定的時(shí)間,我們就不讓他運(yùn)行了則可以增加配置項(xiàng):misfire_grace_time
scheduler.add_job(my_job,args = ['job_cron',] ,id='job_cron',trigger='cron',month='4-8,11-12',hour='7-11',second='*/15',coalesce=True,misfire_grace_time=30,replace_existing=True,end_date='2018-05-30')
這里是設(shè)置如果時(shí)間相差超過30秒了,則這個(gè)任務(wù)就不執(zhí)行了。
九: 調(diào)度器的其他操作
scheduler.remove_job(job_id, jobstore=None) # 刪除作業(yè) scheduler.remove_all_jobs(jobstore=None) # 刪除所有作業(yè) scheduler.pause_job(job_id, jobstore=None) # 暫停作業(yè) scheduler.resume_job(job_id, jobstore=None) # 恢復(fù)作業(yè) scheduler.modify_job(job_id, jobstore=None, **changes) # 修改單個(gè)作業(yè)屬性信息 scheduler.reschedule_job(job_id, jobstore=None, trigger=None, **trigger_args) # 修改單個(gè)作業(yè)的觸發(fā)器并更新下次運(yùn)行時(shí)間 scheduler.print_jobs(jobstore=None, out=sys.stdout) # 輸出作業(yè)信息
十: 調(diào)度事件監(jiān)聽
問題1: 如果程序出現(xiàn)異常,會(huì)影響整個(gè)調(diào)度任務(wù)嗎?
from apscheduler.schedulers.blocking import BlockingScheduler import datetime def aps_test(x): print(1/0) print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=('定時(shí)任務(wù)',), trigger='cron', second='*/5') scheduler.start()
運(yùn)行這個(gè)程序會(huì)發(fā)現(xiàn)每5分鐘報(bào)一次錯(cuò)。
問題2:如果程序的一個(gè)任務(wù)出現(xiàn)異常,其余的任務(wù)能正常執(zhí)行嗎?
from apscheduler.schedulers.blocking import BlockingScheduler import datetime def aps_test(x): print(1/0) print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) def aps_test2(x): print('哈哈哈哈') scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=('定時(shí)任務(wù)',), trigger='cron', second='*/5') scheduler.add_job(func=aps_test2, args=('定時(shí)任務(wù)2',), trigger='cron', second='*/5') scheduler.start()
運(yùn)行發(fā)現(xiàn),任務(wù)一報(bào)錯(cuò),任務(wù)二仍然可以運(yùn)行。
設(shè)置日志記錄和事件監(jiān)聽:
from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR import datetime import logging # 1: 定義日志格式: # %(levelno)s 打印日志級別的數(shù)值 # %(levelname)s 打印日志級別名稱 # %(pathname)s 打印當(dāng)前執(zhí)行程序的路徑,其實(shí)就是sys.argv[0] # %(filename)s 打印當(dāng)前執(zhí)行程序名 # %(funcName)s 打印日志的當(dāng)前函數(shù) # %(lineno)d 打印日志的當(dāng)前行號 # %(asctime)s 打印日志的記錄時(shí)間 # %(thread)d 打印線程ID # %(threadName)s 打印線程的名稱 # %(process)d 打印進(jìn)程的ID # %(message)s 打印日志的信息 logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='log1.txt', filemode='a') # 正確的任務(wù) def aps_test(x): print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) # 出錯(cuò)的任務(wù) def date_test(x): print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) print(1 / 0) # 2: 設(shè)置監(jiān)聽器 def my_listener(event): if event.exception: print('任務(wù)出錯(cuò)了?。。。。。?) else: print('任務(wù)照常運(yùn)行...') scheduler = BlockingScheduler() scheduler.add_job(func=date_test, args=('一次性任務(wù),會(huì)出錯(cuò)',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id='date_task') # 每3秒執(zhí)行一次 scheduler.add_job(func=aps_test, args=('循環(huán)任務(wù)',), trigger='interval', seconds=3, id='interval_task') scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) scheduler._logger = logging scheduler.start()
以上就是Python Apschedule定時(shí)任務(wù)框架的用法詳解的詳細(xì)內(nèi)容,更多關(guān)于Python Apschedule定時(shí)任務(wù)的資料請關(guān)注腳本之家其它相關(guān)文章!
- python自動(dòng)定時(shí)任務(wù)schedule庫的使用方法
- Python apscheduler實(shí)現(xiàn)定時(shí)任務(wù)的方法詳解
- Python高效定時(shí)任務(wù)處理APScheduler庫深入學(xué)習(xí)
- Python第三方模塊apscheduler安裝和基本使用
- python定時(shí)任務(wù)schedule庫用法詳細(xì)講解
- Python flask框架定時(shí)任務(wù)apscheduler應(yīng)用介紹
- Python中schedule模塊關(guān)于定時(shí)任務(wù)使用方法
- Python定時(shí)任務(wù)框架APScheduler安裝使用詳解
- 最新Python?APScheduler?定時(shí)任務(wù)詳解
- Python中schedule擴(kuò)展的具體使用
相關(guān)文章
python進(jìn)程管理工具supervisor安裝使用
supervisor是一個(gè)用python語言編寫的進(jìn)程管理工具,它可以很方便的監(jiān)聽、啟動(dòng)、停止、重啟一個(gè)或多個(gè)進(jìn)程,本文給大家介紹python進(jìn)程管理工具supervisor安裝使用配置教程,感興趣的朋友一起看看吧2023-08-08一篇文章弄懂Python關(guān)鍵字、標(biāo)識符和變量
這篇文章主要給大家介紹了關(guān)于Python關(guān)鍵字、標(biāo)識符和變量的相關(guān)資料,Python關(guān)鍵詞是Python保留的具有特定含義的特殊詞語,用于執(zhí)行某些操作,Python標(biāo)識符是用戶定義的名稱,而變量是計(jì)算機(jī)內(nèi)存中的一塊區(qū)域,存儲對象的內(nèi)存地址,以便引用對象的值,需要的朋友可以參考下2021-07-07python-tornado的接口用swagger進(jìn)行包裝的實(shí)例
今天小編就為大家分享一篇python-tornado的接口用swagger進(jìn)行包裝的實(shí)例,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-08-08nginx搭建基于python的web環(huán)境的實(shí)現(xiàn)步驟
這篇文章主要介紹了nginx搭建基于python的web環(huán)境的實(shí)現(xiàn)步驟,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-01-01Python3的高階函數(shù)map,reduce,filter的示例詳解
這篇文章主要介紹了Python3的高階函數(shù)map,reduce,filter的示例代碼,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-07-07Python 安裝和配置flask, flask_cors的圖文教程
這篇文章主要介紹了Python 安裝和配置flask, flask_cors的圖文教程,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),感興趣的朋友一起看看吧2025-04-04推薦10款最受Python開發(fā)者歡迎的Python IDE
這篇文章收集了一些對開發(fā)者非常有幫助的,最好的10款Python IDE,包括Vim,Eclipse with PyDev,Sublime Text,PyCharm等知明Python開發(fā)工具2018-09-09