欢迎来到徐庆高(Tea)的个人博客网站
磨难很爱我,一度将我连根拔起。从惊慌失措到心力交瘁,我孤身一人,但并不孤独无依。依赖那些依赖我的人,信任那些信任我的人,帮助那些给予我帮助的人。如果我愿意,可以分裂成无数面镜子,让他们看见我,就像看见自己。察言观色和模仿学习是我的领域。像每个深受创伤的人那样,最终,我学会了随遇而安。
当前位置: 日志文章 > 详细内容

Python快速实现定时器的五种常见方法详解

2025年07月30日 Python
1.sleep法(阻塞)通过 while + sleep 实现定时任务 (1) 存在时间漂移(等待运行时间)import timedef loopmonitor(): while true:

1.sleep法(阻塞)

通过 while + sleep 实现定时任务

(1) 存在时间漂移(等待运行时间)

import time
def loopmonitor():
    while true:
        monitorsystem()
        # 1min检查一次
        time.sleep(60)
loopmonitor()

(2) 维护sleep时间

import time
def loopmonitor():
    while true:
        monitorsystem()
        #2s检查一次
        time.sleep(60.0 - ((time.time() - starttime) % 60.0))
loopmonitor()

2.自定义法 (+线程、非阻塞)

import threading 
import time
 
class repeatedtimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = none
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = false
    self.next_call = time.time()
    self.start()
 
  def _run(self):
    self.is_running = false
    self.start()
    self.function(*self.args, **self.kwargs)
 
  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = true
 
  def stop(self):
    self._timer.cancel()
    self.is_running = false
    
    
from time import sleep
def hello(name):
    print "hello %s!" % name
 
print "starting..."
rt = repeatedtimer(1, hello, "world") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

3.twisted

更加健壮,并且实现了许多功能:守护进程、日志记录或异常处理

from twisted.internet import task, reactor
 
timeout = 60.0 # sixty seconds
 
def dowork():
    #do work here
    pass
 
l = task.loopingcall(dowork)
l.start(timeout) # call every sixty seconds
 
reactor.run()

4.schedule (简单方便,推荐)

import schedule
import time
 
def job():
    print("i'm working...")
 
#schedule.every(1)创建job, seconds.do(func)按秒间隔查询并执行
schedule.every(1).seconds.do(func)
#添加任务按分执行
schedule.every(1).minutes.do(func)
#添加任务按天执行
schedule.every(1).days.do(func)
#添加任务按周执行
schedule.every().weeks.do(func)
#添加任务每周一执行,执行时间为下周一这一时刻时间
schedule.every().monday.do(func)
 
 
schedule.every().monday.at("12:00").do(job)
# 每隔10分钟执行一次任务
schedule.every(10).minutes.do(job)
# 每隔一小时执行一次任务
schedule.every().hour.do(job)
# 每天10:30执行一次任务
schedule.every().day.at("10:30").do(job)
# 每隔5到10分钟运行一次任务?
schedule.every(5).to(10).minutes.do(job)
# 每周一的这个时候执行一次任务
schedule.every().monday.do(job)
# 每周三 13:15执行一次任务
schedule.every().wednesday.at("13:15").do(job)
# # 每分钟的第17秒执行任务  
schedule.every().minute.at(":17").do(job)
 
while true:
    schedule.run_pending()   # 检测是否有到期任务
    time.sleep(1)  # 避免一直查询cpu过高。sleep时间不应该小于执行时间,否则会线程堆积。

5.apscheduler (功能多,推荐)

调度器(scheduler)

blockingscheduler: 调度器在当前进程的主线程中运行,会阻塞当前线程。

backgroundscheduler: 调度器在后台线程中运行,不会阻塞当前线程。

asyncioscheduler: 结合asyncio模块一起使用。

geventscheduler: 程序中使用gevent作为io模型和geventexecutor配合使用。

tornadoscheduler: 程序中使用tornado的io模型,用 ioloop.add_timeout 完成定时唤醒。

twistedscheduler: 配合twistedexecutor,用reactor.calllater完成定时唤醒。

qtscheduler: 应用是一个qt应用,需使用qtimer完成定时唤醒。

触发器(trigger)

date是最基本的一种调度,作业任务只会执行一次。

interval触发器,固定时间间隔触发。

cron 触发器,在特定时间周期性地触发,和linux crontab格式兼容。它是功能最强大的触发器。

作业存储(job store)

添加任务,有两种添加方法,一种add_job(), 另一种是scheduled_job()修饰器来修饰函数。

from datetime import datetime
from apscheduler.schedulers.blocking import blockingscheduler 
scheduler = blockingscheduler() 
 
# 第一种 
@scheduler.scheduled_job(job_func, 'interval', seconds=10) 
def timed_task():     
    print(datetime.now().strftime("%y-%m-%d %h:%m:%s")) 
    
# 第二种 
scheduler.add_job(timed_task, 'interval', seconds=5) 
scheduler.start() 

删除任务,两种方法:remove_job() 和 job.remove()。remove_job()是根据任务的id来移除,所以要在任务创建的时候指定一个 id。job.remove()则是对任务执行remove方法。

scheduler.add_job(job_func, 'interval', seconds=20, id='one') 
scheduler.remove_job(one) 
 
task = add_job(task_func, 'interval', seconds=2, id='job_one') 
task.remvoe() 

获取任务列表,通过scheduler.get_jobs()方法能够获取当前调度器中的所有任务的列表

tasks = scheduler.get_jobs() 

关闭任务,使用scheduler.shutdown()默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。

scheduler.shutdown() 
scheduler.shutdown(wait=false) 

注意点

时区问题

报错:valueerror: timezone offset does not match system offset: 0 != 28800. please, check your config files.

分析:通过分析异常日志,发现apscheduler的默认timezone,而“0”是获取的系统环境变量的tz时间28800对应timezone为“asia/shanghai”, 而0对应timezone为“utc”,所以需将系统环境变量的时区与apscheduler的时区设置为一致。

解决:

#!/usr/bin/python
# -*- coding: utf-8 -*-
 
from apscheduler.schedulers.background import backgroundscheduler    
import os
 
os.environ['tz']= "asia/shanghai"
scheduler = backgroundscheduler(timezone="asia/shanghai")

其他解决方案:

如果部署应用dockerfile配置,也可以在dockerfile中设定系统时区。

到此这篇关于python快速实现定时器的五种常见方法详解的文章就介绍到这了,更多相关python定时器内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!