当前位置: 代码网 > it编程>前端脚本>Python > Python开发Telegram Bot实现服务器监控告警

Python开发Telegram Bot实现服务器监控告警

2026年02月11日 Python 我要评论
前言服务器出问题了,怎么第一时间知道?邮件通知?可能漏看短信通知?要钱telegram bot?免费、实时、还能远程控制今天用python开发一个服务器监控机器人。一、telegram bot简介1.

前言

服务器出问题了,怎么第一时间知道?

  • 邮件通知?可能漏看
  • 短信通知?要钱
  • telegram bot?免费、实时、还能远程控制

今天用python开发一个服务器监控机器人。

一、telegram bot简介

1.1 为什么选择telegram

优点:

- 完全免费
- api简单易用
- 消息实时推送
- 支持群组/频道
- 可发送文件、图片
- 支持命令和按钮交互

适用场景:

- 服务器监控告警
- ci/cd通知
- 定时任务提醒
- 远程执行命令
- 日志推送

1.2 创建bot

1. 打开telegram,搜索 @botfather
2. 发送 /newbot
3. 输入bot名称,如:myserverbot
4. 输入bot用户名,如:my_server_monitor_bot
5. 获得token:123456789:abcdefghijklmnopqrstuvwxyz
6. 保存好token!

1.3 获取chat id

方法1:使用@userinfobot

- 搜索@userinfobot
- 发送任意消息
- 返回你的chat id

方法2:api获取

- 给你的bot发送消息
- 访问:https://api.telegram.org/bot<token>/getupdates
- 在返回的json中找到chat.id

二、环境准备

2.1 安装依赖

pip install python-telegram-bot
pip install psutil  # 系统监控
pip install aiohttp  # 异步http
pip install apscheduler  # 定时任务

2.2 项目结构

telegram_bot/
├── bot.py           # 主程序
├── config.py        # 配置
├── monitors/        # 监控模块
│   ├── __init__.py
│   ├── cpu.py
│   ├── memory.py
│   ├── disk.py
│   └── network.py
├── handlers/        # 命令处理
│   ├── __init__.py
│   └── commands.py
└── requirements.txt

三、基础bot开发

3.1 hello world

# bot.py
from telegram import update
from telegram.ext import application, commandhandler, contexttypes

# 配置
token = "你的bot token"

# 命令处理器
async def start(update: update, context: contexttypes.default_type):
    await update.message.reply_text("你好!我是服务器监控机器人。")

async def help_command(update: update, context: contexttypes.default_type):
    help_text = """
可用命令:
/start - 开始
/help - 帮助
/status - 服务器状态
/cpu - cpu使用率
/memory - 内存使用
/disk - 磁盘使用
    """
    await update.message.reply_text(help_text)

def main():
    # 创建应用
    app = application.builder().token(token).build()
    
    # 注册命令
    app.add_handler(commandhandler("start", start))
    app.add_handler(commandhandler("help", help_command))
    
    # 启动
    print("bot启动中...")
    app.run_polling()

if __name__ == "__main__":
    main()

3.2 运行测试

python bot.py

# 在telegram中:
# 1. 搜索你的bot
# 2. 发送 /start
# 3. 发送 /help

四、系统监控功能

4.1 cpu监控

# monitors/cpu.py
import psutil

def get_cpu_info():
    """获取cpu信息"""
    cpu_percent = psutil.cpu_percent(interval=1)
    cpu_count = psutil.cpu_count()
    cpu_freq = psutil.cpu_freq()
    
    # 每核使用率
    per_cpu = psutil.cpu_percent(interval=1, percpu=true)
    
    return {
        "percent": cpu_percent,
        "count": cpu_count,
        "freq": cpu_freq.current if cpu_freq else 0,
        "per_cpu": per_cpu
    }

def format_cpu_info():
    """格式化cpu信息"""
    info = get_cpu_info()
    
    text = f"""
🖥️ cpu状态
━━━━━━━━━━━━━━━
总使用率: {info['percent']}%
核心数量: {info['count']}
当前频率: {info['freq']:.0f} mhz

各核心使用率:
"""
    for i, p in enumerate(info['per_cpu']):
        bar = "█" * int(p / 10) + "░" * (10 - int(p / 10))
        text += f"  核心{i}: [{bar}] {p}%\n"
    
    return text

