当前位置: 代码网 > it编程>前端脚本>Python > 基于Python实现自动化文件管理(分类、重命名和备份)

基于Python实现自动化文件管理(分类、重命名和备份)

2025年10月27日 Python 我要评论
1. 引言在数字化时代,我们每天都会创建和处理大量的文件。根据统计,知识工作者平均每年处理10,000+个文件,而其中30%的时间都花费在文件管理上——寻找文件、整理文件夹、备

1. 引言

在数字化时代,我们每天都会创建和处理大量的文件。根据统计,知识工作者平均每年处理10,000+个文件,而其中30%的时间都花费在文件管理上——寻找文件、整理文件夹、备份重要数据等。这些重复性任务不仅耗时耗力,还容易出错。

自动化文件管理通过编程方式处理这些繁琐任务,可以为我们带来显著的效率提升。想象一下:每天自动整理下载文件夹、按规则批量重命名照片、定时备份重要文档——这些都可以通过python轻松实现。

自动化文件管理的价值

2. 环境准备和基础概念

2.1 必要的python库

在开始之前,我们需要安装以下python库:

# 文件操作和系统交互
pip install pathlib2
pip install shutil
pip install os-sys

# 文件类型检测
pip install python-magic
pip install filetype

# 压缩和备份
pip install pyzipper
pip install rarfile

# 图像处理(用于图片文件)
pip install pillow

# 日期处理
pip install python-dateutil

# 进度显示
pip install tqdm

# 配置文件处理
pip install pyyaml

2.2 核心库功能介绍

# 导入所需库
import os
import shutil
import re
import hashlib
from pathlib import path
from datetime import datetime, timedelta
import time
from typing import list, dict, optional, tuple, callable
import logging
from dataclasses import dataclass
from enum import enum
import json
import yaml
from tqdm import tqdm
import zipfile
import filetype  # 用于准确检测文件类型

# 尝试导入可选依赖
try:
    from pil import image, exiftags
    pil_available = true
except importerror:
    pil_available = false

try:
    import pyzipper
    zip_encryption_available = true
except importerror:
    zip_encryption_available = false

3. 智能文件分类系统

3.1 基础文件分类器

让我们从构建一个智能文件分类器开始:

class fileclassifier:
    """
    智能文件分类器
    根据文件类型、内容、大小等特征自动分类文件
    """
    
    # 文件类型分类映射
    file_categories = {
        'images': {
            'extensions': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg', '.webp', '.tiff', '.heic'],
            'mime_types': ['image/jpeg', 'image/png', 'image/gif', 'image/bmp', 'image/svg+xml'],
            'description': '图片文件'
        },
        'documents': {
            'extensions': ['.pdf', '.doc', '.docx', '.txt', '.rtf', '.xlsx', '.pptx', '.md', '.tex'],
            'mime_types': ['application/pdf', 'application/msword', 'text/plain'],
            'description': '文档文件'
        },
        'audio': {
            'extensions': ['.mp3', '.wav', '.flac', '.aac', '.ogg', '.m4a', '.wma'],
            'mime_types': ['audio/mpeg', 'audio/wav', 'audio/flac'],
            'description': '音频文件'
        },
        'video': {
            'extensions': ['.mp4', '.avi', '.mov', '.wmv', '.flv', '.mkv', '.webm'],
            'mime_types': ['video/mp4', 'video/avi', 'video/quicktime'],
            'description': '视频文件'
        },
        'archives': {
            'extensions': ['.zip', '.rar', '.7z', '.tar', '.gz', '.bz2'],
            'mime_types': ['application/zip', 'application/x-rar-compressed'],
            'description': '压缩文件'
        },
        'code': {
            'extensions': ['.py', '.js', '.html', '.css', '.java', '.cpp', '.c', '.php', '.rb', '.go'],
            'mime_types': ['text/x-python', 'application/javascript', 'text/html'],
            'description': '代码文件'
        },
        'executables': {
            'extensions': ['.exe', '.msi', '.dmg', '.pkg', '.deb', '.rpm', '.appimage'],
            'mime_types': ['application/x-msdownload', 'application/x-executable'],
            'description': '可执行文件'
        },
        'data': {
            'extensions': ['.csv', '.json', '.xml', '.sql', '.db', '.sqlite'],
            'mime_types': ['text/csv', 'application/json'],
            'description': '数据文件'
        }
    }
    
    def __init__(self, config_file: optional[str] = none):
        """
        初始化文件分类器
        
        args:
            config_file: 配置文件路径
        """
        self.config = self.load_config(config_file)
        self.setup_logging()
        
    def setup_logging(self):
        """设置日志记录"""
        logging.basicconfig(
            level=logging.info,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.filehandler('file_classifier.log', encoding='utf-8'),
                logging.streamhandler()
            ]
        )
        self.logger = logging.getlogger(__name__)
    
    def load_config(self, config_file: optional[str]) -> dict:
        """
        加载配置文件
        
        args:
            config_file: 配置文件路径
            
        returns:
            dict: 配置信息
        """
        default_config = {
            'organize_rules': {
                'create_date_folders': true,
                'use_nested_categories': false,
                'handle_duplicates': 'rename',  # 'rename', 'overwrite', 'skip'
                'default_category': 'others',
                'max_filename_length': 255
            },
            'categories': self.file_categories,
            'backup': {
                'enabled': false,
                'backup_before_organize': true,
                'backup_location': './backups'
            }
        }
        
        if config_file and path(config_file).exists():
            try:
                with open(config_file, 'r', encoding='utf-8') as f:
                    if config_file.endswith('.json'):
                        user_config = json.load(f)
                    elif config_file.endswith(('.yaml', '.yml')):
                        user_config = yaml.safe_load(f)
                    else:
                        self.logger.warning(f"不支持的配置文件格式: {config_file}")
                        return default_config
                
                # 深度合并配置
                return self.deep_merge(default_config, user_config)
            except exception as e:
                self.logger.error(f"加载配置文件失败: {str(e)}")
        
        return default_config
    
    def deep_merge(self, base: dict, update: dict) -> dict:
        """
        深度合并两个字典
        
        args:
            base: 基础字典
            update: 更新字典
            
        returns:
            dict: 合并后的字典
        """
        result = base.copy()
        
        for key, value in update.items():
            if isinstance(value, dict) and key in result and isinstance(result[key], dict):
                result[key] = self.deep_merge(result[key], value)
            else:
                result[key] = value
        
        return result
    
    def detect_file_type(self, file_path: path) -> dict[str, str]:
        """
        检测文件类型
        
        args:
            file_path: 文件路径
            
        returns:
            dict: 文件类型信息
        """
        file_info = {
            'extension': file_path.suffix.lower(),
            'mime_type': none,
            'size': file_path.stat().st_size,
            'created': datetime.fromtimestamp(file_path.stat().st_ctime),
            'modified': datetime.fromtimestamp(file_path.stat().st_mtime)
        }
        
        # 使用filetype库检测mime类型
        try:
            kind = filetype.guess(str(file_path))
            if kind:
                file_info['mime_type'] = kind.mime
        except exception as e:
            self.logger.debug(f"文件类型检测失败 {file_path}: {str(e)}")
        
        return file_info
    
    def classify_file(self, file_path: path) -> str:
        """
        分类单个文件
        
        args:
            file_path: 文件路径
            
        returns:
            str: 文件类别
        """
        if not file_path.is_file():
            return 'invalid'
        
        file_info = self.detect_file_type(file_path)
        
        # 基于扩展名分类
        for category, config in self.config['categories'].items():
            if file_info['extension'] in config['extensions']:
                return category
        
        # 基于mime类型分类
        if file_info['mime_type']:
            for category, config in self.config['categories'].items():
                if file_info['mime_type'] in config['mime_types']:
                    return category
        
        # 基于文件大小和内容的启发式分类
        special_category = self.special_classification(file_path, file_info)
        if special_category:
            return special_category
        
        return self.config['organize_rules']['default_category']
    
    def special_classification(self, file_path: path, file_info: dict) -> optional[str]:
        """
        特殊分类逻辑
        
        args:
            file_path: 文件路径
            file_info: 文件信息
            
        returns:
            optional[str]: 特殊类别
        """
        # 大文件分类
        if file_info['size'] > 100 * 1024 * 1024:  # 100mb
            return 'large_files'
        
        # 临时文件
        if file_path.name.startswith('~') or file_path.name.startswith('.'):
            return 'temporary_files'
        
        # 最近修改的文件
        if datetime.now() - file_info['modified'] < timedelta(days=7):
            return 'recent_files'
        
        return none
    
    def organize_directory(self, source_dir: path, target_dir: optional[path] = none, 
                          dry_run: bool = false) -> dict[str, list[path]]:
        """
        整理目录中的文件
        
        args:
            source_dir: 源目录
            target_dir: 目标目录,如果为none则在原目录整理
            dry_run: 试运行模式,不实际移动文件
            
        returns:
            dict: 分类结果
        """
        if not source_dir.exists():
            self.logger.error(f"源目录不存在: {source_dir}")
            return {}
        
        if target_dir is none:
            target_dir = source_dir
        
        # 创建分类文件夹
        categories = list(self.config['categories'].keys()) + [
            self.config['organize_rules']['default_category'],
            'large_files', 'temporary_files', 'recent_files'
        ]
        
        for category in categories:
            category_dir = target_dir / category
            if not dry_run:
                category_dir.mkdir(exist_ok=true)
        
        classification_results = {category: [] for category in categories}
        classification_results['skipped'] = []
        classification_results['errors'] = []
        
        # 遍历文件
        file_count = 0
        for file_path in source_dir.rglob('*'):
            if not file_path.is_file():
                continue
            
            # 跳过分类文件夹中的文件
            if any(category in file_path.parts for category in categories):
                continue
            
            file_count += 1
            
            try:
                category = self.classify_file(file_path)
                
                if dry_run:
                    classification_results[category].append(file_path)
                    self.logger.info(f"[试运行] {file_path.name} -> {category}/")
                else:
                    # 处理文件移动
                    success = self.move_to_category(file_path, target_dir / category, category)
                    if success:
                        classification_results[category].append(file_path)
                    else:
                        classification_results['errors'].append(file_path)
            
            except exception as e:
                self.logger.error(f"处理文件失败 {file_path}: {str(e)}")
                classification_results['errors'].append(file_path)
        
        # 生成报告
        self.generate_organization_report(classification_results, file_count, dry_run)
        
        return classification_results
    
    def move_to_category(self, source_file: path, target_dir: path, category: str) -> bool:
        """
        移动文件到分类目录
        
        args:
            source_file: 源文件
            target_dir: 目标目录
            category: 文件类别
            
        returns:
            bool: 移动是否成功
        """
        try:
            if not target_dir.exists():
                target_dir.mkdir(parents=true, exist_ok=true)
            
            target_file = target_dir / source_file.name
            
            # 处理文件名冲突
            if target_file.exists():
                handle_method = self.config['organize_rules']['handle_duplicates']
                if handle_method == 'rename':
                    target_file = self.generate_unique_filename(target_file)
                elif handle_method == 'skip':
                    self.logger.info(f"跳过已存在文件: {target_file}")
                    return false
                # 'overwrite' 则直接使用原文件名
            
            # 移动文件
            shutil.move(str(source_file), str(target_file))
            self.logger.info(f"已移动: {source_file.name} -> {category}/")
            return true
            
        except exception as e:
            self.logger.error(f"移动文件失败 {source_file} -> {target_dir}: {str(e)}")
            return false
    
    def generate_unique_filename(self, file_path: path) -> path:
        """
        生成唯一文件名
        
        args:
            file_path: 原始文件路径
            
        returns:
            path: 唯一文件路径
        """
        counter = 1
        original_stem = file_path.stem
        extension = file_path.suffix
        
        while file_path.exists():
            new_name = f"{original_stem}_{counter}{extension}"
            file_path = file_path.parent / new_name
            counter += 1
        
        return file_path
    
    def generate_organization_report(self, results: dict, total_files: int, dry_run: bool = false):
        """
        生成整理报告
        
        args:
            results: 分类结果
            total_files: 总文件数
            dry_run: 是否为试运行
        """
        mode = "试运行" if dry_run else "实际执行"
        
        self.logger.info(f"\n=== 文件整理报告 ({mode}) ===")
        self.logger.info(f"总文件数: {total_files}")
        
        for category, files in results.items():
            if category not in ['skipped', 'errors'] and files:
                self.logger.info(f"{category}: {len(files)} 个文件")
        
        if results.get('skipped'):
            self.logger.info(f"跳过文件: {len(results['skipped'])} 个")
        
        if results.get('errors'):
            self.logger.warning(f"处理失败: {len(results['errors'])} 个文件")

