1.sleep法(阻塞)
通过 while + sleep 实现定时任务
(1) 存在时间漂移(等待运行时间)
import time def loopmonitor(): while true: monitorsystem() # 1min检查一次 time.sleep(60) loopmonitor()
(2) 维护sleep时间
import time def loopmonitor(): while true: monitorsystem() #2s检查一次 time.sleep(60.0 - ((time.time() - starttime) % 60.0)) loopmonitor()
2.自定义法 (+线程、非阻塞)
import threading import time class repeatedtimer(object): def __init__(self, interval, function, *args, **kwargs): self._timer = none self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.is_running = false self.next_call = time.time() self.start() def _run(self): self.is_running = false self.start() self.function(*self.args, **self.kwargs) def start(self): if not self.is_running: self.next_call += self.interval self._timer = threading.timer(self.next_call - time.time(), self._run) self._timer.start() self.is_running = true def stop(self): self._timer.cancel() self.is_running = false from time import sleep def hello(name): print "hello %s!" % name print "starting..." rt = repeatedtimer(1, hello, "world") # it auto-starts, no need of rt.start() try: sleep(5) # your long-running job goes here... finally: rt.stop() # better in a try/finally block to make sure the program ends!
3.twisted
更加健壮,并且实现了许多功能:守护进程、日志记录或异常处理
from twisted.internet import task, reactor timeout = 60.0 # sixty seconds def dowork(): #do work here pass l = task.loopingcall(dowork) l.start(timeout) # call every sixty seconds reactor.run()
4.schedule (简单方便,推荐)
import schedule import time def job(): print("i'm working...") #schedule.every(1)创建job, seconds.do(func)按秒间隔查询并执行 schedule.every(1).seconds.do(func) #添加任务按分执行 schedule.every(1).minutes.do(func) #添加任务按天执行 schedule.every(1).days.do(func) #添加任务按周执行 schedule.every().weeks.do(func) #添加任务每周一执行,执行时间为下周一这一时刻时间 schedule.every().monday.do(func) schedule.every().monday.at("12:00").do(job) # 每隔10分钟执行一次任务 schedule.every(10).minutes.do(job) # 每隔一小时执行一次任务 schedule.every().hour.do(job) # 每天10:30执行一次任务 schedule.every().day.at("10:30").do(job) # 每隔5到10分钟运行一次任务? schedule.every(5).to(10).minutes.do(job) # 每周一的这个时候执行一次任务 schedule.every().monday.do(job) # 每周三 13:15执行一次任务 schedule.every().wednesday.at("13:15").do(job) # # 每分钟的第17秒执行任务 schedule.every().minute.at(":17").do(job) while true: schedule.run_pending() # 检测是否有到期任务 time.sleep(1) # 避免一直查询cpu过高。sleep时间不应该小于执行时间,否则会线程堆积。
5.apscheduler (功能多,推荐)
调度器(scheduler)
blockingscheduler: 调度器在当前进程的主线程中运行,会阻塞当前线程。
backgroundscheduler: 调度器在后台线程中运行,不会阻塞当前线程。
asyncioscheduler: 结合asyncio模块一起使用。
geventscheduler: 程序中使用gevent作为io模型和geventexecutor配合使用。
tornadoscheduler: 程序中使用tornado的io模型,用 ioloop.add_timeout 完成定时唤醒。
twistedscheduler: 配合twistedexecutor,用reactor.calllater完成定时唤醒。
qtscheduler: 应用是一个qt应用,需使用qtimer完成定时唤醒。
触发器(trigger)
date是最基本的一种调度,作业任务只会执行一次。
interval触发器,固定时间间隔触发。
cron 触发器,在特定时间周期性地触发,和linux crontab格式兼容。它是功能最强大的触发器。
作业存储(job store)
添加任务,有两种添加方法,一种add_job(), 另一种是scheduled_job()修饰器来修饰函数。
from datetime import datetime from apscheduler.schedulers.blocking import blockingscheduler scheduler = blockingscheduler() # 第一种 @scheduler.scheduled_job(job_func, 'interval', seconds=10) def timed_task(): print(datetime.now().strftime("%y-%m-%d %h:%m:%s")) # 第二种 scheduler.add_job(timed_task, 'interval', seconds=5) scheduler.start()
删除任务,两种方法:remove_job() 和 job.remove()。remove_job()是根据任务的id来移除,所以要在任务创建的时候指定一个 id。job.remove()则是对任务执行remove方法。
scheduler.add_job(job_func, 'interval', seconds=20, id='one') scheduler.remove_job(one) task = add_job(task_func, 'interval', seconds=2, id='job_one') task.remvoe()
获取任务列表,通过scheduler.get_jobs()方法能够获取当前调度器中的所有任务的列表
tasks = scheduler.get_jobs()
关闭任务,使用scheduler.shutdown()默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。
scheduler.shutdown() scheduler.shutdown(wait=false)
注意点
时区问题
报错:valueerror: timezone offset does not match system offset: 0 != 28800. please, check your config files.
分析:通过分析异常日志,发现apscheduler的默认timezone,而“0”是获取的系统环境变量的tz时间28800对应timezone为“asia/shanghai”, 而0对应timezone为“utc”,所以需将系统环境变量的时区与apscheduler的时区设置为一致。
解决:
#!/usr/bin/python # -*- coding: utf-8 -*- from apscheduler.schedulers.background import backgroundscheduler import os os.environ['tz']= "asia/shanghai" scheduler = backgroundscheduler(timezone="asia/shanghai")
其他解决方案:
如果部署应用dockerfile配置,也可以在dockerfile中设定系统时区。
到此这篇关于python快速实现定时器的五种常见方法详解的文章就介绍到这了,更多相关python定时器内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!