4.2 内存监控

# monitors/memory.py
import psutil

def get_memory_info():
    """获取内存信息"""
    mem = psutil.virtual_memory()
    swap = psutil.swap_memory()
    
    return {
        "total": mem.total / (1024**3),  # gb
        "used": mem.used / (1024**3),
        "available": mem.available / (1024**3),
        "percent": mem.percent,
        "swap_total": swap.total / (1024**3),
        "swap_used": swap.used / (1024**3),
        "swap_percent": swap.percent
    }

def format_memory_info():
    """格式化内存信息"""
    info = get_memory_info()
    
    bar = "█" * int(info['percent'] / 10) + "░" * (10 - int(info['percent'] / 10))
    
    text = f"""
💾 内存状态
━━━━━━━━━━━━━━━
使用率: [{bar}] {info['percent']}%

物理内存:
  总量: {info['total']:.1f} gb
  已用: {info['used']:.1f} gb
  可用: {info['available']:.1f} gb

交换分区:
  总量: {info['swap_total']:.1f} gb
  已用: {info['swap_used']:.1f} gb ({info['swap_percent']}%)
"""
    return text

4.3 磁盘监控

# monitors/disk.py
import psutil

def get_disk_info():
    """获取磁盘信息"""
    partitions = []
    for part in psutil.disk_partitions():
        try:
            usage = psutil.disk_usage(part.mountpoint)
            partitions.append({
                "device": part.device,
                "mountpoint": part.mountpoint,
                "total": usage.total / (1024**3),
                "used": usage.used / (1024**3),
                "free": usage.free / (1024**3),
                "percent": usage.percent
            })
        except:
            continue
    return partitions

def format_disk_info():
    """格式化磁盘信息"""
    partitions = get_disk_info()
    
    text = "💿 磁盘状态\n━━━━━━━━━━━━━━━\n"
    
    for p in partitions:
        bar = "█" * int(p['percent'] / 10) + "░" * (10 - int(p['percent'] / 10))
        text += f"""
{p['mountpoint']}
  [{bar}] {p['percent']}%
  已用: {p['used']:.1f} gb / {p['total']:.1f} gb
  剩余: {p['free']:.1f} gb
"""
    return text

4.4 网络监控

# monitors/network.py
import psutil
import socket

def get_network_info():
    """获取网络信息"""
    # 网络io
    net_io = psutil.net_io_counters()
    
    # ip地址
    hostname = socket.gethostname()
    try:
        ip = socket.gethostbyname(hostname)
    except:
        ip = "未知"
    
    # 网络连接数
    connections = len(psutil.net_connections())
    
    return {
        "hostname": hostname,
        "ip": ip,
        "bytes_sent": net_io.bytes_sent / (1024**2),  # mb
        "bytes_recv": net_io.bytes_recv / (1024**2),
        "packets_sent": net_io.packets_sent,
        "packets_recv": net_io.packets_recv,
        "connections": connections
    }

def format_network_info():
    """格式化网络信息"""
    info = get_network_info()
    
    text = f"""
🌐 网络状态
━━━━━━━━━━━━━━━
主机名: {info['hostname']}
ip地址: {info['ip']}
连接数: {info['connections']}

流量统计:
  发送: {info['bytes_sent']:.1f} mb
  接收: {info['bytes_recv']:.1f} mb
  发送包: {info['packets_sent']}
  接收包: {info['packets_recv']}
"""
    return text

五、完整bot实现

5.1 主程序

# bot.py
from telegram import update, inlinekeyboardbutton, inlinekeyboardmarkup
from telegram.ext import (
    application, commandhandler, callbackqueryhandler, contexttypes
)
from apscheduler.schedulers.asyncio import asyncioscheduler
import psutil
from datetime import datetime

# 配置
token = "你的bot token"
admin_chat_id = 123456789  # 你的chat id

# 告警阈值
thresholds = {
    "cpu": 80,
    "memory": 85,
    "disk": 90
}

# 监控函数(简化版,集成上面的模块)
def get_system_status():
    cpu = psutil.cpu_percent(interval=1)
    mem = psutil.virtual_memory()
    disk = psutil.disk_usage('/')
    
    return {
        "cpu": cpu,
        "memory": mem.percent,
        "disk": disk.percent,
        "uptime": datetime.now() - datetime.fromtimestamp(psutil.boot_time())
    }

