pyqt5 的 pdf 批量水印工具:
本文围绕 pdf添加水印.py,完整讲解如何将单文件水印脚本重构为可交互的 pyqt5 桌面应用,支持拖拽导入、实时预览、子线程处理与输出管理,并给出关键实现细节与结构说明。

一、功能概览
这款工具面向“批量给 pdf 添加文字水印”的场景,核心功能如下:
- 支持拖拽或选择多个 pdf 文件
- 参数可配置:字体、字号、行间距、旋转角度、透明度
- 实时预览旋转与透明度效果
- 使用子线程处理,避免界面卡顿
- 输出目录统一为
./output/并自动重命名 - 常见错误(空列表、字体缺失、权限不足)弹窗提示
二、整体架构与模块拆分
应用主要分为三层:
ui 层(watermarkapp)
负责界面布局、参数采集、按钮交互、预览触发、状态展示等。
处理层(watermarkworker / previewworker)
使用 qthread 执行批量水印与预览生成,避免阻塞主线程。
工具层(字体解析 / 输出重名 / 水印生成)
封装公共逻辑,确保主流程清晰且复用性强。
三、界面布局设计
界面布局遵循“左列表、右参数、底部操作区”的结构。
1. 左侧:pdf 文件列表 + 添加/删除
- 列表控件支持拖拽(继承
qlistwidget重写拖拽事件) - 添加按钮打开文件对话框
- 删除按钮移除选中项
2. 右侧:参数面板(按顺序分组)
- 水印文字:
qlineedit - 字体选择:
qcombobox(系统字体列表) - 行间距:
qspinbox(0–200%) - 字号:
qspinbox(8–200pt) - 角度:
qslider + qspinbox(-90–90°) - 透明度:
qslider + qspinbox(0–100%) - 预览:
qlabel显示动态渲染图
3. 底部:操作按钮 + 进度/状态
- “开始处理”
- “预览效果”
- “打开输出目录”
- 进度条 + 状态提示文本
四、拖拽导入实现
自定义 pdflistwidget,重写拖拽相关事件:
dragenterevent/dragmoveevent:校验 urldropevent:过滤.pdf文件并添加到列表
这样用户可以直接把 pdf 拖进列表,极大提升效率。
五、实时预览机制
预览的核心思想是:
- 使用当前 ui 参数生成一个水印 pdf
- 将水印合成到原 pdf 的第一页
- 保存为临时预览文件并自动打开
实现上通过 previewworker(qthread) 执行生成逻辑,以避免 ui 卡顿。
预览按钮会优先使用“选中项”,若未选中则默认使用第一个 pdf。
六、子线程批量处理
处理逻辑放在 watermarkworker(qthread) 中完成:
- 读取每个 pdf
- 对每一页合成水印
- 保存到
./output目录 - 通过信号更新进度条与状态文本
七、水印生成逻辑
水印生成使用 reportlab,在内存中创建一页 pdf:
- 读取当前页面大小(确保适配不同尺寸 pdf)
- 在中心点平移 + 旋转
- 按固定网格绘制水印文本
- 输出为
bytesio内存对象
关键点:
- 行间距与透明度全部来自 ui 参数
- 字体通过系统字体注册,缺失时抛错
八、字体选择与注册
字体下拉框基于系统字体列表:
qfontdatabase().families()获取可用字体- 默认选“宋体”
字体注册则通过 windows 注册表查询真实字体文件路径,然后用 reportlab 注册:
- 在注册表中查找字体名称
- 拼接字体文件路径
pdfmetrics.registerfont(ttfont(...))
这样既能保证预览字体一致,也能保证 pdf 输出字体可用。
九、输出策略与重名机制
输出统一保存到:
./output/原文件名_watermarked.pdf
如果文件已存在,会自动追加序号:
xxx_watermarked_1.pdf xxx_watermarked_2.pdf
这一逻辑由 unique_output_path 负责,避免覆盖历史结果。
十、错误处理与弹窗提示
以下情况会触发弹窗警告:
- pdf 列表为空
- 字体文件缺失
- 没有读写权限
- 缺少 pypdf2 依赖
这样可以避免用户在无感知情况下操作失败。
十一、运行方式
直接运行脚本即可启动 gui:
python d:\测试\公众号水文\11-pdf添加水印\pdf添加水印.py
如需打包为 exe,可使用 pyinstaller(另行说明)。
十二、总结与扩展建议
当前版本已经具备完整的批量水印能力,并具备以下优势:
- 参数全可配置
- ui 友好
- 处理不卡顿
- 输出安全可靠
后续可考虑的扩展方向:
- 增加颜色选择
- 自定义横向/纵向密度
- 多行预览与缩略图预览
- 输出目录可自定义
如需进一步增强功能或增加更多参数,我可以继续扩展优化。
完整代码
import io
import os
import sys
import winreg
from pyqt5.qtcore import qt, qthread, pyqtsignal
from pyqt5.qtgui import qfont, qfontdatabase, qpainter, qpixmap
from pyqt5.qtwidgets import (
qapplication,
qfiledialog,
qgroupbox,
qhboxlayout,
qlabel,
qlineedit,
qlistwidget,
qlistwidgetitem,
qmessagebox,
qpushbutton,
qprogressbar,
qslider,
qspinbox,
qdoublespinbox,
qvboxlayout,
qwidget,
qcombobox,
qabstractitemview,
qsizepolicy,
)
from reportlab.pdfgen import canvas
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import ttfont
try:
from pypdf2 import pdfreader, pdfwriter
except exception:
pdfreader = none
pdfwriter = none
def find_font_file(font_family):
fonts_dir = os.path.join(os.environ.get("windir", "c:\\windows"), "fonts")
try:
with winreg.openkey(
winreg.hkey_local_machine,
r"software\microsoft\windows nt\currentversion\fonts",
) as key:
index = 0
while true:
try:
name, data, _ = winreg.enumvalue(key, index)
index += 1
except oserror:
break
if font_family.lower() in name.lower():
if os.path.isabs(data):
return data
return os.path.join(fonts_dir, data)
except oserror:
pass
return none
def unique_output_path(output_dir, base_name):
output_path = os.path.join(output_dir, base_name)
if not os.path.exists(output_path):
return output_path
name, ext = os.path.splitext(base_name)
counter = 1
while true:
candidate = os.path.join(output_dir, f"{name}_{counter}{ext}")
if not os.path.exists(candidate):
return candidate
counter += 1
def get_reportlab_font(font_family, font_cache):
if font_family in font_cache:
return font_cache[font_family]
font_path = find_font_file(font_family)
if not font_path or not os.path.exists(font_path):
raise filenotfounderror(f"字体文件缺失:{font_family}")
font_key = f"font_{len(font_cache)}"
pdfmetrics.registerfont(ttfont(font_key, font_path))
font_cache[font_family] = font_key
return font_key
def create_watermark_pdf_buffer(width, height, options, font_cache):
buffer = io.bytesio()
watermark_canvas = canvas.canvas(buffer, pagesize=(width, height))
font_name = get_reportlab_font(options["font_family"], font_cache)
watermark_canvas.setfont(font_name, options["font_size"])
watermark_canvas.setfillalpha(options["opacity"] / 100)
watermark_canvas.translate(width / 2, height / 2)
watermark_canvas.rotate(options["angle"])
x_step = width / 5
y_step = height / 10 * (options["line_spacing"] / 100)
for i in range(5):
for j in range(10):
a = (i - 2) * x_step
b = (j - 4) * y_step
watermark_canvas.drawstring(a, b, options["text"])
watermark_canvas.save()
buffer.seek(0)
return buffer
class pdflistwidget(qlistwidget):
def __init__(self, parent=none):
super().__init__(parent)
self.setacceptdrops(true)
self.setselectionmode(qabstractitemview.extendedselection)
def dragenterevent(self, event):
if event.mimedata().hasurls():
event.acceptproposedaction()
else:
event.ignore()
def dragmoveevent(self, event):
if event.mimedata().hasurls():
event.acceptproposedaction()
else:
event.ignore()
def dropevent(self, event):
if not event.mimedata().hasurls():
return
for url in event.mimedata().urls():
path = url.tolocalfile()
if path.lower().endswith(".pdf"):
self.add_pdf_item(path)
def add_pdf_item(self, path):
for i in range(self.count()):
if self.item(i).text() == path:
return
item = qlistwidgetitem(path)
self.additem(item)
def selected_paths(self):
return [item.text() for item in self.selecteditems()]
def all_paths(self):
return [self.item(i).text() for i in range(self.count())]
class watermarkworker(qthread):
progress_changed = pyqtsignal(int)
status_changed = pyqtsignal(str)
error_occurred = pyqtsignal(str)
finished_success = pyqtsignal(str)
def __init__(self, files, options, parent=none):
super().__init__(parent)
self.files = files
self.options = options
self._font_cache = {}
def run(self):
if pdfreader is none or pdfwriter is none:
self.error_occurred.emit("缺少 pypdf2 库,请先安装 pypdf2。")
return
try:
output_dir = os.path.join(os.getcwd(), "output")
os.makedirs(output_dir, exist_ok=true)
except exception as exc:
self.error_occurred.emit(f"无法创建输出目录:{exc}")
return
total = len(self.files)
for index, file_path in enumerate(self.files, start=1):
try:
self.status_changed.emit(f"处理中:{os.path.basename(file_path)}")
output_name = f"{os.path.splitext(os.path.basename(file_path))[0]}_watermarked.pdf"
output_path = unique_output_path(output_dir, output_name)
reader = pdfreader(file_path)
writer = pdfwriter()
for page in reader.pages:
width = float(page.mediabox.width)
height = float(page.mediabox.height)
watermark_pdf = create_watermark_pdf_buffer(
width,
height,
self.options,
self._font_cache,
)
watermark_reader = pdfreader(watermark_pdf)
watermark_page = watermark_reader.pages[0]
page.merge_page(watermark_page)
writer.add_page(page)
with open(output_path, "wb") as output_file:
writer.write(output_file)
progress = int(index / total * 100)
self.progress_changed.emit(progress)
except permissionerror:
self.error_occurred.emit("文件权限不足,无法读写 pdf。")
return
except exception as exc:
self.error_occurred.emit(f"处理失败:{exc}")
return
self.progress_changed.emit(100)
self.status_changed.emit("处理完成")
self.finished_success.emit(output_dir)
class previewworker(qthread):
status_changed = pyqtsignal(str)
error_occurred = pyqtsignal(str)
preview_ready = pyqtsignal(str)
def __init__(self, file_path, options, parent=none):
super().__init__(parent)
self.file_path = file_path
self.options = options
self._font_cache = {}
def run(self):
if pdfreader is none or pdfwriter is none:
self.error_occurred.emit("缺少 pypdf2 库,请先安装 pypdf2。")
return
try:
output_dir = os.path.join(os.getcwd(), "output")
os.makedirs(output_dir, exist_ok=true)
except exception as exc:
self.error_occurred.emit(f"无法创建输出目录:{exc}")
return
try:
self.status_changed.emit("生成预览中")
reader = pdfreader(self.file_path)
if not reader.pages:
self.error_occurred.emit("pdf 内容为空,无法预览。")
return
writer = pdfwriter()
page = reader.pages[0]
width = float(page.mediabox.width)
height = float(page.mediabox.height)
watermark_pdf = create_watermark_pdf_buffer(
width,
height,
self.options,
self._font_cache,
)
watermark_reader = pdfreader(watermark_pdf)
watermark_page = watermark_reader.pages[0]
page.merge_page(watermark_page)
writer.add_page(page)
base_name = os.path.splitext(os.path.basename(self.file_path))[0]
output_name = f"{base_name}_preview.pdf"
output_path = unique_output_path(output_dir, output_name)
with open(output_path, "wb") as output_file:
writer.write(output_file)
self.preview_ready.emit(output_path)
except permissionerror:
self.error_occurred.emit("文件权限不足,无法读写 pdf。")
except exception as exc:
self.error_occurred.emit(f"预览失败:{exc}")
class watermarkapp(qwidget):
def __init__(self):
super().__init__()
self.worker = none
self.preview_worker = none
self.init_ui()
def init_ui(self):
self.setwindowtitle("pdf 水印工具")
self.resize(1100, 650)
main_layout = qvboxlayout(self)
content_layout = qhboxlayout()
main_layout.addlayout(content_layout)
left_layout = qvboxlayout()
content_layout.addlayout(left_layout, 3)
self.pdf_list = pdflistwidget()
self.pdf_list.settooltip("拖拽或通过按钮添加 pdf 文件")
left_layout.addwidget(self.pdf_list)
list_button_layout = qhboxlayout()
left_layout.addlayout(list_button_layout)
self.add_button = qpushbutton("添加 pdf")
self.add_button.settooltip("选择并添加 pdf 文件")
self.add_button.clicked.connect(self.add_files)
list_button_layout.addwidget(self.add_button)
self.remove_button = qpushbutton("删除选中")
self.remove_button.settooltip("删除列表中选中的 pdf 文件")
self.remove_button.clicked.connect(self.remove_selected)
list_button_layout.addwidget(self.remove_button)
right_layout = qvboxlayout()
content_layout.addlayout(right_layout, 4)
self.text_group = qgroupbox("水印文字")
self.text_group.settooltip("设置水印显示的文字内容")
text_layout = qvboxlayout()
self.text_input = qlineedit("python-小庄办公")
self.text_input.settooltip("输入水印文字内容")
self.text_input.textchanged.connect(self.update_preview)
text_layout.addwidget(self.text_input)
self.text_group.setlayout(text_layout)
right_layout.addwidget(self.text_group)
self.font_group = qgroupbox("字体选择器")
self.font_group.settooltip("选择系统已安装字体")
font_layout = qvboxlayout()
self.font_combo = qcombobox()
self.font_combo.settooltip("从系统字体列表中选择字体")
font_layout.addwidget(self.font_combo)
self.font_group.setlayout(font_layout)
right_layout.addwidget(self.font_group)
self.spacing_group = qgroupbox("水印行间距")
self.spacing_group.settooltip("调整水印纵向间距")
spacing_layout = qhboxlayout()
self.spacing_spin = qspinbox()
self.spacing_spin.settooltip("设置水印行间距百分比")
self.spacing_spin.setrange(0, 200)
self.spacing_spin.setsinglestep(1)
self.spacing_spin.setsuffix(" %")
self.spacing_spin.setvalue(100)
self.spacing_spin.valuechanged.connect(self.update_preview)
spacing_layout.addwidget(self.spacing_spin)
self.spacing_group.setlayout(spacing_layout)
right_layout.addwidget(self.spacing_group)
self.size_group = qgroupbox("水印字体大小")
self.size_group.settooltip("设置水印字体大小")
size_layout = qhboxlayout()
self.size_spin = qspinbox()
self.size_spin.settooltip("设置水印字体大小(pt)")
self.size_spin.setrange(8, 200)
self.size_spin.setsinglestep(1)
self.size_spin.setsuffix(" pt")
self.size_spin.setvalue(36)
self.size_spin.valuechanged.connect(self.update_preview)
size_layout.addwidget(self.size_spin)
self.size_group.setlayout(size_layout)
right_layout.addwidget(self.size_group)
self.angle_group = qgroupbox("旋转角度")
self.angle_group.settooltip("调整水印旋转角度")
angle_layout = qhboxlayout()
self.angle_slider = qslider(qt.horizontal)
self.angle_slider.settooltip("拖动调整旋转角度")
self.angle_slider.setrange(-90, 90)
self.angle_slider.setsinglestep(1)
self.angle_slider.setvalue(0)
self.angle_spin = qspinbox()
self.angle_spin.settooltip("输入旋转角度")
self.angle_spin.setrange(-90, 90)
self.angle_spin.setsinglestep(1)
self.angle_spin.setvalue(0)
angle_layout.addwidget(self.angle_slider)
angle_layout.addwidget(self.angle_spin)
self.angle_group.setlayout(angle_layout)
right_layout.addwidget(self.angle_group)
self.opacity_group = qgroupbox("透明度")
self.opacity_group.settooltip("调整水印透明度")
opacity_layout = qhboxlayout()
self.opacity_slider = qslider(qt.horizontal)
self.opacity_slider.settooltip("拖动调整透明度")
self.opacity_slider.setrange(0, 100)
self.opacity_slider.setsinglestep(1)
self.opacity_slider.setvalue(30)
self.opacity_spin = qspinbox()
self.opacity_spin.settooltip("输入透明度百分比")
self.opacity_spin.setrange(0, 100)
self.opacity_spin.setsinglestep(1)
self.opacity_spin.setvalue(30)
opacity_layout.addwidget(self.opacity_slider)
opacity_layout.addwidget(self.opacity_spin)
self.opacity_group.setlayout(opacity_layout)
right_layout.addwidget(self.opacity_group)
self.preview_group = qgroupbox("预览")
self.preview_group.settooltip("实时预览水印效果")
preview_layout = qvboxlayout()
self.preview_label = qlabel()
self.preview_label.settooltip("显示水印旋转与透明度预览")
self.preview_label.setminimumheight(140)
self.preview_label.setsizepolicy(qsizepolicy.expanding, qsizepolicy.expanding)
self.preview_label.setalignment(qt.aligncenter)
preview_layout.addwidget(self.preview_label)
self.preview_group.setlayout(preview_layout)
right_layout.addwidget(self.preview_group)
right_layout.addstretch()
bottom_layout = qhboxlayout()
main_layout.addlayout(bottom_layout)
self.start_button = qpushbutton("开始处理")
self.start_button.settooltip("开始给列表中的 pdf 添加水印")
self.start_button.clicked.connect(self.start_processing)
bottom_layout.addwidget(self.start_button)
self.preview_button = qpushbutton("预览效果")
self.preview_button.settooltip("生成当前参数下的水印预览文件")
self.preview_button.clicked.connect(self.start_preview)
bottom_layout.addwidget(self.preview_button)
self.open_output_button = qpushbutton("打开输出目录")
self.open_output_button.settooltip("打开输出文件夹")
self.open_output_button.clicked.connect(self.open_output_dir)
bottom_layout.addwidget(self.open_output_button)
self.progress_bar = qprogressbar()
self.progress_bar.settooltip("显示处理进度")
self.progress_bar.setvalue(0)
bottom_layout.addwidget(self.progress_bar, 3)
self.status_label = qlabel("就绪")
self.status_label.settooltip("显示当前处理状态")
bottom_layout.addwidget(self.status_label, 2)
self.load_fonts()
self.bind_signals()
self.update_preview()
def load_fonts(self):
db = qfontdatabase()
families = db.families()
self.font_combo.additems(families)
default_font = "宋体"
if default_font in families:
self.font_combo.setcurrenttext(default_font)
elif families:
self.font_combo.setcurrentindex(0)
self.font_combo.currenttextchanged.connect(self.update_preview)
def bind_signals(self):
self.angle_slider.valuechanged.connect(self.angle_spin.setvalue)
self.angle_spin.valuechanged.connect(self.angle_slider.setvalue)
self.angle_slider.valuechanged.connect(self.update_preview)
self.angle_spin.valuechanged.connect(self.update_preview)
self.opacity_slider.valuechanged.connect(self.opacity_spin.setvalue)
self.opacity_spin.valuechanged.connect(self.opacity_slider.setvalue)
self.opacity_slider.valuechanged.connect(self.update_preview)
self.opacity_spin.valuechanged.connect(self.update_preview)
def update_preview(self):
width = max(self.preview_label.width(), 300)
height = max(self.preview_label.height(), 140)
pixmap = qpixmap(width, height)
pixmap.fill(qt.white)
painter = qpainter(pixmap)
painter.setrenderhint(qpainter.antialiasing)
font = qfont(self.font_combo.currenttext(), self.size_spin.value())
painter.setfont(font)
painter.setopacity(self.opacity_spin.value() / 100)
painter.translate(width / 2, height / 2)
painter.rotate(self.angle_spin.value())
painter.drawtext(-width // 4, 0, self.text_input.text())
painter.end()
self.preview_label.setpixmap(pixmap)
def add_files(self):
files, _ = qfiledialog.getopenfilenames(
self,
"选择 pdf 文件",
"",
"pdf 文件 (*.pdf)",
)
for file_path in files:
self.pdf_list.add_pdf_item(file_path)
def remove_selected(self):
for item in self.pdf_list.selecteditems():
row = self.pdf_list.row(item)
self.pdf_list.takeitem(row)
def start_processing(self):
files = self.pdf_list.all_paths()
if not files:
qmessagebox.warning(self, "提示", "请先添加需要处理的 pdf 文件。")
return
if pdfreader is none or pdfwriter is none:
qmessagebox.warning(self, "提示", "未安装 pypdf2,无法处理 pdf。")
return
font_path = find_font_file(self.font_combo.currenttext())
if not font_path or not os.path.exists(font_path):
qmessagebox.warning(self, "提示", "所选字体文件缺失,请更换字体。")
return
options = {
"text": self.text_input.text().strip() or " ",
"font_family": self.font_combo.currenttext(),
"line_spacing": self.spacing_spin.value(),
"font_size": self.size_spin.value(),
"angle": self.angle_spin.value(),
"opacity": self.opacity_spin.value(),
}
self.progress_bar.setvalue(0)
self.status_label.settext("准备处理")
self.start_button.setenabled(false)
self.preview_button.setenabled(false)
self.worker = watermarkworker(files, options)
self.worker.progress_changed.connect(self.progress_bar.setvalue)
self.worker.status_changed.connect(self.status_label.settext)
self.worker.error_occurred.connect(self.handle_error)
self.worker.finished_success.connect(self.handle_success)
self.worker.start()
def handle_error(self, message):
self.start_button.setenabled(true)
self.preview_button.setenabled(true)
self.status_label.settext("处理失败")
qmessagebox.warning(self, "错误", message)
def handle_success(self, output_dir):
self.start_button.setenabled(true)
self.preview_button.setenabled(true)
self.status_label.settext("处理完成")
qmessagebox.information(self, "完成", f"水印处理完成,输出目录:{output_dir}")
def start_preview(self):
files = self.pdf_list.selected_paths() or self.pdf_list.all_paths()
if not files:
qmessagebox.warning(self, "提示", "请先添加需要预览的 pdf 文件。")
return
if pdfreader is none or pdfwriter is none:
qmessagebox.warning(self, "提示", "未安装 pypdf2,无法预览。")
return
font_path = find_font_file(self.font_combo.currenttext())
if not font_path or not os.path.exists(font_path):
qmessagebox.warning(self, "提示", "所选字体文件缺失,请更换字体。")
return
options = {
"text": self.text_input.text().strip() or " ",
"font_family": self.font_combo.currenttext(),
"line_spacing": self.spacing_spin.value(),
"font_size": self.size_spin.value(),
"angle": self.angle_spin.value(),
"opacity": self.opacity_spin.value(),
}
self.status_label.settext("生成预览")
self.preview_button.setenabled(false)
self.start_button.setenabled(false)
self.preview_worker = previewworker(files[0], options)
self.preview_worker.status_changed.connect(self.status_label.settext)
self.preview_worker.error_occurred.connect(self.handle_error)
self.preview_worker.preview_ready.connect(self.handle_preview_ready)
self.preview_worker.start()
def handle_preview_ready(self, preview_path):
self.start_button.setenabled(true)
self.preview_button.setenabled(true)
self.status_label.settext("预览完成")
try:
os.startfile(preview_path)
except exception as exc:
qmessagebox.warning(self, "提示", f"无法打开预览文件:{exc}")
def open_output_dir(self):
output_dir = os.path.join(os.getcwd(), "output")
if not os.path.exists(output_dir):
qmessagebox.warning(self, "提示", "输出目录不存在,请先处理文件。")
return
try:
os.startfile(output_dir)
except exception as exc:
qmessagebox.warning(self, "提示", f"无法打开输出目录:{exc}")
def main():
app = qapplication(sys.argv)
window = watermarkapp()
window.show()
sys.exit(app.exec_())
if __name__ == "__main__":
main()
以上就是python+pyqt5实现pdf批量水印工具的详细内容,更多关于python pdf批量水印的资料请关注代码网其它相关文章!
发表评论