.ibd 文件是 innodb 存储引擎的核心数据文件,存储表的行数据、索引及元信息。在仅有 .ibd 文件的情况下,可通过以下流程恢复数据。
一、.ibd 文件基础认知
- 表空间模式
- 独立表空间(mysql 5.6+ 默认):每张表对应独立
.ibd文件(如user.ibd),包含表的完整数据、索引及元数据,默认存储于datadir/数据库名/目录。 - 系统表空间(旧模式):所有表数据集中存储于
ibdata1等共享文件,无独立.ibd文件。
- 独立表空间(mysql 5.6+ 默认):每张表对应独立
- 核心特性
- 二进制格式,需通过 mysql 命令或专用工具解析;
- 依赖 redo 日志保障事务安全,通过 undo 日志支持 mvcc;
- 最小 i/o 单位为 16kb 页,由区(64 页)和段(索引对应的区集合)组成。
二、恢复工具与流程
使用开源工具 ibd2sql 解析 .ibd 文件,支持提取表结构(ddl)和数据(sql)。
1. 工具准备
# 下载工具 wget https://github.com/ddcw/ibd2sql/archive/refs/heads/ibd2sql-v2.x.zip unzip ibd2sql-v2.x.zip cd ibd2sql-ibd2sql-v2.x/ # 确认 python3 环境 python --version # 需 python 3.x
2. 单文件解析
python3 main.py your_file.ibd --sql --ddl
执行后生成包含表结构和数据的 sql 文件。
三、批量处理脚本
当存在大量 .ibd 文件时,使用以下脚本批量解析:
import os
import sys
import subprocess
from datetime import datetime
from pathlib import path
# ===================== 配置参数(按需修改)=====================
root_dir = path(__file__).resolve().parent
source_ibd_dir = path(os.path.expanduser("./input_ibd")).resolve() # 源 ibd 目录
output_sql_dir = path(os.path.expanduser("./output_sql")).resolve() # 输出 sql 目录
python_cmd = sys.executable or "python3" # 使用当前 python
main_script = root_dir / "main.py"
# ===============================================================
output_sql_dir.mkdir(parents=true, exist_ok=true)
# 日志文件(macos 下默认编码为 utf-8,无需额外设置)
log_file = output_sql_dir / f"parse_log_{datetime.now().strftime('%y%m%d_%h%m%s')}.log"
def log(info, level="info"):
"""日志输出(控制台 + 文件)"""
msg = f"[{level}] [{datetime.now().strftime('%y-%m-%d %h:%m:%s')}] {info}"
print(msg)
with open(log_file, "a", encoding="utf-8") as f:
f.write(msg + "\n")
def parse_single_ibd(ibd_file_path: path) -> bool:
"""调用官方 main.py 解析单个 ibd,并将 stdout 写入同名 .sql 文件"""
table_name = ibd_file_path.stem
sql_file = output_sql_dir / f"{table_name}.sql"
log(f"开始解析表:{table_name}(文件:{ibd_file_path.name})")
if not main_script.exists():
log(f"❌ 未找到 main.py,期望路径:{main_script}", level="error")
return false
cmd = [
python_cmd,
str(main_script),
str(ibd_file_path),
"--sql",
"--ddl",
]
try:
with open(sql_file, "w", encoding="utf-8") as sql_fp:
proc = subprocess.run(
cmd,
stdout=sql_fp,
stderr=subprocess.pipe,
text=true,
cwd=root_dir,
check=false,
)
if proc.returncode != 0:
log(f"❌ 解析失败(退出码 {proc.returncode}):{proc.stderr.strip()}", level="error")
if sql_file.exists():
sql_file.unlink()
return false
if proc.stderr.strip():
log(f"⚠️ 命令警告:{proc.stderr.strip()}", level="warning")
log(f"✅ sql 生成成功:{sql_file}")
return true
except filenotfounderror as exc:
log(f"❌ 命令执行失败:{exc}", level="error")
except exception as exc:
log(f"❌ 未知异常:{exc}", level="error")
return false
if __name__ == "__main__":
log("=" * 60)
log("ibd2sql 批量解析任务(macos 版)启动")
log(f"源 ibd 目录:{source_ibd_dir}")
log(f"输出 sql 目录:{output_sql_dir}")
log("=" * 60 + "\n")
# 遍历所有 .ibd 文件(自动过滤非 ibd 文件)
if not source_ibd_dir.is_dir():
log("⚠️ 源目录不存在,请检查配置!", level="error")
sys.exit(1)
ibd_files = sorted(source_ibd_dir.glob("*.ibd"))
total_count = len(ibd_files)
success_count = 0
fail_count = 0
fail_tables = []
if total_count == 0:
log("⚠️ 未发现 ibd 文件,请检查源目录是否正确!", level="warning")
sys.exit(1)
log(f"发现 {total_count} 个 ibd 文件,开始批量解析...\n")
for ibd_path in ibd_files:
if parse_single_ibd(ibd_path):
success_count += 1
else:
fail_count += 1
fail_tables.append(ibd_path.stem)
log("-" * 40 + "\n")
# 输出统计结果
log("=" * 60)
log("批量解析任务结束")
log(f"总文件数:{total_count}")
log(f"成功解析:{success_count} 个")
log(f"解析失败:{fail_count} 个")
if fail_tables:
log(f"失败表名:{','.join(fail_tables)}", level="error")
log(f"日志文件:{log_file}")
log("=" * 60)
四、sql 文件修正
解析后的 sql 可能存在格式问题,需进一步处理:
1. 移除 json 字段的字符集声明
mysql json 类型为二进制存储,无需指定字符集/排序规则,脚本如下:
import argparse
import sys
from dataclasses import dataclass
from pathlib import path
import re
# 默认输出目录,与 batch_ibd2sql.py 保持一致
default_output_sql_dir = path(__file__).resolve().parent / "output_sql"
json_keyword = re.compile(r"\bjson\b", re.ignorecase)
charset_pattern = re.compile(r"\s+character\s+set\s+\w+", re.ignorecase)
collate_pattern = re.compile(r"\s+collate\s+\w+", re.ignorecase)
column_name_pattern = re.compile(r"`([^`]+)`")
@dataclass
class processresult:
file_path: path
columns: list[str]
modified: bool
def clean_line(line: str) -> tuple[str, str | none]:
"""
若该行定义 json 字段且指定了字符集/排序规则,则移除相关声明。
返回 (新行内容, 字段名或 none)。
"""
if not json_keyword.search(line):
return line, none
has_charset = "character set" in line.upper()
has_collate = "collate" in line.upper()
if not (has_charset or has_collate):
return line, none
new_line = charset_pattern.sub("", line)
new_line = collate_pattern.sub("", new_line)
if new_line == line:
return line, none
match = column_name_pattern.search(line)
column_name = match.group(1) if match else "<unknown>"
return new_line, column_name
def process_sql_file(file_path: path, dry_run: bool = false) -> processresult:
original = file_path.read_text(encoding="utf-8")
lines = original.splitlines()
trailing_newline = original.endswith("\n")
processed_lines = []
touched_columns: list[str] = []
for line in lines:
new_line, column = clean_line(line)
processed_lines.append(new_line)
if column is not none:
touched_columns.append(column)
modified = bool(touched_columns)
if modified and not dry_run:
new_content = "\n".join(processed_lines)
if trailing_newline:
new_content += "\n"
file_path.write_text(new_content, encoding="utf-8")
return processresult(file_path=file_path, columns=touched_columns, modified=modified)
def iter_sql_files(root_dir: path):
for path in sorted(root_dir.rglob("*.sql")):
if path.is_file():
yield path
def parse_args():
parser = argparse.argumentparser(
description="移除 sql 文件中 json 字段的字符集/排序规则声明(mysql json 内置二进制类型无需设定字符集)。"
)
parser.add_argument(
"-t",
"--target",
default=str(default_output_sql_dir),
help="待扫描的 sql 目录,默认为项目 output_sql",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="仅报告将要修改的内容,不实际写回文件",
)
return parser.parse_args()
def main():
args = parse_args()
target_dir = path(args.target).expanduser().resolve()
if not target_dir.is_dir():
print(f"[error] 目录不存在:{target_dir}", file=sys.stderr)
sys.exit(1)
has_changes = false
total_files = 0
total_columns = 0
for sql_file in iter_sql_files(target_dir):
total_files += 1
result = process_sql_file(sql_file, dry_run=args.dry_run)
if result.modified:
has_changes = true
total_columns += len(result.columns)
state = "dry-run" if args.dry_run else "updated"
columns_str = ", ".join(result.columns)
print(f"[{state}] {sql_file} -> {columns_str}")
if not has_changes:
print(f"[info] 未检测到需要处理的 json 字段,扫描文件数:{total_files}")
else:
mode = "dry-run" if args.dry_run else "write"
print(
f"[summary] 模式={mode}, 处理文件数={total_files}, 修正字段数={total_columns}"
)
if __name__ == "__main__":
main()
2. 修复索引注释格式(可选)
部分表可能存在索引注释缺少 comment 关键字或引号的问题,可使用以下脚本修复(未完全测试,少量异常表建议手动修改):
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
修复 sql 文件中 key/unique key/primary key 注释缺少 comment 关键字的问题
将类似: key `xxx` (`column`) 注释内容
修改为: key `xxx` (`column`) comment '注释内容'
同时处理: unique key, key, primary key 等所有索引定义
"""
import os
import re
import sys
from pathlib import path
from datetime import datetime
from typing import list, tuple, dict
# ===================== 配置参数 =====================
output_sql_dir = path("./output_sql").resolve() # 输出 sql 目录
# ===============================================================
# 日志文件
log_file = output_sql_dir / f"fix_unique_key_comment_log_{datetime.now().strftime('%y%m%d_%h%m%s')}.log"
def log(info: str, level: str = "info"):
"""日志输出(控制台 + 文件)"""
msg = f"[{level}] [{datetime.now().strftime('%y-%m-%d %h:%m:%s')}] {info}"
print(msg)
with open(log_file, "a", encoding="utf-8") as f:
f.write(msg + "\n")
def find_key_without_comment(content: str) -> list[tuple[int, str, str, str]]:
"""
查找所有缺少 comment 关键字的 key 定义(包括 unique key, key, primary key)
返回: [(行号, 原始行, key类型, 匹配的注释内容), ...]
"""
issues = []
lines = content.split('\n')
for line_num, line in enumerate(lines, 1):
# 跳过已经有 comment 关键字的行
if 'comment' in line.upper():
continue
# 检查是否包含 key 定义
if 'key' not in line.upper():
continue
line_upper = line.upper()
match = none
key_type = none
# 按优先级匹配:primary key > unique key > key
if 'primary key' in line_upper:
match = re.search(
r'(primary\s+key\s+[^)]+))\s+([^,\n]+?)(?=\s*[,)]|\s*$)',
line,
re.ignorecase
)
if match:
key_type = 'primary key'
elif 'unique key' in line_upper:
match = re.search(
r'(unique\s+key\s+[^)]+))\s+([^,\n]+?)(?=\s*[,)]|\s*$)',
line,
re.ignorecase
)
if match:
key_type = 'unique key'
elif re.search(r'\bkey\s+', line, re.ignorecase):
# 普通 key,确保不是 unique key 或 primary key
match = re.search(
r'(\bkey\s+[^)]+))\s+([^,\n]+?)(?=\s*[,)]|\s*$)',
line,
re.ignorecase
)
if match:
key_type = 'key'
if match and key_type:
comment_text = match.group(2).strip().rstrip(',').strip()
# 排除空注释
if comment_text:
issues.append((line_num, line, key_type, comment_text))
return issues
def fix_key_comment(content: str) -> tuple[str, list[dict]]:
"""
修复 key 注释问题(包括 unique key, key, primary key)
返回: (修复后的内容, 修复记录列表)
"""
lines = content.split('\n')
fixes = []
new_lines = []
for line_num, line in enumerate(lines, 1):
original_line = line
fixed = false
# 跳过已经有 comment 关键字的行
if 'comment' in line.upper():
new_lines.append(line)
continue
# 检查是否包含 key 定义
if 'key' not in line.upper():
new_lines.append(line)
continue
line_upper = line.upper()
match = none
key_type = none
pattern = none
# 按优先级匹配:primary key > unique key > key
if 'primary key' in line_upper:
pattern = r'(primary\s+key\s+[^)]+))\s+([^,\n]+?)(?=\s*[,)]|\s*$)'
match = re.search(pattern, line, re.ignorecase)
if match:
key_type = 'primary key'
elif 'unique key' in line_upper:
pattern = r'(unique\s+key\s+[^)]+))\s+([^,\n]+?)(?=\s*[,)]|\s*$)'
match = re.search(pattern, line, re.ignorecase)
if match:
key_type = 'unique key'
elif re.search(r'\bkey\s+', line, re.ignorecase):
# 普通 key,确保不是 unique key 或 primary key
pattern = r'(\bkey\s+[^)]+))\s+([^,\n]+?)(?=\s*[,)]|\s*$)'
match = re.search(pattern, line, re.ignorecase)
if match:
key_type = 'key'
if match and key_type and pattern:
key_def = match.group(1)
comment_text = match.group(2).strip()
# 排除空注释或只是空白字符
if comment_text and comment_text.strip():
# 移除末尾的逗号(如果有)
comment_text = comment_text.rstrip(',').strip()
# 检查注释是否已经用引号包裹
if not (comment_text.startswith("'") and comment_text.endswith("'")):
# 转义单引号
escaped_comment = comment_text.replace("'", "''")
# 用单引号包裹
quoted_comment = f"'{escaped_comment}'"
else:
quoted_comment = comment_text
# 使用正则替换
# 匹配: key ... ) 注释内容 [逗号或行尾]
# 替换为: key ... ) comment '注释内容' [逗号或行尾]
new_line = re.sub(
pattern,
rf'\1 comment {quoted_comment}',
line,
count=1,
flags=re.ignorecase
)
# 确保行尾的逗号被保留(如果原来有的话)
if line.rstrip().endswith(',') and not new_line.rstrip().endswith(','):
new_line = new_line.rstrip() + ','
fixes.append({
'line': line_num,
'key_type': key_type,
'original': original_line.strip(),
'fixed': new_line.strip(),
'comment': quoted_comment
})
new_lines.append(new_line)
fixed = true
if not fixed:
new_lines.append(line)
return '\n'.join(new_lines), fixes
def process_sql_file(sql_file: path) -> dict:
"""处理单个 sql 文件"""
result = {
'file': str(sql_file),
'fixed': false,
'fixes': []
}
try:
with open(sql_file, 'r', encoding='utf-8') as f:
content = f.read()
# 查找问题
issues = find_key_without_comment(content)
if issues:
# 修复问题
new_content, fixes = fix_key_comment(content)
if fixes:
# 写回文件
with open(sql_file, 'w', encoding='utf-8') as f:
f.write(new_content)
result['fixed'] = true
result['fixes'] = fixes
log(f"✅ 修复文件: {sql_file.name}")
for fix in fixes:
key_type = fix.get('key_type', 'key')
log(f" 行 {fix['line']} ({key_type}): {fix['original']}")
log(f" -> {fix['fixed']}")
else:
log(f"⚠️ 发现问题但未修复: {sql_file.name}")
else:
log(f"✓ 无问题: {sql_file.name}")
except exception as e:
log(f"❌ 处理文件失败 {sql_file.name}: {str(e)}", level="error")
result['error'] = str(e)
return result
def main():
"""主函数"""
log("=" * 60)
log("key/unique key/primary key 注释修复任务启动")
log(f"扫描目录:{output_sql_dir}")
log("=" * 60 + "\n")
if not output_sql_dir.exists():
log(f"❌ 目录不存在:{output_sql_dir}", level="error")
sys.exit(1)
# 查找所有 sql 文件
sql_files = sorted(output_sql_dir.glob("*.sql"))
total_count = len(sql_files)
if total_count == 0:
log("⚠️ 未发现 sql 文件", level="warning")
sys.exit(1)
log(f"发现 {total_count} 个 sql 文件,开始扫描...\n")
# 统计信息
fixed_files = []
total_fixes = 0
all_fixes_detail = []
# 处理每个文件
for sql_file in sql_files:
result = process_sql_file(sql_file)
if result['fixed']:
fixed_files.append(result['file'])
total_fixes += len(result['fixes'])
# 为每个修复添加文件信息
for fix in result['fixes']:
fix['file'] = result['file']
all_fixes_detail.extend(result['fixes'])
log("-" * 40 + "\n")
# 输出统计结果
log("=" * 60)
log("修复任务结束")
log(f"总文件数:{total_count}")
log(f"修复文件数:{len(fixed_files)}")
log(f"总修复数:{total_fixes}")
log("=" * 60)
# 输出详细修复信息
if all_fixes_detail:
log("\n详细修复信息:")
log("=" * 60)
for fix in all_fixes_detail:
# 从文件路径中提取文件名
file_name = path(fix.get('file', '')).name if 'file' in fix else 'unknown'
key_type = fix.get('key_type', 'key')
log(f"\n文件: {file_name}")
log(f"行号: {fix['line']}")
log(f"类型: {key_type}")
log(f"原始: {fix['original']}")
log(f"修复: {fix['fixed']}")
log(f"注释: {fix['comment']}")
log("=" * 60)
log(f"\n日志文件:{log_file}")
if __name__ == "__main__":
main()
总结
通过 ibd2sql 工具结合批量处理脚本,可高效恢复 .ibd 文件中的数据。解析后需注意修正 json 字段格式及索引注释问题,少量异常表建议手动调整以确保数据完整性。
以上就是通过.ibd文件恢复mysql数据的全流程的详细内容,更多关于.ibd文件恢复mysql数据的资料请关注代码网其它相关文章!
发表评论