使用Python實(shí)現(xiàn)一個(gè)優(yōu)雅的異步定時(shí)器
需求背景
定時(shí)器的核心功能是能夠周期性地觸發(fā)回調(diào)函數(shù),同時(shí)需要支持啟動(dòng)、停止以及狀態(tài)檢查等操作。在多線程或異步編程場景中,希望定時(shí)器能夠:
- 支持異步操作,避免阻塞主線程;
- 單例化事件循環(huán),節(jié)省資源;
- 優(yōu)雅地管理定時(shí)器的生命周期;
- 提供簡單的接口,易于使用。
為此,設(shè)計(jì)了一個(gè) Timer 類,結(jié)合 asyncio 和 threading,實(shí)現(xiàn)了一個(gè)高效的定時(shí)器。
代碼
完整代碼
import asyncio
import threading
import time
import sys
class Timer:
_loop = None
_thread = None
_lock = threading.Lock()
_running_timers = 0
@classmethod
def _ensure_loop(cls):
with cls._lock:
if cls._loop is None or not cls._thread or not cls._thread.is_alive():
cls._loop = asyncio.new_event_loop()
cls._thread = threading.Thread(
target=cls._run_loop,
args=(cls._loop,),
daemon=True
)
cls._thread.start()
@classmethod
def _run_loop(cls, loop):
asyncio.set_event_loop(loop)
try:
loop.run_forever()
except Exception as e:
print(f"事件循環(huán)異常: {e}")
finally:
loop.close()
@classmethod
def _shutdown(cls):
with cls._lock:
if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running():
cls._loop.call_soon_threadsafe(cls._loop.stop)
# 不使用 join,因?yàn)槭刈o(hù)線程會(huì)在主線程退出時(shí)自動(dòng)結(jié)束
def __init__(self):
self.is_running = False
self._stop_event = asyncio.Event()
self._task = None
async def _timer_loop(self, interval, callback):
try:
while not self._stop_event.is_set():
await asyncio.sleep(interval)
if not self._stop_event.is_set():
await asyncio.get_event_loop().run_in_executor(None, callback)
except asyncio.CancelledError:
pass # 正常取消時(shí)忽略
except Exception as e:
print(f"定時(shí)器循環(huán)異常: {e}")
finally:
self.is_running = False
Timer._running_timers -= 1
Timer._shutdown()
def start(self, interval, callback):
if not self.is_running:
Timer._ensure_loop()
self.is_running = True
self._stop_event.clear()
self._task = asyncio.run_coroutine_threadsafe(
self._timer_loop(interval, callback),
Timer._loop
)
Timer._running_timers += 1
# print(f"定時(shí)器已啟動(dòng),每{interval}秒執(zhí)行一次")
def stop(self):
if self.is_running:
self._stop_event.set()
if self._task:
Timer._loop.call_soon_threadsafe(self._task.cancel)
self.is_running = False
# print("定時(shí)器已停止")
def is_active(self):
return self.is_running
# 使用示例
def callback1():
print(f"回調(diào)1觸發(fā): {time.strftime('%H:%M:%S')}")
def callback2():
print(f"回調(diào)2觸發(fā): {time.strftime('%H:%M:%S')}")
if __name__ == "__main__":
timer1 = Timer()
timer2 = Timer()
timer1.start(2, callback1)
timer2.start(3, callback2)
try:
time.sleep(100)
timer1.stop()
time.sleep(2)
timer2.stop()
except KeyboardInterrupt:
timer1.stop()
timer2.stop()
finally:
# 確保在程序退出時(shí)清理
Timer._shutdown()
1. 單例事件循環(huán)的實(shí)現(xiàn)
為了避免每個(gè)定時(shí)器都創(chuàng)建一個(gè)獨(dú)立的事件循環(huán),在 Timer 類中使用了類變量和類方法來管理全局唯一的事件循環(huán):
class Timer:
_loop = None
_thread = None
_lock = threading.Lock()
_running_timers = 0
@classmethod
def _ensure_loop(cls):
with cls._lock:
if cls._loop is None or not cls._thread or not cls._thread.is_alive():
cls._loop = asyncio.new_event_loop()
cls._thread = threading.Thread(
target=cls._run_loop,
args=(cls._loop,),
daemon=True
)
cls._thread.start()
_loop:存儲(chǔ)全局的asyncio事件循環(huán)。_thread:將事件循環(huán)運(yùn)行在一個(gè)獨(dú)立的守護(hù)線程中,避免阻塞主線程。_lock:線程鎖,確保在多線程環(huán)境中創(chuàng)建事件循環(huán)時(shí)的線程安全。_ensure_loop:在需要時(shí)創(chuàng)建或重用事件循環(huán),確保只有一個(gè)全局循環(huán)。
守護(hù)線程(daemon=True)的設(shè)計(jì)使得程序退出時(shí)無需顯式關(guān)閉線程,簡化了資源清理。
2. 事件循環(huán)的運(yùn)行與關(guān)閉
事件循環(huán)的運(yùn)行邏輯封裝在 _run_loop 中:
@classmethod
def _run_loop(cls, loop):
asyncio.set_event_loop(loop)
try:
loop.run_forever()
except Exception as e:
print(f"事件循環(huán)異常: {e}")
finally:
loop.close()
run_forever:讓事件循環(huán)持續(xù)運(yùn)行,直到被外部停止。- 異常處理:捕獲可能的錯(cuò)誤并打印,便于調(diào)試。
finally:確保循環(huán)關(guān)閉時(shí)資源被正確釋放。
關(guān)閉邏輯則由 _shutdown 方法控制:
@classmethod
def _shutdown(cls):
with cls._lock:
if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running():
cls._loop.call_soon_threadsafe(cls._loop.stop)
當(dāng)所有定時(shí)器都停止時(shí)(_running_timers == 0),事件循環(huán)會(huì)被安全停止。
3. 定時(shí)器核心邏輯
每個(gè) Timer 實(shí)例負(fù)責(zé)管理一個(gè)獨(dú)立的定時(shí)任務(wù):
def __init__(self):
self.is_running = False
self._stop_event = asyncio.Event()
self._task = None
async def _timer_loop(self, interval, callback):
try:
while not self._stop_event.is_set():
await asyncio.sleep(interval)
if not self._stop_event.is_set():
await asyncio.get_event_loop().run_in_executor(None, callback)
except asyncio.CancelledError:
pass # 正常取消時(shí)忽略
finally:
self.is_running = False
Timer._running_timers -= 1
Timer._shutdown()
_stop_event:一個(gè)asyncio.Event對象,用于控制定時(shí)器的停止。_timer_loop:異步協(xié)程,每隔interval秒執(zhí)行一次回調(diào)函數(shù)callback。run_in_executor:將回調(diào)函數(shù)運(yùn)行在默認(rèn)的線程池中,避免阻塞事件循環(huán)。
4. 啟動(dòng)與停止
啟動(dòng)和停止方法是用戶的主要接口:
def start(self, interval, callback):
if not self.is_running:
Timer._ensure_loop()
self.is_running = True
self._stop_event.clear()
self._task = asyncio.run_coroutine_threadsafe(
self._timer_loop(interval, callback),
Timer._loop
)
Timer._running_timers += 1
def stop(self):
if self.is_running:
self._stop_event.set()
if self._task:
Timer._loop.call_soon_threadsafe(self._task.cancel)
self.is_running = False
start:啟動(dòng)定時(shí)器,確保事件循環(huán)可用,并記錄運(yùn)行中的定時(shí)器數(shù)量。stop:通過設(shè)置_stop_event并取消任務(wù)來停止定時(shí)器。
5. 使用示例
以下是一個(gè)簡單的使用示例:
def callback1():
print(f"回調(diào)1觸發(fā): {time.strftime('%H:%M:%S')}")
def callback2():
print(f"回調(diào)2觸發(fā): {time.strftime('%H:%M:%S')}")
timer1 = Timer()
timer2 = Timer()
timer1.start(2, callback1) # 每2秒觸發(fā)一次
timer2.start(3, callback2) # 每3秒觸發(fā)一次
time.sleep(10) # 運(yùn)行10秒
timer1.stop()
timer2.stop()
輸出可能如下:
回調(diào)1觸發(fā): 14:30:02 回調(diào)2觸發(fā): 14:30:03 回調(diào)1觸發(fā): 14:30:04 回調(diào)1觸發(fā): 14:30:06 回調(diào)2觸發(fā): 14:30:06 ...
設(shè)計(jì)亮點(diǎn)
- 異步與多線程結(jié)合:通過
asyncio和threading,實(shí)現(xiàn)了非阻塞的定時(shí)器,適合高并發(fā)場景。 - 資源高效利用:全局唯一的事件循環(huán)避免了重復(fù)創(chuàng)建的開銷。
- 優(yōu)雅的生命周期管理:守護(hù)線程和自動(dòng)關(guān)閉機(jī)制簡化了資源清理。
- 線程安全:使用鎖機(jī)制確保多線程環(huán)境下的穩(wěn)定性。
適用場景
- 周期性任務(wù)調(diào)度,如數(shù)據(jù)刷新、狀態(tài)檢查。
- 后臺(tái)服務(wù)中的定時(shí)監(jiān)控。
- 游戲或?qū)崟r(shí)應(yīng)用中的計(jì)時(shí)器需求。
總結(jié)
這個(gè)異步定時(shí)器實(shí)現(xiàn)結(jié)合了 Python 的異步編程和多線程特性,提供了一個(gè)輕量、靈活的解決方案。無論是簡單的腳本還是復(fù)雜的后臺(tái)服務(wù),它都能勝任。如果你需要一個(gè)可靠的定時(shí)器,不妨試試這個(gè)實(shí)現(xiàn),或者根據(jù)需求進(jìn)一步優(yōu)化它!
以上就是使用Python實(shí)現(xiàn)一個(gè)優(yōu)雅的異步定時(shí)器的詳細(xì)內(nèi)容,更多關(guān)于Python異步定時(shí)器的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
在linux下實(shí)現(xiàn) python 監(jiān)控usb設(shè)備信號(hào)
今天小編就為大家分享一篇在linux下實(shí)現(xiàn) python 監(jiān)控usb設(shè)備信號(hào),具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-07-07
關(guān)于torch.flatten()函數(shù)及x=x.view()函數(shù)的理解
這篇文章主要介紹了關(guān)于torch.flatten()函數(shù)及x=x.view()函數(shù)的理解,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2025-04-04
Python pyecharts Line折線圖的具體實(shí)現(xiàn)
折線圖在很多圖標(biāo)中都有使用,本文主要介紹了Python pyecharts Line折線圖的具體實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-05-05
Python報(bào)錯(cuò)TypeError: ‘dict‘ object is not&
在Python開發(fā)的旅程中,報(bào)錯(cuò)信息就像是一個(gè)個(gè)路障,阻礙著我們前進(jìn)的步伐,而“TypeError: ‘dict’ object is not iterable”這個(gè)報(bào)錯(cuò),常常讓開發(fā)者們陷入困惑,那么,這個(gè)報(bào)錯(cuò)究竟是怎么產(chǎn)生的呢?又該如何有效地解決它呢?讓我們一起深入探討,找到解決問題的方法2024-10-10
TensorFlow安裝并在Pycharm搭建環(huán)境的詳細(xì)圖文教程
今天動(dòng)手開始搭建TensorFlow開發(fā)環(huán)境,所以下面這篇文章主要給大家介紹了關(guān)于TensorFlow安裝并在Pycharm搭建環(huán)境的詳細(xì)圖文教程,文中通過圖文介紹的非常詳細(xì),需要的朋友可以參考下2022-11-11

