当前位置: 代码网 > it编程>前端脚本>Python > Python文件批量处理(重命名/备份/同步运维)的实战指南

Python文件批量处理(重命名/备份/同步运维)的实战指南

2026年04月21日 Python 我要评论
​老王在一家小公司管服务器。每天最烦的事,就是开发同事丢来一堆日志文件,文件名乱七八糟——有的叫log1.txt,有的叫1212.log,还有的直接叫新建文本文档(1).log

​老王在一家小公司管服务器。每天最烦的事,就是开发同事丢来一堆日志文件,文件名乱七八糟——有的叫log1.txt,有的叫1212.log,还有的直接叫新建文本文档(1).log。更糟的是,每周五要手动备份一遍配置文件,还得确保备份目录和源目录同步。老王决定写几个python脚本把这些破事自动化。

场景一:批量重命名——让文件名规规矩矩

先拿日志文件开刀。老王想把所有.log文件改成2024-12-01_001.log这种格式,按修改时间排序。

import os
from datetime import datetime

def rename_logs(folder_path):
    log_files = [f for f in os.listdir(folder_path) if f.endswith('.log')]
    
    for index, filename in enumerate(log_files, 1):
        old_path = os.path.join(folder_path, filename)
        # 获取修改时间
        mtime = os.path.getmtime(old_path)
        date_str = datetime.fromtimestamp(mtime).strftime('%y-%m-%d')
        new_name = f"{date_str}_{index:03d}.log"
        new_path = os.path.join(folder_path, new_name)
        os.rename(old_path, new_path)
        print(f"{filename} -> {new_name}")

rename_logs('/var/logs')

运行一次,新建文本文档(1).log变成了2024-12-15_005.log,一目了然。

os.rename有个坑:目标文件已存在时会报错。加上覆盖检查更稳妥:

def safe_rename(old_path, new_path):
    if os.path.exists(new_path):
        base, ext = os.path.splitext(new_path)
        counter = 1
        while os.path.exists(f"{base}_{counter}{ext}"):
            counter += 1
        new_path = f"{base}_{counter}{ext}"
    os.rename(old_path, new_path)
    return new_path

更灵活的模式匹配:用glob模块按规则筛选文件,比如只改report_*.xlsx

import glob

for filepath in glob.glob('/data/report_*.xlsx'):
    dirname, filename = os.path.split(filepath)
    # 提取月份,比如 report_202412.xlsx -> 202412
    month = filename.split('_')[1].split('.')[0]
    new_name = f"财务报告_{month}.xlsx"
    os.rename(filepath, os.path.join(dirname, new_name))

正则表达式批量替换:某些文件命名有规律但混乱,比如img_001 (1).jpgimg_001 (2).jpg这种括号空格让人头疼。

import re

def clean_filenames(folder, pattern, replacement):
    for filename in os.listdir(folder):
        new_name = re.sub(pattern, replacement, filename)
        if new_name != filename:
            old_path = os.path.join(folder, filename)
            new_path = os.path.join(folder, new_name)
            os.rename(old_path, new_path)
            print(f"修正: {filename} -> {new_name}")

# 去掉文件名中的空格和括号
clean_filenames('./photos', r'[(\s)]', '')
# img_001 (1).jpg -> img_0011.jpg  (虽然不太完美,但至少没空格了)

场景二:智能备份——别每次都全量复制

老王之前用cp -r全量备份,几十g的配置文件越来越慢。他需要增量备份——只复制改动过的文件。

import shutil
import hashlib

def md5_file(filepath):
    """计算文件md5,用于判断内容是否变化"""
    hash_md5 = hashlib.md5()
    with open(filepath, 'rb') as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()

def incremental_backup(src_dir, dst_dir, state_file='backup_state.json'):
    import json
    
    # 加载上次备份的状态
    if os.path.exists(state_file):
        with open(state_file, 'r') as f:
            last_state = json.load(f)
    else:
        last_state = {}
    
    new_state = {}
    copied_count = 0
    
    for root, dirs, files in os.walk(src_dir):
        rel_path = os.path.relpath(root, src_dir)
        dst_root = os.path.join(dst_dir, rel_path)
        os.makedirs(dst_root, exist_ok=true)
        
        for file in files:
            src_file = os.path.join(root, file)
            dst_file = os.path.join(dst_root, file)
            
            file_md5 = md5_file(src_file)
            file_key = os.path.relpath(src_file, src_dir)
            new_state[file_key] = file_md5
            
            # 检查是否需要备份
            if file_key not in last_state or last_state[file_key] != file_md5:
                shutil.copy2(src_file, dst_file)  # copy2保留元数据
                copied_count += 1
                print(f"备份: {file_key}")
    
    # 保存新状态
    with open(state_file, 'w') as f:
        json.dump(new_state, f, indent=2)
    
    print(f"完成,共备份 {copied_count} 个文件")

