从长视频中批量剪辑出精彩片段。传统的做法是打开笨重的剪辑软件手动操作,效率低下。作为一个开发者,我们自然会想:能不能写个脚本来自动化这个过程?
当然可以。但如果想让这个工具更易用,一个图形用户界面是必不可少的。而这恰恰引入了 gui 编程中最经典的挑战:如何在执行耗时任务(如视频处理)的同时,保持界面的流畅响应?
这篇文章将分享我写的一个app.py单文件小工具,它能根据字幕文件批量剪辑视频。并聊聊如何使用 pyside6框架,通过 qthreadpool 和信号/槽机制,构建一个高性能、不卡顿的桌面应用。

ui线程的“生命不可承受之重”
任何 gui 框架,无论是前端的浏览器、android 的 view 系统还是 qt,都有一个主线程,也叫 ui 线程。它负责处理用户输入、绘制界面、响应事件。如果在这个线程里执行一个耗时任务,比如调用 ffmpeg 命令来剪辑一个 10 秒的视频,那么在这 10 秒内,整个应用界面将完全冻结——无法点击按钮,无法拖动窗口,在用户看来就是“程序卡死了”。
这就是我们首先要解决的问题。解决方案很明确:将所有耗时操作都放到后台工作线程中去执行。
架构设计:qthreadpool与qrunnable
在 qt 中,处理并发任务的首选方式之一就是 qthreadpool 和 qrunnable。
qthreadpool: 一个托管的线程池,负责管理和复用工作线程,避免了手动创建和销毁线程的开销。qrunnable: 一个轻量级的工作任务抽象类。我们只需要继承它并实现run()方法,就可以将具体的业务逻辑(如调用 ffmpeg)封装起来。
在我的代码中,定义了两个这样的任务类:
loadsubtitlestask: 负责读取和解析字幕文件。虽然这通常很快,但对于非常大的字幕文件,也可能造成瞬间卡顿,因此也将其放入后台。cliptask: 核心的剪辑任务,封装了对subprocess.run()的调用来执行ffmpeg命令。
class cliptask(qrunnable):
def __init__(self, video_path, sub, line_num, ...):
super().__init__()
# ... 初始化
def run(self):
# ... 构建 ffmpeg 命令
cmd = ["ffmpeg", "-ss", start_time, "-i", video_path, ...]
try:
# 这是耗时操作
subprocess.run(cmd, check=true, capture_output=true)
# 任务成功了
except exception as e:
# 任务失败了
pass
现在问题来了:任务在后台线程执行,但执行的结果(成功、失败、进度)需要更新到主线程的 ui 控件上。直接在工作线程中操作 ui 控件是绝对禁止的,这会导致线程不安全,轻则界面错乱,重则程序崩溃。
这就是 qt 的精髓——信号与槽(signal/slot)机制——登场的时候了。
用信号与槽实现线程安全通信
信号与槽是 qt 框架的核心特性,它是一种听起来有些复杂但实际还算简单的观察者模式实现,完美地解决了跨线程通信的难题。
- signal(信号): 当某个特定事件发生时,一个对象可以“发射(emit)”一个信号。信号可以携带参数。
- slot(槽): 一个可以接收并处理信号的函数或方法。
当一个对象的信号连接(connect)到另一个对象的槽时,一旦信号被发射,与之关联的槽函数就会被自动调用。最关键的是,如果信号是从工作线程发射,而接收槽位于主线程的对象上,qt 会自动、安全地将这个调用安排到主线程的事件循环中执行。
在我的代码中,创建了一个专门的 signals 类来统一定义所有需要用到的信号:
# 必须继承自 qobject 才能定义信号
class signals(qobject):
# 进度更新信号,携带一个字符串参数
progress = signal(str)
# 所有任务完成的信号,不带参数
finished = signal()
# 字幕加载完成的信号,携带解析后的数据和文件名
subtitles_loaded = signal(list, str)
# 加载出错的信号
load_error = signal(str)
工作流程:
初始化与连接:在主窗口 mainwindow 的 __init__ 方法中,实例化 signals 类,并将信号连接到对应的槽函数。
class mainwindow(qwidget):
def __init__(self):
# ...
self.signals = signals()
# 将 progress 信号连接到 update_progress 槽函数
self.signals.progress.connect(self.update_progress)
self.signals.finished.connect(self.clipping_finished)
# ...
任务分发与信号发射:当用户点击“开始剪辑”按钮时,start_clipping 方法会为每个选中的字幕行创建一个 cliptask 实例,并将我们的 signals 对象传递给它。
# 在 start_clipping 中
for line_num in self.selected_lines:
sub = self.subtitles[line_num - 1]
# 将 signals 对象传入任务
task = cliptask(..., self.signals)
# 提交到线程池执行
self.thread_pool.start(task)
后台发射信号:cliptask 在其 run 方法中执行完 ffmpeg 命令后,根据结果发射不同的信号。
# 在 cliptask.run 中
class cliptask(qrunnable):
def run(self):
try:
subprocess.run(...)
# 任务成功,从工作线程发射 progress 信号
self.signals.progress.emit(f"completed clip {self.line_num}")
except subprocess.calledprocesserror as e:
# 任务失败,同样发射 progress 信号,但内容是错误信息
self.signals.progress.emit(f"failed clip {self.line_num}: {e}")
主线程安全接收并更新 ui:由于我们在第一步中建立了连接,主线程的 update_progress 方法会被自动调用。我们可以在这个方法里安全地更新 qlabel 等 ui 控件。
# 在 mainwindow 中定义的槽函数
def update_progress(self, message: str):
if "completed" in message:
self.completed_clips += 1
elif "failed" in message:
self.failed_clips.append(message)
self.progress_label.settext(
f"共: {self.total_clips}, 完成: {self.completed_clips}, 失败: {len(self.failed_clips)}"
)
# 检查是否所有任务都结束了
if self.completed_clips + len(self.failed_clips) >= self.total_clips:
self.signals.finished.emit()
通过这个机制,我们成功地将耗时的业务逻辑与 ui 更新解耦,实现了应用的流畅响应。即使用户一次性剪辑上百个片段,界面也依然能自如操作。
用uv实现零依赖运行
对于 python 开发者来说,环境和依赖管理常常是个头疼的问题。为了让这个工具对技术小白也足够友好,我采用了最近大火的 uv 工具。
通过在脚本文件头部添加特定的注释,uv 能够读取这些元数据,自动创建一个虚拟环境并安装所需的依赖,然后执行脚本。
# /// script # requires-python = ">=3.10,<=3.12" # dependencies = [ # "pyside6", # "pysubs2" # ] # ///
这意味着用户只需要下载 uv.exe 和 app.py,无需手动 pip install 任何东西,只需在命令行运行 uv run app.py 即可启动程序。这极大地降低了使用门槛,也为代码分发提供了一种轻量级的解决方案。
完整代码
下面是完整的项目代码,涵盖了从 ui 布局、事件处理到并发编程和线程安全通信的各个方面。
# /// script
# requires-python = ">=3.10,<=3.12"
# dependencies = [
# "pyside6",
# "pysubs2"
# ]
#
# [[tool.uv.index]]
# url = "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple"
#
# ///
import sys
import os,shutil,platform
from pathlib import path
import subprocess
from pyside6.qtwidgets import (
qapplication, qwidget, qvboxlayout, qhboxlayout, qpushbutton,
qlabel, qfiledialog, qlistwidget, qlistwidgetitem, qcheckbox,
qcombobox
)
from pyside6.qtcore import qt, qthreadpool, qrunnable, signal, qobject, qurl, slot,qsize
from pyside6.qtgui import qdesktopservices
from datetime import timedelta
import pysubs2
# 全局输出文件夹
root_dir=path(os.getcwd()).as_posix()
output_folder = f"{root_dir}/output"
class signals(qobject):
progress = signal(str)
finished = signal()
subtitles_loaded = signal(list, str) # list of (i, text), subtitle_name
load_error = signal(str)
class cliptask(qrunnable):
def __init__(self, video_path, sub, line_num, subtitle_name, signals, mode):
super().__init__()
self.video_path = video_path
self.sub = sub
self.line_num = line_num
self.subtitle_name = subtitle_name
self.signals = signals
self.mode = mode
def run(self):
try:
start_time = self.sub.start / 1000.0
duration = (self.sub.end - self.sub.start) / 1000.0
output_dir = f'{output_folder}/{self.subtitle_name}'
os.makedirs(output_dir, exist_ok=true)
if self.mode == 0: # 默认
output_path = os.path.join(output_dir, f"{self.line_num}.mp4")
cmd = [
"ffmpeg","-y", "-ss", str(start_time), "-t", str(duration),
"-i", self.video_path, "-c:v", "copy", "-c:a", "copy",
output_path
]
subprocess.run(cmd, check=true, capture_output=true)
elif self.mode == 1: # 仅视频
output_path = os.path.join(output_dir, f"{self.line_num}.mp4")
cmd = [
"ffmpeg","-y", "-ss", str(start_time), "-t", str(duration),
"-i", self.video_path, "-c:v", "copy", "-an",
output_path
]
subprocess.run(cmd, check=true, capture_output=true)
elif self.mode == 2: # 仅音频
output_path = os.path.join(output_dir, f"{self.line_num}.wav")
cmd = [
"ffmpeg", "-y","-ss", str(start_time), "-t", str(duration),
"-i", self.video_path, "-vn", "-c:a", "pcm_s16le",
output_path
]
subprocess.run(cmd, check=true, capture_output=true)
elif self.mode == 3: # 分离
# 无声视频
video_path_out = os.path.join(output_dir, f"{self.line_num}.mp4")
cmd_video = [
"ffmpeg","-y", "-ss", str(start_time), "-t", str(duration),
"-i", self.video_path, "-c:v", "copy", "-an",
video_path_out
]
subprocess.run(cmd_video, check=true, capture_output=true)
# 音频
audio_path_out = os.path.join(output_dir, f"{self.line_num}.wav")
cmd_audio = [
"ffmpeg","-y", "-ss", str(start_time), "-t", str(duration),
"-i", self.video_path, "-vn", "-c:a", "pcm_s16le",
audio_path_out
]
subprocess.run(cmd_audio, check=true, capture_output=true)
# 注意:completed 和 failed 不可修改,ui线程据此判断成功与失败,丑陋但简单
self.signals.progress.emit(f"completed clip {self.line_num}")
except subprocess.calledprocesserror as e:
error_msg = e.stderr.decode() if e.stderr else str(e)
self.signals.progress.emit(f"failed clip {self.line_num}: {error_msg}")
except exception as e:
self.signals.progress.emit(f"failed clip {self.line_num}: {str(e)}")
class loadsubtitlestask(qrunnable):
def __init__(self, subtitle_path, signals):
super().__init__()
self.subtitle_path = subtitle_path
self.signals = signals
def _format_time(self,ms):
hours = ms // (1000 * 3600)
minutes = (ms // (1000 * 60)) % 60
seconds = (ms // 1000) % 60
milliseconds = ms % 1000
return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}"
def run(self):
try:
subtitles = pysubs2.load(self.subtitle_path)
subtitle_name = os.path.splitext(os.path.basename(self.subtitle_path))[0]
data = []
for i, sub in enumerate(subtitles, 1):
start = self._format_time(sub.start)
end = self._format_time(sub.end)
duration = (sub.end - sub.start) / 1000.0
text = f"{start}->{end}({duration:.2f}s) {sub.text}"
data.append((i, text))
self.signals.subtitles_loaded.emit(data, subtitle_name)
except exception as e:
self.signals.load_error.emit(str(e))
class mainwindow(qwidget):
def __init__(self):
super().__init__()
self.setwindowtitle("按字幕剪辑视频")
self.resize(1000, 600)
self.setwindowflags(self.windowflags() | qt.windowmaximizebuttonhint | qt.windowminimizebuttonhint)
self.video_path = none
self.subtitle_path = none
self.subtitles = none
self.subtitle_name = none
self.selected_lines = []
self.thread_pool = qthreadpool()
self.is_clipping = false
self.signals = signals()
self.signals.progress.connect(self.update_progress)
self.signals.finished.connect(self.clipping_finished)
self.signals.subtitles_loaded.connect(self.on_subtitles_loaded)
self.signals.load_error.connect(self.on_load_error)
self.total_clips = 0
self.completed_clips = 0
self.failed_clips = []
self.open_button = none
self.active_tasks = 0
self.setup_ui()
def setup_ui(self):
layout = qvboxlayout()
# 文件选择
file_layout = qhboxlayout()
self.video_label = qlabel("未选视频")
video_btn = qpushbutton("选择视频")
video_btn.clicked.connect(self.select_video)
video_btn.setminimumsize(qsize(200, 35))
video_btn.setcursor(qt.pointinghandcursor)
file_layout.addwidget(video_btn)
file_layout.addwidget(self.video_label)
self.subtitle_label = qlabel("未选字幕")
subtitle_btn = qpushbutton("选择字幕")
subtitle_btn.setminimumsize(qsize(200, 35))
subtitle_btn.clicked.connect(self.select_subtitle)
subtitle_btn.setcursor(qt.pointinghandcursor)
file_layout.addwidget(subtitle_btn)
file_layout.addwidget(self.subtitle_label)
# 输出模式下拉列表
self.output_mode = qcombobox()
self.output_mode.additems([
"默认(mp4片段有声)",
"仅保留视频(mp4片段无声)",
"仅保留音频(wav音频片段)",
"声画分离(mp4片段无声和wav音频片段)"
])
file_layout.addwidget(self.output_mode)
file_layout.addstretch()
layout.addlayout(file_layout)
# 批量选择按钮
batch_layout = qhboxlayout()
select_all_btn = qpushbutton("全选")
select_all_btn.clicked.connect(self.select_all)
select_all_btn.setcursor(qt.pointinghandcursor)
batch_layout.addwidget(select_all_btn)
deselect_all_btn = qpushbutton("全不选")
deselect_all_btn.clicked.connect(self.deselect_all)
deselect_all_btn.setcursor(qt.pointinghandcursor)
batch_layout.addwidget(deselect_all_btn)
invert_btn = qpushbutton("反选")
invert_btn.clicked.connect(self.invert_selection)
invert_btn.setcursor(qt.pointinghandcursor)
batch_layout.addwidget(invert_btn)
batch_layout.addstretch()
layout.addlayout(batch_layout)
# 字幕列表
self.subtitle_list = qlistwidget()
layout.addwidget(self.subtitle_list)
# 底部按钮
btn_layout = qhboxlayout()
self.clip_btn = qpushbutton("开始剪辑")
self.clip_btn.setcursor(qt.pointinghandcursor)
self.clip_btn.setminimumsize(qsize(200, 35))
self.clip_btn.clicked.connect(self.start_clipping)
btn_layout.addwidget(self.clip_btn)
self.clear_btn = qpushbutton("清除已选")
self.clear_btn.setcursor(qt.pointinghandcursor)
self.clear_btn.setmaximumwidth(150)
self.clear_btn.clicked.connect(self.clear_all)
btn_layout.addwidget(self.clear_btn)
self.open_button = qpushbutton("打开输出目录")
self.open_button.setmaximumwidth(200)
self.open_button.setcursor(qt.pointinghandcursor)
self.open_button.clicked.connect(self.open_output_folder)
self.open_button.hide()
btn_layout.addwidget(self.open_button)
layout.addlayout(btn_layout)
self.progress_label = qlabel("")
self.progress_label.setstylesheet('color:#2196f3;font-size:14px')
layout.addwidget(self.progress_label)
self.setlayout(layout)
def select_video(self):
path, _ = qfiledialog.getopenfilename(self, "选择视频", "", "video files (*.mp4 *.avi *.mkv)")
if path:
self.video_path = path
self.video_label.settext(os.path.basename(path))
def select_subtitle(self):
global output_folder
path, _ = qfiledialog.getopenfilename(self, "选择对应字幕", "", "subtitle files (*.srt *.ass *.vtt)")
if path:
output_folder=path(path).parent.as_posix()
self.subtitle_path = path
self.subtitle_label.settext(os.path.basename(path))
self.subtitle_list.clear()
self.progress_label.settext("正在渲染字幕...")
task = loadsubtitlestask(self.subtitle_path, self.signals)
self.thread_pool.start(task)
@slot(list, str)
def on_subtitles_loaded(self, data, subtitle_name):
self.subtitles = pysubs2.load(self.subtitle_path) # reload if needed
self.subtitle_name = subtitle_name
self.subtitle_list.clear()
for i, text in data:
item = qlistwidgetitem()
check = qcheckbox(f"{i}行: {text}")
self.subtitle_list.additem(item)
self.subtitle_list.setitemwidget(item, check)
self.progress_label.settext(f"字幕渲染完成.剪辑后输出到:{output_folder}/{subtitle_name}")
@slot(str)
def on_load_error(self, error):
self.progress_label.settext(f"字幕渲染出错: {error}")
def select_all(self):
for i in range(self.subtitle_list.count()):
item = self.subtitle_list.item(i)
check = self.subtitle_list.itemwidget(item)
check.setchecked(true)
def deselect_all(self):
for i in range(self.subtitle_list.count()):
item = self.subtitle_list.item(i)
check = self.subtitle_list.itemwidget(item)
check.setchecked(false)
def invert_selection(self):
for i in range(self.subtitle_list.count()):
item = self.subtitle_list.item(i)
check = self.subtitle_list.itemwidget(item)
check.setchecked(not check.ischecked())
def clear_all(self):
self.video_path = none
self.subtitle_path = none
self.subtitles = none
self.subtitle_name = none
self.selected_lines = []
self.video_label.settext("未选视频")
self.subtitle_label.settext("未选字幕")
self.subtitle_list.clear()
self.progress_label.settext("")
self.clip_btn.settext("开始剪辑")
self.is_clipping = false
self.total_clips = 0
self.completed_clips = 0
self.failed_clips = []
self.output_mode.setcurrentindex(0)
self.active_tasks = 0
self.open_button.hide()
def start_clipping(self):
if self.is_clipping:
self.stop_clipping()
return
if not self.video_path or not self.subtitle_name:
self.progress_label.settext("请选择待剪辑视频及对应字幕文件.")
return
self.selected_lines = []
for i in range(self.subtitle_list.count()):
item = self.subtitle_list.item(i)
check = self.subtitle_list.itemwidget(item)
if check.ischecked():
self.selected_lines.append(i + 1) # 1-based
if not self.selected_lines:
self.progress_label.settext("至少请选中一行字幕.")
return
mode = self.output_mode.currentindex()
self.is_clipping = true
self.clip_btn.settext("立即停止")
self.total_clips = len(self.selected_lines)
self.completed_clips = 0
self.failed_clips = []
self.open_button.show()
self.active_tasks = self.total_clips
self.progress_label.settext(f"共: {self.total_clips}, 完成: 0, 失败: 0")
for line_num in self.selected_lines:
sub = self.subtitles[line_num - 1]
task = cliptask(self.video_path, sub, line_num, self.subtitle_name, self.signals, mode)
self.thread_pool.start(task)
def stop_clipping(self):
self.thread_pool.clear()
self.is_clipping = false
self.clip_btn.settext("开始剪辑")
self.progress_label.settext("立即停止.")
self.active_tasks = 0
def update_progress(self, message):
if "completed" in message:
self.completed_clips += 1
elif "failed" in message:
self.failed_clips.append(message)
self.active_tasks -= 1
self.progress_label.settext(
f"共: {self.total_clips}, 完成: {self.completed_clips}, "
f"失败: {len(self.failed_clips)}\n" + "\n".join(self.failed_clips)
)
if self.active_tasks <= 0 and self.is_clipping:
self.signals.finished.emit()
def clipping_finished(self):
self.is_clipping = false
self.clip_btn.settext("开始剪辑")
self.active_tasks = 0
def open_output_folder(self):
output_dir = f'{output_folder}/{self.subtitle_name}'
qdesktopservices.openurl(qurl.fromlocalfile(output_dir))
if __name__ == "__main__":
app = qapplication(sys.argv)
window = mainwindow()
window.show()
sys.exit(app.exec())
希望这个实例能为你下次构建桌面应用时,在处理并发和线程通信方面提供一些有用的参考。
到此这篇关于python+pyside6构建一个响应式视频剪辑工具的文章就介绍到这了,更多相关python视频剪辑内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论