一、背景
开发的qt工具需要给不同的部门间使用,工具版本迭代需要经历打包->压缩->上传到共享目录->下载解压,协作十分不爽。且pyqt5不像原生的qt有自己的更新框架,因此需要自己实现一套更新逻辑。
二、需要关注的核心问题
- qt应用需要运行起来后,才能有可交互的界面,即优先执行
app = qapplication(sys.argv) - 更新时,需要替换掉有修改的模块,但是在qt程序运行时,某些模块是被占用的,无法被替换删除,因此需要在qt程序停止后才能替换。
三、实现思路
- 另启动一个监控程序,专门负责更新。
- 通过编写执行.bat文件,负责qt程序停止运行后的更新、替换、重新启动等一系列操作。 很明显,第二种方法看起来更简洁,迅速。
四、具体实现
# main.py
import argparse
import os
import sys
from pyqt5.qtwidgets import qapplication, qmessagebox
from packaging import version
from main import mainform
from updates_scripts.updater import update_main
def check_is_need_update():
"""检查应用程序是否需要更新"""
try:
remote_path = r'\\10.10.10.10' # 我这里是共享文件服务器,根据需求自己替换
# 读取本地版本
with open('version.bin', 'r', encoding='utf-8') as f:
local_version = f.readline().strip()
# 遍历远程文件夹查找最新版本
max_version = version.parse('0')
update_path = ''
for root, dirs, files in os.walk(remote_path):
for file in files:
if file.startswith('debugtool_v') and file.endswith('.zip'):
remote_version = file[file.find('_v') + 2:file.rfind('.')]
try:
remote_ver = version.parse(remote_version)
if remote_ver > max_version:
max_version = remote_ver
update_path = os.path.join(root, file)
except version.invalidversion:
continue
# 比较版本
local_ver = version.parse(local_version)
if max_version > local_ver:
# 必须在主线程中显示qt对话框
reply = qmessagebox.question(
none,
"更新提示",
f"发现新版本 {max_version} (当前版本 {local_version}),是否更新?",
qmessagebox.yes | qmessagebox.no,
qmessagebox.yes
)
if reply == qmessagebox.yes:
print("用户选择更新")
root_path = os.path.dirname(os.path.abspath(sys.argv[0]))
return {
'update': true,
'root_dir': root_path,
'version': f"debugtool_v{str(max_version)}",
'remote_file': update_path
}
else:
print("用户取消更新")
return {'update': false}
except exception as e:
print(f"检查更新时出错: {str(e)}")
return {'update': false, 'error': str(e)}
if __name__ == "__main__":
# 初始化qt应用
app = qapplication(sys.argv)
parser = argparse.argumentparser()
parser.add_argument('--check', action='store_true', help='是否需要检查更新')
args = parser.parse_args()
# 检查更新
if not args.check:
update_info = check_is_need_update()
if not args.check and update_info.get('update'):
# 执行更新逻辑
print(f"准备更新到: {update_info['version']}")
# 这里添加您的更新逻辑
if update_main(root_dir=update_info['root_dir'], version=update_info['version'],
remote_file=update_info['remote_file']):
root_path = update_info['root_dir']
bat_content = f"""@echo off
rem 等待主程序退出
:loop
tasklist /fi "imagename eq debugtool.exe" 2>nul | find /i "debugtool.exe" >nul
if "%errorlevel%"=="0" (
timeout /t 1 /nobreak >nul
goto loop
)
rem 删除旧文件
del /f /q "{root_path}\*.*"
for /d %%p in ("{root_path}\*") do (
if /i not "%%~nxp"=="debugtool" (
if /i not "%%~nxp"=="plans" (
if /i not "%%~nxp"=="logs" (
if /i not "%%~nxp"=="results" (
rmdir "%%p" /s /q
)
)
)
)
)
rem 拷贝新版本文件(使用robocopy实现可靠拷贝)
robocopy {os.path.join(root_path, 'debugtool')} {root_path} /e
rem 启动新版本
start {root_path}\debugtool\debugtool.exe --check
rem 删除临时文件
del "%~f0"
"""
# 将批处理脚本写入临时文件
bat_path = os.path.join(root_path, 'debugtool', "update.bat")
with open(bat_path, 'w') as f:
f.write(bat_content)
# 启动批处理脚本
os.startfile(bat_path)
# 退出当前程序
qapplication.instance().quit()
else:
# 弹出更新失败提示框
qmessagebox.critical(none, "更新失败", "更新失败,请检查或重新打开选择不更新")
sys.exit(0)
else:
# 正常启动应用
win = mainform()
win.show()
sys.exit(app.exec_())
updater.py
# -*- coding: utf-8 -*-
"""
time : 2025/5/6 20:50
author : jiaqi.wang
"""
# -*- coding: utf-8 -*-
"""
time : 2025/5/6 9:39
author : jiaqi.wang
"""
import json
import os
import shutil
import sys
import zipfile
from datetime import datetime
from pyqt5.qtcore import qsettings, qt
from pyqt5.qtwidgets import (qapplication, qlabel, qmessagebox, qprogressdialog)
def get_resource_path(relative_path):
"""获取资源的绝对路径,兼容开发模式和打包后模式"""
if hasattr(sys, '_meipass'):
return os.path.join(sys._meipass, relative_path)
return os.path.join(os.path.abspath("."), relative_path)
class updateconfig:
"""更新系统配置"""
def __init__(self):
self.settings = qsettings("debugtool", "debugtool")
# 修改为共享文件夹路径
self.config = {
'update_url': r"\\10.10.10.10",
'auto_check': self.settings.value("auto_update", true, bool),
'allow_incremental': true,
'max_rollback_versions': 3,
'app_name': 'debugtool',
'main_executable': 'debugtool.exe' if sys.platform == 'win32' else 'debugtool'
}
def __getitem__(self, key):
return self.config[key]
def set_auto_update(self, enabled):
self.config['auto_check'] = enabled
self.settings.setvalue("auto_update", enabled)
class sharedfiledownloader:
"""从windows共享文件夹下载文件的下载器"""
def __init__(self, config, parent=none):
self.config = config
self.parent = parent
self.temp_dir = os.path.join(os.path.expanduser("~"), "temp", f"{self.config['app_name']}_updates")
os.makedirs(self.temp_dir, exist_ok=true)
def download_update(self, update_info, progress_callback=none):
"""从共享文件夹下载更新包"""
try:
if update_info['update_type'] == 'incremental':
# 增量更新
return self.download_incremental(update_info, progress_callback)
else:
# 全量更新
return self.download_full(update_info, progress_callback)
except exception as e:
raise exception(f"从共享文件夹下载失败: {str(e)}")
def download_full(self, update_info, progress_callback):
"""下载完整更新包"""
local_file = os.path.join(self.temp_dir, f"{update_info['version']}.zip")
self._copy_from_share(update_info['remote_file'], local_file, progress_callback)
return local_file
def download_incremental(self, update_info, progress_callback):
"""下载增量更新包"""
incremental = update_info['incremental']
remote_file = f"incremental/{incremental['file']}"
local_file = os.path.join(self.temp_dir, f"{update_info['version']}.zip")
self._copy_from_share(remote_file, local_file, progress_callback)
# 下载差异文件清单
remote_manifest = f"incremental/{incremental['manifest']}"
local_manifest = os.path.join(self.temp_dir, f"inc_{update_info['version']}.json")
self._copy_from_share(remote_manifest, local_manifest, none)
return local_file
def _copy_from_share(self, remote_path, local_path, progress_callback):
"""从共享文件夹复制文件"""
# 构造完整的共享路径
source_path = remote_path
# 确保使用unc路径格式
if not source_path.startswith('\\\\'):
source_path = '\\\\' + source_path.replace('/', '\\')
# 标准化路径
source_path = os.path.normpath(source_path)
local_path = os.path.normpath(local_path)
# 确保本地目录存在
os.makedirs(os.path.dirname(local_path), exist_ok=true)
if progress_callback:
progress_callback(0, f"准备从共享文件夹 {source_path} 复制...")
qapplication.processevents()
try:
# 获取文件大小用于进度显示
total_size = os.path.getsize(source_path)
# 模拟进度更新(文件复制是原子操作)
if progress_callback:
progress_callback(30, "正在从共享文件夹复制文件...")
qapplication.processevents()
# 执行文件复制
shutil.copy2(source_path, local_path)
# 验证文件
if os.path.getsize(local_path) != total_size:
raise exception("文件大小不匹配,复制可能不完整")
if progress_callback:
progress_callback(100, "文件复制完成!")
except exception as e:
# 清理可能已部分复制的文件
if os.path.exists(local_path):
try:
os.remove(local_path)
except:
pass
raise e
class updateapplier:
"""更新应用器"""
def __init__(self, config, backup_dir):
self.config = config
self.backup_dir = backup_dir
os.makedirs(self.backup_dir, exist_ok=true)
def apply_update(self, update_file, update_info, progress_callback=none):
"""应用更新"""
try:
if progress_callback:
progress_callback(10, "开始应用更新...")
if update_info['update_type'] == 'incremental':
self._apply_incremental_update(update_file, update_info)
else:
self._apply_full_update(update_file, update_info['root_dir'])
if progress_callback:
progress_callback(90, "更新版本信息...")
self._update_version_file(update_info['version'], update_info['root_dir'])
if progress_callback:
progress_callback(100, "更新完成!")
return true
except exception as e:
if progress_callback:
progress_callback(0, f"更新失败: {str(e)}")
raise exception(f"更新失败: {str(e)}")
def _create_backup(self):
"""创建当前版本的备份"""
timestamp = datetime.now().strftime("%y%m%d_%h%m%s")
backup_path = os.path.join(self.backup_dir, f"backup_{timestamp}")
os.makedirs(backup_path, exist_ok=true)
# 备份整个应用目录
app_dir = r'xxx'
for item in os.listdir(app_dir):
src = os.path.join(app_dir, item)
if os.path.isfile(src):
shutil.copy2(src, os.path.join(backup_path, item))
elif os.path.isdir(src) and not item.startswith('_'):
shutil.copytree(src, os.path.join(backup_path, item))
return backup_path
def _apply_full_update(self, update_file, root_dir):
"""应用完整更新"""
# 解压更新包
with zipfile.zipfile(update_file, 'r') as zip_ref:
zip_ref.extractall(root_dir)
# 解压到临时目录
temp_dir = os.path.join(os.path.dirname(update_file), "temp_full_update")
with zipfile.zipfile(update_file, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
# 删除解压后的目录
shutil.rmtree(os.path.dirname(os.path.dirname(update_file)))
shutil.rmtree(self.backup_dir)
def _apply_incremental_update(self, update_file, update_info):
"""应用增量更新"""
root_dir = update_info['root_dir']
# 解析差异文件清单
manifest_file = os.path.join(
os.path.dirname(update_file),
f"inc_{update_info['version']}.json"
)
with open(manifest_file, 'r') as f:
manifest = json.load(f)
# 解压增量包
temp_dir = os.path.join(os.path.dirname(update_file), "temp_inc_update")
with zipfile.zipfile(update_file, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
# 应用更新
for action in manifest['actions']:
src = os.path.join(temp_dir, action['path'])
dst = os.path.join(root_dir, action['path'])
if action['type'] == 'add' or action['type'] == 'modify':
os.makedirs(os.path.dirname(dst), exist_ok=true)
if os.path.exists(dst):
os.remove(dst)
shutil.move(src, dst)
elif action['type'] == 'delete':
if os.path.exists(dst):
if os.path.isfile(dst):
os.remove(dst)
else:
shutil.rmtree(dst)
def _update_version_file(self, new_version, root_dir):
"""更新版本号文件"""
version_file = os.path.join(root_dir, 'version.txt')
with open(version_file, 'w') as f:
f.write(new_version)
def _rollback_update(self, backup_path):
"""回滚到备份版本"""
if not os.path.exists(backup_path):
return false
app_dir = r'xxx'
# 恢复备份
for item in os.listdir(backup_path):
src = os.path.join(backup_path, item)
dst = os.path.join(app_dir, item)
if os.path.exists(dst):
if os.path.isfile(dst):
os.remove(dst)
else:
shutil.rmtree(dst)
if os.path.isfile(src):
shutil.copy2(src, dst)
else:
shutil.copytree(src, dst)
return true
class updateprogressdialog(qprogressdialog):
"""更新进度对话框"""
def __init__(self, parent=none):
super().__init__("", "取消", 0, 100, parent)
self.setwindowtitle("正在更新")
self.setwindowmodality(qt.windowmodal)
self.setfixedsize(500, 150)
self.detail_label = qlabel()
self.setlabel(self.detail_label)
def update_progress(self, value, message):
self.setvalue(value)
self.detail_label.settext(message)
qapplication.processevents()
def update_main(root_dir: str, version: str, remote_file: str):
try:
config = updateconfig()
update_info = {
"version": version,
"update_type": "full",
"changelog": "修复了一些bug",
'root_dir': root_dir,
'remote_file': remote_file
}
progress = updateprogressdialog()
downloader = sharedfiledownloader(config)
applier = updateapplier(config, os.path.join(os.path.expanduser("~"), f"{config['app_name']}_backups"))
# 下载更新
update_file = downloader.download_update(
update_info,
progress.update_progress
)
# 应用更新
if progress.wascanceled():
sys.exit(1)
success = applier.apply_update(
update_file,
update_info,
progress.update_progress
)
if success:
qmessagebox.information(
none,
"更新完成",
"更新已成功安装,点击ok后,即将重新启动应用程序(10s内)。"
)
return true
return true
except:
return false
五、总结
到此这篇关于pyqt5程序自动更新的实现代码的文章就介绍到这了,更多相关pyqt5程序自动更新内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论