incremental_backup('/etc/nginx', '/backup/nginx')

第一次运行会全量备份,第二次只复制改动过的文件。状态文件backup_state.json记录了每个文件的md5,拿掉它就能强制全量。

定时自动备份:结合schedule库设置每天凌晨2点执行。

import schedule
import time

def auto_backup_job():
    print(f"{datetime.now()} 开始自动备份")
    incremental_backup('/data/mysql', '/backup/mysql')
    print("备份完成")

schedule.every().day.at("02:00").do(auto_backup_job)

while true:
    schedule.run_pending()
    time.sleep(60)

场景三:目录同步——像rsync一样工作

老王还负责维护两台服务器之间的文件同步。rsync很好用,但某些环境不允许装额外软件,自己写一个轻量版。

import filecmp
import shutil

def sync_dirs(src, dst, delete_extra=false):
    """
    同步目录,dst会变成和src一模一样
    delete_extra=true时删除dst中多余的文件
    """
    if not os.path.exists(dst):
        os.makedirs(dst)
    
    # 比较两个目录
    comparison = filecmp.dircmp(src, dst)
    
    # 复制src有但dst没有的
    for file in comparison.left_only:
        src_path = os.path.join(src, file)
        dst_path = os.path.join(dst, file)
        if os.path.isdir(src_path):
            shutil.copytree(src_path, dst_path)
        else:
            shutil.copy2(src_path, dst_path)
        print(f"新增: {file}")
    
    # 复制有更新的
    for file in comparison.diff_files:
        src_path = os.path.join(src, file)
        dst_path = os.path.join(dst, file)
        shutil.copy2(src_path, dst_path)
        print(f"更新: {file}")
    
    # 删除dst多余的
    if delete_extra:
        for file in comparison.right_only:
            dst_path = os.path.join(dst, file)
            if os.path.isdir(dst_path):
                shutil.rmtree(dst_path)
            else:
                os.remove(dst_path)
            print(f"删除: {file}")
    
    # 递归处理子目录
    for common_dir in comparison.common_dirs:
        sync_dirs(
            os.path.join(src, common_dir),
            os.path.join(dst, common_dir),
            delete_extra
        )

sync_dirs('/home/user/project', '/backup/project', delete_extra=true)

filecmp.dircmp一次性告诉你三类差异:left_only(源独有)、right_only(目标独有)、diff_files(内容不同)。递归调用就能同步整个目录树。

实时同步:监听文件系统事件,文件一改动立刻同步。

from watchdog.observers import observer
from watchdog.events import filesystemeventhandler

class synchandler(filesystemeventhandler):
    def __init__(self, src, dst):
        self.src = src
        self.dst = dst
    
    def on_modified(self, event):
        if not event.is_directory:
            rel_path = os.path.relpath(event.src_path, self.src)
            dst_path = os.path.join(self.dst, rel_path)
            os.makedirs(os.path.dirname(dst_path), exist_ok=true)
            shutil.copy2(event.src_path, dst_path)
            print(f"实时同步: {rel_path}")

observer = observer()
handler = synchandler('/watch/src', '/watch/dst')
observer.schedule(handler, '/watch/src', recursive=true)
observer.start()

运行后,在/watch/src里新建或修改任何文件,都会瞬间复制到/watch/dst。适合配置文件热同步场景。

场景四:批量清理旧文件

日志文件占满磁盘是家常便饭。写个脚本自动删除30天前的文件。

import time

def clean_old_files(folder, days=30, pattern='*'):
    now = time.time()
    cutoff = now - (days * 86400)
    
    for root, dirs, files in os.walk(folder):
        for file in files:
            if not glob.fnmatch.fnmatch(file, pattern):
                continue
            filepath = os.path.join(root, file)
            mtime = os.path.getmtime(filepath)
            if mtime < cutoff:
                os.remove(filepath)
                print(f"删除旧文件: {filepath} ({datetime.fromtimestamp(mtime).date()})")
        
        # 删除空目录
        for dir in dirs:
            dirpath = os.path.join(root, dir)
            try:
                os.rmdir(dirpath)
                print(f"删除空目录: {dirpath}")
            except oserror:
                pass

