当前位置: 代码网 > it编程>前端脚本>Python > Python系统监控之跨平台检查系统服务运行状态的完整指南

Python系统监控之跨平台检查系统服务运行状态的完整指南

2025年12月04日 Python 我要评论
简介系统服务是操作系统和应用程序正常运行的基础。无论是web服务器、数据库服务还是其他后台进程,服务的状态直接影响系统的可用性和稳定性。定期检查关键服务的运行状态,可以及时发现服务异常并采取相应措施。

简介

系统服务是操作系统和应用程序正常运行的基础。无论是web服务器、数据库服务还是其他后台进程,服务的状态直接影响系统的可用性和稳定性。定期检查关键服务的运行状态,可以及时发现服务异常并采取相应措施。本文将介绍一个实用的python脚本——系统服务状态检查工具,它可以跨平台检查系统服务的运行状态,并提供详细的报告和告警功能。

功能介绍

这个系统服务状态检查工具具有以下核心功能:

  • 跨平台支持:支持windows服务、linux systemd服务和macos launchd服务
  • 服务状态检查:检查指定服务的运行状态(运行、停止、未知等)
  • 批量检查:支持同时检查多个服务的状态
  • 详细信息服务:获取服务的详细信息,如描述、启动类型等
  • 告警通知:当服务状态异常时发送告警通知
  • 定时监控:支持定时检查服务状态
  • 日志记录:记录服务状态检查的历史记录
  • 配置文件支持:通过配置文件管理要检查的服务列表

应用场景

这个工具适用于以下场景:

  • 服务器监控:监控关键服务(如apache、mysql、ssh等)的运行状态
  • 故障排查:快速定位服务异常的根本原因
  • 运维自动化:集成到运维流程中自动检查服务状态
  • 系统维护:定期检查系统服务的健康状况
  • 开发测试:在开发和测试环境中验证服务状态
  • 灾难恢复:在系统恢复后验证服务是否正常启动

报错处理

脚本包含了完善的错误处理机制:

  • 平台检测:自动检测操作系统类型并使用相应的服务管理工具
  • 权限验证:检查是否有足够的权限访问服务信息
  • 服务存在性检查:验证指定服务是否存在
  • 命令执行保护:安全地执行系统命令并处理执行结果
  • 网络异常处理:处理远程服务检查中的网络异常
  • 异常捕获:捕获并处理运行过程中可能出现的各种异常

代码实现

import os
import sys
import subprocess
import argparse
import json
import time
from datetime import datetime
import platform
import smtplib
from email.mime.text import mimetext
from email.mime.multipart import mimemultipart

