当前位置: 代码网 > it编程>前端脚本>Python > 使用Python实现一个优雅的异步定时器

使用Python实现一个优雅的异步定时器

2025年04月20日 Python 我要评论
需求背景定时器的核心功能是能够周期性地触发回调函数,同时需要支持启动、停止以及状态检查等操作。在多线程或异步编程场景中,希望定时器能够:支持异步操作,避免阻塞主线程;单例化事件循环,节省资源;优雅地管

需求背景

定时器的核心功能是能够周期性地触发回调函数,同时需要支持启动、停止以及状态检查等操作。在多线程或异步编程场景中,希望定时器能够:

  1. 支持异步操作,避免阻塞主线程;
  2. 单例化事件循环,节省资源;
  3. 优雅地管理定时器的生命周期;
  4. 提供简单的接口,易于使用。

为此,设计了一个 timer 类,结合 asyncio 和 threading,实现了一个高效的定时器。

代码

完整代码

import asyncio
import threading
import time
import sys

class timer:
    _loop = none
    _thread = none
    _lock = threading.lock()
    _running_timers = 0

    @classmethod
    def _ensure_loop(cls):
        with cls._lock:
            if cls._loop is none or not cls._thread or not cls._thread.is_alive():
                cls._loop = asyncio.new_event_loop()
                cls._thread = threading.thread(
                    target=cls._run_loop,
                    args=(cls._loop,),
                    daemon=true
                )
                cls._thread.start()

    @classmethod
    def _run_loop(cls, loop):
        asyncio.set_event_loop(loop)
        try:
            loop.run_forever()
        except exception as e:
            print(f"事件循环异常: {e}")
        finally:
            loop.close()

    @classmethod
    def _shutdown(cls):
        with cls._lock:
            if cls._running_timers == 0 and cls._loop is not none and cls._loop.is_running():
                cls._loop.call_soon_threadsafe(cls._loop.stop)
                # 不使用 join,因为守护线程会在主线程退出时自动结束

    def __init__(self):
        self.is_running = false
        self._stop_event = asyncio.event()
        self._task = none

    async def _timer_loop(self, interval, callback):
        try:
            while not self._stop_event.is_set():
                await asyncio.sleep(interval)
                if not self._stop_event.is_set():
                    await asyncio.get_event_loop().run_in_executor(none, callback)
        except asyncio.cancellederror:
            pass  # 正常取消时忽略
        except exception as e:
            print(f"定时器循环异常: {e}")
        finally:
            self.is_running = false
            timer._running_timers -= 1
            timer._shutdown()

    def start(self, interval, callback):
        if not self.is_running:
            timer._ensure_loop()
            self.is_running = true
            self._stop_event.clear()
            self._task = asyncio.run_coroutine_threadsafe(
                self._timer_loop(interval, callback),
                timer._loop
            )
            timer._running_timers += 1
            # print(f"定时器已启动,每{interval}秒执行一次")

    def stop(self):
        if self.is_running:
            self._stop_event.set()
            if self._task:
                timer._loop.call_soon_threadsafe(self._task.cancel)
            self.is_running = false
            # print("定时器已停止")

    def is_active(self):
        return self.is_running

# 使用示例
def callback1():
    print(f"回调1触发: {time.strftime('%h:%m:%s')}")

def callback2():
    print(f"回调2触发: {time.strftime('%h:%m:%s')}")

if __name__ == "__main__":
    timer1 = timer()
    timer2 = timer()

    timer1.start(2, callback1)
    timer2.start(3, callback2)

    try:
        time.sleep(100)
        timer1.stop()
        time.sleep(2)
        timer2.stop()
    except keyboardinterrupt:
        timer1.stop()
        timer2.stop()
    finally:
        # 确保在程序退出时清理
        timer._shutdown()

1. 单例事件循环的实现

为了避免每个定时器都创建一个独立的事件循环,在 timer 类中使用了类变量和类方法来管理全局唯一的事件循环:

class timer:
    _loop = none
    _thread = none
    _lock = threading.lock()
    _running_timers = 0

    @classmethod
    def _ensure_loop(cls):
        with cls._lock:
            if cls._loop is none or not cls._thread or not cls._thread.is_alive():
                cls._loop = asyncio.new_event_loop()
                cls._thread = threading.thread(
                    target=cls._run_loop,
                    args=(cls._loop,),
                    daemon=true
                )
                cls._thread.start()
  • _loop:存储全局的 asyncio 事件循环。
  • _thread:将事件循环运行在一个独立的守护线程中,避免阻塞主线程。
  • _lock:线程锁,确保在多线程环境中创建事件循环时的线程安全。
  • _ensure_loop:在需要时创建或重用事件循环,确保只有一个全局循环。