clean_old_files('/var/log/nginx', days=7, pattern='*.log*')

更安全的干跑模式:先预览不执行,确认无误再真正删除。

def dry_run_clean(folder, days=30):
    now = time.time()
    cutoff = now - (days * 86400)
    to_delete = []
    
    for root, dirs, files in os.walk(folder):
        for file in files:
            filepath = os.path.join(root, file)
            if os.path.getmtime(filepath) < cutoff:
                to_delete.append(filepath)
    
    print(f"将删除 {len(to_delete)} 个文件:")
    for path in to_delete[:10]:  # 只显示前10个
        print(f"  {path}")
    
    if input("确认删除?(y/n): ").lower() == 'y':
        for path in to_delete:
            os.remove(path)
        print("删除完成")

踩坑与进阶技巧

文件名编码问题:windows和linux的文件名编码不同。跨平台时用os.fsencodeos.fsdecode处理。

def safe_listdir(path):
    try:
        return os.listdir(path)
    except unicodeencodeerror:
        return [os.fsdecode(f) for f in os.listdir(os.fsencode(path))]

大量文件性能优化os.listdir在几十万文件的目录里很慢,用scandir代替。

with os.scandir('/big_folder') as entries:
    for entry in entries:
        if entry.is_file() and entry.name.endswith('.log'):
            print(entry.name, entry.stat().st_size)

scandir一次性返回文件属性,避免额外调用stat,速度提升2-5倍。

移动而不是复制:备份到同一磁盘时,移动文件比复制快得多。

def move_to_archive(src, dst):
    os.makedirs(dst, exist_ok=true)
    for filename in os.listdir(src):
        shutil.move(os.path.join(src, filename), os.path.join(dst, filename))

日志记录:所有操作记下来,出问题能追溯。

import logging

logging.basicconfig(
    filename='file_ops.log',
    level=logging.info,
    format='%(asctime)s - %(message)s'
)

def log_op(op, src, dst=none):
    msg = f"{op}: {src}"
    if dst:
        msg += f" -> {dst}"
    logging.info(msg)
    print(msg)

一键搞定:综合运维脚本

把常用功能打包成命令行工具:

import argparse

def main():
    parser = argparse.argumentparser(description='文件批量处理工具')
    subparsers = parser.add_subparsers(dest='command')
    
    rename_parser = subparsers.add_parser('rename')
    rename_parser.add_argument('folder')
    rename_parser.add_argument('--pattern', default='*')
    rename_parser.add_argument('--template', default='{date}_{index}')
    
    backup_parser = subparsers.add_parser('backup')
    backup_parser.add_argument('src')
    backup_parser.add_argument('dst')
    backup_parser.add_argument('--incremental', action='store_true')
    
    sync_parser = subparsers.add_parser('sync')
    sync_parser.add_argument('src')
    sync_parser.add_argument('dst')
    sync_parser.add_argument('--delete', action='store_true')
    
    clean_parser = subparsers.add_parser('clean')
    clean_parser.add_argument('folder')
    clean_parser.add_argument('--days', type=int, default=30)
    
    args = parser.parse_args()
    
    if args.command == 'rename':
        rename_files(args.folder, args.pattern, args.template)
    elif args.command == 'backup':
        if args.incremental:
            incremental_backup(args.src, args.dst)
        else:
            shutil.copytree(args.src, args.dst)
    elif args.command == 'sync':
        sync_dirs(args.src, args.dst, args.delete)
    elif args.command == 'clean':
        clean_old_files(args.folder, args.days)

# 使用示例:
# python file_tool.py rename ./logs --pattern "*.log" --template "{date}_{index}"
# python file_tool.py backup /data /backup --incremental
# python file_tool.py sync /project /backup/project --delete

老王把这些脚本整合起来,设置好定时任务。现在每天早上一到公司,检查一下日志文件有没有按规范命名,备份有没有成功同步。之前每周五加班备份的日子一去不返,他泡杯茶就能开始一天的工作。

​以上就是python文件批量处理(重命名/备份/同步运维)的实战指南的详细内容,更多关于python文件批量处理的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com