当前位置: 代码网 > it编程>前端脚本>Python > Python实现监控系统资源的脚本工具

Python实现监控系统资源的脚本工具

2025年12月04日 Python 我要评论
简介系统资源监控是系统管理员和开发人员日常工作中的重要任务。通过实时监控cpu、内存、磁盘和网络等关键资源的使用情况,我们可以及时发现系统性能瓶颈、预防系统故障并优化资源配置。本文将介绍一个实用的py

简介

系统资源监控是系统管理员和开发人员日常工作中的重要任务。通过实时监控cpu、内存、磁盘和网络等关键资源的使用情况,我们可以及时发现系统性能瓶颈、预防系统故障并优化资源配置。本文将介绍一个实用的python脚本——系统资源监控工具,它可以实时显示系统的各项关键指标,并支持历史数据记录和告警功能。

功能介绍

这个系统资源监控工具具有以下核心功能:

  • 实时监控:实时显示cpu、内存、磁盘和网络使用情况
  • 多维度监控:监控cpu使用率、内存占用、磁盘io、网络流量等多个维度
  • 历史数据记录:将监控数据保存到csv文件,便于后续分析
  • 可视化图表:生成资源使用趋势图,直观展示系统性能变化
  • 阈值告警:当资源使用超过设定阈值时发出告警
  • 定时监控:支持按指定间隔持续监控系统资源
  • 跨平台支持:支持windows、linux和macos操作系统
  • 轻量级设计:占用系统资源极少,不影响被监控系统的正常运行

应用场景

这个工具适用于以下场景:

  • 服务器监控:监控生产服务器的资源使用情况
  • 性能调优:分析应用程序运行时的资源消耗
  • 故障排查:定位系统性能问题的根本原因
  • 容量规划:根据历史数据预测系统资源需求
  • 开发测试:在开发和测试阶段监控应用程序性能
  • 系统维护:定期检查系统健康状况

报错处理

脚本包含了完善的错误处理机制:

  • 权限检测:检测是否具有足够的权限访问系统信息
  • 依赖检查:检查必要的第三方库是否已安装
  • 网络异常处理:处理网络监控中的连接异常
  • 文件操作保护:安全地读写监控数据文件
  • 资源限制处理:在资源受限环境下优雅降级
  • 异常捕获:捕获并处理运行过程中可能出现的各种异常

代码实现

import psutil
import time
import csv
import argparse
import os
from datetime import datetime
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from collections import deque
import threading
import sys

