基于python實現(xiàn)usb熱插拔檢測
1 概述
定義一個名為USBMonitor的類,用于在Linux系統(tǒng)中監(jiān)測USB設(shè)備的熱插拔事件。
該類利用了pyudev庫來監(jiān)聽內(nèi)核級設(shè)備事件,并且可以為USB設(shè)備的插入和移除事件注冊回調(diào)函數(shù)。
主要功能特性
- 基于pyudev庫監(jiān)聽內(nèi)核級設(shè)備事件:pyudev是一個用于與Linux系統(tǒng)中的udev(設(shè)備管理器)交互的庫,可以用來監(jiān)聽 設(shè)備事件。
- 支持USB設(shè)備插入/移除事件回調(diào):用戶可以通過傳入回調(diào)函數(shù)來處理USB設(shè)備的插入和移除事件。
- 自動過濾非USB設(shè)備事件:類內(nèi)部使用filter_by(subsystem='usb')方法,確保只監(jiān)聽USB子系統(tǒng)的事件。
- 支持設(shè)備詳細(xì)信息獲取:提供設(shè)備的詳細(xì)信息,包括廠商ID、產(chǎn)品ID、序列號和名稱等。
pyudev概述
pyudev 是 Linux 系統(tǒng)下與 udev 設(shè)備管理子系統(tǒng)交互的 Python 綁定庫。它提供對底層硬件設(shè)備的細(xì)粒度控制能力,支持:
- 設(shè)備枚舉:遍歷系統(tǒng)中的所有設(shè)備
- 屬性查詢:獲取設(shè)備詳細(xì)信息(廠商ID、產(chǎn)品ID等)
- 事件監(jiān)控:實時監(jiān)聽 設(shè)備插拔事件
- 規(guī)則管理:與 udev 規(guī)則系統(tǒng)集成(需配合系統(tǒng)配置)
類的主要方法:
- __init__:初始化監(jiān)測器,設(shè)置回調(diào)函數(shù)。
- _parse_device_info:解析設(shè)備信息,將設(shè)備對象轉(zhuǎn)換為包含廠商ID、產(chǎn)品ID等信息的字典。
- _event_handler:統(tǒng)一處理設(shè)備事件,根據(jù)事件類型調(diào)用相應(yīng)的回調(diào)函數(shù)。
- start_monitoring:啟動事件監(jiān)聽循環(huán),持續(xù)監(jiān)聽 設(shè)備事件并調(diào)用事件處理器。
- start:在單獨的線程中啟動監(jiān)測循環(huán),以確保主線程不受阻塞。
- stop:安全停止監(jiān)測,設(shè)置停止標(biāo)志并等待監(jiān)測線程結(jié)束。
環(huán)境說明
環(huán)境 | 版本 |
---|---|
python | V3.10 |
系統(tǒng) | ubuntu22.04 |
依賴 | pyudev |
依賴安裝
pip install pyudev
2 實現(xiàn)代碼
import pyudev import threading import time class USBMonitor: """ Linux系統(tǒng)USB設(shè)備熱插拔監(jiān)測 功能特性: - 基于pyudev庫監(jiān)聽內(nèi)核級設(shè)備事件 - 支持USB設(shè)備插入/移除事件回調(diào) - 自動過濾非USB設(shè)備事件 - 支持設(shè)備詳細(xì)信息獲?。◤S商ID、產(chǎn)品ID等) """ def __init__(self, on_device_added=None, on_device_removed=None): """ 初始化監(jiān)測器 :param on_device_added: 插入回調(diào)函數(shù) func(device_info_dict) :param on_device_removed: 移除回調(diào)函數(shù) func(device_info_dict) """ self.context = pyudev.Context() # 創(chuàng)建pyudev上下文,用于與udev交互 self.monitor = pyudev.Monitor.from_netlink(self.context) # 創(chuàng)建監(jiān)測器,監(jiān)聽內(nèi)核設(shè)備事件 self.monitor.filter_by(subsystem='usb') # 只監(jiān)聽USB子系統(tǒng)的事件 self.on_device_added = on_device_added # USB設(shè)備插入事件的回調(diào)函數(shù) self.on_device_removed = on_device_removed # USB設(shè)備移除事件的回調(diào)函數(shù) self.monitor_thread = None # 監(jiān)測線程 self.running = False # 標(biāo)志監(jiān)測是否正在運行 def _parse_device_info(self, device): """解析設(shè)備信息為字典 :param device: 設(shè)備對象 :return: 包含設(shè)備信息的字典 """ return { 'action': device.action, # 設(shè)備事件類型(add/remove) 'devnode': device.device_node, # 設(shè)備節(jié)點路徑 'vendor_id': device.get('ID_VENDOR_ID'), # 廠商ID 'product_id': device.get('ID_MODEL_ID'), # 產(chǎn)品ID 'serial': device.get('ID_SERIAL_SHORT'), # 設(shè)備序列號 'name': device.get('ID_MODEL') # 設(shè)備名稱 } def _event_handler(self, device): """統(tǒng)一事件處理器 根據(jù)設(shè)備事件類型調(diào)用相應(yīng)的回調(diào)函數(shù) :param device: 設(shè)備對象 """ info = self._parse_device_info(device) # 解析設(shè)備信息 if device.action == 'add': # 設(shè)備插入事件 if callable(self.on_device_added): # 檢查回調(diào)函數(shù)是否可調(diào)用 self.on_device_added(info) elif device.action == 'remove': # 設(shè)備移除事件 if callable(self.on_device_removed): # 檢查回調(diào)函數(shù)是否可調(diào)用 self.on_device_removed(info) def start_monitoring(self): """啟動事件監(jiān)聽循環(huán) 在循環(huán)中監(jiān)聽 設(shè)備事件并調(diào)用事件處理器 """ self.running = True # 設(shè)置運行標(biāo)志 print("[USB Monitor] 開始監(jiān)測USB設(shè)備...") for device in iter(self.monitor.poll, None): # 持續(xù)監(jiān)聽 設(shè)備事件 if not self.running: # 如果停止標(biāo)志被設(shè)置,退出循環(huán) break self._event_handler(device) # 調(diào)用事件處理器 def start(self): """啟動監(jiān)測線程 在單獨的線程中運行監(jiān)測循環(huán) """ if not self.monitor_thread or not self.monitor_thread.is_alive(): # 檢查線程是否已啟動 self.monitor_thread = threading.Thread(target=self.start_monitoring) # 創(chuàng)建新線程 self.monitor_thread.daemon = True # 設(shè)置為守護(hù)線程 self.monitor_thread.start() # 啟動線程 def stop(self): """安全停止監(jiān)測 停止監(jiān)測線程并等待其結(jié)束 """ self.running = False # 設(shè)置停止標(biāo)志 if self.monitor_thread and self.monitor_thread.is_alive(): # 檢查線程是否仍在運行 self.monitor_thread.join(timeout=2) # 等待線程結(jié)束,超時時間為2秒 # 使用示例 if __name__ == "__main__": def device_added_callback(device_info): """設(shè)備插入回調(diào)函數(shù) :param device_info: 包含設(shè)備信息的字典 """ print(f"[插入] 設(shè)備接入: {device_info['name']}") print(f" 廠商ID: {device_info['vendor_id']}") print(f" 產(chǎn)品ID: {device_info['product_id']}") print(f" 設(shè)備路徑: {device_info['devnode']}") def device_removed_callback(device_info): """設(shè)備移除回調(diào)函數(shù) :param device_info: 包含設(shè)備信息的字典[README](USB熱插拔監(jiān)測.assets/README.md) """ print(f"[移除] 設(shè)備斷開: {device_info['name']}") print(f" 設(shè)備路徑: {device_info['devnode']}") monitor = LinuxUSBMonitor( on_device_added=device_added_callback, # 設(shè)置插入回調(diào)函數(shù) on_device_removed=device_removed_callback # 設(shè)置移除回調(diào)函數(shù) ) try: monitor.start() # 啟動USB監(jiān)測 print("監(jiān)測運行中,按Ctrl+C停止...") while True: time.sleep(1) # 主線程保持運行 except KeyboardInterrupt: monitor.stop() # 停止USB監(jiān)測 print("\n監(jiān)測已停止")
3 演示效果
4.方法補充
下面我們來看看如何在Windows平臺實現(xiàn)USB設(shè)備實時監(jiān)控類
實現(xiàn)代碼
import wmi import threading import time import pythoncom # 用于處理COM庫的線程安全初始化 class USBMonitor: """ USB設(shè)備實時監(jiān)控類(Windows平臺專用) 功能特性: - 基于WMI實現(xiàn)設(shè)備熱插拔事件監(jiān)聽 - 支持設(shè)備插入/移除的雙向回調(diào)機制 - 使用獨立監(jiān)控線程避免阻塞主程序 - 自動處理COM庫線程初始化問題 """ def __init__(self, on_device_added=None, on_device_removed=None): """ 初始化監(jiān)控實例 :param on_device_added: 設(shè)備插入回調(diào)函數(shù),格式 func(device_obj) :param on_device_removed: 設(shè)備移除回調(diào)函數(shù),格式 func(device_obj) """ self.on_device_added = on_device_added self.on_device_removed = on_device_removed self.monitor_thread = threading.Thread(target=self.start_monitoring) self.monitor_thread.daemon = True # 設(shè)為守護(hù)線程確保主程序退出時自動終止 self.thread_running = False # 線程運行狀態(tài)標(biāo)志 def get_all_devices(self): """獲取當(dāng)前已連接的USB設(shè)備列表(快照)""" c = wmi.WMI(moniker="root/cimv2") return c.Win32_USBControllerDevice() # 返回Win32_USBControllerDevice對象集合 def device_added_callback(self, new_device): """內(nèi)部設(shè)備插入事件處理器""" if callable(self.on_device_added): # 安全檢查回調(diào)函數(shù) self.on_device_added(new_device) def device_removed_callback(self, removed_device): """內(nèi)部設(shè)備移除事件處理器""" if callable(self.on_device_removed): self.on_device_removed(removed_device) def start_monitoring(self): """監(jiān)控線程主循環(huán)(核心邏輯)""" pythoncom.CoInitialize() # 每個線程必須獨立初始化COM庫 try: c = wmi.WMI(moniker="root/cimv2") # 配置插入事件監(jiān)聽器(1秒輪詢間隔) arr_filter = c.Win32_PnPEntity.watch_for( notification_type="creation", delay_secs=1 ) # 配置移除事件監(jiān)聽器 rem_filter = c.Win32_PnPEntity.watch_for( notification_type="deletion", delay_secs=1 ) self.thread_running = True print("[監(jiān)控系統(tǒng)] USB設(shè)備監(jiān)控服務(wù)啟動") while self.thread_running: # 非阻塞式檢測插入事件(0.5秒超時) try: new_device = arr_filter(0.5) self.device_added_callback(new_device) except wmi.x_wmi_timed_out: pass # 正常超時,繼續(xù)循環(huán) # 非阻塞式檢測移除事件 try: removed_device = rem_filter(0.5) self.device_removed_callback(removed_device) except wmi.x_wmi_timed_out: pass except KeyboardInterrupt: print("\n[監(jiān)控系統(tǒng)] 用戶主動終止監(jiān)控") finally: pythoncom.CoUninitialize() # 必須釋放COM資源 print("[監(jiān)控系統(tǒng)] 監(jiān)控服務(wù)已安全關(guān)閉") def start(self): """啟動監(jiān)控服務(wù)""" if not self.monitor_thread.is_alive(): self.monitor_thread.start() def stop(self): """安全停止監(jiān)控服務(wù)""" self.thread_running = False if self.monitor_thread.is_alive(): self.monitor_thread.join(timeout=2) # 等待線程結(jié)束 # 示例回調(diào)實現(xiàn) def on_device_added(device): """設(shè)備插入事件處理示例""" print(f"[事件] 新設(shè)備接入: {device.Description}") def on_device_removed(device): """設(shè)備移除事件處理示例""" print(f"[事件] 設(shè)備移除: {device.Description}") # 使用示例 if __name__ == "__main__": # 初始化帶回調(diào)的監(jiān)控實例 monitor = USBMonitor(on_device_added, on_device_removed) try: monitor.start() print("主程序運行中,按Ctrl+C停止監(jiān)控...") while True: # 主線程保持活動 time.sleep(1) except KeyboardInterrupt: monitor.stop() print("程序正常退出")
演示效果
到此這篇關(guān)于基于python實現(xiàn)usb熱插拔檢測的文章就介紹到這了,更多相關(guān)python監(jiān)測usb熱插拔內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python標(biāo)準(zhǔn)庫內(nèi)置函數(shù)complex介紹
這篇文章主要介紹了Python標(biāo)準(zhǔn)庫內(nèi)置函數(shù)complex介紹,本文先是講解了complex的作用和使用注意,然后給出了使用示例,需要的朋友可以參考下2014-11-11利用Python監(jiān)控設(shè)備電池電量并發(fā)送通知
在日常使用電子設(shè)備時,及時了解電池電量狀態(tài)并進(jìn)行合理充電是非常重要的,本文將使用Python進(jìn)行設(shè)備電池電量的監(jiān)控并發(fā)送通知,有需要的可以了解下2025-03-03Python3.7 讀取音頻根據(jù)文件名生成腳本的代碼
這篇文章主要介紹了Python3.7 讀取音頻根據(jù)文件名生成字幕腳本的方法,本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-04-04在Python中處理字符串之isdecimal()方法的使用
這篇文章主要介紹了在Python中處理字符串之isdecimal()方法的使用,是Python入門學(xué)習(xí)的基礎(chǔ)知識,需要的朋友可以參考下2015-05-05