# 使用示例
def demo_file_classifier():
    """演示文件分类功能"""
    classifier = fileclassifier()
    
    # 创建测试目录和文件
    test_dir = path("test_files")
    test_dir.mkdir(exist_ok=true)
    
    # 创建一些测试文件
    test_files = {
        "document.pdf": "documents",
        "image.jpg": "images", 
        "music.mp3": "audio",
        "video.mp4": "video",
        "script.py": "code",
        "data.csv": "data",
        "archive.zip": "archives"
    }
    
    for filename, expected_category in test_files.items():
        file_path = test_dir / filename
        file_path.touch()  # 创建空文件
        
        # 测试分类
        detected_category = classifier.classify_file(file_path)
        print(f"{filename}: 预期={expected_category}, 检测={detected_category}")
    
    # 测试目录整理(试运行)
    print("\n=== 目录整理试运行 ===")
    results = classifier.organize_directory(test_dir, dry_run=true)
    
    # 清理测试文件
    for file_path in test_dir.iterdir():
        if file_path.is_file():
            file_path.unlink()
    test_dir.rmdir()
    
    return classifier

if __name__ == "__main__":
    demo_file_classifier()

3.2 高级文件分类功能

class advancedfileclassifier(fileclassifier):
    """
    高级文件分类器
    扩展基础功能,支持更复杂的分类策略
    """
    
    def __init__(self, config_file: optional[str] = none):
        super().__init__(config_file)
        self.learning_data = self.load_learning_data()
    
    def load_learning_data(self) -> dict:
        """
        加载学习数据(用于智能分类)
        
        returns:
            dict: 学习数据
        """
        data_file = path("file_classification_learning.json")
        if data_file.exists():
            try:
                with open(data_file, 'r', encoding='utf-8') as f:
                    return json.load(f)
            except exception as e:
                self.logger.error(f"加载学习数据失败: {str(e)}")
        
        return {'file_patterns': {}, 'user_corrections': {}}
    
    def save_learning_data(self):
        """保存学习数据"""
        data_file = path("file_classification_learning.json")
        try:
            with open(data_file, 'w', encoding='utf-8') as f:
                json.dump(self.learning_data, f, indent=2, ensure_ascii=false)
        except exception as e:
            self.logger.error(f"保存学习数据失败: {str(e)}")
    
    def learn_from_filename(self, filename: str, category: str):
        """
        从文件名学习分类模式
        
        args:
            filename: 文件名
            category: 文件类别
        """
        # 提取文件名中的关键词
        words = re.findall(r'[a-za-z]+', filename.lower())
        
        for word in words:
            if len(word) > 2:  # 忽略太短的词
                if word not in self.learning_data['file_patterns']:
                    self.learning_data['file_patterns'][word] = {}
                
                if category not in self.learning_data['file_patterns'][word]:
                    self.learning_data['file_patterns'][word][category] = 0
                
                self.learning_data['file_patterns'][word][category] += 1
        
        self.save_learning_data()
    
    def classify_by_content(self, file_path: path) -> optional[str]:
        """
        基于内容分类文件
        
        args:
            file_path: 文件路径
            
        returns:
            optional[str]: 分类结果
        """
        try:
            # 文本文件内容分析
            if file_path.suffix.lower() in ['.txt', '.md', '.log']:
                return self.analyze_text_content(file_path)
            
            # 图片文件分析
            if file_path.suffix.lower() in ['.jpg', '.jpeg', '.png'] and pil_available:
                return self.analyze_image_content(file_path)
            
            # 代码文件分析
            if file_path.suffix.lower() in ['.py', '.js', '.java', '.cpp']:
                return self.analyze_code_content(file_path)
            
        except exception as e:
            self.logger.debug(f"内容分析失败 {file_path}: {str(e)}")
        
        return none
    
    def analyze_text_content(self, file_path: path) -> optional[str]:
        """
        分析文本文件内容
        
        args:
            file_path: 文件路径
            
        returns:
            optional[str]: 分类结果
        """
        try:
            with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read().lower()
            
            # 基于关键词分类
            category_keywords = {
                'log': ['error', 'warning', 'debug', 'info', 'exception'],
                'config': ['config', 'setting', 'property', 'environment'],
                'documentation': ['readme', 'license', 'install', 'usage']
            }
            
            for category, keywords in category_keywords.items():
                if any(keyword in content for keyword in keywords):
                    return f"text_{category.capitalize()}"
            
        except exception as e:
            self.logger.debug(f"文本分析失败 {file_path}: {str(e)}")
        
        return none
    
    def analyze_image_content(self, file_path: path) -> optional[str]:
        """
        分析图片文件内容
        
        args:
            file_path: 文件路径
            
        returns:
            optional[str]: 分类结果
        """
        if not pil_available:
            return none
        
        try:
            with image.open(file_path) as img:
                width, height = img.size
                
                # 根据图片尺寸分类
                if width > 2000 or height > 2000:
                    return "images_high_resolution"
                elif width < 500 and height < 500:
                    return "images_thumbnails"
                else:
                    return "images_standard"
                    
        except exception as e:
            self.logger.debug(f"图片分析失败 {file_path}: {str(e)}")
        
        return none
    
    def analyze_code_content(self, file_path: path) -> optional[str]:
        """
        分析代码文件内容
        
        args:
            file_path: 文件路径
            
        returns:
            optional[str]: 分类结果
        """
        try:
            with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read()
            
            # 检测代码类型
            if 'import tensorflow' in content or 'import torch' in content:
                return "code_ml"
            elif 'from flask' in content or 'from django' in content:
                return "code_web"
            elif 'class ' in content and 'def ' in content:
                return "code_oop"
            else:
                return "code_scripts"
                
        except exception as e:
            self.logger.debug(f"代码分析失败 {file_path}: {str(e)}")
        
        return none
    
    def smart_classify_file(self, file_path: path) -> str:
        """
        智能文件分类(结合多种策略)
        
        args:
            file_path: 文件路径
            
        returns:
            str: 分类结果
        """
        # 1. 基础分类
        base_category = super().classify_file(file_path)
        
        # 2. 内容分类
        content_category = self.classify_by_content(file_path)
        if content_category:
            return content_category
        
        # 3. 基于学习数据的分类
        learned_category = self.classify_by_learning(file_path.name)
        if learned_category and learned_category != base_category:
            self.logger.info(f"基于学习数据重新分类: {file_path.name} -> {learned_category}")
            return learned_category
        
        return base_category
    
    def classify_by_learning(self, filename: str) -> optional[str]:
        """
        基于学习数据分类
        
        args:
            filename: 文件名
            
        returns:
            optional[str]: 分类结果
        """
        words = re.findall(r'[a-za-z]+', filename.lower())
        category_scores = {}
        
        for word in words:
            if word in self.learning_data['file_patterns']:
                for category, count in self.learning_data['file_patterns'][word].items():
                    if category not in category_scores:
                        category_scores[category] = 0
                    category_scores[category] += count
        
        if category_scores:
            best_category = max(category_scores.items(), key=lambda x: x[1])[0]
            return best_category
        
        return none

