python實現(xiàn)簡易內(nèi)存監(jiān)控
本例主要功能:每隔3秒獲取系統(tǒng)內(nèi)存,當內(nèi)存超過設定的警報值時,獲取所有進程占用內(nèi)存并發(fā)出警報聲。內(nèi)存值和所有進程占用內(nèi)存記入log,log文件按天命名。
1 獲取cpu、內(nèi)存、進程信息
利用WMI
簡單說明下,WMI的全稱是Windows Management Instrumentation,即Windows管理規(guī)范。它是Windows操作系統(tǒng)上管理數(shù)據(jù)和操作的基礎設施。我們可以使用WMI腳本或者應用自動化管理任務等。
安裝模塊
WMI下載地址
win32com下載地址:
學會使用WMI
不錯的教程
獲取cpu、內(nèi)存、磁盤
def getSysInfo(wmiService = None):
result = {}
if wmiService == None:
wmiService = wmi.WMI()
# cpu
for cpu in wmiService.Win32_Processor():
timestamp = time.strftime('%a, %d %b %Y %H:%M:%S', time.localtime())
result['cpuPercent'] = cpu.loadPercentage
# memory
cs = wmiService.Win32_ComputerSystem()
os = wmiService.Win32_OperatingSystem()
result['memTotal'] = int(int(cs[0].TotalPhysicalMemory)/1024/1024)
result['memFree'] = int(int(os[0].FreePhysicalMemory)/1024)
result['memPercent']=result['memFree'] * 100 /result['memTotal']
#disk
result['diskTotal'] = 0
result['diskFree'] = 0
for disk in wmiService.Win32_LogicalDisk(DriveType=3):
result['diskTotal'] += int(disk.Size)
result['diskFree'] += int(disk.FreeSpace)
result['diskTotal'] = int(result['diskTotal']/1024/1024)
result['diskFree'] = int(result['diskFree']/1024/1024)
return result
獲取所有進程占用內(nèi)存
def getAllProcessInfo(mywmi = None):
"""取出全部進程的進程名,進程ID,進程實際內(nèi)存, 虛擬內(nèi)存,CPU使用率
"""
allProcessList = []
allProcess = mywmi.ExecQuery("SELECT * FROM Win32_PerfFormattedData_PerfProc_Process")
#print (allProcess.count)
for j in allProcess:
#print j.Properties_("PercentPrivilegedTime").__int__()
##print j.Properties_("name").__str__()+" "+j.Properties_("IDProcess").__str__()+" "+j.Properties_("PercentPrivilegedTime").__str__()
#for pro in j.Properties_:
# print (pro.name)
#break
name = j.Properties_("name").__str__()
if name != "_Total" and name !="Idle":
pid = j.Properties_("IDProcess").__str__()
PercentPrivilegedTime = j.Properties_("PercentPrivilegedTime").__int__()
WorkingSetPrivate = j.Properties_("WorkingSetPrivate").__int__()/1024
WorkingSet = j.Properties_("WorkingSet").__int__()/1024
allProcessList.append([name, pid, WorkingSetPrivate, WorkingSet, PercentPrivilegedTime])
return allProcessList
也可以用psutil
import psutil,time from operator import itemgetter, attrgetter def getProcessInfo(p): """取出指定進程占用的進程名,進程ID,進程實際內(nèi)存, 虛擬內(nèi)存,CPU使用率 """ try: cpu = int(p.cpu_percent(interval=0)) memory = p.memory_info() rss = memory.rss/1024 vms = memory.vms/1024 name = p.name() pid = p.pid except psutil.Error: name = "Closed_Process" pid = 0 rss = 0 vms = 0 cpu = 0 #return [name.upper(), pid, rss, vms] return [name, pid, vms, rss, cpu] def getAllProcessInfo(): """取出全部進程的進程名,進程ID,進程實際內(nèi)存, 虛擬內(nèi)存,CPU使用率 """ instances = [] all_processes = list(psutil.process_iter()) for proc in all_processes: proc.cpu_percent(interval=0) #此處sleep1秒是取正確取出CPU使用率的重點 time.sleep(1) for proc in all_processes: instances.append(getProcessInfo(proc)) return instances if __name__ == '__main__': processInfoList = getAllProcessInfo() processInfoList.sort(key=itemgetter(2), reverse=True) for p in processInfoList: print(p)
2. 保存log
配置config文件
[loggers]
keys=example01
[logger_example01]
handlers=hand04
[handlers]
keys=hand04
[handler_hand04]
class=handlers.TimedRotatingFileHandler
level=DEBUG
formatter=form01
args=('./logs/monitor.log', 'd', 1, 7)
[formatters]
keys=form01,form02
[formatter_form01]
format=%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s
datefmt=%Y-%m-%d %H:%M:%S
記錄log
import logging
import logging.config
logger.info("message")
logger.warning("message")
logger.error("message")
3. 完整代碼
文件夾結(jié)構(gòu):
maintain
–monitor
—-logs
—-logger.conf
—-monitor.py
–packages
—-init.py
—-processinfo.py
—-sysinfo.py
monitor
import wmi
import time
import winsound
import logging
import logging.config
from operator import itemgetter, attrgetter
from os import path
import packages.sysinfo #使用wmi
#import packages.ProcessInfo #使用
#def ShowProcessInfo():
# processInfoList = packages.ProcessInfo.getAllProcessInfo()
# processInfoList.sort(key=itemgetter(2), reverse=True)
# for p in processInfoList:
# logger.info(p)
def ShowProcessInfo(wmiService = None):
processInfoList = packages.sysinfo.getAllProcessInfo(wmiService)
processInfoList.sort(key=itemgetter(2), reverse=True)
for p in processInfoList:
logger.info(p)
if __name__ == '__main__':
MemPerWorningLine = 50
MemPerErrorLine = 20
ErrorAlertCount = 10
ProcessInfoCount = 10
counterProcessInfo = ProcessInfoCount
print("Memory monitor start!")
log_file_path = path.join(path.dirname(path.abspath(__file__)), 'logger.conf')
#print(log_file_path)
logging.config.fileConfig(log_file_path)
logger = logging.getLogger("example01")
wmiService = wmi.WMI()
while True:
memPercent = int(packages.sysinfo.getSysInfo(wmiService)['memPercent'])
strMemPercent = 'FreeMemory: ' + str(memPercent) + '%'
if(memPercent < MemPerErrorLine):
logger.error(strMemPercent)
#ProcessInfoList
counterProcessInfo+=1
if(counterProcessInfo >= ProcessInfoCount):
ShowProcessInfo(wmiService)
counterProcessInfo = 0
#ALert
counter = 1
while counter <= ErrorAlertCount:
winsound.Beep(2080, 100)
time.sleep(0.1)
counter += 1
elif(memPercent < MemPerWorningLine):
logger.warning(strMemPercent)
#ProcessInfoList
counterProcessInfo+=1
if(counterProcessInfo >= ProcessInfoCount):
ShowProcessInfo(wmiService)
counterProcessInfo = 0
#ALert
winsound.Beep(2015, 2000)
else:
logger.info(strMemPercent)
time.sleep(3)
sysinfo
# -*- coding: utf-8 -*-
import wmi
import os
import sys
import platform
import time
import win32api
import win32com
from win32com.client import GetObject
from operator import itemgetter, attrgetter
def getSysInfo(wmiService = None):
result = {}
if wmiService == None:
wmiService = wmi.WMI()
# cpu
for cpu in wmiService.Win32_Processor():
timestamp = time.strftime('%a, %d %b %Y %H:%M:%S', time.localtime())
result['cpuPercent'] = cpu.loadPercentage
# memory
cs = wmiService.Win32_ComputerSystem()
os = wmiService.Win32_OperatingSystem()
result['memTotal'] = int(int(cs[0].TotalPhysicalMemory)/1024/1024)
result['memFree'] = int(int(os[0].FreePhysicalMemory)/1024)
result['memPercent']=result['memFree'] * 100 /result['memTotal']
#disk
result['diskTotal'] = 0
result['diskFree'] = 0
for disk in wmiService.Win32_LogicalDisk(DriveType=3):
result['diskTotal'] += int(disk.Size)
result['diskFree'] += int(disk.FreeSpace)
result['diskTotal'] = int(result['diskTotal']/1024/1024)
result['diskFree'] = int(result['diskFree']/1024/1024)
return result
def sys_version():
c = wmi.WMI ()
#獲取操作系統(tǒng)版本
for sys in c.Win32_OperatingSystem():
print ("Version:%s" % sys.Caption.encode("UTF8"),"Vernum:%s" % sys.BuildNumber)
print (sys.OSArchitecture.encode("UTF8"))#系統(tǒng)是32位還是64位的
print (sys.NumberOfProcesses) #當前系統(tǒng)運行的進程總數(shù)
def cpu_mem():
c = wmi.WMI ()
#CPU類型和內(nèi)存
for processor in c.Win32_Processor():
#print "Processor ID: %s" % processor.DeviceID
print ("Process Name: %s" % processor.Name.strip() )
for Memory in c.Win32_PhysicalMemory():
print ("Memory Capacity: %.fMB" %(int(Memory.Capacity)/1048576))
def cpu_use():
#5s取一次CPU的使用率
c = wmi.WMI()
while True:
for cpu in c.Win32_Processor():
timestamp = time.strftime('%a, %d %b %Y %H:%M:%S', time.localtime())
print ('%s | Utilization: %s: %d %%' % (timestamp, cpu.DeviceID, cpu.LoadPercentage))
time.sleep(5)
def disk():
c = wmi.WMI ()
#獲取硬盤分區(qū)
for physical_disk in c.Win32_DiskDrive ():
for partition in physical_disk.associators ("Win32_DiskDriveToDiskPartition"):
for logical_disk in partition.associators ("Win32_LogicalDiskToPartition"):
print (physical_disk.Caption.encode("UTF8"), partition.Caption.encode("UTF8"), logical_disk.Caption)
#獲取硬盤使用百分情況
for disk in c.Win32_LogicalDisk (DriveType=3):
print (disk.Caption, "%0.2f%% free" % (100.0 * long (disk.FreeSpace) / long (disk.Size)))
def network():
c = wmi.WMI ()
#獲取MAC和IP地址
for interface in c.Win32_NetworkAdapterConfiguration (IPEnabled=1):
print ("MAC: %s" % interface.MACAddress )
for ip_address in interface.IPAddress:
print ("ip_add: %s" % ip_address )
print
#獲取自啟動程序的位置
for s in c.Win32_StartupCommand ():
print ("[%s] %s <%s>" % (s.Location.encode("UTF8"), s.Caption.encode("UTF8"), s.Command.encode("UTF8")))
#獲取當前運行的進程
for process in c.Win32_Process ():
print (process.ProcessId, process.Name)
def getAllProcessInfo(mywmi = None):
"""取出全部進程的進程名,進程ID,內(nèi)存(專有工作集), 工作集
"""
allProcessList = []
allProcess = mywmi.ExecQuery("SELECT * FROM Win32_PerfFormattedData_PerfProc_Process")
#print (allProcess.count)
for j in allProcess:
#print j.Properties_("PercentPrivilegedTime").__int__()
##print j.Properties_("name").__str__()+" "+j.Properties_("IDProcess").__str__()+" "+j.Properties_("PercentPrivilegedTime").__str__()
#for pro in j.Properties_:
# print (pro.name)
#break
name = j.Properties_("name").__str__()
if name != "_Total" and name !="Idle":
pid = j.Properties_("IDProcess").__str__()
PercentPrivilegedTime = j.Properties_("PercentPrivilegedTime").__int__()
WorkingSetPrivate = j.Properties_("WorkingSetPrivate").__int__()/1024
WorkingSet = j.Properties_("WorkingSet").__int__()/1024
allProcessList.append([name, pid, WorkingSetPrivate, WorkingSet, PercentPrivilegedTime])
# allProcess = mywmi.ExecQuery("select * from Win32_Process")
# for i in allProcess:
# Name = str(i.Properties_("Name"))
# ProcessID = int(i.Properties_("ProcessID"))
# WorkingSetSize = int(i.Properties_("WorkingSetSize"))/1024
# #VirtualSize = int(i.Properties_("VirtualSize"))/1024
# PeakWorkingSetSize = int(i.Properties_("PeakWorkingSetSize"))/1024
# CreationDate = str(i.Properties_("CreationDate"))
# allProcessList.append([Name, ProcessID, WorkingSetSize, PeakWorkingSetSize, CreationDate])
return allProcessList
#def main():
#sys_version()
#cpu_mem()
#disk()
#network()
#cpu_use()
if __name__ == '__main__':
#mywmi = GetObject("winmgmts:")
mywmi = wmi.WMI()
processInfoList = getAllProcessInfo(mywmi)
processInfoList.sort(key=itemgetter(2), reverse=True)
for processinfo in processInfoList:
print(processinfo)
processinfo
import psutil,time from operator import itemgetter, attrgetter def getProcessInfo(p): """取出指定進程占用的進程名,進程ID,進程實際內(nèi)存, 虛擬內(nèi)存,CPU使用率 """ try: cpu = int(p.cpu_percent(interval=0)) memory = p.memory_info() rss = memory.rss/1024 vms = memory.vms/1024 name = p.name() pid = p.pid except psutil.Error: name = "Closed_Process" pid = 0 rss = 0 vms = 0 cpu = 0 #return [name.upper(), pid, rss, vms] return [name, pid, vms, rss, cpu] def getAllProcessInfo(): """取出全部進程的進程名,進程ID,進程實際內(nèi)存, 虛擬內(nèi)存,CPU使用率 """ instances = [] all_processes = list(psutil.process_iter()) for proc in all_processes: proc.cpu_percent(interval=0) #此處sleep1秒是取正確取出CPU使用率的重點 time.sleep(1) for proc in all_processes: instances.append(getProcessInfo(proc)) return instances if __name__ == '__main__': processInfoList = getAllProcessInfo() processInfoList.sort(key=itemgetter(2), reverse=True) for p in processInfoList: print(p)
logger
#logger.conf
###############################################
[loggers]
keys=root,example01,example02
[logger_root]
level=DEBUG
handlers=hand01,hand02
[logger_example01]
handlers=hand01,hand04
qualname=example01
propagate=0
[logger_example02]
handlers=hand01,hand03
qualname=example02
propagate=0
###############################################
[handlers]
keys=hand01,hand02,hand03,hand04
[handler_hand01]
class=StreamHandler
level=INFO
formatter=form02
args=(sys.stderr,)
[handler_hand02]
class=FileHandler
level=DEBUG
formatter=form01
args=('myapp.log', 'a')
[handler_hand03]
class=handlers.RotatingFileHandler
level=INFO
formatter=form02
args=('myapp.log', 'a', 10*1024*1024, 5)
[handler_hand04]
class=handlers.TimedRotatingFileHandler
level=DEBUG
formatter=form01
args=('./logs/monitor.log', 'd', 1, 7)
###############################################
[formatters]
keys=form01,form02
[formatter_form01]
format=%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s
datefmt=%Y-%m-%d %H:%M:%S
[formatter_form02]
format=%(asctime)-12s: %(levelname)-8s %(message)s
datefmt=%Y-%m-%d %H:%M:%S
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
tensorflow 動態(tài)獲取 BatchSzie 的大小實例
這篇文章主要介紹了tensorflow 動態(tài)獲取 BatchSzie 的大小實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-06-06
python游戲?qū)崙?zhàn)項目之童年經(jīng)典超級瑪麗
史上十大最經(jīng)典小霸王游戲中魂斗羅只能排在第二,那么第一是誰?最經(jīng)典最風靡的當屬超級瑪麗,那個戴帽子的大胡子穿著背帶褲的馬里奧哪個不認得,小編帶你用python實現(xiàn)超級瑪麗緬懷童年2021-09-09
一文詳解如何在Python中實現(xiàn)switch語句
這篇文章主要給大家介紹了關(guān)于如何在Python中實現(xiàn)switch語句的相關(guān)資料,今天在學習python的過程中,發(fā)現(xiàn)python沒有switch這個語法,所以這里給大家總結(jié)下,需要的朋友可以參考下2023-09-09
在 Windows 下搭建高效的 django 開發(fā)環(huán)境的詳細教程
這篇文章主要介紹了如何在 Windows 下搭建高效的 django 開發(fā)環(huán)境,本文通過一篇詳細教程實例代碼相結(jié)合給大家講解的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-07-07

