python 删除重复图片(md5 + phash + orb 图像识别)最终版:改目录直接运行(支持子文件夹/中文路径/导出清单/先移动再删)
很多人“删除重复图片”只会用 md5:确实能删完全一样的文件,但遇到这些情况就失效:
- 同一张图被压缩过(清晰度变了)
- 分辨率不同(缩放过)
- 截图边缘裁了一点点(很像,但不是同文件)
所以这篇给你一套更稳、更接近“图像识别”的去重方案:
✅ md5:删“文件内容完全相同”的重复图(最快最准)
✅ phash(感知哈希):快速筛出“看起来很像”的候选
✅ orb 图像识别(二次确认):关键点特征匹配,减少 phash 误判(更像“识图”)
并且脚本做到:
- 单文件:改目录即可跑
- 依赖写在 py 里:缺库自动 pip 安装(也支持设置国内镜像)
- 安全:默认不直接删,先移动到
_duplicates/,并导出dedup_report.csv(可回溯) - 支持中文路径:windows 下中文目录/中文文件名可读
1)使用方法(最简单)
第一步:保存脚本
把本文下面整段代码保存为:dedup_images_run.py
第二步:只改一个地方
打开文件,修改:
root_dir = r"d:\imgs"
改成你的图片目录。
第三步:直接运行
python dedup_images_run.py
2)脚本会输出什么?
运行结束后,你会在 root_dir 下看到:
dedup_report.csv:每一条重复判定记录(保留图、重复图、原因、距离/匹配度、移动到哪)_duplicates/:被判定为重复的图片(默认移动到这里,保持原目录结构)
确认无误后,你再决定是否要把 action 改成 delete 直接删除。
3)阈值怎么调(最常见问题)
phash 阈值phash_th
- 越小越严格(误判少,但漏判多)
- 常用范围:
6~10 - 推荐默认:
8
orb 图像识别阈值(更关键)
orb 用两个条件同时判断“足够相似”:
orb_min_good_matches:好匹配点数量(建议 15~60)orb_similarity_th:good_matches / min(kp1, kp2)的比例(建议 0.08~0.20)
如果你感觉重复图没抓到(太严格):
orb_min_good_matches: 25 → 15orb_similarity_th: 0.12 → 0.08
如果你感觉误判太多(太宽松):
orb_min_good_matches: 25 → 40orb_similarity_th: 0.12 → 0.18- 同时把
phash_th: 8 → 6
4)完整代码(单文件可运行,含自动安装依赖 + 图像识别 orb)
说明:依赖安装“写在 py 里”并不等于不需要联网。第一次运行如果缺库,脚本会自动 pip 安装;如果机器完全离线,请提前装好依赖或把依赖 wheel 放到本地。
# -*- coding: utf-8 -*-
"""
dedup_images_run.py
python 批量删除重复图片(md5 + phash + orb 图像识别)—— 单文件可直接运行版
✅ 你只需要改 root_dir,然后 python 运行即可
✅ 不用手动 pip install:脚本会自动检测缺少的库并安装
✅ 支持子文件夹递归
✅ 支持中文路径(windows)
✅ 默认不直接删除:先移动到 _duplicates/(更安全)
✅ 导出 dedup_report.csv,方便回溯
去重策略(推荐):
1) md5:完全重复(内容一模一样)——最快最准
2) phash:近似重复(缩放/压缩/轻微变化)——快速筛选候选
3) orb 图像识别:关键点特征匹配——二次确认,减少误判,更像“识图”
依赖(自动安装):
- opencv-python(cv2)
- numpy
- tqdm
注意:
- 第一次自动安装 opencv-python 可能较慢
- 如在国内网络环境,可设置 pip_index_url 为镜像
"""
import os
import sys
import csv
import time
import shutil
import hashlib
import subprocess
import importlib.util
from dataclasses import dataclass
from typing import dict, list, tuple, optional
# =========================================================
# 1) 配置区:你只需要改 root_dir
# =========================================================
root_dir = r"c:\feeday\imgtest" # <<< 改成你的图片目录
recursive = true # 是否递归子文件夹
# 模式:
# - "md5" 只做完全重复
# - "phash" 只做近似重复(可能误判多一点)
# - "md5+phash" 先md5后phash(推荐)
# - "md5+phash+orb" 先md5后phash,再用orb图像识别二次确认(强烈推荐)
mode = "md5+phash+orb"
phash_th = 8 # phash 汉明距离阈值:常用 6~10,默认 8
# orb 图像识别参数(用于“确认相似”)
use_orb = true # mode 包含 orb 时会自动启用,这里留着方便你手动开关
orb_nfeatures = 1200 # orb 特征点数量:800~2000 常用
orb_ratio_test = 0.75 # 0.70~0.80,越小越严格
orb_min_good_matches = 25 # 至少多少个“好匹配”才算重复(建议 15~60)
orb_similarity_th = 0.12 # good_matches / min(kp1,kp2) 的比例阈值(建议 0.08~0.20)
orb_max_image_edge = 1200 # 识别时把大图等比缩小到最长边(提速)
action = "move" # "move" / "delete" / "none"
move_to = "_duplicates" # move 时重复图片存放目录(相对 root_dir)
report_name = "dedup_report.csv" # 报告文件名(相对 root_dir)
dry_run = false # true=只生成报告不移动/不删除
prefer_keep = "area" # 保留策略: "area"(分辨率优先) / "size" / "newest" / "oldest"
# 自动安装依赖
auto_install = true
# 如在国内网络环境可设置镜像(可选)
# 例如:pip_index_url = "https://mirrors.tencent.com/pypi/simple/"
pip_index_url = ""
# =========================================================
# 2) 自动安装依赖(写在 py 里)
# =========================================================
def _module_exists(module_name: str) -> bool:
return importlib.util.find_spec(module_name) is not none
def ensure_packages():
"""
检测并自动安装依赖库:opencv-python / numpy / tqdm
"""
if not auto_install:
return
need_install = []
req = [
("cv2", "opencv-python"),
("numpy", "numpy"),
("tqdm", "tqdm"),
]
for mod, pkg in req:
if not _module_exists(mod):
need_install.append(pkg)
if not need_install:
return
print("⚠️ 检测到缺少依赖,将自动安装:", ", ".join(need_install))
cmd = [sys.executable, "-m", "pip", "install", "--upgrade"] + need_install
if pip_index_url.strip():
cmd += ["-i", pip_index_url.strip()]
print("➡️ 执行:", " ".join(cmd))
try:
subprocess.check_call(cmd)
print("✅ 依赖安装完成")
except subprocess.calledprocesserror as e:
print("❌ 自动安装失败:", e)
print("你可以手动执行:")
if pip_index_url.strip():
print(f' {sys.executable} -m pip install -u {" ".join(need_install)} -i {pip_index_url.strip()}')
else:
print(f' {sys.executable} -m pip install -u {" ".join(need_install)}')
sys.exit(1)
ensure_packages()
import numpy as np
import cv2
from tqdm import tqdm
# =========================================================
# 3) 主逻辑
# =========================================================
image_exts = {".jpg", ".jpeg", ".png", ".webp", ".bmp", ".tif", ".tiff"}
def imread_unicode(path: str) -> optional[np.ndarray]:
"""
opencv 兼容中文路径读取
"""
try:
data = np.fromfile(path, dtype=np.uint8)
img = cv2.imdecode(data, cv2.imread_color)
return img
except exception:
return none
def get_image_hw(path: str) -> tuple[int, int]:
img = imread_unicode(path)
if img is none:
return 0, 0
h, w = img.shape[:2]
return int(h), int(w)
def file_md5(path: str, chunk_size: int = 1024 * 1024) -> str:
md5 = hashlib.md5()
with open(path, "rb") as f:
while true:
b = f.read(chunk_size)
if not b:
break
md5.update(b)
return md5.hexdigest()
def phash_64(path: str) -> optional[int]:
"""
phash 64-bit:灰度->32x32->dct->取8x8低频->与中位数比较->64bit
"""
img = imread_unicode(path)
if img is none:
return none
try:
gray = cv2.cvtcolor(img, cv2.color_bgr2gray)
gray = cv2.resize(gray, (32, 32), interpolation=cv2.inter_area)
gray = np.float32(gray)
dct = cv2.dct(gray)
dct_low = dct[:8, :8].copy()
flat = dct_low.flatten()
flat_no_dc = flat[1:] # 去掉 dc
median = np.median(flat_no_dc)
bits = (flat > median).astype(np.uint8)
h = 0
for b in bits:
h = (h << 1) | int(b)
return int(h)
except exception:
return none
def hamming_distance_64(a: int, b: int) -> int:
return int((a ^ b).bit_count())
# -----------------------------
# orb 图像识别(关键点特征匹配)
# -----------------------------
_orb = none
_bf = none
_orb_cache: dict[str, tuple[int, optional[np.ndarray]]] = {}
def _get_orb():
global _orb
if _orb is none:
_orb = cv2.orb_create(nfeatures=int(orb_nfeatures))
return _orb
def _get_bf():
global _bf
if _bf is none:
_bf = cv2.bfmatcher(cv2.norm_hamming, crosscheck=false)
return _bf
def _resize_for_orb(img: np.ndarray) -> np.ndarray:
h, w = img.shape[:2]
m = max(h, w)
if m <= orb_max_image_edge:
return img
scale = orb_max_image_edge / float(m)
new_w = max(1, int(w * scale))
new_h = max(1, int(h * scale))
return cv2.resize(img, (new_w, new_h), interpolation=cv2.inter_area)
def orb_signature(path: str) -> tuple[int, optional[np.ndarray]]:
"""
返回:(keypoints_count, descriptors)
descriptors 可能为 none(例如纯色图/读图失败)
"""
if path in _orb_cache:
return _orb_cache[path]
img = imread_unicode(path)
if img is none:
_orb_cache[path] = (0, none)
return (0, none)
try:
img = _resize_for_orb(img)
gray = cv2.cvtcolor(img, cv2.color_bgr2gray)
orb = _get_orb()
kps, des = orb.detectandcompute(gray, none)
kp_count = 0 if kps is none else len(kps)
_orb_cache[path] = (kp_count, des)
return _orb_cache[path]
except exception:
_orb_cache[path] = (0, none)
return (0, none)
def orb_similarity(path_a: str, path_b: str) -> tuple[int, float]:
"""
计算 orb 相似度:
- good_matches:通过 ratio test 的好匹配数量
- similarity:good_matches / min(kp_count_a, kp_count_b)
"""
kp_a, des_a = orb_signature(path_a)
kp_b, des_b = orb_signature(path_b)
if des_a is none or des_b is none:
return 0, 0.0
if kp_a <= 0 or kp_b <= 0:
return 0, 0.0
try:
bf = _get_bf()
matches = bf.knnmatch(des_a, des_b, k=2)
good = 0
ratio = float(orb_ratio_test)
for m_n in matches:
if len(m_n) < 2:
continue
m, n = m_n[0], m_n[1]
if m.distance < ratio * n.distance:
good += 1
denom = max(1, min(kp_a, kp_b))
sim = good / float(denom)
return good, float(sim)
except exception:
return 0, 0.0
@dataclass
class imginfo:
path: str
size: int
w: int
h: int
@property
def area(self) -> int:
return self.w * self.h
def choose_keep(a: imginfo, b: imginfo, prefer: str) -> tuple[imginfo, imginfo]:
if prefer == "area":
if a.area != b.area:
return (a, b) if a.area > b.area else (b, a)
if a.size != b.size:
return (a, b) if a.size > b.size else (b, a)
return (a, b)
if prefer == "size":
if a.size != b.size:
return (a, b) if a.size > b.size else (b, a)
if a.area != b.area:
return (a, b) if a.area > b.area else (b, a)
return (a, b)
if prefer in {"newest", "oldest"}:
ta = os.path.getmtime(a.path)
tb = os.path.getmtime(b.path)
if prefer == "newest":
return (a, b) if ta >= tb else (b, a)
else:
return (a, b) if ta <= tb else (b, a)
return choose_keep(a, b, "area")
def iter_images(root: str, recursive: bool) -> list[str]:
root = os.path.abspath(root)
out = []
if recursive:
for dp, _, fns in os.walk(root):
for fn in fns:
ext = os.path.splitext(fn)[1].lower()
if ext in image_exts:
out.append(os.path.join(dp, fn))
else:
for fn in os.listdir(root):
p = os.path.join(root, fn)
if os.path.isfile(p):
ext = os.path.splitext(fn)[1].lower()
if ext in image_exts:
out.append(p)
return out
def ensure_dir(p: str) -> none:
os.makedirs(p, exist_ok=true)
def safe_move(src: str, dst: str) -> str:
base, ext = os.path.splitext(dst)
final = dst
i = 1
while os.path.exists(final):
final = f"{base}({i}){ext}"
i += 1
ensure_dir(os.path.dirname(final))
shutil.move(src, final)
return final
def write_report_csv(report_path: str, rows: list[dict]) -> none:
ensure_dir(os.path.dirname(report_path) if os.path.dirname(report_path) else ".")
fieldnames = [
"time", "mode",
"keep_path", "dup_path",
"reason", "value",
"phash_distance",
"orb_good_matches", "orb_similarity",
"keep_w", "keep_h", "keep_size",
"dup_w", "dup_h", "dup_size",
"dup_new_path",
]
with open(report_path, "w", newline="", encoding="utf-8-sig") as f:
w = csv.dictwriter(f, fieldnames=fieldnames)
w.writeheader()
for r in rows:
w.writerow(r)
def dedup_by_md5(paths: list[str], prefer_keep: str) -> list[tuple[str, str, str]]:
"""
返回重复对:(dup_path, keep_path, md5)
"""
md5_map: dict[str, imginfo] = {}
dups: list[tuple[str, str, str]] = []
for p in tqdm(paths, desc="md5 扫描", unit="img"):
try:
h = file_md5(p)
except exception:
continue
try:
size = os.path.getsize(p)
except exception:
size = 0
hh, ww = get_image_hw(p)
cur = imginfo(path=p, size=size, w=ww, h=hh)
if h not in md5_map:
md5_map[h] = cur
else:
old = md5_map[h]
keep, drop = choose_keep(old, cur, prefer_keep)
md5_map[h] = keep
dups.append((drop.path, keep.path, h))
return dups
def dedup_by_phash_with_orb(
paths: list[str],
prefer_keep: str,
phash_th: int,
use_orb: bool
) -> list[tuple[str, str, str, int, int, float]]:
"""
返回重复对:
(dup_path, keep_path, phash_hex, phash_dist, orb_good, orb_sim)
说明:
- 先用 phash 快速筛选候选(phash_dist <= phash_th)
- 若启用 orb,则对候选进行二次“图像识别”确认
"""
hashes: dict[str, int] = {}
infos: dict[str, imginfo] = {}
for p in tqdm(paths, desc="phash 计算", unit="img"):
ph = phash_64(p)
if ph is none:
continue
hashes[p] = ph
try:
size = os.path.getsize(p)
except exception:
size = 0
hh, ww = get_image_hw(p)
infos[p] = imginfo(path=p, size=size, w=ww, h=hh)
# 分桶加速:用高16位分桶
buckets: dict[int, list[str]] = {}
for p, ph in hashes.items():
key = (ph >> 48) & 0xffff
buckets.setdefault(key, []).append(p)
keep_map: dict[str, str] = {p: p for p in hashes.keys()}
dups: list[tuple[str, str, str, int, int, float]] = []
for key in tqdm(list(buckets.keys()), desc="phash 分桶比对", unit="bucket"):
group = buckets[key]
if len(group) <= 1:
continue
for i in range(len(group)):
a = group[i]
a_keep = keep_map.get(a, a)
if a_keep not in hashes:
continue
for j in range(i + 1, len(group)):
b = group[j]
b_keep = keep_map.get(b, b)
if b_keep not in hashes:
continue
ha = hashes[a_keep]
hb = hashes[b_keep]
dist = hamming_distance_64(ha, hb)
# phash 先筛候选
if dist > phash_th:
continue
orb_good = 0
orb_sim = 0.0
if use_orb:
orb_good, orb_sim = orb_similarity(a_keep, b_keep)
# orb 二次确认门槛:两个条件都满足才算重复
if orb_good < int(orb_min_good_matches):
continue
if orb_sim < float(orb_similarity_th):
continue
ia = infos.get(a_keep)
ib = infos.get(b_keep)
if ia is none or ib is none:
continue
keep, drop = choose_keep(ia, ib, prefer_keep)
keep_map[keep.path] = keep.path
keep_map[drop.path] = keep.path
ph_hex = f"{hashes[keep.path]:016x}"
dups.append((drop.path, keep.path, ph_hex, dist, orb_good, orb_sim))
return dups
def main():
root = os.path.abspath(root_dir)
if not os.path.isdir(root):
print(f"❌ 目录不存在:{root}")
return
t0 = time.time()
paths = iter_images(root, recursive)
if not paths:
print("⚠️ 未找到图片文件。")
return
duplicates_dir = os.path.join(root, move_to)
report_path = os.path.join(root, report_name)
mode = mode.strip().lower()
use_orb = (("orb" in mode) and use_orb)
print("=================================================")
print("📁 根目录:", root)
print("🧾 图片数量:", len(paths))
print(f"🧠 模式:{mode} | phash阈值:{phash_th} | 保留策略:{prefer_keep}")
print(f"🧷 动作:{action} | dry-run:{dry_run}")
if use_orb:
print("🧠 orb 图像识别:启用")
print(f" orb_nfeatures={orb_nfeatures}, ratio={orb_ratio_test}, min_good={orb_min_good_matches}, sim_th={orb_similarity_th}")
else:
print("🧠 orb 图像识别:未启用")
print("=================================================")
report_rows: list[dict] = []
processed = 0
# 1) md5
md5_dups: list[tuple[str, str, str]] = []
remain_after_md5 = paths
if mode in {"md5", "md5+phash", "md5+phash+orb"} or ("md5" in mode):
md5_dups = dedup_by_md5(paths, prefer_keep)
dup_set = set([d[0] for d in md5_dups])
remain_after_md5 = [p for p in paths if p not in dup_set]
# 2) phash (+ orb)
phash_dups: list[tuple[str, str, str, int, int, float]] = []
if ("phash" in mode):
phash_dups = dedup_by_phash_with_orb(
remain_after_md5,
prefer_keep=prefer_keep,
phash_th=phash_th,
use_orb=use_orb
)
def do_action(dup_path: str) -> str:
nonlocal processed
if dry_run or action == "none":
return ""
if not os.path.exists(dup_path):
return ""
if action == "delete":
try:
os.remove(dup_path)
processed += 1
return ""
except exception:
return ""
if action == "move":
try:
rel = os.path.relpath(dup_path, root)
dst = os.path.join(duplicates_dir, rel)
newp = safe_move(dup_path, dst)
processed += 1
return newp
except exception:
return ""
return ""
# 记录 md5
for dup_path, keep_path, md5v in md5_dups:
if not os.path.exists(dup_path):
continue
keep_h, keep_w = get_image_hw(keep_path)
dup_h, dup_w = get_image_hw(dup_path)
keep_size = os.path.getsize(keep_path) if os.path.exists(keep_path) else 0
dup_size = os.path.getsize(dup_path) if os.path.exists(dup_path) else 0
new_path = do_action(dup_path)
report_rows.append({
"time": time.strftime("%y-%m-%d %h:%m:%s"),
"mode": "md5",
"keep_path": keep_path,
"dup_path": dup_path,
"reason": "md5 完全重复",
"value": md5v,
"phash_distance": "",
"orb_good_matches": "",
"orb_similarity": "",
"keep_w": keep_w, "keep_h": keep_h, "keep_size": keep_size,
"dup_w": dup_w, "dup_h": dup_h, "dup_size": dup_size,
"dup_new_path": new_path,
})
# 记录 phash(+orb)
for dup_path, keep_path, ph_hex, ph_dist, orb_good, orb_sim in phash_dups:
if not os.path.exists(dup_path):
continue
keep_h, keep_w = get_image_hw(keep_path)
dup_h, dup_w = get_image_hw(dup_path)
keep_size = os.path.getsize(keep_path) if os.path.exists(keep_path) else 0
dup_size = os.path.getsize(dup_path) if os.path.exists(dup_path) else 0
new_path = do_action(dup_path)
reason = "phash 近似重复"
if use_orb:
reason = "phash + orb 图像识别确认"
report_rows.append({
"time": time.strftime("%y-%m-%d %h:%m:%s"),
"mode": "phash+orb" if use_orb else "phash",
"keep_path": keep_path,
"dup_path": dup_path,
"reason": reason,
"value": ph_hex,
"phash_distance": ph_dist,
"orb_good_matches": orb_good if use_orb else "",
"orb_similarity": f"{orb_sim:.4f}" if use_orb else "",
"keep_w": keep_w, "keep_h": keep_h, "keep_size": keep_size,
"dup_w": dup_w, "dup_h": dup_h, "dup_size": dup_size,
"dup_new_path": new_path,
})
write_report_csv(report_path, report_rows)
dt = time.time() - t0
print("")
print("✅ 完成")
print(f"📄 报告:{report_path}")
if action == "move":
print(f"📦 重复文件目录:{duplicates_dir}(保持原目录结构)")
print(f"🧮 识别重复条目:{len(report_rows)}")
if dry_run or action == "none":
print("🧪 dry-run/none:未移动/未删除任何文件")
else:
print(f"🧹 实际处理文件数(移动/删除):{processed}")
print(f"⏱ 用时:{dt:.2f} 秒")
if __name__ == "__main__":
main()到此这篇关于基于python的重复图片删除工具的文章就介绍到这了,更多相关python重复图片删除内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论