# 使用示例
def demo_advanced_classifier():
    """演示高级分类功能"""
    classifier = advancedfileclassifier()
    
    # 测试学习功能
    test_files = [
        ("error_log_2024.txt", "text_log"),
        ("config_settings.json", "text_config"), 
        ("model_training.py", "code_ml"),
        ("web_app.py", "code_web")
    ]
    
    for filename, category in test_files:
        classifier.learn_from_filename(filename, category)
        print(f"学习: {filename} -> {category}")
    
    # 测试智能分类
    test_file = path("test_model.py")
    test_file.touch()
    
    category = classifier.smart_classify_file(test_file)
    print(f"智能分类: {test_file.name} -> {category}")
    
    test_file.unlink()
    
    return classifier

if __name__ == "__main__":
    demo_advanced_classifier()

4. 智能文件重命名系统

批量文件重命名器

class filerenamer:
    """
    智能文件重命名器
    提供多种重命名策略和批量操作
    """
    
    def __init__(self):
        self.setup_logging()
        self.rename_history = []
    
    def setup_logging(self):
        """设置日志记录"""
        logging.basicconfig(
            level=logging.info,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getlogger(__name__)
    
    def bulk_rename(self, directory: path, pattern: str, 
                   naming_strategy: str = "sequential", 
                   start_number: int = 1, dry_run: bool = false) -> list[tuple[path, path]]:
        """
        批量重命名文件
        
        args:
            directory: 目录路径
            pattern: 文件名模式(支持变量)
            naming_strategy: 命名策略
            start_number: 起始编号
            dry_run: 试运行模式
            
        returns:
            list: 重命名结果列表 (原路径, 新路径)
        """
        if not directory.exists():
            self.logger.error(f"目录不存在: {directory}")
            return []
        
        # 获取文件列表
        files = self.get_files_for_renaming(directory)
        if not files:
            self.logger.warning(f"目录中没有可重命名的文件: {directory}")
            return []
        
        # 根据策略排序文件
        sorted_files = self.sort_files(files, naming_strategy)
        
        # 生成新文件名
        rename_plan = self.generate_rename_plan(sorted_files, pattern, start_number)
        
        # 执行重命名
        results = []
        for old_path, new_path in rename_plan:
            if dry_run:
                self.logger.info(f"[试运行] 重命名: {old_path.name} -> {new_path.name}")
                results.append((old_path, new_path))
            else:
                success = self.safe_rename(old_path, new_path)
                if success:
                    results.append((old_path, new_path))
                    self.rename_history.append((old_path, new_path))
        
        self.logger.info(f"重命名完成: {len(results)}/{len(files)} 个文件")
        return results
    
    def get_files_for_renaming(self, directory: path) -> list[path]:
        """
        获取需要重命名的文件列表
        
        args:
            directory: 目录路径
            
        returns:
            list: 文件路径列表
        """
        files = []
        for item in directory.iterdir():
            if item.is_file() and not item.name.startswith('.'):
                files.append(item)
        return files
    
    def sort_files(self, files: list[path], strategy: str) -> list[path]:
        """
        根据策略排序文件
        
        args:
            files: 文件列表
            strategy: 排序策略
            
        returns:
            list: 排序后的文件列表
        """
        if strategy == "creation_time":
            return sorted(files, key=lambda x: x.stat().st_ctime)
        elif strategy == "modified_time":
            return sorted(files, key=lambda x: x.stat().st_mtime)
        elif strategy == "size":
            return sorted(files, key=lambda x: x.stat().st_size)
        elif strategy == "name":
            return sorted(files, key=lambda x: x.name.lower())
        else:  # sequential 或其他
            return files
    
    def generate_rename_plan(self, files: list[path], pattern: str, 
                           start_number: int) -> list[tuple[path, path]]:
        """
        生成重命名计划
        
        args:
            files: 文件列表
            pattern: 文件名模式
            start_number: 起始编号
            
        returns:
            list: 重命名计划
        """
        rename_plan = []
        
        for i, file_path in enumerate(files, start_number):
            new_name = self.generate_new_filename(file_path, pattern, i)
            new_path = file_path.parent / new_name
            rename_plan.append((file_path, new_path))
        
        return rename_plan
    
    def generate_new_filename(self, file_path: path, pattern: str, number: int) -> str:
        """
        生成新文件名
        
        args:
            file_path: 原文件路径
            pattern: 文件名模式
            number: 序号
            
        returns:
            str: 新文件名
        """
        # 支持的模式变量
        variables = {
            '{number}': f"{number:04d}",  # 4位数字,前面补零
            '{index}': str(number),
            '{name}': file_path.stem,
            '{ext}': file_path.suffix[1:],  # 去掉点号
            '{full_ext}': file_path.suffix,
            '{date}': datetime.now().strftime('%y%m%d'),
            '{time}': datetime.now().strftime('%h%m%s'),
            '{datetime}': datetime.now().strftime('%y%m%d_%h%m%s')
        }
        
        new_name = pattern
        for var, value in variables.items():
            new_name = new_name.replace(var, value)
        
        # 确保文件名合法
        new_name = self.sanitize_filename(new_name)
        
        return new_name
    
    def sanitize_filename(self, filename: str) -> str:
        """
        清理文件名,移除非法字符
        
        args:
            filename: 原文件名
            
        returns:
            str: 清理后的文件名
        """
        # 移除非法字符
        illegal_chars = r'[<>:"/\\|?*\x00-\x1f]'
        sanitized = re.sub(illegal_chars, '_', filename)
        
        # 限制文件名长度
        max_length = 255
        if len(sanitized) > max_length:
            name, ext = os.path.splitext(sanitized)
            sanitized = name[:max_length - len(ext)] + ext
        
        return sanitized
    
    def safe_rename(self, old_path: path, new_path: path) -> bool:
        """
        安全重命名文件(处理冲突)
        
        args:
            old_path: 原路径
            new_path: 新路径
            
        returns:
            bool: 重命名是否成功
        """
        try:
            # 如果目标文件已存在,生成新名称
            counter = 1
            original_new_path = new_path
            while new_path.exists():
                stem = original_new_path.stem
                extension = original_new_path.suffix
                new_name = f"{stem}_{counter}{extension}"
                new_path = original_new_path.parent / new_name
                counter += 1
            
            old_path.rename(new_path)
            self.logger.info(f"重命名: {old_path.name} -> {new_path.name}")
            return true
            
        except exception as e:
            self.logger.error(f"重命名失败 {old_path} -> {new_path}: {str(e)}")
            return false
    
    def rename_with_regex(self, directory: path, search_pattern: str, 
                         replace_pattern: str, dry_run: bool = false) -> list[tuple[path, path]]:
        """
        使用正则表达式重命名文件
        
        args:
            directory: 目录路径
            search_pattern: 搜索模式(正则表达式)
            replace_pattern: 替换模式
            dry_run: 试运行模式
            
        returns:
            list: 重命名结果
        """
        if not directory.exists():
            self.logger.error(f"目录不存在: {directory}")
            return []
        
        results = []
        
        for file_path in directory.iterdir():
            if not file_path.is_file():
                continue
            
            try:
                new_name = re.sub(search_pattern, replace_pattern, file_path.name)
                if new_name != file_path.name:
                    new_path = file_path.parent / new_name
                    
                    if dry_run:
                        self.logger.info(f"[试运行] 正则重命名: {file_path.name} -> {new_name}")
                        results.append((file_path, new_path))
                    else:
                        success = self.safe_rename(file_path, new_path)
                        if success:
                            results.append((file_path, new_path))
                            self.rename_history.append((file_path, new_path))
            
            except exception as e:
                self.logger.error(f"正则重命名失败 {file_path}: {str(e)}")
        
        return results
    
    def undo_last_rename(self) -> bool:
        """
        撤销最后一次重命名操作
        
        returns:
            bool: 撤销是否成功
        """
        if not self.rename_history:
            self.logger.warning("没有可撤销的重命名操作")
            return false
        
        try:
            # 从最近的操作开始撤销
            for old_path, new_path in reversed(self.rename_history):
                if new_path.exists():
                    new_path.rename(old_path)
                    self.logger.info(f"撤销重命名: {new_path.name} -> {old_path.name}")
            
            self.rename_history.clear()
            self.logger.info("所有重命名操作已撤销")
            return true
            
        except exception as e:
            self.logger.error(f"撤销重命名失败: {str(e)}")
            return false

# 使用示例
def demo_file_renamer():
    """演示文件重命名功能"""
    renamer = filerenamer()
    
    # 创建测试目录和文件
    test_dir = path("test_rename")
    test_dir.mkdir(exist_ok=true)
    
    # 创建测试文件
    test_files = ["photo1.jpg", "document1.pdf", "music1.mp3", "data1.csv"]
    for filename in test_files:
        file_path = test_dir / filename
        file_path.touch()
    
    # 测试批量重命名(试运行)
    print("=== 批量重命名试运行 ===")
    results = renamer.bulk_rename(
        directory=test_dir,
        pattern="file_{number}{full_ext}",
        naming_strategy="name",
        start_number=1,
        dry_run=true
    )
    
    # 测试正则重命名(试运行)
    print("\n=== 正则重命名试运行 ===")
    regex_results = renamer.rename_with_regex(
        directory=test_dir,
        search_pattern=r"(\d+)",
        replace_pattern=r"_\1",
        dry_run=true
    )
    
    # 清理测试文件
    for file_path in test_dir.iterdir():
        if file_path.is_file():
            file_path.unlink()
    test_dir.rmdir()
    
    return renamer

if __name__ == "__main__":
    demo_file_renamer()

5. 智能备份系统

自动化备份管理器

class backupmanager:
    """
    智能备份管理器
    提供完整的文件备份解决方案
    """
    
    def __init__(self, config_file: optional[str] = none):
        """
        初始化备份管理器
        
        args:
            config_file: 配置文件路径
        """
        self.config = self.load_config(config_file)
        self.setup_logging()
        self.backup_history = []
    
    def setup_logging(self):
        """设置日志记录"""
        logging.basicconfig(
            level=logging.info,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.filehandler('backup_manager.log', encoding='utf-8'),
                logging.streamhandler()
            ]
        )
        self.logger = logging.getlogger(__name__)
    
    def load_config(self, config_file: optional[str]) -> dict:
        """
        加载备份配置
        
        args:
            config_file: 配置文件路径
            
        returns:
            dict: 备份配置
        """
        default_config = {
            'backup_locations': {
                'local': './backups',
                'external': none  # 例如: 'd:/backups' 或 '/mnt/backup'
            },
            'compression': {
                'enabled': true,
                'format': 'zip',  # 'zip', 'tar', 'tar.gz'
                'compression_level': 6
            },
            'encryption': {
                'enabled': false,
                'password': none
            },
            'retention': {
                'keep_daily': 7,
                'keep_weekly': 4,
                'keep_monthly': 12,
                'max_total_size': '10gb'  # 例如: '10gb', '1tb'
            },
            'scheduling': {
                'auto_backup': false,
                'backup_times': ['02:00']  # 每天备份时间
            }
        }
        
        if config_file and path(config_file).exists():
            try:
                with open(config_file, 'r', encoding='utf-8') as f:
                    if config_file.endswith('.json'):
                        user_config = json.load(f)
                    elif config_file.endswith(('.yaml', '.yml')):
                        user_config = yaml.safe_load(f)
                    else:
                        self.logger.warning(f"不支持的配置文件格式: {config_file}")
                        return default_config
                
                return self.deep_merge(default_config, user_config)
            except exception as e:
                self.logger.error(f"加载配置文件失败: {str(e)}")
        
        return default_config
    
    def deep_merge(self, base: dict, update: dict) -> dict:
        """
        深度合并配置
        
        args:
            base: 基础配置
            update: 更新配置
            
        returns:
            dict: 合并后的配置
        """
        result = base.copy()
        
        for key, value in update.items():
            if isinstance(value, dict) and key in result and isinstance(result[key], dict):
                result[key] = self.deep_merge(result[key], value)
            else:
                result[key] = value
        
        return result
    
    def create_backup(self, source_path: path, backup_name: str = none, 
                     incremental: bool = false) -> optional[path]:
        """
        创建备份
        
        args:
            source_path: 源路径(文件或目录)
            backup_name: 备份名称
            incremental: 是否使用增量备份
            
        returns:
            optional[path]: 备份文件路径
        """
        if not source_path.exists():
            self.logger.error(f"源路径不存在: {source_path}")
            return none
        
        # 生成备份名称
        if backup_name is none:
            timestamp = datetime.now().strftime('%y%m%d_%h%m%s')
            if source_path.is_file():
                backup_name = f"{source_path.stem}_{timestamp}"
            else:
                backup_name = f"{source_path.name}_{timestamp}"
        
        # 确定备份位置
        backup_dir = path(self.config['backup_locations']['local'])
        backup_dir.mkdir(parents=true, exist_ok=true)
        
        try:
            if incremental and source_path.is_dir():
                # 增量备份
                backup_path = self.create_incremental_backup(source_path, backup_dir, backup_name)
            else:
                # 完整备份
                if source_path.is_file():
                    backup_path = self.backup_file(source_path, backup_dir, backup_name)
                else:
                    backup_path = self.backup_directory(source_path, backup_dir, backup_name)
            
            if backup_path:
                self.backup_history.append({
                    'timestamp': datetime.now(),
                    'source': source_path,
                    'backup_path': backup_path,
                    'type': 'incremental' if incremental else 'full',
                    'size': backup_path.stat().st_size if backup_path.exists() else 0
                })
                
                self.logger.info(f"备份创建成功: {backup_path}")
                return backup_path
        
        except exception as e:
            self.logger.error(f"创建备份失败: {str(e)}")
        
        return none
    
    def backup_file(self, source_file: path, backup_dir: path, backup_name: str) -> optional[path]:
        """
        备份单个文件
        
        args:
            source_file: 源文件
            backup_dir: 备份目录
            backup_name: 备份名称
            
        returns:
            optional[path]: 备份文件路径
        """
        try:
            if self.config['compression']['enabled']:
                # 压缩备份
                backup_path = backup_dir / f"{backup_name}.{self.config['compression']['format']}"
                self.compress_files([source_file], backup_path)
            else:
                # 直接复制
                backup_path = backup_dir / f"{backup_name}{source_file.suffix}"
                shutil.copy2(source_file, backup_path)
            
            return backup_path
            
        except exception as e:
            self.logger.error(f"备份文件失败 {source_file}: {str(e)}")
            return none
    
    def backup_directory(self, source_dir: path, backup_dir: path, backup_name: str) -> optional[path]:
        """
        备份整个目录
        
        args:
            source_dir: 源目录
            backup_dir: 备份目录
            backup_name: 备份名称
            
        returns:
            optional[path]: 备份文件路径
        """
        try:
            if self.config['compression']['enabled']:
                # 压缩备份整个目录
                backup_path = backup_dir / f"{backup_name}.{self.config['compression']['format']}"
                self.compress_directory(source_dir, backup_path)
            else:
                # 直接复制目录
                backup_path = backup_dir / backup_name
                shutil.copytree(source_dir, backup_path)
            
            return backup_path
            
        except exception as e:
            self.logger.error(f"备份目录失败 {source_dir}: {str(e)}")
            return none
    
    def compress_files(self, files: list[path], output_path: path):
        """
        压缩文件列表
        
        args:
            files: 文件列表
            output_path: 输出路径
        """
        compression_format = self.config['compression']['format']
        
        if compression_format == 'zip':
            self.create_zip_archive(files, output_path)
        elif compression_format in ['tar', 'tar.gz']:
            self.create_tar_archive(files, output_path)
        else:
            raise valueerror(f"不支持的压缩格式: {compression_format}")
    
    def compress_directory(self, directory: path, output_path: path):
        """
        压缩整个目录
        
        args:
            directory: 目录路径
            output_path: 输出路径
        """
        compression_format = self.config['compression']['format']
        
        if compression_format == 'zip':
            self.create_zip_archive([directory], output_path)
        elif compression_format in ['tar', 'tar.gz']:
            self.create_tar_archive([directory], output_path)
        else:
            raise valueerror(f"不支持的压缩格式: {compression_format}")
    
    def create_zip_archive(self, items: list[path], output_path: path):
        """
        创建zip压缩包
        
        args:
            items: 要压缩的项目列表
            output_path: 输出路径
        """
        compression_level = self.config['compression']['compression_level']
        
        if self.config['encryption']['enabled'] and zip_encryption_available:
            # 加密zip
            password = self.config['encryption']['password']
            if not password:
                raise valueerror("加密已启用但未设置密码")
            
            with pyzipper.aeszipfile(
                output_path, 'w',
                compression=pyzipper.zip_deflated,
                compresslevel=compression_level,
                encryption=pyzipper.wz_aes
            ) as zipf:
                zipf.setpassword(password.encode('utf-8'))
                self.add_items_to_zip(zipf, items)
        else:
            # 普通zip
            with zipfile.zipfile(
                output_path, 'w',
                compression=zipfile.zip_deflated,
                compresslevel=compression_level
            ) as zipf:
                self.add_items_to_zip(zipf, items)
    
    def add_items_to_zip(self, zipf, items: list[path], base_path: path = none):
        """
        添加项目到zip文件
        
        args:
            zipf: zip文件对象
            items: 项目列表
            base_path: 基础路径(用于相对路径)
        """
        for item in items:
            if base_path is none:
                base_path = item.parent
            
            if item.is_file():
                # 添加文件
                arcname = item.relative_to(base_path)
                zipf.write(item, arcname)
            elif item.is_dir():
                # 递归添加目录
                for file_path in item.rglob('*'):
                    if file_path.is_file():
                        arcname = file_path.relative_to(base_path)
                        zipf.write(file_path, arcname)
    
    def create_tar_archive(self, items: list[path], output_path: path):
        """
        创建tar压缩包
        
        args:
            items: 要压缩的项目列表
            output_path: 输出路径
        """
        import tarfile
        
        mode = 'w'
        if self.config['compression']['format'] == 'tar.gz':
            mode = 'w:gz'
        
        with tarfile.open(output_path, mode) as tar:
            for item in items:
                tar.add(item, arcname=item.name)
    
    def create_incremental_backup(self, source_dir: path, backup_dir: path, 
                                backup_name: str) -> optional[path]:
        """
        创建增量备份
        
        args:
            source_dir: 源目录
            backup_dir: 备份目录
            backup_name: 备份名称
            
        returns:
            optional[path]: 备份文件路径
        """
        try:
            # 获取上次备份时间
            last_backup_time = self.get_last_backup_time(source_dir)
            
            # 查找自上次备份以来修改的文件
            changed_files = self.get_changed_files(source_dir, last_backup_time)
            
            if not changed_files:
                self.logger.info("没有检测到文件变化,跳过增量备份")
                return none
            
            # 创建增量备份
            timestamp = datetime.now().strftime('%y%m%d_%h%m%s')
            backup_path = backup_dir / f"{backup_name}_incremental_{timestamp}.zip"
            
            self.compress_files(changed_files, backup_path)
            
            self.logger.info(f"增量备份创建成功: {len(changed_files)} 个文件")
            return backup_path
            
        except exception as e:
            self.logger.error(f"创建增量备份失败: {str(e)}")
            return none
    
    def get_last_backup_time(self, source_dir: path) -> optional[datetime]:
        """
        获取上次备份时间
        
        args:
            source_dir: 源目录
            
        returns:
            optional[datetime]: 上次备份时间
        """
        # 查找该目录的最近备份
        relevant_backups = [
            entry for entry in self.backup_history 
            if entry['source'] == source_dir
        ]
        
        if relevant_backups:
            return max(entry['timestamp'] for entry in relevant_backups)
        
        return none
    
    def get_changed_files(self, source_dir: path, since_time: optional[datetime]) -> list[path]:
        """
        获取自指定时间以来修改的文件
        
        args:
            source_dir: 源目录
            since_time: 起始时间
            
        returns:
            list[path]: 修改的文件列表
        """
        changed_files = []
        
        for file_path in source_dir.rglob('*'):
            if file_path.is_file():
                file_mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
                
                # 如果没有指定时间,或者文件在指定时间后修改过
                if since_time is none or file_mtime > since_time:
                    changed_files.append(file_path)
        
        return changed_files
    
    def restore_backup(self, backup_path: path, target_path: path, 
                      overwrite: bool = false) -> bool:
        """
        恢复备份
        
        args:
            backup_path: 备份文件路径
            target_path: 恢复目标路径
            overwrite: 是否覆盖现有文件
            
        returns:
            bool: 恢复是否成功
        """
        if not backup_path.exists():
            self.logger.error(f"备份文件不存在: {backup_path}")
            return false
        
        try:
            if backup_path.suffix.lower() in ['.zip', '.tar', '.gz']:
                # 解压恢复
                self.extract_backup(backup_path, target_path, overwrite)
            else:
                # 直接复制恢复
                if target_path.exists() and not overwrite:
                    self.logger.error(f"目标路径已存在: {target_path}")
                    return false
                shutil.copy2(backup_path, target_path)
            
            self.logger.info(f"备份恢复成功: {backup_path} -> {target_path}")
            return true
            
        except exception as e:
            self.logger.error(f"恢复备份失败: {str(e)}")
            return false
    
    def extract_backup(self, backup_path: path, target_path: path, overwrite: bool = false):
        """
        解压备份文件
        
        args:
            backup_path: 备份文件路径
            target_path: 目标路径
            overwrite: 是否覆盖
        """
        if backup_path.suffix.lower() == '.zip':
            # 处理加密zip
            if self.config['encryption']['enabled'] and zip_encryption_available:
                password = self.config['encryption']['password']
                with pyzipper.aeszipfile(backup_path) as zipf:
                    zipf.setpassword(password.encode('utf-8'))
                    zipf.extractall(target_path)
            else:
                with zipfile.zipfile(backup_path) as zipf:
                    zipf.extractall(target_path)
        
        elif backup_path.suffix.lower() in ['.tar', '.gz']:
            import tarfile
            with tarfile.open(backup_path) as tar:
                tar.extractall(target_path)
    
    def cleanup_old_backups(self) -> int:
        """
        清理旧备份
        
        returns:
            int: 删除的备份数量
        """
        backup_dir = path(self.config['backup_locations']['local'])
        if not backup_dir.exists():
            return 0
        
        deleted_count = 0
        retention_config = self.config['retention']
        
        try:
            # 获取所有备份文件
            backup_files = list(backup_dir.glob('*'))
            backup_files.sort(key=lambda x: x.stat().st_mtime)
            
            # 这里可以实现复杂的保留策略
            # 简化实现:按数量保留
            max_backups = 10  # 默认保留10个备份
            if len(backup_files) > max_backups:
                files_to_delete = backup_files[:-max_backups]
                
                for file_path in files_to_delete:
                    file_path.unlink()
                    deleted_count += 1
                    self.logger.info(f"删除旧备份: {file_path.name}")
        
        except exception as e:
            self.logger.error(f"清理旧备份失败: {str(e)}")
        
        return deleted_count