class servicestatuschecker:
    def __init__(self):
        self.os_type = platform.system().lower()
        self.services = []
        self.results = []
        self.errors = []
        
    def add_service(self, name, display_name=none, description=none):
        """添加要检查的服务"""
        service = {
            'name': name,
            'display_name': display_name or name,
            'description': description or ''
        }
        self.services.append(service)
        
    def load_services_from_config(self, config_file):
        """从配置文件加载服务列表"""
        try:
            with open(config_file, 'r', encoding='utf-8') as f:
                config = json.load(f)
                services = config.get('services', [])
                for service in services:
                    self.add_service(
                        service['name'],
                        service.get('display_name'),
                        service.get('description')
                    )
            return true
        except exception as e:
            self.errors.append(f"加载配置文件失败: {e}")
            return false
            
    def check_windows_service(self, service_name):
        """检查windows服务状态"""
        try:
            # 使用sc query命令检查服务状态
            cmd = ['sc', 'query', service_name]
            result = subprocess.run(cmd, capture_output=true, text=true, timeout=30)
            
            if result.returncode != 0:
                return {
                    'name': service_name,
                    'status': 'unknown',
                    'error': result.stderr.strip()
                }
                
            output = result.stdout.lower()
            
            if 'running' in output:
                status = 'running'
            elif 'stopped' in output:
                status = 'stopped'
            else:
                status = 'unknown'
                
            return {
                'name': service_name,
                'status': status,
                'details': output
            }
        except subprocess.timeoutexpired:
            return {
                'name': service_name,
                'status': 'timeout',
                'error': '命令执行超时'
            }
        except exception as e:
            return {
                'name': service_name,
                'status': 'error',
                'error': str(e)
            }
            
    def check_linux_service(self, service_name):
        """检查linux systemd服务状态"""
        try:
            # 使用systemctl is-active命令检查服务状态
            cmd = ['systemctl', 'is-active', service_name]
            result = subprocess.run(cmd, capture_output=true, text=true, timeout=30)
            
            if result.returncode == 0:
                status = result.stdout.strip()
            else:
                # 如果命令失败,可能是服务不存在
                status = 'inactive' if 'inactive' in result.stdout.lower() else 'unknown'
                
            # 获取服务详细信息
            details = {}
            try:
                cmd = ['systemctl', 'show', service_name, '--no-pager']
                detail_result = subprocess.run(cmd, capture_output=true, text=true, timeout=30)
                if detail_result.returncode == 0:
                    for line in detail_result.stdout.strip().split('\n'):
                        if '=' in line:
                            key, value = line.split('=', 1)
                            details[key] = value
            except:
                pass
                
            return {
                'name': service_name,
                'status': status,
                'details': details
            }
        except subprocess.timeoutexpired:
            return {
                'name': service_name,
                'status': 'timeout',
                'error': '命令执行超时'
            }
        except exception as e:
            return {
                'name': service_name,
                'status': 'error',
                'error': str(e)
            }
            
    def check_macos_service(self, service_name):
        """检查macos launchd服务状态"""
        try:
            # 使用launchctl list命令检查服务状态
            cmd = ['launchctl', 'list']
            result = subprocess.run(cmd, capture_output=true, text=true, timeout=30)
            
            if result.returncode != 0:
                return {
                    'name': service_name,
                    'status': 'unknown',
                    'error': result.stderr.strip()
                }
                
            # 解析输出查找服务
            lines = result.stdout.strip().split('\n')[1:]  # 跳过标题行
            service_found = false
            pid = none
            
            for line in lines:
                parts = line.split()
                if len(parts) >= 3 and service_name in parts[2]:
                    service_found = true
                    pid = parts[0] if parts[0] != '-' else none
                    break
                    
            if service_found:
                status = 'running' if pid else 'loaded'
            else:
                status = 'not_found'
                
            return {
                'name': service_name,
                'status': status,
                'pid': pid
            }
        except subprocess.timeoutexpired:
            return {
                'name': service_name,
                'status': 'timeout',
                'error': '命令执行超时'
            }
        except exception as e:
            return {
                'name': service_name,
                'status': 'error',
                'error': str(e)
            }
            
    def check_service(self, service_name):
        """检查服务状态(跨平台)"""
        if self.os_type == 'windows':
            return self.check_windows_service(service_name)
        elif self.os_type == 'linux':
            return self.check_linux_service(service_name)
        elif self.os_type == 'darwin':  # macos
            return self.check_macos_service(service_name)
        else:
            return {
                'name': service_name,
                'status': 'unsupported',
                'error': f'不支持的操作系统: {self.os_type}'
            }
            
    def check_all_services(self):
        """检查所有服务的状态"""
        print(f"开始检查服务状态 (操作系统: {self.os_type})")
        print("=" * 60)
        
        self.results = []
        
        for service in self.services:
            print(f"检查服务: {service['display_name']} ({service['name']})")
            result = self.check_service(service['name'])
            result['display_name'] = service['display_name']
            result['description'] = service['description']
            result['check_time'] = datetime.now().isoformat()
            self.results.append(result)
            
            # 显示结果
            status_display = {
                'running': '✅ 运行中',
                'stopped': '❌ 已停止',
                'inactive': '⏸️  未激活',
                'loaded': '🔄 已加载',
                'not_found': '❓ 未找到',
                'timeout': '⏰ 超时',
                'error': '⚠️  错误',
                'unknown': '❓ 未知'
            }
            
            status_text = status_display.get(result['status'], result['status'])
            print(f"  状态: {status_text}")
            
            if 'error' in result:
                print(f"  错误: {result['error']}")
                
            print()
            
        return self.results
        
    def generate_report(self):
        """生成服务状态报告"""
        report = []
        report.append("=" * 80)
        report.append("系统服务状态检查报告")
        report.append("=" * 80)
        report.append(f"检查时间: {datetime.now().strftime('%y-%m-%d %h:%m:%s')}")
        report.append(f"操作系统: {self.os_type}")
        report.append(f"检查服务数: {len(self.services)}")
        report.append("")
        
        # 统计信息
        status_counts = {}
        for result in self.results:
            status = result['status']
            status_counts[status] = status_counts.get(status, 0) + 1
            
        report.append("状态统计:")
        for status, count in status_counts.items():
            report.append(f"  {status}: {count}")
        report.append("")
        
        # 详细结果
        report.append("服务详细信息:")
        report.append("-" * 50)
        for result in self.results:
            status_display = {
                'running': '✅ 运行中',
                'stopped': '❌ 已停止',
                'inactive': '⏸️  未激活',
                'loaded': '🔄 已加载',
                'not_found': '❓ 未找到',
                'timeout': '⏰ 超时',
                'error': '⚠️  错误',
                'unknown': '❓ 未知'
            }
            
            status_text = status_display.get(result['status'], result['status'])
            report.append(f"{result['display_name']} ({result['name']}): {status_text}")
            if result['description']:
                report.append(f"  描述: {result['description']}")
            if 'error' in result:
                report.append(f"  错误: {result['error']}")
            report.append("")
            
        # 错误信息
        if self.errors:
            report.append("错误信息:")
            report.append("-" * 50)
            for error in self.errors:
                report.append(f"  {error}")
            report.append("")
            
        report.append("=" * 80)
        return "\n".join(report)
        
    def save_report(self, filename):
        """保存报告到文件"""
        try:
            report = self.generate_report()
            with open(filename, 'w', encoding='utf-8') as f:
                f.write(report)
            print(f"报告已保存到: {filename}")
            return true
        except exception as e:
            print(f"保存报告时出错: {e}")
            return false
            
    def save_json_report(self, filename):
        """保存json格式报告"""
        try:
            report_data = {
                'check_time': datetime.now().isoformat(),
                'os_type': self.os_type,
                'services': self.results,
                'errors': self.errors
            }
            
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(report_data, f, indent=2, ensure_ascii=false)
            print(f"json报告已保存到: {filename}")
            return true
        except exception as e:
            print(f"保存json报告时出错: {e}")
            return false
            
    def send_alert(self, smtp_config, alert_services=none):
        """发送告警邮件"""
        try:
            # 确定需要告警的服务
            if alert_services is none:
                # 默认告警停止或异常的服务
                alert_services = [
                    r for r in self.results 
                    if r['status'] not in ['running', 'loaded']
                ]
                
            if not alert_services:
                print("没有需要告警的服务")
                return true
                
            # 构建邮件内容
            subject = f"服务状态告警 - {datetime.now().strftime('%y-%m-%d %h:%m:%s')}"
            body = "以下服务状态异常:\n\n"
            
            for service in alert_services:
                body += f"服务: {service['display_name']} ({service['name']})\n"
                body += f"状态: {service['status']}\n"
                if 'error' in service:
                    body += f"错误: {service['error']}\n"
                body += "\n"
                
            # 发送邮件
            msg = mimemultipart()
            msg['from'] = smtp_config['from']
            msg['to'] = smtp_config['to']
            msg['subject'] = subject
            
            msg.attach(mimetext(body, 'plain', 'utf-8'))
            
            server = smtplib.smtp(smtp_config['server'], smtp_config['port'])
            server.starttls()
            server.login(smtp_config['username'], smtp_config['password'])
            server.send_message(msg)
            server.quit()
            
            print("告警邮件已发送")
            return true
            
        except exception as e:
            print(f"发送告警邮件时出错: {e}")
            return false
            
    def continuous_monitor(self, interval=60, max_checks=none):
        """持续监控服务状态"""
        check_count = 0
        
        try:
            while true:
                print(f"\n[{datetime.now().strftime('%y-%m-%d %h:%m:%s')}] 开始第 {check_count + 1} 次检查")
                self.check_all_services()
                
                # 显示简要状态
                running_count = len([r for r in self.results if r['status'] == 'running'])
                print(f"运行中服务: {running_count}/{len(self.results)}")
                
                check_count += 1
                
                # 检查是否达到最大检查次数
                if max_checks and check_count >= max_checks:
                    break
                    
                # 等待下次检查
                time.sleep(interval)
                
        except keyboardinterrupt:
            print("\n\n停止监控...")

