@mylog.timer 装饰器的详细教学。这通常是指用于测量函数执行时间并记录日志的装饰器。
1. 基础版本:简易计时装饰器
import time
import functools
def timer(func):
"""基础计时装饰器"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
print(f"[timer] {func.__name__} 执行时间: {end_time - start_time:.4f} 秒")
return result
return wrapper
# 使用方式
@timer
def slow_function():
time.sleep(1)
return "完成"
slow_function()
# 输出: [timer] slow_function 执行时间: 1.0023 秒2. 进阶版本:集成日志系统(推荐)
实际项目中,我们通常需要专业的日志管理:
import logging
import time
import functools
from typing import callable, any
# 配置日志
logging.basicconfig(
level=logging.info,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%y-%m-%d %h:%m:%s'
)
class mylog:
"""日志工具类,包含计时装饰器"""
def __init__(self, name: str = "mylog"):
self.logger = logging.getlogger(name)
def timer(self, level: int = logging.info):
"""
带日志级别的计时装饰器
用法:
@mylog.timer()
@mylog.timer(logging.debug)
"""
def decorator(func: callable) -> callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> any:
# 记录开始时间
start_time = time.perf_counter()
try:
# 执行函数
result = func(*args, **kwargs)
# 计算耗时
end_time = time.perf_counter()
duration = end_time - start_time
# 获取函数参数信息
args_repr = [repr(a) for a in args]
kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()]
signature = ", ".join(args_repr + kwargs_repr)
# 记录日志
self.logger.log(
level,
f"[timer] {func.__name__}({signature}) -> {duration:.4f}s"
)
return result
except exception as e:
# 异常时也记录耗时
end_time = time.perf_counter()
duration = end_time - start_time
self.logger.error(
f"[timer] {func.__name__} 执行失败,耗时: {duration:.4f}s, 错误: {e}"
)
raise
return wrapper
return decorator
# 实例化
mylog = mylog()
# 使用示例
@mylog.timer()
def calculate_sum(n: int):
"""计算 1 到 n 的和"""
return sum(range(1, n + 1))
@mylog.timer(logging.debug)
def fetch_data(url: str):
"""模拟获取数据"""
time.sleep(0.5)
return f"data from {url}"
# 测试
calculate_sum(1000000)
fetch_data("https://api.example.com")3. 高级版本:带配置和统计功能
import logging
import time
import functools
import statistics
from typing import dict, list
from collections import defaultdict
class advancedmylog:
"""高级日志类,支持统计多次调用的性能数据"""
_stats: dict[str, list[float]] = defaultdict(list)
def __init__(self, name: str = "advancedlog"):
self.logger = logging.getlogger(name)
handler = logging.streamhandler()
formatter = logging.formatter(
'%(asctime)s | %(name)s | %(levelname)s | %(message)s'
)
handler.setformatter(formatter)
self.logger.addhandler(handler)
self.logger.setlevel(logging.debug)
def timer(self, log_args: bool = true, alert_threshold: float = none):
"""
高级计时装饰器
args:
log_args: 是否记录函数参数
alert_threshold: 超时警告阈值(秒)
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
func_name = func.__qualname__
start = time.perf_counter()
result = func(*args, **kwargs)
duration = time.perf_counter() - start
# 保存统计数据
self._stats[func_name].append(duration)
# 构建日志消息
if log_args and (args or kwargs):
args_str = ", ".join(
[repr(a) for a in args[1:]] + # 排除 self/cls
[f"{k}={v!r}" for k, v in kwargs.items()]
)
msg = f"[timer] {func_name}({args_str}) 耗时: {duration:.4f}s"
else:
msg = f"[timer] {func_name} 耗时: {duration:.4f}s"
# 检查阈值
if alert_threshold and duration > alert_threshold:
self.logger.warning(f"[slow] {msg} (超过阈值 {alert_threshold}s)")
else:
self.logger.info(msg)
return result
return wrapper
return decorator
@classmethod
def get_stats(cls, func_name: str = none):
"""获取性能统计报告"""
if func_name:
times = cls._stats.get(func_name, [])
if not times:
return f"{func_name}: 无数据"
return {
"函数": func_name,
"调用次数": len(times),
"平均耗时": statistics.mean(times),
"最大耗时": max(times),
"最小耗时": min(times),
}
return {name: cls.get_stats(name) for name in cls._stats}
# 使用示例
adv_log = advancedmylog()
class dataprocessor:
@adv_log.timer(alert_threshold=1.0) # 超过1秒警告
def process(self, data: list):
time.sleep(0.8) # 模拟处理
return len(data)
processor = dataprocessor()
processor.process([1, 2, 3])
processor.process([4, 5, 6])
# 查看统计
print(advancedmylog.get_stats("dataprocessor.process"))4. 异步函数支持
现代 python 常使用 async/await,需要特殊处理:
import asyncio
import time
import functools
class asynctimerlog:
def __init__(self):
self.logger = logging.getlogger("asynclog")
def timer(self, func):
"""支持异步函数的计时装饰器"""
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
start = time.perf_counter()
result = await func(*args, **kwargs)
duration = time.perf_counter() - start
self.logger.info(f"[async timer] {func.__name__} 耗时: {duration:.4f}s")
return result
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
duration = time.perf_counter() - start
self.logger.info(f"[sync timer] {func.__name__} 耗时: {duration:.4f}s")
return result
# 自动判断是否是协程函数
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
# 使用
mylog = asynctimerlog()
@mylog.timer
async def async_fetch():
await asyncio.sleep(1)
return "data"
@mylog.timer
def sync_fetch():
time.sleep(0.5)
return "data"
# 运行
sync_fetch()
asyncio.run(async_fetch())5. 使用场景与最佳实践
5.1 性能监控
# 监控 api 接口响应时间
@mylog.timer(alert_threshold=2.0)
def api_endpoint():
# 数据库查询等操作
pass5.2 算法优化对比
@mylog.timer()
def algorithm_v1(data):
# 旧算法
pass
@mylog.timer()
def algorithm_v2(data):
# 新算法
pass
# 对比两者的执行时间5.3 上下文管理器(替代方案)
import time
from contextlib import contextmanager
@contextmanager
def timer_context(name: str):
start = time.perf_counter()
yield
duration = time.perf_counter() - start
print(f"[block timer] {name}: {duration:.4f}s")
# 使用
with timer_context("代码块 a"):
time.sleep(1)6. 注意事项
| 注意点 | 说明 |
|---|---|
| functools.wraps | 必须保留原函数的元数据(__name__, __doc__) |
| 异常处理 | 确保即使函数抛出异常也能记录耗时 |
| 性能开销 | 日志本身有开销,生产环境建议设置开关 |
| 线程安全 | 多线程环境下统计数据需要加锁 |
完整使用模板
# config.py 或 utils.py
import logging
import time
import functools
class mylog:
_instance = none
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, name="myapp"):
if not hasattr(self, 'logger'):
self.logger = logging.getlogger(name)
self.logger.setlevel(logging.info)
if not self.logger.handlers:
handler = logging.streamhandler()
handler.setformatter(
logging.formatter('%(asctime)s - %(message)s')
)
self.logger.addhandler(handler)
def timer(self, level=logging.info):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
try:
result = func(*args, **kwargs)
duration = time.perf_counter() - start
self.logger.log(
level,
f"[⏱️ timer] {func.__name__} completed in {duration:.4f}s"
)
return result
except exception as e:
duration = time.perf_counter() - start
self.logger.error(
f"[⏱️ timer] {func.__name__} failed after {duration:.4f}s: {e}"
)
raise
return wrapper
return decorator
# 全局实例
mylog = mylog()
# 业务代码中使用
@mylog.timer()
def your_function():
pass到此这篇关于python @mylog.timer 装饰器的使用的文章就介绍到这了,更多相关python @mylog.timer 装饰器内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论