# 使用示例
def demo_backup_manager():
    """演示备份管理功能"""
    backup_mgr = backupmanager()
    
    # 创建测试目录和文件
    test_dir = path("test_backup_source")
    test_dir.mkdir(exist_ok=true)
    
    # 创建测试文件
    test_files = ["important_document.txt", "precious_photo.jpg", "critical_data.csv"]
    for filename in test_files:
        file_path = test_dir / filename
        file_path.write_text(f"这是 {filename} 的内容")
    
    # 测试完整备份
    print("=== 创建完整备份 ===")
    backup_path = backup_mgr.create_backup(test_dir, "test_backup")
    if backup_path:
        print(f"备份创建成功: {backup_path}")
    
    # 测试增量备份
    print("\n=== 创建增量备份 ===")
    # 修改一个文件以触发增量备份
    (test_dir / "important_document.txt").write_text("修改后的内容")
    
    incremental_backup = backup_mgr.create_backup(test_dir, "test_backup", incremental=true)
    if incremental_backup:
        print(f"增量备份创建成功: {incremental_backup}")
    
    # 测试恢复
    print("\n=== 测试备份恢复 ===")
    restore_dir = path("test_restore")
    if backup_path and backup_mgr.restore_backup(backup_path, restore_dir):
        print(f"备份恢复成功: {restore_dir}")
    
    # 清理测试文件
    for file_path in test_dir.iterdir():
        file_path.unlink()
    test_dir.rmdir()
    
    if restore_dir.exists():
        for file_path in restore_dir.iterdir():
            file_path.unlink()
        restore_dir.rmdir()
    
    return backup_mgr