def create_sample_config():
    """创建示例配置文件"""
    config = {
        "services": [
            {
                "name": "ssh",
                "display_name": "ssh服务",
                "description": "安全shell服务"
            },
            {
                "name": "nginx",
                "display_name": "nginx web服务器",
                "description": "高性能http和反向代理服务器"
            },
            {
                "name": "mysql",
                "display_name": "mysql数据库",
                "description": "关系型数据库管理系统"
            }
        ]
    }
    
    try:
        with open('service_config.json', 'w', encoding='utf-8') as f:
            json.dump(config, f, indent=2, ensure_ascii=false)
        print("示例配置文件已创建: service_config.json")
        return true
    except exception as e:
        print(f"创建示例配置文件时出错: {e}")
        return false

def main():
    parser = argparse.argumentparser(description="系统服务状态检查工具")
    parser.add_argument("-c", "--config", help="配置文件路径")
    parser.add_argument("-s", "--service", action='append', help="要检查的服务名称(可多次使用)")
    parser.add_argument("-o", "--output", help="保存报告到文件")
    parser.add_argument("-j", "--json", help="保存json报告到文件")
    parser.add_argument("-m", "--monitor", type=int, help="持续监控间隔(秒)")
    parser.add_argument("--max-checks", type=int, help="最大检查次数")
    parser.add_argument("--create-config", action="store_true", help="创建示例配置文件")
    parser.add_argument("--smtp-server", help="smtp服务器地址")
    parser.add_argument("--smtp-port", type=int, default=587, help="smtp服务器端口")
    parser.add_argument("--smtp-username", help="smtp用户名")
    parser.add_argument("--smtp-password", help="smtp密码")
    parser.add_argument("--smtp-from", help="发件人邮箱")
    parser.add_argument("--smtp-to", help="收件人邮箱")
    
    args = parser.parse_args()
    
    # 创建示例配置文件
    if args.create_config:
        create_sample_config()
        return
        
    try:
        checker = servicestatuschecker()
        
        # 加载服务列表
        if args.config:
            if not checker.load_services_from_config(args.config):
                print("加载配置文件失败")
                sys.exit(1)
        elif args.service:
            for service_name in args.service:
                checker.add_service(service_name)
        else:
            print("请指定配置文件或服务名称")
            sys.exit(1)
            
        # 检查服务状态
        if args.monitor:
            # 持续监控模式
            checker.continuous_monitor(args.monitor, args.max_checks)
        else:
            # 单次检查模式
            checker.check_all_services()
            
            # 生成报告
            if args.output:
                checker.save_report(args.output)
            elif args.json:
                checker.save_json_report(args.json)
            else:
                print(checker.generate_report())
                
            # 发送告警邮件
            if args.smtp_server and args.smtp_username and args.smtp_password:
                smtp_config = {
                    'server': args.smtp_server,
                    'port': args.smtp_port,
                    'username': args.smtp_username,
                    'password': args.smtp_password,
                    'from': args.smtp_from or args.smtp_username,
                    'to': args.smtp_to
                }
                checker.send_alert(smtp_config)
                
    except keyboardinterrupt:
        print("\n\n用户中断操作")
    except exception as e:
        print(f"程序执行出错: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

使用方法

基本使用

# 创建示例配置文件
python service_checker.py --create-config

# 使用配置文件检查服务
python service_checker.py -c service_config.json

# 检查单个服务
python service_checker.py -s nginx

# 检查多个服务
python service_checker.py -s nginx -s mysql -s ssh

# 保存报告到文件
python service_checker.py -c service_config.json -o report.txt

# 保存json报告
python service_checker.py -c service_config.json -j report.json

# 持续监控服务状态(每30秒检查一次)
python service_checker.py -c service_config.json -m 30

# 持续监控最多10次
python service_checker.py -c service_config.json -m 30 --max-checks 10

告警功能

# 发送告警邮件
python service_checker.py -c service_config.json \
  --smtp-server smtp.gmail.com \
  --smtp-username your_email@gmail.com \
  --smtp-password your_password \
  --smtp-to admin@example.com

配置文件示例

创建的示例配置文件 service_config.json 内容如下:

{
  "services": [
    {
      "name": "ssh",
      "display_name": "ssh服务",
      "description": "安全shell服务"
    },
    {
      "name": "nginx",
      "display_name": "nginx web服务器",
      "description": "高性能http和反向代理服务器"
    },
    {
      "name": "mysql",
      "display_name": "mysql数据库",
      "description": "关系型数据库管理系统"
    }
  ]
}

命令行参数说明

  • -c, --config: 配置文件路径
  • -s, --service: 要检查的服务名称(可多次使用)
  • -o, --output: 保存报告到指定文件
  • -j, --json: 保存json报告到指定文件
  • -m, --monitor: 持续监控间隔(秒)
  • --max-checks: 最大检查次数
  • --create-config: 创建示例配置文件
  • --smtp-server: smtp服务器地址
  • --smtp-port: smtp服务器端口(默认587)
  • --smtp-username: smtp用户名
  • --smtp-password: smtp密码
  • --smtp-from: 发件人邮箱
  • --smtp-to: 收件人邮箱

使用示例

基本检查

python service_checker.py -c service_config.json

持续监控

python service_checker.py -c service_config.json -m 60 --max-checks 10 -o status.log

带告警的检查

python service_checker.py -c service_config.json \
  --smtp-server smtp.gmail.com \
  --smtp-username admin@example.com \
  --smtp-password app_password \
  --smtp-to ops-team@example.com

总结

这个系统服务状态检查工具提供了一个跨平台的服务监控解决方案,能够检查windows、linux和macos系统上的服务状态。它支持配置文件管理、批量检查、持续监控和告警通知等功能,适用于服务器监控、故障排查和运维自动化等多种场景。通过这个工具,用户可以及时了解关键服务的运行状态,在服务异常时快速响应,确保系统的稳定运行。

以上就是python系统监控之跨平台检查系统服务运行状态的完整指南的详细内容,更多关于python检查系统服务运行状态的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com