def format_status():
    s = get_system_status()
    
    # 状态图标
    cpu_icon = "🔴" if s['cpu'] > thresholds['cpu'] else "🟢"
    mem_icon = "🔴" if s['memory'] > thresholds['memory'] else "🟢"
    disk_icon = "🔴" if s['disk'] > thresholds['disk'] else "🟢"
    
    text = f"""
📊 服务器状态概览
━━━━━━━━━━━━━━━━━━
{cpu_icon} cpu: {s['cpu']}%
{mem_icon} 内存: {s['memory']}%
{disk_icon} 磁盘: {s['disk']}%

⏱️ 运行时间: {str(s['uptime']).split('.')[0]}
🕐 更新时间: {datetime.now().strftime('%y-%m-%d %h:%m:%s')}
"""
    return text

# 命令处理器
async def start(update: update, context: contexttypes.default_type):
    keyboard = [
        [
            inlinekeyboardbutton("📊 状态", callback_data="status"),
            inlinekeyboardbutton("🖥️ cpu", callback_data="cpu"),
        ],
        [
            inlinekeyboardbutton("💾 内存", callback_data="memory"),
            inlinekeyboardbutton("💿 磁盘", callback_data="disk"),
        ],
        [
            inlinekeyboardbutton("🌐 网络", callback_data="network"),
            inlinekeyboardbutton("🔄 刷新", callback_data="refresh"),
        ]
    ]
    reply_markup = inlinekeyboardmarkup(keyboard)
    
    await update.message.reply_text(
        "🤖 服务器监控机器人\n选择要查看的信息:",
        reply_markup=reply_markup
    )

async def status(update: update, context: contexttypes.default_type):
    await update.message.reply_text(format_status())

async def button_callback(update: update, context: contexttypes.default_type):
    query = update.callback_query
    await query.answer()
    
    if query.data == "status" or query.data == "refresh":
        text = format_status()
    elif query.data == "cpu":
        text = format_cpu_info()
    elif query.data == "memory":
        text = format_memory_info()
    elif query.data == "disk":
        text = format_disk_info()
    elif query.data == "network":
        text = format_network_info()
    else:
        text = "未知命令"
    
    await query.edit_message_text(text=text)

# 告警检查
async def check_alerts(context: contexttypes.default_type):
    s = get_system_status()
    alerts = []
    
    if s['cpu'] > thresholds['cpu']:
        alerts.append(f"🔴 cpu使用率过高: {s['cpu']}%")
    if s['memory'] > thresholds['memory']:
        alerts.append(f"🔴 内存使用率过高: {s['memory']}%")
    if s['disk'] > thresholds['disk']:
        alerts.append(f"🔴 磁盘使用率过高: {s['disk']}%")
    
    if alerts:
        alert_text = "⚠️ 服务器告警\n━━━━━━━━━━━━━━━\n" + "\n".join(alerts)
        alert_text += f"\n\n⏰ {datetime.now().strftime('%y-%m-%d %h:%m:%s')}"
        await context.bot.send_message(chat_id=admin_chat_id, text=alert_text)

# 定时报告
async def daily_report(context: contexttypes.default_type):
    text = "📈 每日服务器报告\n" + format_status()
    await context.bot.send_message(chat_id=admin_chat_id, text=text)

def main():
    # 创建应用
    app = application.builder().token(token).build()
    
    # 注册命令
    app.add_handler(commandhandler("start", start))
    app.add_handler(commandhandler("status", status))
    app.add_handler(callbackqueryhandler(button_callback))
    
    # 定时任务
    scheduler = asyncioscheduler()
    # 每分钟检查告警
    scheduler.add_job(
        check_alerts, 'interval', minutes=1,
        args=[app]
    )
    # 每天9点发送报告
    scheduler.add_job(
        daily_report, 'cron', hour=9,
        args=[app]
    )
    scheduler.start()
    
    # 启动
    print("bot启动中...")
    app.run_polling()

if __name__ == "__main__":
    main()

六、高级功能

6.1 远程执行命令

import subprocess