if __name__ == "__main__":
    demo_backup_manager()

6. 完整代码实现

下面是本文中使用的完整代码集合:

"""
自动化文件管理系统 - 完整代码实现
包含文件分类、重命名和备份
日期: 2024年
"""

import os
import shutil
import re
import hashlib
import json
import yaml
import zipfile
import logging
from pathlib import path
from datetime import datetime, timedelta
from typing import list, dict, optional, tuple, callable
from dataclasses import dataclass
from enum import enum
from tqdm import tqdm

# 可选依赖
try:
    from pil import image
    pil_available = true
except importerror:
    pil_available = false

try:
    import pyzipper
    zip_encryption_available = true
except importerror:
    zip_encryption_available = false

try:
    import filetype
    filetype_available = true
except importerror:
    filetype_available = false

@dataclass
class fileoperationresult:
    """文件操作结果"""
    success: bool
    message: str
    details: dict = none

class filecategory(enum):
    """文件类别枚举"""
    images = "images"
    documents = "documents"
    audio = "audio"
    video = "video"
    archives = "archives"
    code = "code"
    executables = "executables"
    data = "data"
    others = "others"

class automatedfilemanager:
    """
    自动化文件管理器
    集成文件分类、重命名和备份功能
    """
    
    def __init__(self, config_file: optional[str] = none):
        """
        初始化文件管理器
        
        args:
            config_file: 配置文件路径
        """
        self.config_file = config_file
        self.config = self.load_config()
        self.setup_logging()
        
        # 初始化组件
        self.classifier = fileclassifier(config_file)
        self.renamer = filerenamer()
        self.backup_mgr = backupmanager(config_file)
        
        self.operation_history = []
    
    def setup_logging(self):
        """设置日志记录"""
        logging.basicconfig(
            level=logging.info,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.filehandler('file_manager.log', encoding='utf-8'),
                logging.streamhandler()
            ]
        )
        self.logger = logging.getlogger(__name__)
    
    def load_config(self) -> dict:
        """加载配置文件"""
        default_config = {
            'general': {
                'max_file_size': '10gb',
                'default_encoding': 'utf-8',
                'backup_before_operations': true
            },
            'classification': {
                'enabled': true,
                'auto_organize': false,
                'categories': fileclassifier.file_categories
            },
            'renaming': {
                'enabled': true,
                'default_pattern': '{name}{full_ext}',
                'backup_original': true
            },
            'backup': {
                'enabled': true,
                'auto_cleanup': true,
                'compression': true
            }
        }
        
        if self.config_file and path(self.config_file).exists():
            try:
                with open(self.config_file, 'r', encoding='utf-8') as f:
                    if self.config_file.endswith('.json'):
                        user_config = json.load(f)
                    elif self.config_file.endswith(('.yaml', '.yml')):
                        user_config = yaml.safe_load(f)
                    else:
                        self.logger.warning(f"不支持的配置文件格式: {self.config_file}")
                        return default_config
                
                return self.deep_merge(default_config, user_config)
            except exception as e:
                self.logger.error(f"加载配置文件失败: {str(e)}")
        
        return default_config
    
    def deep_merge(self, base: dict, update: dict) -> dict:
        """深度合并配置"""
        result = base.copy()
        
        for key, value in update.items():
            if isinstance(value, dict) and key in result and isinstance(result[key], dict):
                result[key] = self.deep_merge(result[key], value)
            else:
                result[key] = value
        
        return result
    
    def organize_directory(self, directory: path, target_dir: optional[path] = none,
                          dry_run: bool = false) -> fileoperationresult:
        """
        整理目录
        
        args:
            directory: 要整理的目录
            target_dir: 目标目录
            dry_run: 试运行模式
            
        returns:
            fileoperationresult: 操作结果
        """
        try:
            if not directory.exists():
                return fileoperationresult(false, f"目录不存在: {directory}")
            
            # 备份检查
            if self.config['general']['backup_before_operations'] and not dry_run:
                self.logger.info("创建操作前备份...")
                backup_path = self.backup_mgr.create_backup(directory, "pre_organization")
                if backup_path:
                    self.logger.info(f"备份已创建: {backup_path}")
            
            # 执行分类整理
            results = self.classifier.organize_directory(directory, target_dir, dry_run)
            
            # 生成报告
            total_files = sum(len(files) for files in results.values())
            moved_files = total_files - len(results.get('skipped', [])) - len(results.get('errors', []))
            
            message = f"整理完成: 移动 {moved_files}/{total_files} 个文件"
            details = {
                'total_files': total_files,
                'moved_files': moved_files,
                'skipped_files': len(results.get('skipped', [])),
                'errors': len(results.get('errors', [])),
                'categories': {k: len(v) for k, v in results.items() 
                             if k not in ['skipped', 'errors'] and v}
            }
            
            self.operation_history.append({
                'timestamp': datetime.now(),
                'operation': 'organize_directory',
                'directory': directory,
                'result': details
            })
            
            return fileoperationresult(true, message, details)
            
        except exception as e:
            error_msg = f"整理目录失败: {str(e)}"
            self.logger.error(error_msg)
            return fileoperationresult(false, error_msg)
    
    def bulk_rename(self, directory: path, pattern: str, 
                   naming_strategy: str = "sequential", 
                   start_number: int = 1, dry_run: bool = false) -> fileoperationresult:
        """
        批量重命名
        
        args:
            directory: 目录路径
            pattern: 文件名模式
            naming_strategy: 命名策略
            start_number: 起始编号
            dry_run: 试运行模式
            
        returns:
            fileoperationresult: 操作结果
        """
        try:
            if not directory.exists():
                return fileoperationresult(false, f"目录不存在: {directory}")
            
            # 备份检查
            if self.config['general']['backup_before_operations'] and not dry_run:
                self.logger.info("创建操作前备份...")
                backup_path = self.backup_mgr.create_backup(directory, "pre_renaming")
                if backup_path:
                    self.logger.info(f"备份已创建: {backup_path}")
            
            # 执行重命名
            results = self.renamer.bulk_rename(
                directory, pattern, naming_strategy, start_number, dry_run
            )
            
            message = f"重命名完成: {len(results)} 个文件"
            details = {
                'renamed_files': len(results),
                'pattern': pattern,
                'strategy': naming_strategy
            }
            
            self.operation_history.append({
                'timestamp': datetime.now(),
                'operation': 'bulk_rename',
                'directory': directory,
                'result': details
            })
            
            return fileoperationresult(true, message, details)
            
        except exception as e:
            error_msg = f"批量重命名失败: {str(e)}"
            self.logger.error(error_msg)
            return fileoperationresult(false, error_msg)
    
    def create_backup(self, source_path: path, backup_name: str = none,
                     incremental: bool = false) -> fileoperationresult:
        """
        创建备份
        
        args:
            source_path: 源路径
            backup_name: 备份名称
            incremental: 是否增量备份
            
        returns:
            fileoperationresult: 操作结果
        """
        try:
            if not source_path.exists():
                return fileoperationresult(false, f"源路径不存在: {source_path}")
            
            backup_path = self.backup_mgr.create_backup(source_path, backup_name, incremental)
            
            if backup_path:
                message = f"备份创建成功: {backup_path}"
                details = {
                    'backup_path': backup_path,
                    'backup_size': backup_path.stat().st_size,
                    'incremental': incremental
                }
                
                self.operation_history.append({
                    'timestamp': datetime.now(),
                    'operation': 'create_backup',
                    'source': source_path,
                    'result': details
                })
                
                return fileoperationresult(true, message, details)
            else:
                return fileoperationresult(false, "备份创建失败")
            
        except exception as e:
            error_msg = f"创建备份失败: {str(e)}"
            self.logger.error(error_msg)
            return fileoperationresult(false, error_msg)
    
    def restore_backup(self, backup_path: path, target_path: path,
                      overwrite: bool = false) -> fileoperationresult:
        """
        恢复备份
        
        args:
            backup_path: 备份路径
            target_path: 目标路径
            overwrite: 是否覆盖
            
        returns:
            fileoperationresult: 操作结果
        """
        try:
            success = self.backup_mgr.restore_backup(backup_path, target_path, overwrite)
            
            if success:
                message = f"备份恢复成功: {target_path}"
                details = {
                    'backup_path': backup_path,
                    'target_path': target_path
                }
                
                self.operation_history.append({
                    'timestamp': datetime.now(),
                    'operation': 'restore_backup',
                    'result': details
                })
                
                return fileoperationresult(true, message, details)
            else:
                return fileoperationresult(false, "备份恢复失败")
            
        except exception as e:
            error_msg = f"恢复备份失败: {str(e)}"
            self.logger.error(error_msg)
            return fileoperationresult(false, error_msg)
    
    def cleanup_system(self) -> fileoperationresult:
        """
        清理系统(旧备份等)
        
        returns:
            fileoperationresult: 操作结果
        """
        try:
            # 清理旧备份
            deleted_count = self.backup_mgr.cleanup_old_backups()
            
            # 清理临时文件等(这里可以扩展)
            message = f"系统清理完成: 删除 {deleted_count} 个旧备份"
            details = {
                'deleted_backups': deleted_count
            }
            
            self.operation_history.append({
                'timestamp': datetime.now(),
                'operation': 'cleanup_system',
                'result': details
            })
            
            return fileoperationresult(true, message, details)
            
        except exception as e:
            error_msg = f"系统清理失败: {str(e)}"
            self.logger.error(error_msg)
            return fileoperationresult(false, error_msg)
    
    def get_operation_history(self, limit: int = 10) -> list[dict]:
        """
        获取操作历史
        
        args:
            limit: 返回记录数量限制
            
        returns:
            list[dict]: 操作历史
        """
        return self.operation_history[-limit:]
    
    def generate_report(self) -> dict:
        """
        生成系统报告
        
        returns:
            dict: 系统报告
        """
        report = {
            'timestamp': datetime.now(),
            'operations_count': len(self.operation_history),
            'recent_operations': self.get_operation_history(5),
            'system_status': {
                'backup_enabled': self.config['backup']['enabled'],
                'classification_enabled': self.config['classification']['enabled'],
                'renaming_enabled': self.config['renaming']['enabled']
            }
        }
        
        # 统计各类操作数量
        op_counts = {}
        for op in self.operation_history:
            op_type = op['operation']
            op_counts[op_type] = op_counts.get(op_type, 0) + 1
        
        report['operation_stats'] = op_counts
        
        return report

