前言:脚本虽好,但总不能一直“盯着”它吧?
你是否已经编写了许多强大的python自动化脚本,例如我们之前做的文件整理、excel数据处理、图片批量压缩,甚至打包成了exe文件?它们极大地提升了你的办公自动化效率!
然而,这些自动化工具依然需要你“亲自”去启动它们。我们需要的是
让自动化脚本在特定时间自动运行?
让它每隔一段时间就重复执行一次,无需干预?

彻底告别手动启动**,实现真正的**“无人值守”自动运行**,解放双手?
恭喜你!今天,我将带你进入python自动化任务调度的奇妙世界!我们将手把手教你如何利用python的schedule和apscheduler库,让你的自动化脚本拥有“时间观念”,彻底告别手动启动!
轻量级调度: 使用schedule库,快速实现简单的定时任务。
企业级调度: 使用apscheduler,打造更强大、更健壮、不怕掉线的定时任务系统。
1.自动化任务调度核心:让脚本自己调用自己
在办公自动化中,仅仅编写好脚本是第一步。如何让这些脚本在需要的时候自动运行,无需人工干预,是实现真正无人值守自动化的关键。
1.1 什么是任务调度?为什么需要无人值守?
任务调度: 指的是在预设的时间点、或按照设定的时间间隔,自动执行某个程序或脚本的过程。
“无人值守”: 意味着任务可以在后台运行,即使你没有打开终端,甚至电脑重启后也能继续工作,无需你手动启动或关闭任何窗口
为什么你需要它?
告别重复: 清理垃圾、备份文件、发送日报等重复性任务,无需再手动提醒自己。
提升效率: 确保任务在最合适的时间(如夜间、下班后)自动完成,不影响你的工作。
数据及时性: 报表、数据抓取等任务可以每天定时更新,确保你总是获取最新信息。
系统维护: 自动进行电脑优化、磁盘空间清理,让你的电脑始终保持最佳状态。
python社区提供了多种任务调度库,让我们能够轻松实现这些功能。
1.2 python定时任务基础:schedule库的基本用法
场景: 你想让你的python清理脚本每天凌晨1点自动运行,或者每隔10分钟就检查一次文件状态。
方案: schedule库是一个轻量级、api简洁的定时任务库。它让你用接近自然语言的方式来定义任务,非常适合python定时任务的快速入门!
安装schedule:
pip install schedule
代码:
import schedule
import time # 用于暂停程序,让调度器有机会运行
import os
import datetime
# 假设这个函数就是你之前写的清理函数,这里为了演示简化为打印日志
def perform_cleanup_job(folder_path, days_threshold):
"""
模拟一个文件清理任务。
在实际应用中,这里会调用你之前编写的 auto_clean_folder_once 函数。
"""
print(f"⏰ [{datetime.datetime.now().strftime('%h:%m:%s')}] 正在执行定时清理任务...")
print(f" - 清理目录: {folder_path}")
print(f" - 删除 {days_threshold} 天前的过期文件。")
# 在实际代码中,这里会调用 auto_clean_folder_once(folder_path, days_threshold)
print(" ✨ 本次清理任务完成!")
if __name__ == "__main__":
# 请替换为你要定时清理的文件夹路径
target_folder_for_cleanup = os.path.expanduser("~/desktop/每日自动清理")
os.makedirs(target_folder_for_cleanup, exist_ok=true) # 确保文件夹存在
print("🚀 正在启动python自动化任务调度器 (schedule库演示)...")
print("--- 请注意:此脚本需要持续运行才能保持调度! ---")
# 示例1:每隔5秒执行一次任务
schedule.every(5).seconds.do(perform_cleanup_job, target_folder_for_cleanup, 7) # 模拟清理7天前文件
print(f"任务已设置:每5秒执行一次清理任务,目标文件夹:{target_folder_for_cleanup}")
# 示例2:每隔1分钟执行一次
# schedule.every(1).minutes.do(perform_cleanup_job, target_folder_for_cleanup, 14)
# print("任务已设置:每1分钟执行一次清理任务。")
# 示例3:每天的特定时间执行任务 (例如每天凌晨1点)
# schedule.every().day.at("01:00").do(perform_cleanup_job, target_folder_for_cleanup, 30)
# print("任务已设置:每天凌晨01:00执行清理任务。")
# 示例4:每周三的下午14:30执行
# schedule.every().wednesday.at("14:30").do(perform_cleanup_job, target_folder_for_cleanup, 60)
# print("任务已设置:每周三14:30执行清理任务。")
# 主循环:持续检查是否有待执行的任务
while true:
schedule.run_pending() # 运行所有待执行的任务
time.sleep(1) # 等待1秒,避免cpu占用过高
步骤:
安装schedule: 在vs code终端运行 pip install schedule。
修改代码路径: 复制上方代码到vs code,保存为schedule_basic_demo.py。修改 target_folder_for_cleanup。
运行: 在vs code终端运行 python schedule_basic_demo.py。
展示:

2.进阶任务调度:apscheduler,你的小闹钟!
schedule库虽然简单易用,但它需要脚本持续运行在内存中,并且没有内置的持久化、多线程、作业重试等功能。
对于更复杂、更健壮的python自动化任务调度需求,你需要apscheduler(advanced python scheduler)。它是一个功能全面的调度库,能够应对更严苛的生产环境。
作用: apscheduler提供了一个灵活的调度器,可以脱离主程序线程运行,支持多种存储(内存、数据库)和执行方式。
2.1apscheduler安装与核心概念:调度器、作业存储、执行器
安装apscheduler:
pip install apscheduler
常用组件:
scheduler (调度器): 核心组件,负责管理和触发作业(job)。
job store (作业存储): 存储作业的地方,可以是内存、数据库(sqlite, mongodb, postgresql等)。这使得任务可以持久化,即使程序重启也不会丢失。
executor (执行器): 定义作业如何被执行(如线程池、进程池)。
trigger (触发器): 定义作业何时运行(如日期、间隔、cron表达式)。
2.2 定时运行与间隔运行:精准控制你的任务
场景: 你需要一个更可靠的定时任务,或者想在特定的日期执行一次性任务,或更灵活地使用cron表达式。
方案: apscheduler支持多种触发器,让你对任务执行时间有更精准的控制。
代码:
from apscheduler.schedulers.background import backgroundscheduler # 后台调度器
from apscheduler.triggers.interval import intervaltrigger # 间隔触发器
from apscheduler.triggers.date import datetrigger # 日期触发器
from apscheduler.triggers.cron import crontrigger # cron表达式触发器
import os
import datetime
import time
def perform_data_sync_job():
"""
模拟一个数据同步任务。
实际中,这里可以调用你的其他自动化脚本。
"""
print(f"🔄 [{datetime.datetime.now().strftime('%h:%m:%s')}] 正在执行数据同步任务...")
# 模拟任务耗时
time.sleep(2)
print(" ✨ 数据同步任务完成!")
def perform_weekly_report_job():
"""模拟一个每周生成报告的任务"""
print(f"📊 [{datetime.datetime.now().strftime('%h:%m:%s')}] 正在生成每周报告...")
time.sleep(3)
print(" ✨ 每周报告生成完成!")
if __name__ == "__main__":
scheduler = backgroundscheduler() # 创建一个后台调度器实例
print("🚀 正在启动apscheduler任务调度器...")
# 1. 间隔触发器 (intervaltrigger):每隔5秒执行一次
scheduler.add_job(
perform_data_sync_job,
intervaltrigger(seconds=5),
id='data_sync_job' # 为作业指定唯一id
)
print("任务:'data_sync_job' 已设置,每5秒执行一次。")
# 2. 日期触发器 (datetrigger):在指定日期和时间执行一次性任务
# run_date = datetime.datetime.now() + datetime.timedelta(seconds=10)
# scheduler.add_job(
# perform_weekly_report_job,
# datetrigger(run_date=run_date),
# id='one_time_report',
# name='一次性报告任务'
# )
# print(f"任务:'one_time_report' 已设置,将在 {run_date.strftime('%h:%m:%s')} 执行。")
# 3. cron触发器 (crontrigger):使用cron表达式进行复杂调度
# 每天凌晨2点0分执行
scheduler.add_job(
perform_weekly_report_job,
crontrigger(hour=2, minute=0),
id='daily_cleanup_cron',
name='每日凌晨清理任务'
)
print("任务:'daily_cleanup_cron' 已设置,每天凌晨2点执行。")
# 启动调度器
scheduler.start()
print("调度器已启动。程序将持续运行以保持调度。按 ctrl+c 停止。")
# 保持主线程运行,以便后台调度器能够工作
try:
while true:
time.sleep(1)
except (keyboardinterrupt, systemexit):
scheduler.shutdown() # 安全关闭调度器
print("调度器已停止。")
步骤:
安装apscheduler: pip install apscheduler。
修改代码: 复制上方代码到vs code,保存为apscheduler_demo.py。
运行: 在vs code终端运行 python apscheduler_demo.py。
效果展示:

2.3 异常处理与持久化:让你的任务更健壮、不怕掉线
对于生产环境的python自动化任务调度,任务的健壮性和可靠性至关重要。
apscheduler提供了:
异常处理: 你可以为作业注册监听器,捕获任务执行中的异常,进行日志记录或邮件通知,确保即使任务失败也能及时知晓。
持久化存储: 使用sqlalchemyjobstore等,将作业信息存储到数据库中。这样,即使程序崩溃或重启,apscheduler也能从数据库中恢复未完成或计划中的任务,实现无人值守自动运行**的真正持久性。
代码:
from apscheduler.schedulers.background import backgroundscheduler
from apscheduler.jobstores.sqlalchemy import sqlalchemyjobstore # 用于持久化
from apscheduler.triggers.interval import intervaltrigger
import os
import datetime
import time
# 定义一个会失败的模拟任务
def failing_job():
print(f"🔴 [{datetime.datetime.now().strftime('%h:%m:%s')}] 尝试执行一个可能失败的任务...")
if datetime.datetime.now().second % 10 == 0: # 每10秒模拟失败一次
raise valueerror("模拟任务执行失败!")
print(" ✅ 任务成功完成!")
# 定义一个错误监听器
def my_listener(event):
if event.exception:
print(f"🔥 作业 '{event.job_id}' 执行失败:{event.exception}")
else:
print(f"✨ 作业 '{event.job_id}' 正常完成。")
if __name__ == "__main__":
# 配置作业存储:这里使用sqlite作为示例数据库
jobstores = {
'default': sqlalchemyjobstore(url='sqlite:///jobs.sqlite')
}
# 配置执行器 (默认是threadpoolexecutor)
executors = {
'default': {'type': 'threadpool', 'max_workers': 20}
}
scheduler = backgroundscheduler(jobstores=jobstores, executors=executors)
print("🚀 正在启动apscheduler调度器 (持久化与异常处理演示)...")
# 添加一个每5秒执行的任务
scheduler.add_job(
failing_job,
intervaltrigger(seconds=5),
id='resilient_task',
name='弹性任务'
)
print("任务:'resilient_task' 已设置,每5秒执行一次。")
# 注册错误监听器
from apscheduler.events import event_job_error, event_job_executed
scheduler.add_listener(my_listener, event_job_error | event_job_executed)
# 启动调度器
scheduler.start()
print("调度器已启动。可以尝试 ctrl+c 中断程序,再重新运行,任务会从上次中断的地方继续。")
try:
while true:
time.sleep(1)
except (keyboardinterrupt, systemexit):
scheduler.shutdown()
print("调度器已停止。")
步骤:
安装库: pip install apscheduler sqlalchemy。
修改代码: 复制上方代码到vs code,保存为apscheduler_advanced.py。
运行: 运行 python apscheduler_advanced.py。观察日志。尝试在任务执行中按 ctrl+c 中断程序,然后重新运行脚本,看任务是否能继续。
展示:

3.你的无人值守自动化工具集!
通过本篇文章,你已经掌握了python自动化任务调度的各项核心魔法,亲手打造了一个能够让你的脚本无人值守自动运行的python自动化工具集!
我们深入学习了schedule库和apscheduler,它们是实现python定时任务的强大工具,
实现了:
schedule的轻量级调度: 快速实现简单的、基于时间间隔或特定时间点的定时执行。
apscheduler的高级调度: 支持多种触发器(间隔、日期、cron),实现后台运行、持久化存储和异常处理,让你的任务更加健壮可靠。
现在,你不再需要为手动启动自动化脚本而烦恼。
4. 尾声:任务调度,实现自动化工具的“终极自由”!
通过本篇文章,你已经掌握了python自动化任务调度的终极奥秘,为你的办公自动化之旅又增添了一个重量级技能!你学会了如何利用schedule和apscheduler,高效地让python脚本无人值守自动运行。
表格:schedule vs. apscheduler对比
| 特性 | schedule | apscheduler |
|---|---|---|
| 易用性 | 极简api,快速上手 | 功能全面,api稍复杂,但更灵活 |
| 运行方式 | 依赖主程序while true循环,非真正后台 | 可在后台线程/进程运行,不阻塞主程序 |
| 持久化 | ❌ 不支持 | ✅ 支持 (可存数据库,不怕重启) |
| 触发器 | 简单时间间隔/定点/周几 | 间隔、日期、cron表达式等多种 |
| 错误处理 | 需手动捕获并处理 | 内置监听器,可定制错误事件处理 |
| 适用场景 | 简单个人脚本,短期运行 | 生产环境,复杂调度,需健壮性任务 |
| 图注:python两大任务调度库schedule与apscheduler对比,选择适合你需求的工具! | ||
现在,你的python自动化工具已经拥有了“终极自由”!它可以独立运行,无需你的关注,默默地提升你的效率提升和电脑优化。
到此这篇关于一文带你深入了解python中的自动化任务调度的文章就介绍到这了,更多相关python任务调度内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论