功能介绍
这是一个功能强大的定时任务调度器,用于自动化执行各种重复性任务。该工具具备以下核心功能:
灵活的调度配置:
- 支持cron表达式定时调度
- 支持间隔执行(每x秒/分钟/小时)
- 支持一次性执行任务
- 支持任务依赖关系
多任务管理:
- 并行执行多个任务
- 任务优先级设置
- 任务执行状态监控
- 任务执行历史记录
任务类型支持:
- shell命令执行
- python脚本执行
- http请求任务
- 文件操作任务
- 自定义函数任务
监控和告警:
- 任务执行状态实时监控
- 执行失败自动告警
- 执行日志详细记录
- 性能指标统计
配置管理:
- yaml/json配置文件支持
- 动态任务加载和卸载
- 配置热更新
- 任务模板支持
场景应用
1. 系统运维自动化
- 定时备份数据库和重要文件
- 定期清理系统日志和临时文件
- 监控系统资源使用情况
- 自动化部署和更新应用
2. 数据处理自动化
- 定时数据同步和etl处理
- 定期生成报表和统计数据
- 自动化数据清洗和验证
- 批量处理文件和数据
3. 监控告警自动化
- 定时检查服务状态和可用性
- 监控网站和api响应时间
- 检测系统性能指标异常
- 自动发送监控报告
4. 业务流程自动化
- 定时发送邮件和通知
- 自动化处理用户请求
- 定期执行业务逻辑
- 批量处理订单和交易
报错处理
1. 任务执行异常
try:
result = task.execute()
if not result.success:
logger.error(f"任务执行失败: {result.error_message}")
send_alert(f"任务 {task.name} 执行失败: {result.error_message}")
except exception as e:
logger.error(f"任务执行异常: {str(e)}")
send_alert(f"任务 {task.name} 执行异常: {str(e)}")
2. 配置文件异常
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
except yaml.yamlerror as e:
logger.error(f"配置文件格式错误: {str(e)}")
raise taskschedulererror(f"配置文件无效: {str(e)}")
except filenotfounderror:
logger.error(f"配置文件不存在: {config_file}")
raise taskschedulererror(f"配置文件未找到: {config_file}")
3. 调度器异常
try:
scheduler.start()
except scheduleralreadyrunningerror:
logger.warning("调度器已在运行")
except exception as e:
logger.error(f"调度器启动失败: {str(e)}")
raise taskschedulererror(f"调度器启动失败: {str(e)}")
4. 资源限制异常
try:
if len(running_tasks) >= max_concurrent_tasks:
raise resourcelimiterror(f"超过最大并发任务数: {max_concurrent_tasks}")
except resourcelimiterror as e:
logger.warning(f"资源限制: {str(e)}")
# 将任务加入等待队列
task_queue.put(task)
代码实现
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
定时任务调度器
功能:自动化执行定时任务
作者:cline
版本:1.0
"""
import argparse
import sys
import json
import yaml
import logging
import os
import time
import threading
import subprocess
import requests
from datetime import datetime, timedelta
from typing import dict, list, callable, any, optional
import schedule
import signal
from concurrent.futures import threadpoolexecutor, as_completed
import smtplib
from email.mime.text import mimetext
from email.mime.multipart import mimemultipart
import sqlite3
# 配置日志
logging.basicconfig(
level=logging.info,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.filehandler('task_scheduler.log'),
logging.streamhandler(sys.stdout)
]
)
logger = logging.getlogger(__name__)
class taskschedulererror(exception):
"""任务调度器异常类"""
pass
class taskexecutionerror(exception):
"""任务执行异常类"""
pass
class task:
def __init__(self, config: dict[str, any]):
self.id = config.get('id')
self.name = config.get('name', 'unnamed task')
self.description = config.get('description', '')
self.type = config.get('type', 'shell')
self.command = config.get('command', '')
self.schedule = config.get('schedule', '* * * * *')
self.enabled = config.get('enabled', true)
self.timeout = config.get('timeout', 300) # 5分钟超时
self.max_retries = config.get('max_retries', 3)
self.retry_delay = config.get('retry_delay', 5) # 重试延迟秒数
self.priority = config.get('priority', 0)
self.dependencies = config.get('dependencies', [])
self.alert_on_failure = config.get('alert_on_failure', true)
self.alert_emails = config.get('alert_emails', [])
# 执行统计
self.stats = {
'executed_count': 0,
'success_count': 0,
'failed_count': 0,
'last_execution': none,
'last_success': none,
'last_failure': none,
'average_duration': 0.0
}
def execute(self) -> dict[str, any]:
"""执行任务"""
if not self.enabled:
logger.info(f"任务 {self.name} 已禁用,跳过执行")
return {'success': true, 'message': '任务已禁用'}
start_time = time.time()
logger.info(f"开始执行任务: {self.name}")
try:
result = none
for attempt in range(self.max_retries + 1):
try:
if self.type == 'shell':
result = self._execute_shell_command()
elif self.type == 'python':
result = self._execute_python_script()
elif self.type == 'http':
result = self._execute_http_request()
elif self.type == 'function':
result = self._execute_function()
else:
raise taskexecutionerror(f"不支持的任务类型: {self.type}")
# 执行成功
duration = time.time() - start_time
self._update_stats(true, duration)
logger.info(f"任务 {self.name} 执行成功,耗时 {duration:.2f} 秒")
return {'success': true, 'duration': duration, 'result': result}
except exception as e:
if attempt < self.max_retries:
logger.warning(f"任务 {self.name} 第 {attempt + 1} 次执行失败: {str(e)},{self.retry_delay} 秒后重试")
time.sleep(self.retry_delay)
else:
# 最后一次尝试也失败了
duration = time.time() - start_time
self._update_stats(false, duration)
logger.error(f"任务 {self.name} 执行失败: {str(e)}")
return {'success': false, 'error': str(e), 'duration': duration}
except exception as e:
duration = time.time() - start_time
self._update_stats(false, duration)
logger.error(f"任务 {self.name} 执行异常: {str(e)}")
return {'success': false, 'error': str(e), 'duration': duration}
def _execute_shell_command(self) -> str:
"""执行shell命令"""
logger.debug(f"执行shell命令: {self.command}")
result = subprocess.run(
self.command,
shell=true,
capture_output=true,
text=true,
timeout=self.timeout
)
if result.returncode != 0:
raise taskexecutionerror(f"命令执行失败: {result.stderr}")
return result.stdout
def _execute_python_script(self) -> any:
"""执行python脚本"""
logger.debug(f"执行python脚本: {self.command}")
# 这里可以使用exec或subprocess执行python代码
# 为了安全起见,建议使用subprocess
script_path = self.command
if not os.path.exists(script_path):
raise taskexecutionerror(f"python脚本不存在: {script_path}")
result = subprocess.run(
[sys.executable, script_path],
capture_output=true,
text=true,
timeout=self.timeout
)
if result.returncode != 0:
raise taskexecutionerror(f"python脚本执行失败: {result.stderr}")
return result.stdout
def _execute_http_request(self) -> dict[str, any]:
"""执行http请求"""
logger.debug(f"执行http请求: {self.command}")
# 解析http配置
http_config = self.command if isinstance(self.command, dict) else json.loads(self.command)
method = http_config.get('method', 'get').upper()
url = http_config.get('url')
headers = http_config.get('headers', {})
data = http_config.get('data')
params = http_config.get('params')
if not url:
raise taskexecutionerror("http请求缺少url")
response = requests.request(
method=method,
url=url,
headers=headers,
data=data,
params=params,
timeout=self.timeout
)
response.raise_for_status()
return {
'status_code': response.status_code,
'headers': dict(response.headers),
'content': response.text
}
def _execute_function(self) -> any:
"""执行自定义函数"""
logger.debug(f"执行自定义函数: {self.command}")
# 这里需要根据具体实现来调用函数
# 可以使用getattr或eval等方式,但要注意安全性
raise notimplementederror("自定义函数执行暂未实现")
def _update_stats(self, success: bool, duration: float):
"""更新执行统计"""
self.stats['executed_count'] += 1
self.stats['last_execution'] = datetime.now().isoformat()
if success:
self.stats['success_count'] += 1
self.stats['last_success'] = datetime.now().isoformat()
# 更新平均执行时间
current_avg = self.stats['average_duration']
count = self.stats['success_count']
self.stats['average_duration'] = (current_avg * (count - 1) + duration) / count
else:
self.stats['failed_count'] += 1
self.stats['last_failure'] = datetime.now().isoformat()
class taskscheduler:
def __init__(self, config_file: str = none):
self.config_file = config_file
self.tasks = {}
self.running = false
self.executor = none
self.max_workers = 10
self.alert_config = {}
self.database_path = 'task_scheduler.db'
# 加载配置
self.load_config()
# 初始化数据库
self.init_database()
# 注册信号处理器
signal.signal(signal.sigint, self._signal_handler)
signal.signal(signal.sigterm, self._signal_handler)
def load_config(self):
"""加载配置文件"""
if not self.config_file or not os.path.exists(self.config_file):
logger.info("未指定配置文件或文件不存在,使用默认配置")
self._create_default_config()
return
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
if self.config_file.endswith('.yaml') or self.config_file.endswith('.yml'):
config = yaml.safe_load(f)
else:
config = json.load(f)
# 加载调度器配置
scheduler_config = config.get('scheduler', {})
self.max_workers = scheduler_config.get('max_workers', 10)
self.alert_config = scheduler_config.get('alerts', {})
# 加载任务配置
tasks_config = config.get('tasks', [])
for task_config in tasks_config:
task = task(task_config)
self.tasks[task.id] = task
logger.info(f"加载任务: {task.name} (id: {task.id})")
logger.info(f"成功加载 {len(self.tasks)} 个任务")
except exception as e:
logger.error(f"加载配置文件失败: {str(e)}")
raise taskschedulererror(f"配置加载失败: {str(e)}")
def _create_default_config(self):
"""创建默认配置"""
default_tasks = [
{
"id": "system_monitor",
"name": "系统监控",
"description": "监控系统资源使用情况",
"type": "shell",
"command": "python system_monitor.py",
"schedule": "*/5 * * * *", # 每5分钟执行一次
"enabled": true,
"timeout": 300,
"max_retries": 3
},
{
"id": "log_cleanup",
"name": "日志清理",
"description": "清理过期日志文件",
"type": "shell",
"command": "find /var/log -name '*.log' -mtime +30 -delete",
"schedule": "0 2 * * *", # 每天凌晨2点执行
"enabled": true,
"timeout": 600
}
]
default_config = {
"scheduler": {
"max_workers": 10,
"alerts": {
"smtp_server": "smtp.example.com",
"smtp_port": 587,
"sender": "scheduler@example.com",
"password": "your_password"
}
},
"tasks": default_tasks
}
# 保存默认配置
config_path = self.config_file or 'scheduler_config.json'
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(default_config, f, indent=2, ensure_ascii=false)
logger.info(f"创建默认配置文件: {config_path}")
def init_database(self):
"""初始化数据库"""
try:
conn = sqlite3.connect(self.database_path)
cursor = conn.cursor()
# 创建任务执行记录表
cursor.execute('''
create table if not exists task_executions (
id integer primary key autoincrement,
task_id text not null,
task_name text not null,
start_time text not null,
end_time text,
duration real,
success boolean,
error_message text,
output text
)
''')
# 创建任务统计表
cursor.execute('''
create table if not exists task_stats (
task_id text primary key,
executed_count integer default 0,
success_count integer default 0,
failed_count integer default 0,
last_execution text,
last_success text,
last_failure text,
average_duration real default 0.0
)
''')
conn.commit()
conn.close()
logger.info("数据库初始化完成")
except exception as e:
logger.error(f"数据库初始化失败: {str(e)}")
raise taskschedulererror(f"数据库初始化失败: {str(e)}")
def save_task_execution(self, task_id: str, task_name: str, start_time: datetime,
end_time: datetime, duration: float, success: bool,
error_message: str = none, output: str = none):
"""保存任务执行记录"""
try:
conn = sqlite3.connect(self.database_path)
cursor = conn.cursor()
cursor.execute('''
insert into task_executions
(task_id, task_name, start_time, end_time, duration, success, error_message, output)
values (?, ?, ?, ?, ?, ?, ?, ?)
''', (
task_id, task_name, start_time.isoformat(), end_time.isoformat(),
duration, success, error_message, output
))
conn.commit()
conn.close()
except exception as e:
logger.error(f"保存任务执行记录失败: {str(e)}")
def update_task_stats(self, task: task):
"""更新任务统计信息到数据库"""
try:
conn = sqlite3.connect(self.database_path)
cursor = conn.cursor()
cursor.execute('''
insert or replace into task_stats
(task_id, executed_count, success_count, failed_count,
last_execution, last_success, last_failure, average_duration)
values (?, ?, ?, ?, ?, ?, ?, ?)
''', (
task.id,
task.stats['executed_count'],
task.stats['success_count'],
task.stats['failed_count'],
task.stats['last_execution'],
task.stats['last_success'],
task.stats['last_failure'],
task.stats['average_duration']
))
conn.commit()
conn.close()
except exception as e:
logger.error(f"更新任务统计信息失败: {str(e)}")
def schedule_task(self, task: task):
"""调度任务"""
if not task.enabled:
logger.info(f"任务 {task.name} 已禁用,跳过调度")
return
try:
# 解析cron表达式
cron_parts = task.schedule.split()
if len(cron_parts) != 5:
raise taskschedulererror(f"无效的cron表达式: {task.schedule}")
minute, hour, day, month, weekday = cron_parts
# 使用schedule库进行调度
job = schedule.every()
# 设置调度时间
if minute != '*':
job = job.minute.at(minute)
if hour != '*':
job = job.hour.at(hour)
if day != '*':
job = job.day.at(day)
# 设置作业
job.do(self._execute_task, task_id=task.id)
logger.info(f"任务 {task.name} 已调度: {task.schedule}")
except exception as e:
logger.error(f"调度任务 {task.name} 失败: {str(e)}")
def _execute_task(self, task_id: str):
"""执行任务(内部方法)"""
if task_id not in self.tasks:
logger.error(f"任务不存在: {task_id}")
return
task = self.tasks[task_id]
start_time = datetime.now()
try:
# 执行任务
result = task.execute()
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
# 保存执行记录
self.save_task_execution(
task_id=task.id,
task_name=task.name,
start_time=start_time,
end_time=end_time,
duration=duration,
success=result['success'],
error_message=result.get('error'),
output=result.get('result')
)
# 更新统计信息
self.update_task_stats(task)
# 发送告警(如果需要)
if not result['success'] and task.alert_on_failure:
self.send_alert(task, result.get('error', '未知错误'))
except exception as e:
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
# 保存失败记录
self.save_task_execution(
task_id=task.id,
task_name=task.name,
start_time=start_time,
end_time=end_time,
duration=duration,
success=false,
error_message=str(e)
)
logger.error(f"执行任务 {task.name} 时发生异常: {str(e)}")
def send_alert(self, task: task, error_message: str):
"""发送告警"""
if not self.alert_config or not task.alert_emails:
return
try:
# 创建邮件内容
msg = mimemultipart()
msg['from'] = self.alert_config['sender']
msg['to'] = ', '.join(task.alert_emails)
msg['subject'] = f"任务调度器告警 - {task.name}"
body = f"""
任务调度器告警通知
任务名称: {task.name}
任务id: {task.id}
执行时间: {datetime.now().strftime('%y-%m-%d %h:%m:%s')}
错误信息: {error_message}
请及时处理!
---
任务调度器
"""
msg.attach(mimetext(body, 'plain'))
# 发送邮件
server = smtplib.smtp(self.alert_config['smtp_server'], self.alert_config['smtp_port'])
server.starttls()
server.login(self.alert_config['sender'], self.alert_config['password'])
server.send_message(msg)
server.quit()
logger.info(f"告警邮件已发送给: {', '.join(task.alert_emails)}")
except exception as e:
logger.error(f"发送告警邮件失败: {str(e)}")
def start(self):
"""启动调度器"""
if self.running:
logger.warning("调度器已在运行")
return
logger.info("启动任务调度器...")
self.running = true
self.executor = threadpoolexecutor(max_workers=self.max_workers)
# 调度所有任务
for task in self.tasks.values():
self.schedule_task(task)
# 主循环
try:
while self.running:
schedule.run_pending()
time.sleep(1)
except keyboardinterrupt:
logger.info("收到中断信号,正在停止调度器...")
finally:
self.stop()
def stop(self):
"""停止调度器"""
logger.info("停止任务调度器...")
self.running = false
if self.executor:
self.executor.shutdown(wait=true)
schedule.clear()
logger.info("任务调度器已停止")
def _signal_handler(self, signum, frame):
"""信号处理器"""
logger.info(f"收到信号 {signum},准备停止调度器...")
self.stop()
sys.exit(0)
def add_task(self, task_config: dict[str, any]):
"""添加任务"""
task = task(task_config)
self.tasks[task.id] = task
if self.running:
self.schedule_task(task)
logger.info(f"添加任务: {task.name}")
def remove_task(self, task_id: str):
"""移除任务"""
if task_id in self.tasks:
task = self.tasks[task_id]
del self.tasks[task_id]
logger.info(f"移除任务: {task.name}")
def get_task_stats(self, task_id: str = none) -> dict[str, any]:
"""获取任务统计信息"""
if task_id:
if task_id in self.tasks:
return self.tasks[task_id].stats
else:
return none
else:
return {task_id: task.stats for task_id, task in self.tasks.items()}
def get_execution_history(self, task_id: str = none, limit: int = 100) -> list[dict[str, any]]:
"""获取执行历史"""
try:
conn = sqlite3.connect(self.database_path)
cursor = conn.cursor()
if task_id:
cursor.execute('''
select * from task_executions
where task_id = ?
order by start_time desc
limit ?
''', (task_id, limit))
else:
cursor.execute('''
select * from task_executions
order by start_time desc
limit ?
''', (limit,))
rows = cursor.fetchall()
conn.close()
# 转换为字典列表
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in rows]
except exception as e:
logger.error(f"获取执行历史失败: {str(e)}")
return []
def create_sample_config():
"""创建示例配置文件"""
sample_config = {
"scheduler": {
"max_workers": 5,
"alerts": {
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"sender": "your_email@gmail.com",
"password": "your_app_password"
}
},
"tasks": [
{
"id": "backup_task",
"name": "数据库备份",
"description": "每日备份数据库",
"type": "shell",
"command": "mysqldump -u root -p mydb > /backups/mydb_$(date +%y%m%d).sql",
"schedule": "0 2 * * *",
"enabled": true,
"timeout": 3600,
"max_retries": 2,
"alert_on_failure": true,
"alert_emails": ["admin@example.com"]
},
{
"id": "cleanup_task",
"name": "临时文件清理",
"description": "清理7天前的临时文件",
"type": "shell",
"command": "find /tmp -name '*.tmp' -mtime +7 -delete",
"schedule": "0 3 * * 0",
"enabled": true,
"timeout": 300
},
{
"id": "api_monitor",
"name": "api监控",
"description": "监控关键api的可用性",
"type": "http",
"command": json.dumps({
"method": "get",
"url": "https://api.example.com/health",
"timeout": 30
}),
"schedule": "*/10 * * * *",
"enabled": true,
"timeout": 60,
"alert_on_failure": true,
"alert_emails": ["ops@example.com"]
}
]
}
with open('scheduler_sample_config.json', 'w', encoding='utf-8') as f:
json.dump(sample_config, f, indent=2, ensure_ascii=false)
logger.info("示例配置文件已创建: scheduler_sample_config.json")
def main():
parser = argparse.argumentparser(description='定时任务调度器')
parser.add_argument('-c', '--config', help='配置文件路径')
parser.add_argument('--start', action='store_true', help='启动调度器')
parser.add_argument('--sample-config', action='store_true', help='创建示例配置文件')
parser.add_argument('--list-tasks', action='store_true', help='列出所有任务')
parser.add_argument('--task-stats', help='查看指定任务的统计信息')
parser.add_argument('--history', help='查看指定任务的执行历史')
args = parser.parse_args()
if args.sample_config:
create_sample_config()
return
scheduler = taskscheduler(args.config)
if args.list_tasks:
print("任务列表:")
for task_id, task in scheduler.tasks.items():
status = "启用" if task.enabled else "禁用"
print(f" - {task.name} ({task_id}) [{status}]")
return
if args.task_stats:
stats = scheduler.get_task_stats(args.task_stats)
if stats:
print(f"任务 {args.task_stats} 的统计信息:")
for key, value in stats.items():
print(f" {key}: {value}")
else:
print(f"任务 {args.task_stats} 不存在")
return
if args.history:
history = scheduler.get_execution_history(args.history)
print(f"任务 {args.history} 的执行历史:")
for record in history:
print(f" 时间: {record['start_time']}, 成功: {record['success']}, 耗时: {record['duration']:.2f}s")
return
if args.start:
scheduler.start()
else:
parser.print_help()
if __name__ == '__main__':
main()
使用说明
1. 安装依赖
pip install schedule requests pyyaml
2. 创建配置文件
python task_scheduler.py --sample-config
3. 启动调度器
python task_scheduler.py --config scheduler_config.json --start
4. 查看任务列表
python task_scheduler.py --config scheduler_config.json --list-tasks
5. 查看任务统计
python task_scheduler.py --config scheduler_config.json --task-stats backup_task
6. 查看执行历史
python task_scheduler.py --config scheduler_config.json --history backup_task
配置文件示例
json配置文件
{
"scheduler": {
"max_workers": 5,
"alerts": {
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"sender": "your_email@gmail.com",
"password": "your_app_password"
}
},
"tasks": [
{
"id": "backup_task",
"name": "数据库备份",
"description": "每日备份数据库",
"type": "shell",
"command": "mysqldump -u root -p mydb > /backups/mydb_$(date +%y%m%d).sql",
"schedule": "0 2 * * *",
"enabled": true,
"timeout": 3600,
"max_retries": 2,
"alert_on_failure": true,
"alert_emails": ["admin@example.com"]
},
{
"id": "cleanup_task",
"name": "临时文件清理",
"description": "清理7天前的临时文件",
"type": "shell",
"command": "find /tmp -name '*.tmp' -mtime +7 -delete",
"schedule": "0 3 * * 0",
"enabled": true,
"timeout": 300
}
]
}
yaml配置文件
scheduler:
max_workers: 5
alerts:
smtp_server: smtp.gmail.com
smtp_port: 587
sender: your_email@gmail.com
password: your_app_password
tasks:
- id: backup_task
name: 数据库备份
description: 每日备份数据库
type: shell
command: mysqldump -u root -p mydb > /backups/mydb_$(date +%y%m%d).sql
schedule: "0 2 * * *"
enabled: true
timeout: 3600
max_retries: 2
alert_on_failure: true
alert_emails:
- admin@example.com
- id: cleanup_task
name: 临时文件清理
description: 清理7天前的临时文件
type: shell
command: find /tmp -name '*.tmp' -mtime +7 -delete
schedule: "0 3 * * 0"
enabled: true
timeout: 300
高级特性
1. 任务依赖管理
支持任务间的依赖关系,确保任务按正确的顺序执行:
tasks:
- id: task_a
name: 任务a
# ... 其他配置
- id: task_b
name: 任务b
dependencies:
- task_a # 任务b依赖于任务a
# ... 其他配置
2. 动态任务管理
支持在运行时动态添加、移除和修改任务,无需重启调度器。
3. 执行历史和统计
内置sqlite数据库存储任务执行历史和统计信息,便于分析和监控。
4. 告警通知
支持通过邮件发送告警通知,及时发现任务执行异常。
5. 资源限制
支持并发任务数量限制,防止系统资源被过度占用。
最佳实践
1. 安全性考虑
- 不要在配置文件中明文存储敏感信息
- 使用环境变量或加密存储敏感配置
- 限制脚本执行权限
2. 性能优化
- 合理设置任务并发数
- 为长时间运行的任务设置适当的超时时间
- 定期清理执行历史数据
3. 监控和维护
- 定期检查任务执行日志
- 监控系统资源使用情况
- 及时处理执行失败的任务
总结
这个定时任务调度器提供了一个功能完整、易于使用的任务自动化解决方案。通过灵活的配置和丰富的功能,可以满足各种自动化需求。无论是系统运维、数据处理还是业务流程自动化,都能通过这个工具大大提高工作效率。
到此这篇关于python实现定时任务调度器的示例详解的文章就介绍到这了,更多相关python定时任务调度内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论