def main():
    """主函数 - 演示完整功能"""
    file_mgr = automatedfilemanager()
    
    print("=== 自动化文件管理系统 ===")
    print("请选择操作:")
    print("1. 整理目录")
    print("2. 批量重命名")
    print("3. 创建备份")
    print("4. 恢复备份")
    print("5. 系统清理")
    print("6. 查看报告")
    print("7. 退出")
    
    while true:
        choice = input("\n请输入选择 (1-7): ").strip()
        
        if choice == '1':
            path = input("请输入要整理的目录路径: ").strip()
            target = input("目标目录(留空则在原目录整理): ").strip()
            dry_run = input("试运行模式?(y/n): ").strip().lower() == 'y'
            
            result = file_mgr.organize_directory(path(path), 
                                               path(target) if target else none, 
                                               dry_run)
            print(f"结果: {result.message}")
        
        elif choice == '2':
            path = input("请输入目录路径: ").strip()
            pattern = input("文件名模式(支持 {number}、{name}、{ext} 等): ").strip()
            strategy = input("排序策略 (sequential/name/size/creation_time): ").strip()
            dry_run = input("试运行模式?(y/n): ").strip().lower() == 'y'
            
            result = file_mgr.bulk_rename(path(path), pattern, strategy, dry_run=dry_run)
            print(f"结果: {result.message}")
        
        elif choice == '3':
            path = input("请输入要备份的路径: ").strip()
            name = input("备份名称(留空则自动生成): ").strip()
            incremental = input("增量备份?(y/n): ").strip().lower() == 'y'
            
            result = file_mgr.create_backup(path(path), name or none, incremental)
            print(f"结果: {result.message}")
        
        elif choice == '4':
            backup_path = input("请输入备份文件路径: ").strip()
            target_path = input("恢复目标路径: ").strip()
            overwrite = input("覆盖现有文件?(y/n): ").strip().lower() == 'y'
            
            result = file_mgr.restore_backup(path(backup_path), path(target_path), overwrite)
            print(f"结果: {result.message}")
        
        elif choice == '5':
            result = file_mgr.cleanup_system()
            print(f"结果: {result.message}")
        
        elif choice == '6':
            report = file_mgr.generate_report()
            print("\n=== 系统报告 ===")
            print(f"总操作次数: {report['operations_count']}")
            print("操作统计:")
            for op_type, count in report['operation_stats'].items():
                print(f"  {op_type}: {count} 次")
            print("最近操作:")
            for op in report['recent_operations']:
                print(f"  {op['timestamp']}: {op['operation']}")
        
        elif choice == '7':
            print("谢谢使用!")
            break
        
        else:
            print("无效选择,请重新输入。")

