使用Python實現(xiàn)tail的示例代碼
前記
tail是一個常用的Linux命令, 它可以打印文件的后面n行數(shù)據(jù), 也能實時輸出文件的追加數(shù)據(jù). tail的實現(xiàn)很簡單,但是要實現(xiàn)一個完善的tail卻需要考慮很多細(xì)節(jié),如果要注重性能,則需要引入一些其他的機(jī)制. 一開始只是為了單純的實現(xiàn)Linux的tail
的基本功能,后面隨著需要對日志文件的高性能讀取則需要的Linux的inotify
機(jī)制去完善. 相關(guān)源碼見:https://github.com/so1n/example/tree/master/example_python/example_python/tail
1.第一版--從文件尾部讀取實時數(shù)據(jù)
主要思路是: 打開文件, 把指針移動到文件最后, 然后有數(shù)據(jù)則輸出數(shù)據(jù), 無數(shù)據(jù)則休眠一段時間.
import time import sys from typing import Callable, NoReturn class Tail(object): def __init__( self, file_name: str, output: Callable[[str], NoReturn] = sys.stdout.write, interval: int = 1 ): self.file_name: str = file_name self.output: Callable[[str], NoReturn] = output self.interval: int = interval def __call__(self): with open(self.file_name) as f: f.seek(0, 2) # 從文件結(jié)尾處開始seek while True: line: str = f.readline() if line: self.output(line) # 使用print都會每次都打印新的一行 else: time.sleep(self.interval) if __name__ == '__main__': filename: str = sys.argv[0] Tail(filename)()
之后只要做如下調(diào)用即可:
python xxx.py filename
2.第二版--實現(xiàn)tail -f
tail -f
默認(rèn)先讀取最后10行數(shù)據(jù),再從文件尾部讀取實時數(shù)據(jù).如果對于小文件,可以先讀取所有文件內(nèi)容,并輸出最后10行, 但是讀取全文再獲取最后10行的性能不高, 而從后滾10行的邊界條件也很復(fù)雜, 先看先讀取全文再獲取最后10行的實現(xiàn):
import time import sys from typing import Callable, NoReturn class Tail(object): def __init__( self, file_name: str, output: Callable[[str], NoReturn] = sys.stdout.write, interval: int = 1 ): self.file_name: str = file_name self.output: Callable[[str], NoReturn] = output self.interval: int = interval def __call__(self): with open(self.file_name) as f: self.read_last_line(f) while True: line: str = f.readline() if line: self.output(line) # 使用print都會每次都打印新的一行 else: time.sleep(self.interval) def read_last_line(self, f): last_lines = f.readlines()[-10:] for line in last_lines: self.output(line) if __name__ == '__main__': filename: str = sys.argv[0] Tail(filename)()
可以看到實現(xiàn)很簡單, 相比第一版只多了個read_last_line的函數(shù)
, 接下來就要解決性能的問題了, 當(dāng)文件很大的時候, 這個邏輯是不行的, 特別是有些日志文件經(jīng)常有幾個G大, 如果全讀出來內(nèi)存就爆了. 而在Linux系統(tǒng)中, 沒有一個接口可以指定指針跳到倒數(shù)10行, 只能使用如下方法來模擬輸出倒數(shù)10行:
- 首先游標(biāo)跳轉(zhuǎn)到最新的字符, 保存當(dāng)前游標(biāo), 然后預(yù)估一行數(shù)據(jù)的字符長度, 最好偏多, 這里我按1024字符長度為一行來處理
- 然后利用seek的方法,跳轉(zhuǎn)到seek(-1024 * 10, 2)的字符, 這就是我們預(yù)估的倒數(shù)10行內(nèi)的內(nèi)容
- 接著對內(nèi)容進(jìn)行判斷, 如果跳轉(zhuǎn)的字符長度小于 10 * 1024, 則證明整個文件沒有10行, 則采用原來的
read_last_line
方法. - 如果跳轉(zhuǎn)到字符長度等于1024 * 10, 則利用換行符計算已取字符長度共有多少行,如果行數(shù)大于10,那只輸出最后10行,如果只讀了4行,則繼續(xù)讀6*1024,直到讀滿10行為止
通過以上步奏, 就把倒數(shù)10行的數(shù)據(jù)計算好了可以打印出來, 可以進(jìn)入追加數(shù)據(jù)了, 但是這時候文件內(nèi)容可能發(fā)生改變了, 我們的游標(biāo)也發(fā)生改變了, 這時候要把游標(biāo)跳回到剛才保存的游標(biāo),防止漏打或者重復(fù)打印數(shù)據(jù).
分析完畢后, 就可以開始重構(gòu)read_last_line
函數(shù)了.
import time import sys from typing import Callable, List, NoReturn class Tail(object): def __init__( self, file_name: str, output: Callable[[str], NoReturn] = sys.stdout.write, interval: int = 1, len_line: int = 1024 ): self.file_name: str = file_name self.output: Callable[[str], NoReturn] = output self.interval: int = interval self.len_line: int = len_line def __call__(self, n: int = 10): with open(self.file_name) as f: self.read_last_line(f, n) while True: line: str = f.readline() if line: self.output(line) # 使用print都會每次都打印新的一行 else: time.sleep(self.interval) def read_last_line(self, file, n): read_len: int = self.len_line * n # 跳轉(zhuǎn)游標(biāo)到最后 file.seek(0, 2) # 獲取當(dāng)前結(jié)尾的游標(biāo)位置 now_tell: int = file.tell() while True: if read_len > file.tell(): # 如果跳轉(zhuǎn)的字符長度大于原來文件長度,那就把所有文件內(nèi)容打印出來 file.seek(0) # 由于read方法是按照游標(biāo)進(jìn)行打印, 所以要重置游標(biāo) last_line_list: List[str] = file.read().split('\n')[-n:] # 重新獲取游標(biāo)位置 now_tell: int = file.tell() break # 跳轉(zhuǎn)到我們預(yù)估的字符位置 file.seek(-read_len, 2) read_str: str = file.read(read_len) cnt: int = read_str.count('\n') if cnt >= n: # 如果獲取的行數(shù)大于要求的行數(shù),則獲取前n行的行數(shù) last_line_list: List[str] = read_str.split('\n')[-n:] break else: # 如果獲取的行數(shù)小于要求的行數(shù),則預(yù)估需要獲取的行數(shù),繼續(xù)獲取 if cnt == 0: line_per: int = read_len else: line_per: int = int(read_len / cnt) read_len = line_per * n for line in last_line_list: self.output(line + '\n') # 重置游標(biāo),確保接下來打印的數(shù)據(jù)不重復(fù) file.seek(now_tell) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument("-f", "--filename") parser.add_argument("-n", "--num", default=10) args, unknown = parser.parse_known_args() if not args.filename: raise RuntimeError('filename args error') Tail(args.filename)(int(args.num))
3.第三版--優(yōu)雅的讀取輸出日志文件
可以發(fā)現(xiàn)實時讀取那塊的邏輯性能還是很差, 如果每秒讀一次文件,實時性就太慢了,把間隔改小了,則處理器占用太多. 性能最好的情況是如果能得知文件更新再進(jìn)行打印文件, 那性能就能得到保障了.慶幸的是,在Linux中inotify
提供了這樣的功能. 此外,日志文件有一個特點就是會進(jìn)行l(wèi)ogrotate,如果日志被logrotate了,那我們就需要重新打開文件,并進(jìn)一步讀取數(shù)據(jù), 這種情況也可以利用到inotify
, 當(dāng)inotify
獲取到文件重新打開的事件時,我們就重新打開文件,再讀取.
import os import sys from typing import Callable, List, NoReturn import pyinotify multi_event = pyinotify.IN_MODIFY | pyinotify.IN_MOVE_SELF # 監(jiān)控多個事件 class InotifyEventHandler(pyinotify.ProcessEvent): # 定制化事件處理類,注意繼承 """ 執(zhí)行inotify event的封裝 """ f: 'open()' filename: str path: str wm: 'pyinotify.WatchManager' output: Callable def my_init(self, **kargs): """pyinotify.ProcessEvent要求不能直接繼承__init__, 而是要重寫my_init, 我們重寫這一段并進(jìn)行初始化""" # 獲取文件 filename: str = kargs.pop('filename') if not os.path.exists(filename): raise RuntimeError('Not Found filename') if '/' not in filename: filename = os.getcwd() + '/' + filename index = filename.rfind('/') if index == len(filename) - 1 or index == -1: raise RuntimeError('Not a legal path') self.f = None self.filename = filename self.output: Callable = kargs.pop('output') self.wm = kargs.pop('wm') # 只監(jiān)控路徑,這樣就能知道文件是否移動 self.path = filename[:index] self.wm.add_watch(self.path, multi_event) def read_line(self): """統(tǒng)一的輸出方法""" for line in self.f.readlines(): self.output(line) def process_IN_MODIFY(self, event): """必須為process_事件名稱,event表示事件對象, 這里表示監(jiān)控到文件發(fā)生變化, 進(jìn)行文件讀取""" if event.pathname == self.filename: self.read_line() def process_IN_MOVE_SELF(self, event): """必須為process_事件名稱,event表示事件對象, 這里表示監(jiān)控到文件發(fā)生重新打開, 進(jìn)行文件讀取""" if event.pathname == self.filename: # 檢測到文件被移動重新打開文件 self.f.close() self.f = open(self.filename) self.read_line() def __enter__(self) -> 'InotifyEventHandler': self.f = open(self.filename) return self def __exit__(self, exc_type, exc_val, exc_tb): self.f.close() class Tail(object): def __init__( self, file_name: str, output: Callable[[str], NoReturn] = sys.stdout.write, interval: int = 1, len_line: int = 1024 ): self.file_name: str = file_name self.output: Callable[[str], NoReturn] = output self.interval: int = interval self.len_line: int = len_line wm = pyinotify.WatchManager() # 創(chuàng)建WatchManager對象 inotify_event_handler = InotifyEventHandler( **dict(filename=file_name, wm=wm, output=output) ) # 實例化我們定制化后的事件處理類, 采用**dict傳參數(shù) wm.add_watch('/tmp', multi_event) # 添加監(jiān)控的目錄,及事件 self.notifier = pyinotify.Notifier(wm, inotify_event_handler) # 在notifier實例化時傳入,notifier會自動執(zhí)行 self.inotify_event_handle: 'InotifyEventHandler' = inotify_event_handler def __call__(self, n: int = 10): """通過inotify的with管理打開文件""" with self.inotify_event_handle as i: # 先讀取指定的行數(shù) self.read_last_line(i.f, n) # 啟用inotify的監(jiān)聽 self.notifier.loop() def read_last_line(self, file, n): read_len: int = self.len_line * n # 獲取當(dāng)前結(jié)尾的游標(biāo)位置 file.seek(0, 2) now_tell: int = file.tell() while True: if read_len > file.tell(): # 如果跳轉(zhuǎn)的字符長度大于原來文件長度,那就把所有文件內(nèi)容打印出來 file.seek(0) last_line_list: List[str] = file.read().split('\n')[-n:] # 重新獲取游標(biāo)位置 now_tell: int = file.tell() break file.seek(-read_len, 2) read_str: str = file.read(read_len) cnt: int = read_str.count('\n') if cnt >= n: # 如果獲取的行數(shù)大于要求的行數(shù),則獲取前n行的行數(shù) last_line_list: List[str] = read_str.split('\n')[-n:] break else: # 如果獲取的行數(shù)小于要求的行數(shù),則預(yù)估需要獲取的行數(shù),繼續(xù)獲取 if cnt == 0: line_per: int = read_len else: line_per: int = int(read_len / cnt) read_len = line_per * n for line in last_line_list: self.output(line + '\n') # 重置游標(biāo),確保接下來打印的數(shù)據(jù)不重復(fù) file.seek(now_tell) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument("-f", "--filename") parser.add_argument("-n", "--num", default=10) args, unknown = parser.parse_known_args() if not args.filename: raise RuntimeError('filename args error') Tail(args.filename)(int(args.num))
可以看到, 從原本的open打開文件改為用inotify打開文件(這時候會調(diào)用my_init方法進(jìn)行初始化), 打開后還是運行我們打開原來n行的代碼, 然后就交給inotify運行. 在inotify運行之前, 我們把重新打開文件方法和打印文件方法都掛載在inotifiy對應(yīng)的事件里, 之后inotify運行時, 會根據(jù)對應(yīng)的事件執(zhí)行對應(yīng)的方法.
到此這篇關(guān)于使用Python實現(xiàn)tail的示例代碼的文章就介紹到這了,更多相關(guān)Python tail內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解numpy.ndarray.reshape()函數(shù)的參數(shù)問題
這篇文章主要介紹了詳解numpy.ndarray.reshape()函數(shù)的參數(shù)問題,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10Python讀取sqlite數(shù)據(jù)庫文件的方法分析
這篇文章主要介紹了Python讀取sqlite數(shù)據(jù)庫文件的方法,結(jié)合實例形式分析了Python引入sqlite3模塊操作sqlite數(shù)據(jù)庫的讀取、SQL命令執(zhí)行等相關(guān)操作技巧,需要的朋友可以參考下2017-08-08Python?clip與range函數(shù)保姆級使用教程
本文主要和大家介紹了詳解Python中clip與range函數(shù)的用法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參,希望能幫助到大家2022-06-06