async def exec_command(update: update, context: contexttypes.default_type):
    """执行系统命令(危险!仅限管理员)"""
    # 权限检查
    if update.effective_user.id != admin_chat_id:
        await update.message.reply_text("❌ 无权限")
        return
    
    if not context.args:
        await update.message.reply_text("用法: /exec <命令>")
        return
    
    cmd = " ".join(context.args)
    
    try:
        result = subprocess.run(
            cmd, shell=true, capture_output=true,
            text=true, timeout=30
        )
        output = result.stdout or result.stderr or "(无输出)"
        
        # 限制输出长度
        if len(output) > 4000:
            output = output[:4000] + "\n...(输出过长已截断)"
        
        await update.message.reply_text(f"```\n{output}\n```", parse_mode='markdown')
    except subprocess.timeoutexpired:
        await update.message.reply_text("❌ 命令执行超时")
    except exception as e:
        await update.message.reply_text(f"❌ 执行失败: {e}")

6.2 进程管理

async def processes(update: update, context: contexttypes.default_type):
    """查看进程列表"""
    procs = []
    for p in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
        try:
            procs.append(p.info)
        except:
            pass
    
    # 按cpu排序
    procs.sort(key=lambda x: x['cpu_percent'] or 0, reverse=true)
    
    text = "📋 进程列表 (top 10 cpu)\n━━━━━━━━━━━━━━━━━━\n"
    for p in procs[:10]:
        text += f"{p['pid']:>6} | {p['cpu_percent']:>5.1f}% | {p['name'][:20]}\n"
    
    await update.message.reply_text(f"```\n{text}\n```", parse_mode='markdown')

6.3 docker容器监控

import docker

async def docker_status(update: update, context: contexttypes.default_type):
    """docker容器状态"""
    try:
        client = docker.from_env()
        containers = client.containers.list(all=true)
        
        text = "🐳 docker容器状态\n━━━━━━━━━━━━━━━━━━\n"
        
        for c in containers:
            status_icon = "🟢" if c.status == "running" else "🔴"
            text += f"{status_icon} {c.name[:20]}: {c.status}\n"
        
        if not containers:
            text += "没有容器"
        
        await update.message.reply_text(text)
    except exception as e:
        await update.message.reply_text(f"❌ docker连接失败: {e}")

七、跨网络访问

7.1 问题

场景:
- bot运行在内网服务器
- 想在外面通过telegram控制服务器
- 服务器没有公网ip

问题:
- telegram api可以正常调用(服务器能访问外网)
- 但无法直接ssh到服务器进行维护

7.2 解决方案

bot本身可以正常工作,但如果需要直接访问内网服务器:

使用组网软件(如星空组网):
1. 内网服务器安装组网客户端
2. 你的手机/电脑安装组网客户端
3. 组建虚拟局域网
4. 通过虚拟ip直接ssh到服务器

优势:
- bot负责告警和简单查询
- 组网负责需要时的直接访问
- 互相补充,完美配合

八、部署运维

8.1 后台运行

# 使用nohup
nohup python bot.py > bot.log 2>&1 &

# 使用screen
screen -s bot
python bot.py
# ctrl+a, d 退出

# 使用systemd(推荐)

8.2 systemd服务

# /etc/systemd/system/telegram-bot.service
[unit]
description=telegram server monitor bot
after=network.target

[service]
type=simple
user=root
workingdirectory=/opt/telegram_bot
execstart=/usr/bin/python3 /opt/telegram_bot/bot.py
restart=always
restartsec=10

[install]
wantedby=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable telegram-bot
sudo systemctl start telegram-bot
sudo systemctl status telegram-bot

九、总结

telegram bot监控要点:

功能实现
状态查询命令 + 按钮
定时检查apscheduler
告警推送阈值触发
远程控制命令执行

bot能做的:

  • 实时查看服务器状态
  • 自动告警通知
  • 远程执行命令
  • 定时报告

参考资料

  1. python-telegram-bot文档:https://docs.python-telegram-bot.org/
  2. psutil文档:https://psutil.readthedocs.io/
  3. telegram bot api:https://core.telegram.org/bots/api

💡 telegram bot是服务器监控的好帮手,配合组网软件可以实现更完整的远程运维方案。

到此这篇关于python开发telegram bot实现服务器监控告警的文章就介绍到这了,更多相关python 实现服务器监控告警内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com