功能介绍
这是一个实时文件监控和变更通知器,用于监控指定目录中的文件变化并发送通知。该工具具备以下核心功能:
实时文件监控:
- 监控文件创建、修改、删除操作
- 支持递归监控子目录
- 实时事件检测和处理
灵活的通知机制:
- 邮件通知
- 桌面通知
- webhook通知
- 日志记录
智能过滤和匹配:
- 文件类型过滤
- 正则表达式匹配
- 排除特定文件或目录
- 大小和时间过滤
多平台支持:
- windows、linux、macos兼容
- 跨平台文件系统事件处理
- 系统托盘集成(windows/macos)
配置管理:
- json/yaml配置文件支持
- 命令行参数配置
- 动态配置更新
- 多监控任务管理
场景应用
1. 安全监控
- 监控敏感目录的文件变更
- 检测恶意文件创建或修改
- 实时告警重要系统文件变更
2. 开发辅助
- 监控代码文件变更自动触发构建
- 监控配置文件变更自动重启服务
- 监控日志文件变更实时分析
3. 数据同步
- 监控下载目录自动处理新文件
- 监控共享目录自动同步文件
- 监控备份目录验证完整性
4. 系统管理
- 监控系统配置目录变更
- 监控应用程序数据目录
- 监控日志目录异常写入
报错处理
1. 文件系统监控异常
try:
observer.start()
except oserror as e:
logger.error(f"文件系统监控启动失败: {str(e)}")
if e.errno == errno.enospc:
logger.error("系统inotify限制 reached,请增加fs.inotify.max_user_watches")
raise filemonitorerror(f"监控启动失败: {str(e)}")
except exception as e:
logger.error(f"文件监控异常: {str(e)}")
raise filemonitorerror(f"监控异常: {str(e)}")
2. 通知发送异常
try:
send_notification(event)
except smtplib.smtpexception as e:
logger.error(f"邮件发送失败: {str(e)}")
# 重试机制
retry_send_notification(event, max_retries=3)
except requests.requestexception as e:
logger.error(f"webhook请求失败: {str(e)}")
handle_webhook_failure(event)
except exception as e:
logger.error(f"通知发送异常: {str(e)}")
3. 配置文件异常
try:
config = load_config(config_file)
except json.jsondecodeerror as e:
logger.error(f"配置文件json格式错误: {str(e)}")
raise configerror(f"配置文件格式无效: {str(e)}")
except yaml.yamlerror as e:
logger.error(f"配置文件yaml格式错误: {str(e)}")
raise configerror(f"配置文件格式无效: {str(e)}")
except filenotfounderror:
logger.warning(f"配置文件不存在,使用默认配置: {config_file}")
config = create_default_config()
4. 权限异常
try:
os.access(path, os.r_ok)
except permissionerror as e:
logger.error(f"无权限访问目录: {path}")
send_alert(f"监控目录访问被拒绝: {path}")
except exception as e:
logger.error(f"目录访问异常: {str(e)}")
代码实现
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
文件监控和变更通知器
功能:监控文件系统变更并发送通知
作者:cline
版本:1.0
"""
import argparse
import sys
import json
import yaml
import logging
import os
import time
import threading
import re
import fnmatch
from datetime import datetime
from typing import dict, list, set, optional, callable
from pathlib import path
import hashlib
import smtplib
import requests
from email.mime.text import mimetext
from email.mime.multipart import mimemultipart
# 根据平台导入文件监控库
try:
from watchdog.observers import observer
from watchdog.events import filesystemeventhandler
watchdog_available = true
except importerror:
watchdog_available = false
logger.warning("watchdog库未安装,将使用轮询模式")
# 配置日志
logging.basicconfig(
level=logging.info,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.filehandler('file_monitor.log'),
logging.streamhandler(sys.stdout)
]
)
logger = logging.getlogger(__name__)
class filemonitorerror(exception):
"""文件监控器异常类"""
pass
class configerror(exception):
"""配置异常类"""
pass
class fileevent:
"""文件事件类"""
def __init__(self, event_type: str, src_path: str, dest_path: str = none):
self.event_type = event_type # created, modified, deleted, moved
self.src_path = src_path
self.dest_path = dest_path
self.timestamp = datetime.now()
self.file_size = none
self.file_hash = none
# 获取文件信息
if os.path.exists(src_path):
try:
self.file_size = os.path.getsize(src_path)
# 计算文件哈希值(仅对小文件)
if self.file_size < 10 * 1024 * 1024: # 10mb以下
self.file_hash = self._calculate_hash(src_path)
except exception:
pass
def _calculate_hash(self, file_path: str) -> str:
"""计算文件md5哈希值"""
hash_md5 = hashlib.md5()
try:
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
except exception:
return none
def to_dict(self) -> dict:
"""转换为字典"""
return {
'event_type': self.event_type,
'src_path': self.src_path,
'dest_path': self.dest_path,
'timestamp': self.timestamp.isoformat(),
'file_size': self.file_size,
'file_hash': self.file_hash
}
class notificationmanager:
"""通知管理器"""
def __init__(self, config: dict):
self.config = config.get('notifications', {})
self.email_config = self.config.get('email', {})
self.webhook_config = self.config.get('webhook', {})
self.desktop_config = self.config.get('desktop', {})
def send_email(self, subject: str, body: str, attachments: list[str] = none):
"""发送邮件通知"""
if not self.email_config.get('enabled', false):
return
try:
msg = mimemultipart()
msg['from'] = self.email_config.get('sender')
msg['to'] = ', '.join(self.email_config.get('recipients', []))
msg['subject'] = subject
msg.attach(mimetext(body, 'plain'))
server = smtplib.smtp(
self.email_config.get('smtp_server'),
self.email_config.get('smtp_port', 587)
)
server.starttls()
server.login(
self.email_config.get('sender'),
self.email_config.get('password')
)
server.send_message(msg)
server.quit()
logger.info(f"邮件通知已发送: {subject}")
except exception as e:
logger.error(f"邮件发送失败: {str(e)}")
def send_webhook(self, event: fileevent):
"""发送webhook通知"""
if not self.webhook_config.get('enabled', false):
return
try:
payload = {
'event': event.to_dict(),
'monitor_name': self.config.get('name', 'unknown')
}
response = requests.post(
self.webhook_config.get('url'),
json=payload,
headers=self.webhook_config.get('headers', {}),
timeout=self.webhook_config.get('timeout', 30)
)
response.raise_for_status()
logger.info(f"webhook通知已发送: {event.event_type}")
except exception as e:
logger.error(f"webhook发送失败: {str(e)}")
def send_desktop_notification(self, title: str, message: str):
"""发送桌面通知"""
if not self.desktop_config.get('enabled', false):
return
try:
# 根据平台选择通知库
if sys.platform == 'win32':
# windows通知
from plyer import notification
notification.notify(
title=title,
message=message,
app_name='file monitor'
)
elif sys.platform == 'darwin':
# macos通知
import pync
pync.notify(message, title=title)
else:
# linux通知
import notify2
notify2.init('file monitor')
notice = notify2.notification(title, message)
notice.show()
logger.info(f"桌面通知已发送: {title}")
except exception as e:
logger.error(f"桌面通知发送失败: {str(e)}")
class fileeventhandler(filesystemeventhandler):
"""文件事件处理器"""
def __init__(self, monitor_config: dict, notification_manager: notificationmanager):
self.config = monitor_config
self.notification_manager = notification_manager
self.ignore_patterns = self.config.get('ignore_patterns', [])
self.include_patterns = self.config.get('include_patterns', ['*'])
self.min_file_size = self.config.get('min_file_size', 0)
self.max_file_size = self.config.get('max_file_size', float('inf'))
def on_any_event(self, event):
"""处理所有文件事件"""
if event.is_directory:
return
# 检查是否应该忽略此文件
if self._should_ignore(event.src_path):
return
# 创建文件事件对象
file_event = fileevent(
event_type=event.event_type,
src_path=event.src_path,
dest_path=getattr(event, 'dest_path', none)
)
# 检查文件大小过滤
if file_event.file_size is not none:
if file_event.file_size < self.min_file_size or file_event.file_size > self.max_file_size:
return
# 发送通知
self._send_notifications(file_event)
def _should_ignore(self, file_path: str) -> bool:
"""检查是否应该忽略文件"""
filename = os.path.basename(file_path)
# 检查排除模式
for pattern in self.ignore_patterns:
if fnmatch.fnmatch(filename, pattern):
return true
# 检查包含模式
included = false
for pattern in self.include_patterns:
if fnmatch.fnmatch(filename, pattern):
included = true
break
return not included
def _send_notifications(self, event: fileevent):
"""发送通知"""
# 构造通知内容
subject = f"文件变更通知 - {event.event_type}"
message = f"""
文件监控器检测到文件变更
事件类型: {event.event_type}
文件路径: {event.src_path}
目标路径: {event.dest_path or 'n/a'}
文件大小: {event.file_size or 'n/a'} bytes
时间戳: {event.timestamp.strftime('%y-%m-%d %h:%m:%s')}
文件哈希: {event.file_hash or 'n/a'}
"""
# 发送各种通知
self.notification_manager.send_email(subject, message)
self.notification_manager.send_webhook(event)
self.notification_manager.send_desktop_notification(subject, message)
class pollingfilemonitor:
"""轮询文件监控器(当watchdog不可用时使用)"""
def __init__(self, path: str, interval: int = 1):
self.path = path
self.interval = interval
self.files = {}
self.running = false
self.callbacks = []
# 初始化文件状态
self._scan_directory()
def _scan_directory(self):
"""扫描目录获取文件状态"""
try:
for root, dirs, files in os.walk(self.path):
for file in files:
file_path = os.path.join(root, file)
try:
stat = os.stat(file_path)
self.files[file_path] = {
'mtime': stat.st_mtime,
'size': stat.st_size
}
except oserror:
continue
except exception as e:
logger.error(f"扫描目录失败: {str(e)}")
def add_callback(self, callback: callable):
"""添加回调函数"""
self.callbacks.append(callback)
def start(self):
"""启动监控"""
self.running = true
thread = threading.thread(target=self._monitor_loop)
thread.daemon = true
thread.start()
def stop(self):
"""停止监控"""
self.running = false
def _monitor_loop(self):
"""监控循环"""
while self.running:
try:
current_files = {}
for root, dirs, files in os.walk(self.path):
for file in files:
file_path = os.path.join(root, file)
try:
stat = os.stat(file_path)
current_files[file_path] = {
'mtime': stat.st_mtime,
'size': stat.st_size
}
except oserror:
continue
# 检查文件变更
self._check_changes(current_files)
self.files = current_files
time.sleep(self.interval)
except exception as e:
logger.error(f"轮询监控异常: {str(e)}")
time.sleep(self.interval)
def _check_changes(self, current_files: dict):
"""检查文件变更"""
# 检查新文件
for file_path, info in current_files.items():
if file_path not in self.files:
event = fileevent('created', file_path)
for callback in self.callbacks:
try:
callback(event)
except exception as e:
logger.error(f"回调执行失败: {str(e)}")
# 检查删除的文件
for file_path in self.files:
if file_path not in current_files:
event = fileevent('deleted', file_path)
for callback in self.callbacks:
try:
callback(event)
except exception as e:
logger.error(f"回调执行失败: {str(e)}")
# 检查修改的文件
for file_path, info in current_files.items():
if file_path in self.files:
old_info = self.files[file_path]
if (info['mtime'] != old_info['mtime'] or
info['size'] != old_info['size']):
event = fileevent('modified', file_path)
for callback in self.callbacks:
try:
callback(event)
except exception as e:
logger.error(f"回调执行失败: {str(e)}")
class filemonitor:
"""文件监控器主类"""
def __init__(self, config_file: str = none):
self.config_file = config_file
self.config = {}
self.monitors = {}
self.notification_managers = {}
self.running = false
# 加载配置
self.load_config()
def load_config(self):
"""加载配置文件"""
if not self.config_file or not os.path.exists(self.config_file):
logger.info("未指定配置文件或文件不存在,使用默认配置")
self.config = self._create_default_config()
return
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
if self.config_file.endswith('.yaml') or self.config_file.endswith('.yml'):
self.config = yaml.safe_load(f)
else:
self.config = json.load(f)
logger.info(f"成功加载配置文件: {self.config_file}")
except exception as e:
logger.error(f"加载配置文件失败: {str(e)}")
raise configerror(f"配置加载失败: {str(e)}")
def _create_default_config(self) -> dict:
"""创建默认配置"""
return {
"monitors": [
{
"name": "downloads_monitor",
"path": "~/downloads",
"recursive": true,
"ignore_patterns": ["*.tmp", "*.part"],
"include_patterns": ["*"],
"min_file_size": 0,
"max_file_size": 1073741824, # 1gb
"enabled": true
}
],
"notifications": {
"email": {
"enabled": false,
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"sender": "your_email@gmail.com",
"password": "your_app_password",
"recipients": ["admin@example.com"]
},
"webhook": {
"enabled": false,
"url": "https://hooks.slack.com/services/your/slack/webhook",
"headers": {},
"timeout": 30
},
"desktop": {
"enabled": true
}
}
}
def start(self):
"""启动所有监控器"""
if self.running:
logger.warning("文件监控器已在运行")
return
logger.info("启动文件监控器...")
self.running = true
# 为每个监控任务创建监控器
for monitor_config in self.config.get('monitors', []):
if not monitor_config.get('enabled', true):
continue
name = monitor_config.get('name', 'unnamed')
path = os.path.expanduser(monitor_config.get('path', '.'))
if not os.path.exists(path):
logger.warning(f"监控路径不存在: {path}")
continue
try:
# 创建通知管理器
notification_manager = notificationmanager(self.config)
self.notification_managers[name] = notification_manager
if watchdog_available:
# 使用watchdog监控
event_handler = fileeventhandler(monitor_config, notification_manager)
observer = observer()
observer.schedule(event_handler, path, recursive=monitor_config.get('recursive', true))
observer.start()
self.monitors[name] = observer
logger.info(f"已启动监控器: {name} -> {path}")
else:
# 使用轮询监控
interval = monitor_config.get('poll_interval', 1)
poll_monitor = pollingfilemonitor(path, interval)
# 创建事件处理器
def handle_event(event):
event_handler = fileeventhandler(monitor_config, notification_manager)
event_handler._send_notifications(event)
poll_monitor.add_callback(handle_event)
poll_monitor.start()
self.monitors[name] = poll_monitor
logger.info(f"已启动轮询监控器: {name} -> {path}")
except exception as e:
logger.error(f"启动监控器 {name} 失败: {str(e)}")
logger.info("文件监控器启动完成")
def stop(self):
"""停止所有监控器"""
logger.info("停止文件监控器...")
self.running = false
for name, monitor in self.monitors.items():
try:
if watchdog_available and hasattr(monitor, 'stop'):
monitor.stop()
monitor.join()
elif hasattr(monitor, 'stop'):
monitor.stop()
logger.info(f"已停止监控器: {name}")
except exception as e:
logger.error(f"停止监控器 {name} 失败: {str(e)}")
self.monitors.clear()
logger.info("文件监控器已停止")
def add_monitor(self, monitor_config: dict):
"""动态添加监控任务"""
name = monitor_config.get('name', 'unnamed')
path = os.path.expanduser(monitor_config.get('path', '.'))
if name in self.monitors:
logger.warning(f"监控器已存在: {name}")
return
if not os.path.exists(path):
logger.warning(f"监控路径不存在: {path}")
return
try:
notification_manager = notificationmanager(self.config)
self.notification_managers[name] = notification_manager
if watchdog_available:
event_handler = fileeventhandler(monitor_config, notification_manager)
observer = observer()
observer.schedule(event_handler, path, recursive=monitor_config.get('recursive', true))
observer.start()
self.monitors[name] = observer
else:
interval = monitor_config.get('poll_interval', 1)
poll_monitor = pollingfilemonitor(path, interval)
def handle_event(event):
event_handler = fileeventhandler(monitor_config, notification_manager)
event_handler._send_notifications(event)
poll_monitor.add_callback(handle_event)
poll_monitor.start()
self.monitors[name] = poll_monitor
logger.info(f"已添加监控器: {name} -> {path}")
except exception as e:
logger.error(f"添加监控器 {name} 失败: {str(e)}")
def remove_monitor(self, name: str):
"""移除监控任务"""
if name not in self.monitors:
logger.warning(f"监控器不存在: {name}")
return
try:
monitor = self.monitors[name]
if watchdog_available and hasattr(monitor, 'stop'):
monitor.stop()
monitor.join()
elif hasattr(monitor, 'stop'):
monitor.stop()
del self.monitors[name]
if name in self.notification_managers:
del self.notification_managers[name]
logger.info(f"已移除监控器: {name}")
except exception as e:
logger.error(f"移除监控器 {name} 失败: {str(e)}")
def create_sample_config():
"""创建示例配置文件"""
sample_config = {
"monitors": [
{
"name": "downloads_monitor",
"path": "~/downloads",
"recursive": true,
"ignore_patterns": ["*.tmp", "*.part", "~*"],
"include_patterns": ["*.pdf", "*.doc", "*.docx", "*.jpg", "*.png"],
"min_file_size": 1024, # 1kb
"max_file_size": 1073741824, # 1gb
"enabled": true
},
{
"name": "config_monitor",
"path": "/etc",
"recursive": false,
"ignore_patterns": [],
"include_patterns": ["*.conf", "*.cfg"],
"min_file_size": 0,
"max_file_size": 10485760, # 10mb
"enabled": true
}
],
"notifications": {
"email": {
"enabled": true,
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"sender": "your_email@gmail.com",
"password": "your_app_password",
"recipients": ["admin@example.com", "security@example.com"]
},
"webhook": {
"enabled": true,
"url": "https://hooks.slack.com/services/your/slack/webhook",
"headers": {
"content-type": "application/json"
},
"timeout": 30
},
"desktop": {
"enabled": true
}
}
}
with open('file_monitor_sample_config.json', 'w', encoding='utf-8') as f:
json.dump(sample_config, f, indent=2, ensure_ascii=false)
logger.info("示例配置文件已创建: file_monitor_sample_config.json")
def main():
parser = argparse.argumentparser(description='文件监控和变更通知器')
parser.add_argument('-c', '--config', help='配置文件路径')
parser.add_argument('--start', action='store_true', help='启动监控器')
parser.add_argument('--sample-config', action='store_true', help='创建示例配置文件')
args = parser.parse_args()
if args.sample_config:
create_sample_config()
return
monitor = filemonitor(args.config)
if args.start:
try:
monitor.start()
# 保持程序运行
while true:
time.sleep(1)
except keyboardinterrupt:
logger.info("收到中断信号,正在停止监控器...")
finally:
monitor.stop()
else:
parser.print_help()
if __name__ == '__main__':
main()
使用说明
1. 安装依赖
# 基础依赖 pip install watchdog requests pyyaml plyer # macos通知支持 pip install pync # linux通知支持 pip install notify2
2. 创建配置文件
python file_monitor.py --sample-config
3. 启动监控器
python file_monitor.py --config file_monitor_config.json --start
配置文件示例
json配置文件
{
"monitors": [
{
"name": "downloads_monitor",
"path": "~/downloads",
"recursive": true,
"ignore_patterns": ["*.tmp", "*.part", "~*"],
"include_patterns": ["*.pdf", "*.doc", "*.docx", "*.jpg", "*.png"],
"min_file_size": 1024,
"max_file_size": 1073741824,
"enabled": true
},
{
"name": "config_monitor",
"path": "/etc",
"recursive": false,
"ignore_patterns": [],
"include_patterns": ["*.conf", "*.cfg"],
"min_file_size": 0,
"max_file_size": 10485760,
"enabled": true
}
],
"notifications": {
"email": {
"enabled": true,
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"sender": "your_email@gmail.com",
"password": "your_app_password",
"recipients": ["admin@example.com"]
},
"webhook": {
"enabled": true,
"url": "https://hooks.slack.com/services/your/slack/webhook",
"headers": {
"content-type": "application/json"
},
"timeout": 30
},
"desktop": {
"enabled": true
}
}
}
yaml配置文件
monitors:
- name: downloads_monitor
path: ~/downloads
recursive: true
ignore_patterns:
- "*.tmp"
- "*.part"
- "~*"
include_patterns:
- "*.pdf"
- "*.doc"
- "*.docx"
- "*.jpg"
- "*.png"
min_file_size: 1024
max_file_size: 1073741824
enabled: true
- name: config_monitor
path: /etc
recursive: false
ignore_patterns: []
include_patterns:
- "*.conf"
- "*.cfg"
min_file_size: 0
max_file_size: 10485760
enabled: true
notifications:
email:
enabled: true
smtp_server: smtp.gmail.com
smtp_port: 587
sender: your_email@gmail.com
password: your_app_password
recipients:
- admin@example.com
webhook:
enabled: true
url: https://hooks.slack.com/services/your/slack/webhook
headers:
content-type: application/json
timeout: 30
desktop:
enabled: true
高级特性
1. 动态配置更新
支持在运行时动态添加或移除监控任务,无需重启监控器。
2. 文件指纹识别
对小文件自动计算md5哈希值,用于精确识别文件变更。
3. 多种通知方式
同时支持邮件、webhook和桌面通知,确保不会错过重要变更。
4. 智能过滤机制
支持复杂的文件过滤规则,包括文件类型、大小、名称模式等。
最佳实践
1. 性能优化
- 合理设置监控目录范围,避免监控大量文件的目录
- 使用适当的文件过滤规则减少不必要的事件处理
- 对于大文件,避免计算哈希值
2. 安全性考虑
- 不要在配置文件中明文存储敏感信息
- 限制监控目录的访问权限
- 定期审查监控日志
3. 监控和维护
- 定期检查监控器运行状态
- 监控系统资源使用情况
- 及时处理通知失败的情况
总结
这个文件监控和变更通知器提供了一个功能完整、易于使用的文件监控解决方案。通过灵活的配置和多种通知方式,可以满足各种文件监控需求。无论是安全监控、开发辅助还是系统管理,都能通过这个工具实现实时的文件变更检测和通知。
以上就是python实现实时文件监控和变更通知的详细内容,更多关于python文件监控的资料请关注代码网其它相关文章!
发表评论