python中實現(xiàn)定時任務(wù)的幾種方案
while True: + sleep()
利用while True的死循環(huán),加上 sleep()函數(shù)讓其暫停一段時間,達到每隔一段時間執(zhí)行特定任務(wù)的目的。
比較簡單,例子如下:
import datetime import time def time_printer(): now = datetime.datetime.now() ts = now.strftime('%Y-%m-%d %H:%M:%S') print('do func time :', ts) def loop_monitor(): while True: time_printer() time.sleep(5) if __name__ == "__main__": loop_monitor()
主要缺點:
- 只能設(shè)定間隔,不能指定具體的時間
- sleep 是一個阻塞函數(shù),也就是說 sleep 這一段時間,程序什么也不能操作。
Timeloop庫
Timeloop是一個庫,可用于運行多周期任務(wù)。
Timeloop內(nèi)部維護了一個任務(wù)列表jobs,用來管理所有任務(wù)。
可以使用裝飾器標(biāo)記任務(wù),這樣就會把任務(wù)加到任務(wù)列表jobs中,使用start方法啟動任務(wù)列表重的所有任務(wù)。
示例如下:
import time from timeloop import Timeloop from datetime import timedelta tl = Timeloop() @tl.job(interval=timedelta(seconds=2)) def sample_job_every_2s(): print("2s job current time : {}".format(time.ctime())) if __name__ == "__main__": tl.start(block=True)
運行后打印如下:
[2023-10-02 09:48:41,926] [timeloop] [INFO] Starting Timeloop.. [2023-10-02 09:48:41,926] [timeloop] [INFO] Registered job <function sample_job_every_2s at 0x7fc3d022d0d0> [2023-10-02 09:48:41,926] [timeloop] [INFO] Timeloop now started. Jobs will run based on the interval set 2s job current time : Mon Oct 2 09:48:43 2023 2s job current time : Mon Oct 2 09:48:45 2023 2s job current time : Mon Oct 2 09:48:47 2023
同時Timeloop還有個stop方法,可以用來暫停所有任務(wù)。
threading.Timer
threading 模塊中的 Timer 是一個非阻塞函數(shù),比 sleep 稍好一點,timer最基本理解就是定時器,我們可以啟動多個定時任務(wù),這些定時器任務(wù)是異步執(zhí)行,所以不存在等待順序執(zhí)行問題。
主要有如下方法:
方法 | 說明 |
---|---|
Timer(interval, function, args=None, kwargs=None) | 創(chuàng)建定時器 |
cancel() | 取消定時器 |
start() | 使用線程方式執(zhí)行 |
join(self, timeout=None) | 主線程等待線程執(zhí)行結(jié)束 |
示例:
import datetime from threading import Timer def time_printer(): now = datetime.datetime.now() ts = now.strftime('%Y-%m-%d %H:%M:%S') print('do func time :', ts) # 注意 Timer 只能執(zhí)行一次,這里需要循環(huán)調(diào)用,否則只能執(zhí)行一次 loop_monitor() def loop_monitor(): t = Timer(5, time_printer) t.start() if __name__ == "__main__": loop_monitor()
sched模塊
sched模塊實現(xiàn)了一個通用事件調(diào)度器,在調(diào)度器類使用一個延遲函數(shù)等待特定的時間,執(zhí)行任務(wù)。同時支持多線程應(yīng)用程序,在每個任務(wù)執(zhí)行后會立刻調(diào)用延時函數(shù),以確保其他線程也能執(zhí)行。
class sched.scheduler(timefunc, delayfunc)
這個類定義了調(diào)度事件的通用接口,它需要外部傳入兩個參數(shù),timefunc是一個沒有參數(shù)的返回時間類型數(shù)字的函數(shù)(常用使用的如time模塊里面的time),delayfunc應(yīng)該是一個需要一個參數(shù)來調(diào)用、與timefunc的輸出兼容、并且作用為延遲多個時間單位的函數(shù)(常用的如time模塊的sleep)。
import datetime import time import sched def time_printer(): now = datetime.datetime.now() ts = now.strftime('%Y-%m-%d %H:%M:%S') print('do func time :', ts) loop_monitor() def loop_monitor(): s = sched.scheduler(time.time, time.sleep) # 生成調(diào)度器 s.enter(5, 1, time_printer, ()) s.run() if __name__ == "__main__": loop_monitor()
scheduler對象主要方法:
enter(delay, priority, action, argument),安排一個事件來延遲delay個時間單位。
cancel(event):從隊列中刪除事件。如果事件不是當(dāng)前隊列中的事件,則該方法將跑出一個ValueError。
run():運行所有預(yù)定的事件。這個函數(shù)將等待(使用傳遞給構(gòu)造函數(shù)的delayfunc()函數(shù)),然后執(zhí)行事件,直到不再有預(yù)定的事件。比threading.Timer更好,不需要循環(huán)調(diào)用。
schedule模塊
schedule是一個第三方輕量級的任務(wù)調(diào)度模塊,可以按照秒,分,小時,日期或者自定義事件執(zhí)行時間。schedule允許用戶使用簡單、人性化的語法以預(yù)定的時間間隔定期運行Python函數(shù)(或其它可調(diào)用函數(shù))。
示例:
import schedule import time def job(): print("I'm working...") schedule.every(10).seconds.do(job) schedule.every(10).minutes.do(job) schedule.every().hour.do(job) schedule.every().day.at("10:30").do(job) schedule.every(5).to(10).minutes.do(job) schedule.every().monday.do(job) schedule.every().wednesday.at("13:15").do(job) schedule.every().minute.at(":17").do(job) while True: schedule.run_pending() time.sleep(1)
也可以通過 @repeat() 裝飾靜態(tài)方法:
import time from schedule import every, repeat, run_pending @repeat(every().second) def job(): print('working...') while True: run_pending() time.sleep(1)
傳遞參數(shù):
import schedule def greet(name): print('Hello', name) schedule.every(2).seconds.do(greet, name='Alice') schedule.every(4).seconds.do(greet, name='Bob') while True: schedule.run_pending()
裝飾器同樣能傳遞參數(shù):
from schedule import every, repeat, run_pending @repeat(every().second, 'World') @repeat(every().minute, 'Mars') def hello(planet): print('Hello', planet) while True: run_pending()
取消任務(wù):
import schedule i = 0 def some_task(): global i i += 1 print(i) if i == 10: schedule.cancel_job(job) print('cancel job') exit(0) job = schedule.every().second.do(some_task) while True: schedule.run_pending()
運行一次任務(wù):
import time import schedule def job_that_executes_once(): print('Hello') return schedule.CancelJob schedule.every().minute.at(':34').do(job_that_executes_once) while True: schedule.run_pending() time.sleep(1)
根據(jù)標(biāo)簽檢索任務(wù):
# 檢索所有任務(wù):schedule.get_jobs() import schedule def greet(name): print('Hello {}'.format(name)) schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest') friends = schedule.get_jobs('friend') print(friends)
根據(jù)標(biāo)簽取消任務(wù):
# 取消所有任務(wù):schedule.clear() import schedule def greet(name): print('Hello {}'.format(name)) if name == 'Cancel': schedule.clear('second-tasks') print('cancel second-tasks') schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend') schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest') while True: schedule.run_pending()
運行任務(wù)到某時間:
import schedule from datetime import datetime, timedelta, time def job(): print('working...') schedule.every().second.until('23:59').do(job) # 今天23:59停止 schedule.every().second.until('2030-01-01 18:30').do(job) # 2030-01-01 18:30停止 schedule.every().second.until(timedelta(hours=8)).do(job) # 8小時后停止 schedule.every().second.until(time(23, 59, 59)).do(job) # 今天23:59:59停止 schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job) # 2030-01-01 18:30停止 while True: schedule.run_pending()
馬上運行所有任務(wù)(主要用于測試):
import schedule def job(): print('working...') def job1(): print('Hello...') schedule.every().monday.at('12:40').do(job) schedule.every().tuesday.at('16:40').do(job1) schedule.run_all() schedule.run_all(delay_seconds=3) # 任務(wù)間延遲3秒
并行運行:使用 Python 內(nèi)置隊列實現(xiàn):
import threading import time import schedule def job1(): print("I'm running on thread %s" % threading.current_thread()) def job2(): print("I'm running on thread %s" % threading.current_thread()) def job3(): print("I'm running on thread %s" % threading.current_thread()) def run_threaded(job_func): job_thread = threading.Thread(target=job_func) job_thread.start() schedule.every(10).seconds.do(run_threaded, job1) schedule.every(10).seconds.do(run_threaded, job2) schedule.every(10).seconds.do(run_threaded, job3) while True: schedule.run_pending() time.sleep(1)
APScheduler框架
APScheduler(advanceded python scheduler)基于Quartz的一個Python定時任務(wù)框架,實現(xiàn)了Quartz的所有功能,使用起來十分方便。提供了基于日期、固定時間間隔以及crontab類型的任務(wù),并且可以持久化任務(wù)?;谶@些功能,我們可以很方便的實現(xiàn)一個Python定時任務(wù)系統(tǒng)。
Celery框架
Celery是一個簡單,靈活,可靠的分布式系統(tǒng),用于處理大量消息,同時為操作提供維護此類系統(tǒng)所需的工具, 也可用于任務(wù)調(diào)度。Celery 的配置比較麻煩,如果你只是需要一個輕量級的調(diào)度工具,Celery 不會是一個好選擇。
Celery 是一個強大的分布式任務(wù)隊列,它可以讓任務(wù)的執(zhí)行完全脫離主程序,甚至可以被分配到其他主機上運行。通常使用它來實現(xiàn)異步任務(wù)(async task)和定時任務(wù)(crontab)。異步任務(wù)比如是發(fā)送郵件、或者文件上傳, 圖像處理等等一些比較耗時的操作 ,定時任務(wù)是需要在特定時間執(zhí)行的任務(wù)。
數(shù)據(jù)流工具Apache Airflow
概述
Apache Airflow 是Airbnb開源的一款數(shù)據(jù)流程工具,目前是Apache孵化項目。以非常靈活的方式來支持?jǐn)?shù)據(jù)的ETL過程,同時還支持非常多的插件來完成諸如HDFS監(jiān)控、郵件通知等功能。Airflow支持單機和分布式兩種模式,支持Master-Slave模式,支持Mesos等資源調(diào)度,有非常好的擴展性。被大量公司采用。
Airflow使用Python開發(fā),它通過DAGs(Directed Acyclic Graph, 有向無環(huán)圖)來表達一個工作流中所要執(zhí)行的任務(wù),以及任務(wù)之間的關(guān)系和依賴。比如,如下的工作流中,任務(wù)T1執(zhí)行完成,T2和T3才能開始執(zhí)行,T2和T3都執(zhí)行完成,T4才能開始執(zhí)行。
Airflow提供了各種Operator實現(xiàn),可以完成各種任務(wù)實現(xiàn):
- BashOperator – 執(zhí)行 bash 命令或腳本。
- SSHOperator – 執(zhí)行遠程 bash 命令或腳本(原理同 paramiko 模塊)。
- PythonOperator – 執(zhí)行 Python 函數(shù)。EmailOperator – 發(fā)送 Email。
- HTTPOperator – 發(fā)送一個 HTTP 請求。
- MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等,執(zhí)行 SQL 任務(wù)。
- DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator…
除了以上這些 Operators 還可以方便的自定義 Operators 滿足個性化的任務(wù)需求。
一些情況下,我們需要根據(jù)執(zhí)行結(jié)果執(zhí)行不同的任務(wù),這樣工作流會產(chǎn)生分支。如:
這種需求可以使用BranchPythonOperator來實現(xiàn)。
Airflow 核心概念
- DAGs:即有向無環(huán)圖(Directed Acyclic Graph),將所有需要運行的tasks按照依賴關(guān)系組織起來,描述的是所有tasks執(zhí)行順序。
- Operators:可以簡單理解為一個class,描述了DAG中某個的task具體要做的事。其中,airflow內(nèi)置了很多operators,如BashOperator 執(zhí)行一個bash 命令,PythonOperator 調(diào)用任意的Python 函數(shù),EmailOperator 用于發(fā)送郵件,HTTPOperator 用于發(fā)送HTTP請求, SqlOperator 用于執(zhí)行SQL命令等等,同時,用戶可以自定義Operator,這給用戶提供了極大的便利性。
- Tasks:Task 是 Operator的一個實例,也就是DAGs中的一個node。
- Task Instance:task的一次運行。Web 界面中可以看到task instance 有自己的狀態(tài),包括”running”, “success”, “failed”, “skipped”, “up for retry”等。
- Task Relationships:DAGs中的不同Tasks之間可以有依賴關(guān)系,如 Task1 >> Task2,表明Task2依賴于Task2了。通過將DAGs和Operators結(jié)合起來,用戶就可以創(chuàng)建各種復(fù)雜的 工作流(workflow)。
Airflow 的架構(gòu)
在一個可擴展的生產(chǎn)環(huán)境中,Airflow 含有以下組件:
- 元數(shù)據(jù)庫:這個數(shù)據(jù)庫存儲有關(guān)任務(wù)狀態(tài)的信息。
- 調(diào)度器:Scheduler 是一種使用 DAG 定義結(jié)合元數(shù)據(jù)中的任務(wù)狀態(tài)來決定哪些任務(wù)需要被執(zhí)行以及任務(wù)執(zhí)行優(yōu)先級的過程。調(diào)度器通常作為服務(wù)運行。
- 執(zhí)行器:Executor 是一個消息隊列進程,它被綁定到調(diào)度器中,用于確定實際執(zhí)行每個任務(wù)計劃的工作進程。有不同類型的執(zhí)行器,每個執(zhí)行器都使用一個指定工作進程的類來執(zhí)行任務(wù)。例如,LocalExecutor 使用與調(diào)度器進程在同一臺機器上運行的并行進程執(zhí)行任務(wù)。其他像 CeleryExecutor 的執(zhí)行器使用存在于獨立的工作機器集群中的工作進程執(zhí)行任務(wù)。
- Workers:這些是實際執(zhí)行任務(wù)邏輯的進程,由正在使用的執(zhí)行器確定。
Worker的具體實現(xiàn)由配置文件中的executor來指定,airflow支持多種Executor:
- SequentialExecutor: 單進程順序執(zhí)行,一般只用來測試
- LocalExecutor: 本地多進程執(zhí)行
- CeleryExecutor: 使用Celery進行分布式任務(wù)調(diào)度
- DaskExecutor:使用Dask進行分布式任務(wù)調(diào)度
- KubernetesExecutor: 1.10.0新增, 創(chuàng)建臨時POD執(zhí)行每次任務(wù)
生產(chǎn)環(huán)境一般使用CeleryExecutor和KubernetesExecutor。
使用CeleryExecutor的架構(gòu)如圖:
使用KubernetesExecutor的架構(gòu)如圖:
以上就是python中實現(xiàn)定時任務(wù)的幾種方案的詳細內(nèi)容,更多關(guān)于python實現(xiàn)定時任務(wù)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
使用Python字符串訪問與修改局部變量的實現(xiàn)代碼
這篇文章主要介紹了使用Python字符串訪問與修改局部變量,本文通過實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-06-06Python數(shù)據(jù)分析之彩票的歷史數(shù)據(jù)
這篇文章主要介紹了Python數(shù)據(jù)分析之彩票的歷史數(shù)據(jù),文中有非常詳細的代碼示例,對正在學(xué)習(xí)python的小伙伴們有非常好的幫助,需要的朋友可以參考下2021-04-04python異步編程之a(chǎn)syncio低階API的使用詳解
asyncio中低階API的種類很多,涉及到開發(fā)的5個方面,這篇文章主要為大家詳細介紹了這些低階API的具體使用,感興趣的小伙伴可以學(xué)習(xí)一下2024-01-01Python企業(yè)編碼生成系統(tǒng)之主程序模塊設(shè)計詳解
這篇文章主要介紹了Python企業(yè)編碼生成系統(tǒng)之主程序模塊設(shè)計,包括初始化、界面與邏輯實現(xiàn)相關(guān)操作技巧,需要的朋友可以參考下2019-07-07Python面試之os.system()和os.popen()的區(qū)別詳析
Python調(diào)用Shell,有兩種方法:os.system(cmd)或os.popen(cmd)腳本執(zhí)行過程中的輸出內(nèi)容,下面這篇文章主要給大家介紹了關(guān)于Python面試之os.system()和os.popen()區(qū)別的相關(guān)資料,需要的朋友可以參考下2022-06-06Tensorflow實現(xiàn)將標(biāo)簽變?yōu)閛ne-hot形式
這篇文章主要介紹了Tensorflow實現(xiàn)將標(biāo)簽變?yōu)閛ne-hot形式,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-05-05PyCharm Ctrl+Shift+F 失靈的簡單有效解決操作
這篇文章主要介紹了PyCharm Ctrl+Shift+F 失靈的簡單有效解決操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-01-01