if __name__ == "__main__":
    main()

7. 代码自查和优化

为确保代码质量和减少bug,我们对所有代码进行了以下自查:

7.1 代码质量检查

  • 异常处理:所有文件操作都包含完善的try-catch异常处理
  • 输入验证:对文件路径、配置参数进行严格验证
  • 资源管理:确保文件句柄、内存等资源正确释放
  • 类型提示:使用类型提示提高代码可读性和可靠性
  • 日志记录:详细的日志记录便于调试和监控

7.2 性能优化

  • 批量操作:对大量文件使用批量处理,减少io操作
  • 缓存策略:对重复读取的数据使用缓存
  • 增量处理:支持增量备份,避免重复处理
  • 进度显示:使用tqdm显示操作进度

7.3 安全性改进

  • 文件权限:合理设置生成文件的访问权限
  • 路径安全:防止路径遍历攻击
  • 输入清理:对用户输入进行适当的清理和验证
  • 加密支持:支持备份文件加密

7.4 健壮性提升

  • 重试机制:对可能失败的操作添加重试逻辑
  • 回滚功能:支持撤销操作
  • 状态检查:在执行操作前检查系统状态
  • 错误恢复:在可能的情况下从错误中恢复

8. 总结

通过本文的详细介绍和代码示例,我们构建了一个功能完整的自动化文件管理系统。这个系统不仅能够智能分类文件、批量重命名,还能提供可靠的备份解决方案,大大提高了文件管理的效率。

