前言
以下是一个使用python编写的mysql数据库备份脚本,包含压缩、日志记录和自动清理旧备份功能:
脚本内容:
import subprocess
import gzip
import os
import logging
import argparse
from datetime import datetime, timedelta
# 配置日志记录
logging.basicconfig(
level=logging.info,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.filehandler('db_backup.log'),
logging.streamhandler()
]
)
def backup_mysql(host, user, password, database, backup_dir, retain_days):
try:
# 创建备份目录
os.makedirs(backup_dir, exist_ok=true)
# 生成备份文件名
timestamp = datetime.now().strftime('%y%m%d%h%m%s')
backup_file = os.path.join(backup_dir, f"{database}_{timestamp}.sql")
compressed_file = f"{backup_file}.gz"
# 执行mysqldump命令
logging.info(f"starting backup for database: {database}")
with open(backup_file, 'w') as f:
cmd = [
'mysqldump',
f'-h{host}',
f'-u{user}',
f'-p{password}',
'--single-transaction',
'--routines',
'--triggers',
database
]
result = subprocess.run(cmd, stdout=f, stderr=subprocess.pipe, text=true)
if result.returncode != 0:
raise exception(f"mysqldump failed: {result.stderr}")
# 压缩备份文件
with open(backup_file, 'rb') as f_in:
with gzip.open(compressed_file, 'wb') as f_out:
f_out.writelines(f_in)
os.remove(backup_file)
logging.info(f"backup compressed: {compressed_file}")
# 清理旧备份
cutoff = datetime.now() - timedelta(days=retain_days)
for filename in os.listdir(backup_dir):
filepath = os.path.join(backup_dir, filename)
if os.path.isfile(filepath) and filename.endswith('.sql.gz'):
file_time = datetime.fromtimestamp(os.path.getmtime(filepath))
if file_time < cutoff:
os.remove(filepath)
logging.info(f"deleted old backup: {filename}")
except exception as e:
logging.error(f"backup failed: {str(e)}")
raise
if __name__ == "__main__":
parser = argparse.argumentparser(description='mysql database backup tool')
parser.add_argument('--host', required=true, help='mysql host')
parser.add_argument('--user', required=true, help='mysql username')
parser.add_argument('--password', required=true, help='mysql password')
parser.add_argument('--database', required=true, help='database name')
parser.add_argument('--backup-dir', default='./backups', help='backup directory')
parser.add_argument('--retain-days', type=int, default=7,
help='number of days to retain backups')
args = parser.parse_args()
try:
backup_mysql(
args.host,
args.user,
args.password,
args.database,
args.backup_dir,
args.retain_days
)
logging.info("backup completed successfully")
except:
logging.error("backup process terminated with errors")功能说明:
- 自动压缩:使用gzip压缩备份文件,节省存储空间
- 日志记录:同时输出到文件和控制台
- 旧备份清理:自动删除超过指定天数的旧备份
- 事务备份:使用–single-transaction保证备份一致性
- 存储过程/触发器:包含–routines和–triggers参数
- 错误处理:完善的异常捕获和错误日志记录
使用方式
bash
python mysql_backup.py \
--host=localhost \
--user=root \
--password=your_password \
--database=your_database \
--backup-dir=/path/to/backups \
--retain-days=7
建议改进方向:
- 添加邮件通知功能
- 支持远程备份到云存储
- 添加备份验证功能
- 支持多数据库并行备份
- 添加备份恢复功能
注意:需要提前安装mysql客户端工具(包含mysqldump),建议在服务器上直接运行。
方法补充
下面小编整理了一些其他python备份mysql数据库的方法,希望对大家有所帮助
python定时按日期备份mysql数据并压缩
#-*- coding:utf-8 -*-
import os
import time
import tarfile
import zipfile
'''
mysqldump
usage: mysqldump [options] database [tables]
or mysqldump [options] --databases [options] db1 [db2 db3...]
or mysqldump [options] --all-databases [options]
for more options, use mysqldump --help
'''
db_host="localhost"
db_user="root"
db_passwd="root"
db_name="crm"
db_charset="utf8"
db_backup_name=r"c:\crm_%s.sql" %(time.strftime("%y%m%d%h%m"))
zip_src = db_backup_name
zip_dest = zip_src + ".zip"
def zip_files():
f = zipfile.zipfile(zip_dest, 'w' ,zipfile.zip_deflated)
f.write(zip_src)
f.close()
if __name__ == "__main__":
print("begin to dump mysql database crm...");
os.system("mysqldump -h%s -u%s -p%s %s --default_character-set=%s > %s" %(db_host, db_user, db_passwd, db_name, db_charset, db_backup_name))
print("begin zip files...")
zip_files()
print("done, pyhon is great!")python 自动备份 mysql数据库
import os
import sys
# import glob
import filecmp
import time
import datetime
import subprocess
import sys
databackup_dir_path = r'e:\databackup'
mysqldump_path = r'e:\program files\mariadb 10.5\bin\mysqldump'
mysql_ip = '127.0.0.1'
mysql_username='root'
mysql_password='root'
mysql_port=3306
backdatabasenames = ["kwjl5","kwjl5c2","kwjl5ga" ]
def deleteoldfile(path,day):
for eachfile in os.listdir(path):
filename = os.path.join(path, eachfile)
# backup_ 开头的文件才会被删除
if os.path.isfile(filename) :
lastmodifytime = os.path.getmtime(filename)
# 设置删除多久之前的文件
endfiletime = time.time() - 3600 * 24 * day
if endfiletime > lastmodifytime:
if filename[0:7] == "backup_" and filename[-4:] == ".sql":
os.remove(filename)
print ("删除文件 %s 成功" % filename)
# 如果是目录则递归调用当前函数
elif os.path.isdir(filename):
deleteoldfile(filename,day)
def backupmysql(dbname):
time1_str = datetime.datetime.now().strftime('%y_%m_%d')
cmd =[]
cmd.append("\"" + mysqldump_path + "\"")
cmd.append("--opt")
cmd.append("--single-transaction=true")
cmd.append("--user="+ mysql_username)
cmd.append("--password="+ mysql_password)
cmd.append("--host="+ mysql_ip)
cmd.append("--protocol=tcp")
cmd.append("--port="+ str(mysql_port))
cmd.append("--default-character-set=utf8")
cmd.append("--single-transaction=true")
cmd.append("--routines")
cmd.append("--events \"%s\" > \"%s\\backup_%s_%s.sql\" " % (dbname, databackup_dir_path, dbname,time1_str) )
#cmd.append(" \n pause")
strcmd = str.join(" ",cmd )
print("cmd=>" + strcmd)
# os.system(strcmd)
try:
retcode = subprocess.call(strcmd, shell=true)
if retcode < 0:
print(sys.stderr, "child was terminated by signal", -retcode )
else:
print(sys.stderr, "child returned", retcode )
except oserror as e:
print(sys.stderr, "execution failed:", e )
# 删除老文件 15天以前的文件会被删掉
deleteoldfile(databackup_dir_path,15)
#逐个数据库备份
for f in backdatabasenames:
backupmysql(f)
print("执行完成")
脚本简单易懂, 就几句话. 改改参数即可.
到此这篇关于基于python编写mysql数据库备份脚本的文章就介绍到这了,更多相关python备份mysql数据库内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论