class systemresourcemonitor:
    def __init__(self, interval=1, history_size=100):
        self.interval = interval
        self.history_size = history_size
        self.monitoring = false
        self.data_history = deque(maxlen=history_size)
        
        # 初始化历史数据存储
        self.timestamps = deque(maxlen=history_size)
        self.cpu_percentages = deque(maxlen=history_size)
        self.memory_percentages = deque(maxlen=history_size)
        self.disk_usages = deque(maxlen=history_size)
        self.network_bytes_sent = deque(maxlen=history_size)
        self.network_bytes_recv = deque(maxlen=history_size)
        
        # 网络流量基准值
        self.net_io_baseline = psutil.net_io_counters()
        
    def get_cpu_info(self):
        """获取cpu信息"""
        try:
            cpu_percent = psutil.cpu_percent(interval=1)
            cpu_count = psutil.cpu_count()
            cpu_freq = psutil.cpu_freq()
            
            return {
                'percent': cpu_percent,
                'count': cpu_count,
                'frequency': cpu_freq.current if cpu_freq else 0
            }
        except exception as e:
            return {'error': str(e)}
            
    def get_memory_info(self):
        """获取内存信息"""
        try:
            memory = psutil.virtual_memory()
            swap = psutil.swap_memory()
            
            return {
                'total': memory.total,
                'available': memory.available,
                'used': memory.used,
                'percent': memory.percent,
                'swap_total': swap.total,
                'swap_used': swap.used,
                'swap_percent': swap.percent
            }
        except exception as e:
            return {'error': str(e)}
            
    def get_disk_info(self):
        """获取磁盘信息"""
        try:
            disk = psutil.disk_usage('/')
            disk_io = psutil.disk_io_counters()
            
            return {
                'total': disk.total,
                'used': disk.used,
                'free': disk.free,
                'percent': disk.percent,
                'read_bytes': disk_io.read_bytes if disk_io else 0,
                'write_bytes': disk_io.write_bytes if disk_io else 0
            }
        except exception as e:
            return {'error': str(e)}
            
    def get_network_info(self):
        """获取网络信息"""
        try:
            net_io = psutil.net_io_counters()
            
            # 计算相对于基准值的流量
            bytes_sent = net_io.bytes_sent - self.net_io_baseline.bytes_sent
            bytes_recv = net_io.bytes_recv - self.net_io_baseline.bytes_recv
            
            return {
                'bytes_sent': bytes_sent,
                'bytes_recv': bytes_recv,
                'packets_sent': net_io.packets_sent,
                'packets_recv': net_io.packets_recv
            }
        except exception as e:
            return {'error': str(e)}
            
    def format_bytes(self, bytes_value):
        """格式化字节单位"""
        for unit in ['b', 'kb', 'mb', 'gb', 'tb']:
            if bytes_value < 1024.0:
                return f"{bytes_value:.2f} {unit}"
            bytes_value /= 1024.0
        return f"{bytes_value:.2f} pb"
        
    def display_system_info(self):
        """显示系统信息"""
        # 清屏(兼容不同操作系统)
        os.system('cls' if os.name == 'nt' else 'clear')
        
        print("=" * 80)
        print("系统资源监控工具")
        print("=" * 80)
        print(f"监控时间: {datetime.now().strftime('%y-%m-%d %h:%m:%s')}")
        print()
        
        # cpu信息
        cpu_info = self.get_cpu_info()
        if 'error' not in cpu_info:
            print("cpu信息:")
            print(f"  使用率: {cpu_info['percent']:.1f}%")
            print(f"  核心数: {cpu_info['count']}")
            print(f"  频率: {cpu_info['frequency']:.2f} mhz")
        else:
            print(f"cpu信息获取失败: {cpu_info['error']}")
        print()
        
        # 内存信息
        memory_info = self.get_memory_info()
        if 'error' not in memory_info:
            print("内存信息:")
            print(f"  总计: {self.format_bytes(memory_info['total'])}")
            print(f"  已用: {self.format_bytes(memory_info['used'])}")
            print(f"  可用: {self.format_bytes(memory_info['available'])}")
            print(f"  使用率: {memory_info['percent']:.1f}%")
            if memory_info['swap_total'] > 0:
                print(f"  交换分区: {memory_info['swap_percent']:.1f}%")
        else:
            print(f"内存信息获取失败: {memory_info['error']}")
        print()
        
        # 磁盘信息
        disk_info = self.get_disk_info()
        if 'error' not in disk_info:
            print("磁盘信息:")
            print(f"  总计: {self.format_bytes(disk_info['total'])}")
            print(f"  已用: {self.format_bytes(disk_info['used'])}")
            print(f"  可用: {self.format_bytes(disk_info['free'])}")
            print(f"  使用率: {disk_info['percent']:.1f}%")
        else:
            print(f"磁盘信息获取失败: {disk_info['error']}")
        print()
        
        # 网络信息
        network_info = self.get_network_info()
        if 'error' not in network_info:
            print("网络信息:")
            print(f"  发送: {self.format_bytes(network_info['bytes_sent'])}")
            print(f"  接收: {self.format_bytes(network_info['bytes_recv'])}")
            print(f"  发送包: {network_info['packets_sent']:,}")
            print(f"  接收包: {network_info['packets_recv']:,}")
        else:
            print(f"网络信息获取失败: {network_info['error']}")
        print()
        
        print("=" * 80)
        print("按 ctrl+c 停止监控")
        
    def collect_data(self):
        """收集监控数据"""
        timestamp = datetime.now()
        
        data = {
            'timestamp': timestamp,
            'cpu': self.get_cpu_info(),
            'memory': self.get_memory_info(),
            'disk': self.get_disk_info(),
            'network': self.get_network_info()
        }
        
        # 添加到历史记录
        self.data_history.append(data)
        
        # 更新图表数据
        self.timestamps.append(timestamp)
        if 'error' not in data['cpu']:
            self.cpu_percentages.append(data['cpu']['percent'])
        if 'error' not in data['memory']:
            self.memory_percentages.append(data['memory']['percent'])
        if 'error' not in data['disk']:
            self.disk_usages.append(data['disk']['percent'])
            
        return data
        
    def save_to_csv(self, filename="system_monitor.csv"):
        """保存数据到csv文件"""
        try:
            file_exists = os.path.exists(filename)
            
            with open(filename, 'a', newline='', encoding='utf-8') as csvfile:
                fieldnames = [
                    'timestamp', 'cpu_percent', 'memory_percent', 'disk_percent',
                    'network_sent', 'network_recv'
                ]
                writer = csv.dictwriter(csvfile, fieldnames=fieldnames)
                
                # 如果是新文件,写入表头
                if not file_exists:
                    writer.writeheader()
                    
                # 写入最新数据
                if self.data_history:
                    latest_data = self.data_history[-1]
                    row = {
                        'timestamp': latest_data['timestamp'].strftime('%y-%m-%d %h:%m:%s'),
                        'cpu_percent': latest_data['cpu'].get('percent', 0) if 'error' not in latest_data['cpu'] else 0,
                        'memory_percent': latest_data['memory'].get('percent', 0) if 'error' not in latest_data['memory'] else 0,
                        'disk_percent': latest_data['disk'].get('percent', 0) if 'error' not in latest_data['disk'] else 0,
                        'network_sent': latest_data['network'].get('bytes_sent', 0) if 'error' not in latest_data['network'] else 0,
                        'network_recv': latest_data['network'].get('bytes_recv', 0) if 'error' not in latest_data['network'] else 0
                    }
                    writer.writerow(row)
                    
            return true
        except exception as e:
            print(f"保存csv文件时出错: {e}")
            return false
            
    def generate_chart(self, filename="resource_usage.png"):
        """生成资源使用趋势图"""
        try:
            if len(self.timestamps) < 2:
                print("数据不足,无法生成图表")
                return false
                
            plt.figure(figsize=(12, 8))
            
            # 创建子图
            ax1 = plt.subplot(2, 2, 1)
            if self.cpu_percentages:
                ax1.plot(self.timestamps, self.cpu_percentages, 'b-', label='cpu使用率')
                ax1.set_title('cpu使用率')
                ax1.set_ylabel('百分比 (%)')
                ax1.legend()
                ax1.grid(true)
            
            ax2 = plt.subplot(2, 2, 2)
            if self.memory_percentages:
                ax2.plot(self.timestamps, self.memory_percentages, 'g-', label='内存使用率')
                ax2.set_title('内存使用率')
                ax2.set_ylabel('百分比 (%)')
                ax2.legend()
                ax2.grid(true)
            
            ax3 = plt.subplot(2, 2, 3)
            if self.disk_usages:
                ax3.plot(self.timestamps, self.disk_usages, 'r-', label='磁盘使用率')
                ax3.set_title('磁盘使用率')
                ax3.set_ylabel('百分比 (%)')
                ax3.set_xlabel('时间')
                ax3.legend()
                ax3.grid(true)
            
            ax4 = plt.subplot(2, 2, 4)
            if self.network_bytes_sent and self.network_bytes_recv:
                ax4.plot(self.timestamps, [b/1024/1024 for b in self.network_bytes_sent], 'c-', label='发送(mb)')
                ax4.plot(self.timestamps, [b/1024/1024 for b in self.network_bytes_recv], 'm-', label='接收(mb)')
                ax4.set_title('网络流量')
                ax4.set_ylabel('流量 (mb)')
                ax4.set_xlabel('时间')
                ax4.legend()
                ax4.grid(true)
            
            # 格式化时间轴
            for ax in [ax1, ax2, ax3, ax4]:
                ax.xaxis.set_major_formatter(mdates.dateformatter('%h:%m:%s'))
                ax.xaxis.set_major_locator(mdates.minutelocator(interval=5))
                plt.setp(ax.xaxis.get_majorticklabels(), rotation=45)
            
            plt.tight_layout()
            plt.savefig(filename, dpi=300, bbox_inches='tight')
            plt.close()
            
            print(f"图表已保存到: {filename}")
            return true
        except exception as e:
            print(f"生成图表时出错: {e}")
            return false
            
    def check_thresholds(self, thresholds):
        """检查阈值并发出告警"""
        if not self.data_history:
            return
            
        latest_data = self.data_history[-1]
        
        # 检查cpu阈值
        if 'error' not in latest_data['cpu'] and thresholds.get('cpu'):
            cpu_percent = latest_data['cpu']['percent']
            if cpu_percent > thresholds['cpu']:
                print(f"⚠️  cpu使用率过高: {cpu_percent:.1f}% (阈值: {thresholds['cpu']}%)")
                
        # 检查内存阈值
        if 'error' not in latest_data['memory'] and thresholds.get('memory'):
            memory_percent = latest_data['memory']['percent']
            if memory_percent > thresholds['memory']:
                print(f"⚠️  内存使用率过高: {memory_percent:.1f}% (阈值: {thresholds['memory']}%)")
                
        # 检查磁盘阈值
        if 'error' not in latest_data['disk'] and thresholds.get('disk'):
            disk_percent = latest_data['disk']['percent']
            if disk_percent > thresholds['disk']:
                print(f"⚠️  磁盘使用率过高: {disk_percent:.1f}% (阈值: {thresholds['disk']}%)")
                
    def start_monitoring(self, duration=none, csv_file=none, chart_file=none, thresholds=none):
        """开始监控"""
        self.monitoring = true
        start_time = time.time()
        
        try:
            while self.monitoring:
                # 收集数据
                self.collect_data()
                
                # 显示信息
                self.display_system_info()
                
                # 保存到csv
                if csv_file:
                    self.save_to_csv(csv_file)
                    
                # 检查阈值
                if thresholds:
                    self.check_thresholds(thresholds)
                    
                # 检查是否达到指定时长
                if duration and (time.time() - start_time) >= duration:
                    break
                    
                # 等待下次监控
                time.sleep(self.interval)
                
        except keyboardinterrupt:
            print("\n\n停止监控...")
        finally:
            self.monitoring = false
            
            # 生成图表
            if chart_file:
                self.generate_chart(chart_file)
                
            print("监控已停止")

