Python高效定時任務(wù)處理APScheduler庫深入學(xué)習(xí)
介紹
APScheduler是Python中一個強大的第三方庫,用于在后臺執(zhí)行定時任務(wù)。它允許我們根據(jù)設(shè)定的時間間隔、日期規(guī)則或特定時間來執(zhí)行任務(wù),適用于定時執(zhí)行腳本、定時發(fā)送郵件、定時處理數(shù)據(jù)等場景。APScheduler的功能使得在Python中實現(xiàn)定時任務(wù)變得非常簡單和高效。本文將從入門到精通地介紹APScheduler庫的使用方法,帶你掌握在Python中實現(xiàn)定時任務(wù)的技巧。
- 安裝和導(dǎo)入
- 創(chuàng)建定時任務(wù)
- 定時任務(wù)觸發(fā)器
- 任務(wù)存儲
- 并發(fā)執(zhí)行
- 阻塞和非阻塞
- 錯誤處理
- 立即執(zhí)行任務(wù)
- 調(diào)度器持久化
- 任務(wù)監(jiān)聽器
- 移除定時任務(wù)
1. 安裝和導(dǎo)入
首先,我們需要安裝APScheduler庫??梢允褂胮ip命令進行安裝:
pip install apscheduler
安裝完成后,我們可以在Python代碼中導(dǎo)入APScheduler:
from apscheduler.schedulers.background import BackgroundScheduler
2. 創(chuàng)建定時任務(wù)
APScheduler提供了BackgroundScheduler和BlockingScheduler兩種類型的調(diào)度器,用于創(chuàng)建定時任務(wù)。BackgroundScheduler在后臺運行,不會阻塞主線程;而BlockingScheduler會阻塞主線程直到所有任務(wù)完成。
from apscheduler.schedulers.background import BackgroundScheduler import time # 創(chuàng)建后臺調(diào)度器 scheduler = BackgroundScheduler() # 定義任務(wù)函數(shù) def job(): print("定時任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S")) # 添加定時任務(wù),每隔5秒執(zhí)行一次 scheduler.add_job(job, 'interval', seconds=5) # 啟動調(diào)度器 scheduler.start() # 主線程等待一段時間后結(jié)束 time.sleep(20) # 關(guān)閉調(diào)度器 scheduler.shutdown() print("主線程結(jié)束")
在上述代碼中,我們首先創(chuàng)建了一個后臺調(diào)度器scheduler,然后定義了一個名為job的任務(wù)函數(shù),在其中打印當(dāng)前時間。使用scheduler.add_job()添加了一個定時任務(wù),設(shè)置為每隔5秒執(zhí)行一次。然后,我們啟動了調(diào)度器scheduler,讓定時任務(wù)在后臺執(zhí)行。主線程等待20秒后結(jié)束,并調(diào)用scheduler.shutdown()關(guān)閉調(diào)度器。
3. 定時任務(wù)觸發(fā)器
APScheduler提供了多種觸發(fā)器類型,用于設(shè)置定時任務(wù)的觸發(fā)條件。
interval觸發(fā)器: 按照設(shè)定的時間間隔來觸發(fā)任務(wù)。
from apscheduler.schedulers.background import BackgroundScheduler import time # 創(chuàng)建后臺調(diào)度器 scheduler = BackgroundScheduler() # 定義任務(wù)函數(shù) def job(): print("定時任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S")) # 添加定時任務(wù),每隔5秒執(zhí)行一次 scheduler.add_job(job, 'interval', seconds=5) # 啟動調(diào)度器 scheduler.start() # 主線程等待一段時間后結(jié)束 time.sleep(20) # 關(guān)閉調(diào)度器 scheduler.shutdown() print("主線程結(jié)束")
在上述代碼中,我們使用'interval'觸發(fā)器,設(shè)置任務(wù)每隔5秒執(zhí)行一次。
cron觸發(fā)器: 使用類似于Linux中cron表達式的規(guī)則來觸發(fā)任務(wù),可以精確到秒。
from apscheduler.schedulers.background import BackgroundScheduler import time # 創(chuàng)建后臺調(diào)度器 scheduler = BackgroundScheduler() # 定義任務(wù)函數(shù) def job(): print("定時任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S")) # 添加定時任務(wù),每天的13點30分觸發(fā)任務(wù) scheduler.add_job(job, 'cron', hour=13, minute=30) # 啟動調(diào)度器 scheduler.start() # 主線程等待一段時間后結(jié)束 time.sleep(60) # 關(guān)閉調(diào)度器 scheduler.shutdown() print("主線程結(jié)束")
在上述代碼中,我們使用'cron'觸發(fā)器,設(shè)置任務(wù)每天的13點30分觸發(fā)。
date觸發(fā)器: 在指定的時間點觸發(fā)任務(wù)。
from apscheduler.schedulers.background import BackgroundScheduler import time # 創(chuàng)建后臺調(diào)度器 scheduler = BackgroundScheduler() # 定義任務(wù)函數(shù) def job(): print("定時任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S")) # 添加定時任務(wù),設(shè)置任務(wù)在2023年7月31日10點30分觸發(fā) scheduler.add_job(job, 'date', run_date='2023-07-31 10:30:00') # 啟動調(diào)度器 scheduler.start() # 主線程等待一段時間后結(jié)束 time.sleep(60) # 關(guān)閉調(diào)度器 scheduler.shutdown() print("主線程結(jié)束")
在上述代碼中,我們使用'date'觸發(fā)器,設(shè)置任務(wù)在2023年7月31日10點30分觸發(fā)。
4. 任務(wù)存儲
APScheduler支持將任務(wù)存儲在不同的后端存儲中,如內(nèi)存、數(shù)據(jù)庫等。默認(rèn)情況下,任務(wù)是存儲在內(nèi)存中的。
from apscheduler.schedulers.background import BackgroundScheduler import time # 創(chuàng)建后臺調(diào)度器 scheduler = BackgroundScheduler() # 定義任務(wù)函數(shù) def job(): print("定時任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S")) # 添加定時任務(wù),每隔5秒執(zhí)行一次 scheduler.add_job(job, 'interval', seconds=5) # 啟動調(diào)度器 scheduler.start() # 主線程等待一段時間后結(jié)束 time.sleep(20) # 關(guān)閉調(diào)度器 scheduler.shutdown() print("主線程結(jié)束")
在上述代碼中,我們使用默認(rèn)的內(nèi)存存儲來存儲任務(wù)。
如果需要將任務(wù)存儲在數(shù)據(jù)庫中,可以使用jobstores參數(shù)來設(shè)置。
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore import time # 創(chuàng)建后臺調(diào)度器 scheduler = BackgroundScheduler() # 創(chuàng)建數(shù)據(jù)庫存儲 jobstores = { 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } # 定義任務(wù)函數(shù) def job(): print("定時任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S")) # 添加定時任務(wù),每隔5秒執(zhí)行一次 scheduler.add_job(job, 'interval', seconds=5) # 啟動調(diào)度器 scheduler.start() # 主線程等待一段時間后結(jié)束 time.sleep(20) # 關(guān)閉調(diào)度器 scheduler.shutdown() print("主線程結(jié)束")
在上述代碼中,我們使用了SQLAlchemyJobStore來將任務(wù)存儲在SQLite數(shù)據(jù)庫中。
5. 并發(fā)執(zhí)行
默認(rèn)情況下,APScheduler會將任務(wù)串行執(zhí)行,也就是說一個任務(wù)結(jié)束后才會執(zhí)行下一個任務(wù)。如果希望并發(fā)執(zhí)行多個任務(wù),可以使用max_instances參數(shù)來設(shè)置。
from apscheduler.schedulers.background import BackgroundScheduler import time # 創(chuàng)建后臺調(diào)度器 scheduler = BackgroundScheduler() # 定義任務(wù)函數(shù) def job(index): print(f"定時任務(wù){(diào)index}執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S")) # 添加定時任務(wù),每隔5秒執(zhí)行一次,最多并發(fā)3個任務(wù) scheduler.add_job(job, 'interval', seconds=5, args=[1], max_instances=3) scheduler.add_job(job, 'interval', seconds=5, args=[2], max_instances=3) scheduler.add_job(job, 'interval', seconds=5, args=[3], max_instances=3) # 啟動調(diào)度器 scheduler.start() # 主線程等待一段時間后結(jié)束 time.sleep(20) # 關(guān)閉調(diào)度器 scheduler.shutdown() print("主線程結(jié)束")
在上述代碼中,我們使用了args參數(shù)傳遞參數(shù)給任務(wù)函數(shù),并使用max_instances參數(shù)設(shè)置最多并發(fā)3個任務(wù)。
6. 阻塞和非阻塞
APScheduler提供了阻塞和非阻塞兩種調(diào)度器類型。
阻塞調(diào)度器: 在調(diào)度器啟動后,會阻塞主線程直到所有任務(wù)完成。
from apscheduler.schedulers.blocking import BlockingScheduler import time # 創(chuàng)建阻塞調(diào)度器 scheduler = BlockingScheduler() # 定義任務(wù)函數(shù) def job(): print("定時任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S")) # 添加定時任務(wù),每隔5秒執(zhí)行一次 scheduler.add_job(job, 'interval', seconds=5) # 啟動調(diào)度器 scheduler.start() print("主線程結(jié)束")
非阻塞調(diào)度器: 在調(diào)度器啟動后,不會阻塞主線程。
from apscheduler.schedulers.background import BackgroundScheduler import time # 創(chuàng)建后臺調(diào)度器 scheduler = BackgroundScheduler() # 定義任務(wù)函數(shù) def job(): print("定時任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S")) # 添加定時任務(wù),每隔5秒執(zhí)行一次 scheduler.add_job(job, 'interval', seconds=5) # 啟動調(diào)度器 scheduler.start() # 主線程等待一段時間后結(jié)束 time.sleep(20) # 關(guān)閉調(diào)度器 scheduler.shutdown() print("主線程結(jié)束")
在上述代碼中,我們分別使用BlockingScheduler和BackgroundScheduler創(chuàng)建了阻塞和非阻塞調(diào)度器。
7. 錯誤處理
在任務(wù)執(zhí)行過程中,可能會出現(xiàn)異常。APScheduler提供了異常處理機制,我們可以通過try...except...捕獲任務(wù)函數(shù)中的異常,并進行相應(yīng)的處理。
from apscheduler.schedulers.background import BackgroundScheduler import time # 創(chuàng)建后臺調(diào)度器 scheduler = BackgroundScheduler() # 定義任務(wù)函數(shù) def job(): try: print("定時任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S")) # 拋出一個異常 raise ValueError("任務(wù)出現(xiàn)異常") except Exception as e: print("任務(wù)執(zhí)行過程中發(fā)生異常:", str(e)) # 添加定時任務(wù),每隔5秒執(zhí)行一次 scheduler.add_job(job, 'interval', seconds=5) # 啟動調(diào)度器 scheduler.start() # 主線程等待一段時間后結(jié)束 time.sleep(20) # 關(guān)閉調(diào)度器 scheduler.shutdown() print("主線程結(jié)束")
在上述代碼中,我們在任務(wù)函數(shù)中拋出了一個ValueError異常,并通過try...except...捕獲并輸出了異常信息。
8. 立即執(zhí)行任務(wù)
有時候我們可能需要立即執(zhí)行一個任務(wù),而不是等到下次觸發(fā)時間。APScheduler提供了run_job方法來立即執(zhí)行任務(wù)。
from apscheduler.schedulers.background import BackgroundScheduler import time # 創(chuàng)建后臺調(diào)度器 scheduler = BackgroundScheduler() # 定義任務(wù)函數(shù) def job(): print("定時任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S")) # 添加定時任務(wù),每隔5秒執(zhí)行一次 scheduler.add_job(job, 'interval', seconds=5) # 啟動調(diào)度器 scheduler.start() # 立即執(zhí)行任務(wù) scheduler.run_job(job) # 主線程等待一段時間后結(jié)束 time.sleep(20) # 關(guān)閉調(diào)度器 scheduler.shutdown() print("主線程結(jié)束")
在上述代碼中,我們使用scheduler.run_job(job)方法立即執(zhí)行了任務(wù)。
9. 調(diào)度器持久化
在實際應(yīng)用中,我們可能需要將調(diào)度器的配置保存到文件中,以便在下次啟動時恢復(fù)。
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore import time # 創(chuàng)建數(shù)據(jù)庫存儲 jobstores = { 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } # 創(chuàng)建后臺調(diào)度器,并指定jobstores參數(shù) scheduler = BackgroundScheduler(jobstores=jobstores) # 定義任務(wù)函數(shù) def job(): print("定時任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S")) # 添加定時任務(wù),每隔5秒執(zhí)行一次 scheduler.add_job(job, 'interval', seconds=5) # 啟動調(diào)度器 scheduler.start() # 主線程等待一段時間后結(jié)束 time.sleep(20) # 關(guān)閉調(diào)度器 scheduler.shutdown() print("主線程結(jié)束")
在上述代碼中,我們創(chuàng)建了一個數(shù)據(jù)庫存儲jobstores,并在創(chuàng)建后臺調(diào)度器時指定了jobstores參數(shù)。這樣,在調(diào)度器運行過程中,任務(wù)的配置將會被持久化到數(shù)據(jù)庫中。
10. 任務(wù)監(jiān)聽器
APScheduler提供了任務(wù)監(jiān)聽器,用于監(jiān)聽任務(wù)的狀態(tài)變化。我們可以通過add_listener方法添加監(jiān)聽器,并在任務(wù)狀態(tài)發(fā)生變化時進行相應(yīng)的處理。
from apscheduler.schedulers.background import BackgroundScheduler import time # 創(chuàng)建后臺調(diào)度器 scheduler = BackgroundScheduler() # 定義任務(wù)函數(shù) def job(): print("定時任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S")) # 添加定時任務(wù),每隔5秒執(zhí)行一次 scheduler.add_job(job, 'interval', seconds=5) # 定義任務(wù)監(jiān)聽器 def my_listener(event): if event.exception: print("任務(wù)執(zhí)行過程中發(fā)生異常:", str(event.exception)) else: print("任務(wù)執(zhí)行成功") # 添加任務(wù)監(jiān)聽器 scheduler.add_listener(my_listener, mask='all') # 啟動調(diào)度器 scheduler.start() # 主線程等待一段時間后結(jié)束 time.sleep(20) # 關(guān)閉調(diào)度器 scheduler.shutdown() print("主線程結(jié)束")
在上述代碼中,我們創(chuàng)建了一個任務(wù)監(jiān)聽器my_listener,并在任務(wù)執(zhí)行過程中通過if...else...判斷是否出現(xiàn)異常。然后通過scheduler.add_listener(my_listener, mask='all')方法添加了監(jiān)聽器。
11. 移除定時任務(wù)
如果我們希望在調(diào)度器運行過程中移除某個定時任務(wù),可以使用scheduler.remove_job(job_id)方法。
from apscheduler.schedulers.background import BackgroundScheduler import time # 創(chuàng)建后臺調(diào)度器 scheduler = BackgroundScheduler() # 定義任務(wù)函數(shù) def job(): print("定時任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S")) # 添加定時任務(wù),每隔5秒執(zhí)行一次,并獲取任務(wù)ID job_id = scheduler.add_job(job, 'interval', seconds=5).id # 啟動調(diào)度器 scheduler.start() # 主線程等待一段時間后移除定時任務(wù) time.sleep(10) scheduler.remove_job(job_id) # 主線程等待一段時間后結(jié)束 time.sleep(10) # 關(guān)閉調(diào)度器 scheduler.shutdown() print("主線程結(jié)束")
在上述代碼中,我們通過scheduler.add_job(job, 'interval', seconds=5).id獲取了定時任務(wù)的ID,并使用scheduler.remove_job(job_id)移除了定時任務(wù)。
總結(jié)
通過本文的介紹,我們學(xué)習(xí)了APScheduler庫的基本用法,包括創(chuàng)建定時任務(wù)、定時任務(wù)觸發(fā)器、任務(wù)存儲、并發(fā)執(zhí)行、阻塞和非阻塞調(diào)度器、錯誤處理、立即執(zhí)行任務(wù)、調(diào)度器持久化、任務(wù)監(jiān)聽器和移除定時任務(wù)等。APScheduler為Python開發(fā)者提供了一個強大的定時任務(wù)調(diào)度框架,使得在Python中實現(xiàn)定時任務(wù)變得非常簡單和高效。掌握APScheduler的使用將為我們的項目和程序帶來很大的便利。
以上就是Python高效定時任務(wù)處理APScheduler庫深入學(xué)習(xí)的詳細內(nèi)容,更多關(guān)于Python APScheduler定時任務(wù)的資料請關(guān)注腳本之家其它相關(guān)文章!
- python自動定時任務(wù)schedule庫的使用方法
- Python apscheduler實現(xiàn)定時任務(wù)的方法詳解
- Python?Apschedule定時任務(wù)框架的用法詳解
- Python第三方模塊apscheduler安裝和基本使用
- python定時任務(wù)schedule庫用法詳細講解
- Python flask框架定時任務(wù)apscheduler應(yīng)用介紹
- Python中schedule模塊關(guān)于定時任務(wù)使用方法
- Python定時任務(wù)框架APScheduler安裝使用詳解
- 最新Python?APScheduler?定時任務(wù)詳解
- Python中schedule擴展的具體使用
相關(guān)文章
Python實現(xiàn)平行坐標(biāo)圖的兩種方法小結(jié)
今天小編就為大家分享一篇Python實現(xiàn)平行坐標(biāo)圖的兩種方法小結(jié),具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-07-07Python操作數(shù)據(jù)庫之?dāng)?shù)據(jù)庫編程接口
這篇文章主要介紹了Python操作數(shù)據(jù)庫之?dāng)?shù)據(jù)庫編程接口,文章圍繞主題展開詳細的內(nèi)容介紹,具有一定的參考價值,感興趣的小伙伴可以參考一下2022-06-06Python CSS選擇器爬取京東網(wǎng)商品信息過程解析
這篇文章主要介紹了Python CSS選擇器爬取京東網(wǎng)商品信息過程解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-06-06解決python中畫圖時x,y軸名稱出現(xiàn)中文亂碼的問題
今天小編就為大家分享一篇解決python中畫圖時x,y軸名稱出現(xiàn)中文亂碼的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-01-01Python簡單實現(xiàn)網(wǎng)頁內(nèi)容抓取功能示例
這篇文章主要介紹了Python簡單實現(xiàn)網(wǎng)頁內(nèi)容抓取功能,結(jié)合實例形式分析了Python基于urllib模塊的網(wǎng)頁請求、內(nèi)容讀取等相關(guān)操作技巧,需要的朋友可以參考下2018-06-06