8.1 主要收获

  • 完整的文件管理流程:掌握了从分类、重命名到备份的完整文件管理流程
  • 智能分类策略:了解了基于扩展名、内容和学习数据的多种分类方法
  • 灵活的命名系统:学会了使用模式变量和正则表达式进行批量重命名
  • 可靠的备份方案:掌握了完整备份和增量备份的实现技术
  • 健壮的系统设计:理解了错误处理、资源管理和性能优化的最佳实践

8.2 最佳实践建议

  • 定期备份:重要数据要定期备份,并测试恢复流程
  • 渐进式实施:先在小范围测试,确认无误后再大规模应用
  • 版本控制:对配置文件和使用脚本进行版本控制
  • 监控告警:设置监控,及时发现和处理问题
  • 文档维护:保持使用文档和操作记录的更新

8.3 应用前景

自动化文件管理技术在以下领域有着广泛的应用前景:

  • 个人文件管理:自动整理照片、文档、下载文件等
  • 企业数据管理:标准化文件命名、自动化归档备份
  • 媒体资产管理:智能分类图片、视频、音频文件
  • 开发项目管理:自动化整理代码、文档、构建产物
  • 科研数据管理:规范化实验数据、论文、代码的存储

通过掌握这些技术,您可以构建出适合自己需求的智能文件管理系统,无论是个人使用还是企业部署,都能带来显著的效率提升。随着人工智能技术的发展,未来的文件管理系统将会更加智能和自动化,为用户提供更好的使用体验。

以上就是基于python实现自动化文件管理(分类、重命名和备份)的详细内容,更多关于python文件管理的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com