def main():
    parser = argparse.argumentparser(description="系统资源监控工具")
    parser.add_argument("-i", "--interval", type=int, default=5, 
                       help="监控间隔(秒,默认:5)")
    parser.add_argument("-d", "--duration", type=int, 
                       help="监控时长(秒,不指定则持续监控)")
    parser.add_argument("-c", "--csv", help="保存数据到csv文件")
    parser.add_argument("-g", "--graph", help="生成图表文件")
    parser.add_argument("--cpu-threshold", type=float, 
                       help="cpu使用率告警阈值(%)")
    parser.add_argument("--memory-threshold", type=float, 
                       help="内存使用率告警阈值(%)")
    parser.add_argument("--disk-threshold", type=float, 
                       help="磁盘使用率告警阈值(%)")
    
    args = parser.parse_args()
    
    # 检查依赖
    try:
        import psutil
    except importerror:
        print("错误: 缺少psutil库,请安装: pip install psutil")
        sys.exit(1)
        
    if args.graph and not plt:
        print("警告: 缺少matplotlib库,无法生成图表")
        args.graph = none
        
    # 设置阈值
    thresholds = {}
    if args.cpu_threshold:
        thresholds['cpu'] = args.cpu_threshold
    if args.memory_threshold:
        thresholds['memory'] = args.memory_threshold
    if args.disk_threshold:
        thresholds['disk'] = args.disk_threshold
        
    try:
        monitor = systemresourcemonitor(interval=args.interval)
        monitor.start_monitoring(
            duration=args.duration,
            csv_file=args.csv,
            chart_file=args.graph,
            thresholds=thresholds
        )
        
    except exception as e:
        print(f"程序执行出错: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

使用方法

安装依赖

在使用此脚本之前,需要安装必要的库:

pip install psutil matplotlib

基本使用

# 基本用法,每5秒刷新一次
python system_monitor.py

# 设置监控间隔为10秒
python system_monitor.py -i 10

# 监控60秒后自动停止
python system_monitor.py -d 60

# 保存监控数据到csv文件
python system_monitor.py -c monitor_data.csv

# 生成资源使用趋势图
python system_monitor.py -g resource_trend.png

# 设置告警阈值
python system_monitor.py --cpu-threshold 80 --memory-threshold 85 --disk-threshold 90

命令行参数说明

  • -i, --interval: 监控间隔(秒),默认5秒
  • -d, --duration: 监控时长(秒),不指定则持续监控
  • -c, --csv: 保存数据到指定的csv文件
  • -g, --graph: 生成资源使用趋势图到指定文件
  • --cpu-threshold: cpu使用率告警阈值(%)
  • --memory-threshold: 内存使用率告警阈值(%)
  • --disk-threshold: 磁盘使用率告警阈值(%)

使用示例

持续监控系统资源

python system_monitor.py

监控并保存数据

python system_monitor.py -i 10 -d 300 -c system_data.csv -g trend.png

带告警的监控

python system_monitor.py --cpu-threshold 80 --memory-threshold 85

总结

这个系统资源监控工具提供了一个全面的系统监控解决方案,能够实时显示cpu、内存、磁盘和网络等关键资源的使用情况。它支持数据持久化、可视化图表生成和阈值告警等功能,适用于服务器监控、性能调优和故障排查等多种场景。通过这个工具,用户可以更好地了解系统运行状态,及时发现和解决性能问题,确保系统的稳定运行。

以上就是python实现监控系统资源的脚本工具的详细内容,更多关于python监控系统资源的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com