python實現(xiàn)定時任務(wù)的八種方式總結(jié)
前言
在日常工作中,常常會用到需要周期性執(zhí)行的任務(wù),一種方式是采用 Linux 系統(tǒng)自帶的 crond 結(jié)合命令行實現(xiàn)。另外一種方式是直接使用Python。
當(dāng)每隔一段時間就要執(zhí)行一段程序,或者往復(fù)循環(huán)執(zhí)行某一個任務(wù),這就需要使用定時任務(wù)來執(zhí)行程序。比如在實現(xiàn)對某個目標進行爬蟲的話,需要用到實時任務(wù)。
python中常用的定時任務(wù)主要有以下8種方法:
- while True:+sleep()
- threading.Timer定時器
- Timeloop庫執(zhí)行定時任務(wù)
- 調(diào)度模塊sched
- 調(diào)度模塊schedule
- 任務(wù)框架APScheduler
- 分布式消息系統(tǒng)celery執(zhí)行定時任務(wù)
- 使用windows自帶的定時任務(wù)
接下來分別用上述8中方式來完成下面定義的Task()任務(wù),示例代碼如下:
from datetime import datetime def task(): now = datetime.now() ts = now.strftime("%Y-%m-%d %H:%M:%S") print(ts)
1、利用while True:+sleep()實現(xiàn)定時任務(wù)
最簡單的方式應(yīng)該就是使用time模塊來實現(xiàn)定時任務(wù),在循環(huán)里面放入要執(zhí)行的任務(wù),然后sleep一段時間再執(zhí)行。實現(xiàn)令當(dāng)前執(zhí)行的線程暫停 n秒后再繼續(xù)執(zhí)行。所謂暫停,即令當(dāng)前線程進入阻塞狀態(tài),當(dāng)達到 sleep() 函數(shù)規(guī)定的時間后,再由阻塞狀態(tài)轉(zhuǎn)為就緒狀態(tài),等待 CPU 調(diào)度。
示例代碼:
from datetime import datetime import time def task(): now = datetime.now() ts = now.strftime("%Y-%m-%d %H:%M:%S") print(ts) def func(): while True: task() time.sleep(3) func()
運行結(jié)果:
優(yōu)缺點:只能實現(xiàn)同步任務(wù),無法執(zhí)行異步任務(wù)。執(zhí)行起來雖然是比較簡單,但不容易控制,而且sleep是個阻塞函數(shù)。只能設(shè)定間隔,不能指定具體的時間點。
2、利用threading.Timer()定時器實現(xiàn)定時任務(wù)
timer最基本理解就是定時器,可以啟動多個定時任務(wù),這些定時器任務(wù)是異步執(zhí)行,所以不存在等待順序執(zhí)行問題。
Timer方法 | 說明 |
---|---|
Timer(interval, function, args=None, kwargs=None) | 創(chuàng)建定時器 |
cancel() | 取消定時器 |
start() | 使用線程方式執(zhí)行 |
join(self, timeout=None) | 等待線程執(zhí)行結(jié)束 |
示例代碼:
from datetime import datetime from threading import Timer def task(): now = datetime.now() ts = now.strftime("%Y-%m-%d %H:%M:%S") print(ts) def func(): task() t = Timer(3, func) t.start() func()
運行結(jié)果:
優(yōu)缺點:可以實現(xiàn)異步任務(wù),是非阻塞的,但當(dāng)運行次數(shù)過多時,會出現(xiàn)報錯:Pyinstaller maximum recursion depth exceeded Error Resolution 達到最大遞歸深度,然后想到的是修改最大遞歸深度,
sys.setrecursionlimit(100000000)
但是運行到達到最大CPU時,python會直接銷毀程序。
3、使用Timeloop庫執(zhí)行定時任務(wù)
Timeloop是一個庫,可用于運行多周期任務(wù)。這是一個簡單的庫,使用decorator模式在線程中運行標記函數(shù)。
示例代碼:
from datetime import datetime, timedelta from timeloop import Timeloop tl = Timeloop() def task(): now = datetime.now() ts = now.strftime("%Y-%m-%d %H:%M:%S") print(ts + '333!') def task2(): now = datetime.now() ts = now.strftime("%Y-%m-%d %H:%M:%S") print(ts + "555555!") @tl.job(interval=timedelta(seconds=2)) def sample_job_every_2s(): task() @tl.job(interval=timedelta(seconds=5)) def sample_job_every_5s(): task2()
關(guān)于更多timeloop用法,詳見博文: python中定時任務(wù)timeloop庫用法詳解
4、利用調(diào)度模塊sched實現(xiàn)定時任務(wù)
sched是一種調(diào)度(延時處理機制)。sched模塊實現(xiàn)了一個通用事件調(diào)度器,在調(diào)度器類使用一個延遲函數(shù)等待特定的時間,執(zhí)行任務(wù)。同時支持多線程應(yīng)用程序,在每個任務(wù)執(zhí)行后會立刻調(diào)用延時函數(shù),以確保其他線程也能執(zhí)行。
scheduler對象主要方法:
- enter(delay, priority, action, argument),安排一個事件來延遲delay個時間單位。
- cancel(event):從隊列中刪除事件。如果事件不是當(dāng)前隊列中的事件,則該方法將跑出一個ValueError。
- run():運行所有預(yù)定的事件。這個函數(shù)將等待(使用傳遞給構(gòu)造函數(shù)的delayfunc()函數(shù)),然后執(zhí)行事件,直到不再有預(yù)定的事件。
示例代碼:
import sched import time from datetime import datetime # 初始化sched模塊的scheduler類 # 第一個參數(shù)是一個可以返回時間戳的函數(shù),第二個參數(shù)可以在定時未到達之前阻塞。 schedule = sched.scheduler(time.time, time.sleep) def task(inc): now = datetime.now() ts = now.strftime("%Y-%m-%d %H:%M:%S") print(ts) schedule.enter(inc, 0, task, (inc,)) def func(inc=3): # enter四個參數(shù)分別為: # 間隔事件、優(yōu)先級(用于同時間到達的兩個事件同時執(zhí)行時定序)、被調(diào)用觸發(fā)的函數(shù)、給該觸發(fā)函數(shù)的參數(shù)(tuple形式) schedule.enter(0, 0, task, (inc,)) schedule.run() func()
運行結(jié)果:
關(guān)于更多sched用法,詳見博文:http://chabaoo.cn/article/272340.htm
5、利用調(diào)度模塊schedule實現(xiàn)定時任務(wù)
schedule是一個第三方輕量級的任務(wù)調(diào)度模塊,可以按照秒,分,小時,日期或者自定義事件執(zhí)行時間。
如果想執(zhí)行多個任務(wù),也可以添加多個task。
示例代碼:
import schedule from datetime import datetime def task(): now = datetime.now() ts = now.strftime("%Y-%m-%d %H:%M:%S") print(ts) def task2(): now = datetime.now() ts = now.strftime("%Y-%m-%d %H:%M:%S") print(ts + '666!') def func(): # 清空任務(wù) schedule.clear() # 創(chuàng)建一個按3秒間隔執(zhí)行任務(wù) schedule.every(3).seconds.do(task) # 創(chuàng)建一個按2秒間隔執(zhí)行任務(wù) schedule.every(2).seconds.do(task2) while True: schedule.run_pending() func()
運行結(jié)果:
優(yōu)缺點:需要和while Ture配合使用,而且占用的CPU也比其他幾種多的多,占用內(nèi)存也是較大。
關(guān)于更多schedule用法,詳見博文: http://chabaoo.cn/article/272345.htm
6、利用任務(wù)框架ASPcheduler實現(xiàn)定時任務(wù)
APScheduler是Python的一個定時任務(wù)框架,用于執(zhí)行周期或者定時任務(wù),該框架不僅可以添加、刪除定時任務(wù),還可以將任務(wù)存儲到數(shù)據(jù)庫中,實現(xiàn)任務(wù)的持久化,使用起來非常方便。
示例代碼:
from datetime import datetime from apscheduler.schedulers.blocking import BlockingScheduler def task(): now = datetime.now() ts = now.strftime("%Y-%m-%d %H:%M:%S") print(ts) def task2(): now = datetime.now() ts = now.strftime("%Y-%m-%d %H:%M:%S") print(ts + '666!') def func(): # 創(chuàng)建調(diào)度器BlockingScheduler() scheduler = BlockingScheduler() scheduler.add_job(task, 'interval', seconds=3, id='test_job1') # 添加任務(wù),時間間隔為5秒 scheduler.add_job(task2, 'interval', seconds=5, id='test_job2') scheduler.start() func()
運行結(jié)果:
關(guān)于更多apschedule用法,詳見博文:python中定時任務(wù)apscheduler庫用法詳解
7、使用分布式消息系統(tǒng)celery執(zhí)行定時任務(wù)
Celery是一個簡單,靈活,可靠的分布式系統(tǒng),用于處理大量消息,同時為操作提供維護此類系統(tǒng)所需的工具, 也可用于任務(wù)調(diào)度。Celery 的配置比較麻煩,如果你只是需要一個輕量級的調(diào)度工具,Celery 不會是一個好選擇。
Celery 是一個強大的分布式任務(wù)隊列,它可以讓任務(wù)的執(zhí)行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現(xiàn)異步任務(wù)(async task)和定時任務(wù)(crontab)。 異步任務(wù)比如是發(fā)送郵件、或者文件上傳, 圖像處理等等一些比較耗時的操作 ,定時任務(wù)是需要在特定時間執(zhí)行的任務(wù)。
注意:celery本身并不具備任務(wù)的存儲功能,在調(diào)度任務(wù)的時候肯定是要把任務(wù)存起來的,因此在使用celery的時候還需要搭配一些具備存儲、訪問功能的工具,比如:消息隊列、Redis緩存、數(shù)據(jù)庫等。官方推薦的是消息隊列RabbitMQ,有些時候使用Redis也是不錯的選擇。
8、使用windows自帶的定時任務(wù)
略。這兒不做細述!
總結(jié)
到此這篇關(guān)于python實現(xiàn)定時任務(wù)的八種方式的文章就介紹到這了,更多相關(guān)python定時任務(wù)方式內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Jupyter notebook 更改文件打開的默認路徑操作
這篇文章主要介紹了Jupyter notebook 更改文件打開的默認路徑操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-05-05