守护线程(daemon=true)的设计使得程序退出时无需显式关闭线程,简化了资源清理。

2. 事件循环的运行与关闭

事件循环的运行逻辑封装在 _run_loop 中:

@classmethod
def _run_loop(cls, loop):
    asyncio.set_event_loop(loop)
    try:
        loop.run_forever()
    except exception as e:
        print(f"事件循环异常: {e}")
    finally:
        loop.close()
  • run_forever:让事件循环持续运行,直到被外部停止。
  • 异常处理:捕获可能的错误并打印,便于调试。
  • finally:确保循环关闭时资源被正确释放。

关闭逻辑则由 _shutdown 方法控制:

@classmethod
def _shutdown(cls):
    with cls._lock:
        if cls._running_timers == 0 and cls._loop is not none and cls._loop.is_running():
            cls._loop.call_soon_threadsafe(cls._loop.stop)

当所有定时器都停止时(_running_timers == 0),事件循环会被安全停止。

3. 定时器核心逻辑

每个 timer 实例负责管理一个独立的定时任务:

def __init__(self):
    self.is_running = false
    self._stop_event = asyncio.event()
    self._task = none

async def _timer_loop(self, interval, callback):
    try:
        while not self._stop_event.is_set():
            await asyncio.sleep(interval)
            if not self._stop_event.is_set():
                await asyncio.get_event_loop().run_in_executor(none, callback)
    except asyncio.cancellederror:
        pass  # 正常取消时忽略
    finally:
        self.is_running = false
        timer._running_timers -= 1
        timer._shutdown()
  • _stop_event:一个 asyncio.event 对象,用于控制定时器的停止。
  • _timer_loop:异步协程,每隔 interval 秒执行一次回调函数 callback
  • run_in_executor:将回调函数运行在默认的线程池中,避免阻塞事件循环。

4. 启动与停止

启动和停止方法是用户的主要接口:

def start(self, interval, callback):
    if not self.is_running:
        timer._ensure_loop()
        self.is_running = true
        self._stop_event.clear()
        self._task = asyncio.run_coroutine_threadsafe(
            self._timer_loop(interval, callback),
            timer._loop
        )
        timer._running_timers += 1

def stop(self):
    if self.is_running:
        self._stop_event.set()
        if self._task:
            timer._loop.call_soon_threadsafe(self._task.cancel)
        self.is_running = false
  • start:启动定时器,确保事件循环可用,并记录运行中的定时器数量。
  • stop:通过设置 _stop_event 并取消任务来停止定时器。

5. 使用示例

以下是一个简单的使用示例:

def callback1():
    print(f"回调1触发: {time.strftime('%h:%m:%s')}")

def callback2():
    print(f"回调2触发: {time.strftime('%h:%m:%s')}")

timer1 = timer()
timer2 = timer()

timer1.start(2, callback1)  # 每2秒触发一次
timer2.start(3, callback2)  # 每3秒触发一次

time.sleep(10)  # 运行10秒
timer1.stop()
timer2.stop()

输出可能如下:

回调1触发: 14:30:02
回调2触发: 14:30:03
回调1触发: 14:30:04
回调1触发: 14:30:06
回调2触发: 14:30:06
...

设计亮点

  1. 异步与多线程结合:通过 asyncio 和 threading,实现了非阻塞的定时器,适合高并发场景。
  2. 资源高效利用:全局唯一的事件循环避免了重复创建的开销。
  3. 优雅的生命周期管理:守护线程和自动关闭机制简化了资源清理。
  4. 线程安全:使用锁机制确保多线程环境下的稳定性。

适用场景

  • 周期性任务调度,如数据刷新、状态检查。
  • 后台服务中的定时监控。
  • 游戏或实时应用中的计时器需求。

总结

这个异步定时器实现结合了 python 的异步编程和多线程特性,提供了一个轻量、灵活的解决方案。无论是简单的脚本还是复杂的后台服务,它都能胜任。如果你需要一个可靠的定时器,不妨试试这个实现,或者根据需求进一步优化它!

以上就是使用python实现一个优雅的异步定时器的详细内容,更多关于python异步定时器的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com