当前位置: 代码网 > it编程>前端脚本>Python > Python实现结构化日志系统的完整方案和最佳实践

Python实现结构化日志系统的完整方案和最佳实践

2025年12月17日 Python 我要评论
引言在现代软件系统中,日志不仅是调试和问题排查的工具,更是系统可观测性的核心组成部分。随着微服务、分布式系统和云原生架构的普及,传统文本日志已无法满足复杂系统的监控、分析和调试需求。结构化日志应运而生

引言

在现代软件系统中,日志不仅是调试和问题排查的工具,更是系统可观测性的核心组成部分。随着微服务、分布式系统和云原生架构的普及,传统文本日志已无法满足复杂系统的监控、分析和调试需求。结构化日志应运而生,成为现代日志系统的标准实践。

根据2023年devops现状报告显示,采用结构化日志的团队部署频率提高2.6倍,故障恢复时间缩短3.2倍。本文将深入探讨结构化日志系统的设计原理、实现方法和最佳实践,提供完整的python实现方案。

1. 日志系统基础概念

1.1 日志的重要性与价值

日志系统为软件系统提供了以下关键价值:

  • 故障排查:快速定位和解决生产环境问题
  • 性能监控:跟踪系统性能和资源使用情况
  • 安全审计:记录用户操作和安全事件
  • 业务分析:分析用户行为和应用使用模式
  • 合规要求:满足法律和行业规定的日志保留要求

1.2 日志系统的演进历程

1.3 日志质量的金字塔模型

2. 结构化日志基础

2.1 什么是结构化日志

结构化日志是将日志数据以机器可读的格式(通常是json)进行组织,而不是传统的纯文本格式。结构化日志包含:

  • 固定字段:时间戳、级别、消息、来源等
  • 上下文字段:请求id、用户id、会话id等
  • 业务字段:操作类型、资源id、结果状态等
  • 性能字段:耗时、内存使用、请求大小等

2.2 结构化日志 vs 非结构化日志

维度结构化日志非结构化日志
格式json、键值对纯文本
可读性机器友好人类友好
查询能力强大(支持字段筛选)有限(文本搜索)
存储效率较高较低
解析复杂度简单复杂
扩展性容易添加新字段需要修改格式

2.3 结构化日志的数学表示

设日志事件为一个元组: l=(t,l,m,c)

其中:

  • t :时间戳
  • l:日志级别
  • m:消息模板
  • c:上下文键值对集合,c={k1:v1,k2:v2,...,kn:vn}

结构化日志可以表示为:

lstruct=json({timestamp:t,level:l,message:m}c)

日志查询可以形式化为:

query(lstruct,φ)={l∣∀(k,v)φ,l.c[k]=v}

其中 φ 是查询条件的键值对集合。

3. 日志系统架构设计

3.1 现代日志系统架构

3.2 日志处理流水线

典型的日志处理流水线包含以下阶段:

  • 收集:从应用收集原始日志
  • 解析:提取结构化字段
  • 丰富:添加元数据(主机名、环境等)
  • 过滤:移除敏感信息或无用数据
  • 转换:格式转换和标准化
  • 路由:根据规则分发到不同目的地
  • 存储:持久化存储
  • 索引:建立快速检索索引

3.3 分布式日志追踪

在微服务架构中,分布式追踪是结构化日志的关键组成部分。使用以下字段实现追踪:

  • trace_id:整个请求链路的唯一标识
  • span_id:单个操作段的标识
  • parent_span_id:父操作的标识
  • service_name:服务名称
  • operation_name:操作名称

追踪系统的数学表示:

设请求 r经过 n  个服务,则:

t(r)={s1,s2,...,sn}

每个服务操作 si 包含:si=(tstart,tend,trace_id,span_idi,parent_span_idi,metadatai)

请求总耗时:δt=max(tend)min(tstart)

4. python结构化日志实现

4.1 基础结构化日志框架

"""
结构化日志系统实现
设计原则:
1. 结构化优先:所有日志输出为结构化格式
2. 上下文感知:自动捕获和传递上下文
3. 性能友好:异步处理,最小化性能影响
4. 可扩展性:支持自定义处理器和格式器
5. 安全性:内置敏感信息过滤
"""

import json
import logging
import sys
import time
import uuid
import inspect
import threading
from typing import dict, any, optional, list, union, callable
from datetime import datetime
from enum import enum
from dataclasses import dataclass, field, asdict
from abc import abc, abstractmethod
from queue import queue, empty
from concurrent.futures import threadpoolexecutor
from pathlib import path
import traceback
import hashlib
import zlib
from collections import defaultdict

# 类型别名
logdata = dict[str, any]
contextdict = dict[str, any]


class loglevel(enum):
    """日志级别枚举"""
    trace = 0      # 最详细的跟踪信息
    debug = 1      # 调试信息
    info = 2       # 常规信息
    warn = 3       # 警告信息
    error = 4      # 错误信息
    fatal = 5      # 严重错误
    
    @classmethod
    def from_string(cls, level_str: str) -> 'loglevel':
        """从字符串转换日志级别"""
        level_map = {
            'trace': cls.trace,
            'debug': cls.debug,
            'info': cls.info,
            'warn': cls.warn,
            'warning': cls.warn,
            'error': cls.error,
            'fatal': cls.fatal,
            'critical': cls.fatal
        }
        return level_map.get(level_str.lower(), cls.info)
    
    @classmethod
    def to_standard_level(cls, level: 'loglevel') -> int:
        """转换为标准logging级别"""
        mapping = {
            cls.trace: 5,      # 低于debug
            cls.debug: logging.debug,
            cls.info: logging.info,
            cls.warn: logging.warning,
            cls.error: logging.error,
            cls.fatal: logging.critical
        }
        return mapping[level]


@dataclass
class logrecord:
    """结构化日志记录"""
    
    # 基础字段
    timestamp: str
    level: str
    message: str
    logger_name: str
    
    # 上下文字段
    trace_id: optional[str] = none
    span_id: optional[str] = none
    request_id: optional[str] = none
    user_id: optional[str] = none
    session_id: optional[str] = none
    correlation_id: optional[str] = none
    
    # 执行上下文
    filename: optional[str] = none
    function: optional[str] = none
    line_no: optional[int] = none
    thread_id: optional[int] = none
    thread_name: optional[str] = none
    process_id: optional[int] = none
    
    # 应用程序上下文
    app_name: optional[str] = none
    app_version: optional[str] = none
    environment: optional[str] = none
    hostname: optional[str] = none
    service_name: optional[str] = none
    
    # 性能指标
    duration_ms: optional[float] = none
    memory_mb: optional[float] = none
    cpu_percent: optional[float] = none
    
    # 自定义字段
    extra: dict[str, any] = field(default_factory=dict)
    
    # 错误信息
    error_type: optional[str] = none
    error_message: optional[str] = none
    stack_trace: optional[str] = none
    
    def to_dict(self) -> dict[str, any]:
        """转换为字典"""
        result = asdict(self)
        
        # 移除none值以减小体积
        return {k: v for k, v in result.items() if v is not none}
    
    def to_json(self, indent: optional[int] = none) -> str:
        """转换为json字符串"""
        return json.dumps(self.to_dict(), indent=indent, ensure_ascii=false)
    
    def get_field_hash(self) -> str:
        """获取字段内容的哈希值(用于去重)"""
        # 排除一些动态字段
        excluded_fields = {'timestamp', 'duration_ms', 'memory_mb', 'cpu_percent'}
        data = {k: v for k, v in self.to_dict().items() 
                if k not in excluded_fields and v is not none}
        
        content = json.dumps(data, sort_keys=true, ensure_ascii=false)
        return hashlib.md5(content.encode()).hexdigest()
    
    def is_similar_to(self, other: 'logrecord', threshold: float = 0.9) -> bool:
        """判断两个日志记录是否相似(用于去重)"""
        if self.level != other.level:
            return false
        
        # 计算消息相似度(简化的编辑距离)
        from difflib import sequencematcher
        message_similarity = sequencematcher(
            none, self.message, other.message
        ).ratio()
        
        return message_similarity >= threshold


class logcontext:
    """日志上下文管理器"""
    
    def __init__(self):
        # 线程本地存储
        self._local = threading.local()
        self._global_context = {}
        self._context_stack = []
    
    @property
    def current(self) -> dict[str, any]:
        """获取当前上下文"""
        if not hasattr(self._local, 'context'):
            self._local.context = {}
        return self._local.context
    
    @current.setter
    def current(self, context: dict[str, any]):
        """设置当前上下文"""
        self._local.context = context
    
    def get(self, key: str, default: any = none) -> any:
        """获取上下文值"""
        return self.current.get(key, self._global_context.get(key, default))
    
    def set(self, key: str, value: any, global_scope: bool = false):
        """设置上下文值"""
        if global_scope:
            self._global_context[key] = value
        else:
            self.current[key] = value
    
    def update(self, data: dict[str, any], global_scope: bool = false):
        """批量更新上下文"""
        if global_scope:
            self._global_context.update(data)
        else:
            self.current.update(data)
    
    def clear(self):
        """清除当前线程上下文"""
        if hasattr(self._local, 'context'):
            self._local.context.clear()
    
    def push_context(self, context: dict[str, any]):
        """压入新的上下文层"""
        if not hasattr(self._local, 'context_stack'):
            self._local.context_stack = []
        
        # 保存当前上下文
        current_copy = self.current.copy()
        self._local.context_stack.append(current_copy)
        
        # 更新为新上下文(合并)
        new_context = current_copy.copy()
        new_context.update(context)
        self.current = new_context
    
    def pop_context(self) -> dict[str, any]:
        """弹出上下文层"""
        if not hasattr(self._local, 'context_stack') or not self._local.context_stack:
            old_context = self.current.copy()
            self.clear()
            return old_context
        
        old_context = self.current
        self.current = self._local.context_stack.pop()
        return old_context
    
    def context_manager(self, **kwargs):
        """上下文管理器"""
        return logcontextmanager(self, kwargs)
    
    def get_all_context(self) -> dict[str, any]:
        """获取所有上下文(包括全局)"""
        result = self._global_context.copy()
        result.update(self.current)
        return result


class logcontextmanager:
    """上下文管理器"""
    
    def __init__(self, log_context: logcontext, context_data: dict[str, any]):
        self.log_context = log_context
        self.context_data = context_data
    
    def __enter__(self):
        self.log_context.push_context(self.context_data)
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.log_context.pop_context()


class structuredformatter(abc):
    """结构化日志格式化器抽象基类"""
    
    @abstractmethod
    def format(self, record: logrecord) -> str:
        """格式化日志记录"""
        pass


class jsonformatter(structuredformatter):
    """json格式化器"""
    
    def __init__(
        self,
        indent: optional[int] = none,
        ensure_ascii: bool = false,
        sort_keys: bool = false,
        include_metadata: bool = true
    ):
        self.indent = indent
        self.ensure_ascii = ensure_ascii
        self.sort_keys = sort_keys
        self.include_metadata = include_metadata
    
    def format(self, record: logrecord) -> str:
        """格式化为json"""
        data = record.to_dict()
        
        # 添加格式化元数据
        if self.include_metadata:
            data['_metadata'] = {
                'format_version': '1.0',
                'formatter': 'json',
                'timestamp_ns': time.time_ns()
            }
        
        return json.dumps(
            data,
            indent=self.indent,
            ensure_ascii=self.ensure_ascii,
            sort_keys=self.sort_keys
        )


class ndjsonformatter(structuredformatter):
    """ndjson格式化器(每行一个json)"""
    
    def __init__(self, **kwargs):
        self.json_formatter = jsonformatter(**kwargs)
    
    def format(self, record: logrecord) -> str:
        """格式化为ndjson"""
        return self.json_formatter.format(record)


class logfilter(abc):
    """日志过滤器抽象基类"""
    
    @abstractmethod
    def filter(self, record: logrecord) -> bool:
        """过滤日志记录,返回true表示保留"""
        pass


class levelfilter(logfilter):
    """级别过滤器"""
    
    def __init__(self, min_level: loglevel):
        self.min_level = min_level
    
    def filter(self, record: logrecord) -> bool:
        """根据级别过滤"""
        record_level = loglevel.from_string(record.level)
        return record_level.value >= self.min_level.value


class ratelimitfilter(logfilter):
    """速率限制过滤器"""
    
    def __init__(self, max_per_second: int = 10, window_seconds: int = 1):
        self.max_per_second = max_per_second
        self.window_seconds = window_seconds
        self.log_counts = defaultdict(int)
        self.window_start = time.time()
    
    def filter(self, record: logrecord) -> bool:
        """速率限制"""
        current_time = time.time()
        
        # 检查是否需要重置窗口
        if current_time - self.window_start >= self.window_seconds:
            self.log_counts.clear()
            self.window_start = current_time
        
        # 获取日志哈希作为键
        log_key = record.get_field_hash()
        current_count = self.log_counts[log_key]
        
        if current_count < self.max_per_second:
            self.log_counts[log_key] = current_count + 1
            return true
        
        return false


class sensitivedatafilter(logfilter):
    """敏感数据过滤器"""
    
    def __init__(self):
        # 敏感数据模式(可以扩展)
        self.sensitive_patterns = [
            r'(?i)(password|passwd|pwd)[=:]\s*["\']?([^"\'\s]+)["\']?',
            r'(?i)(api[_-]?key|secret[_-]?key)[=:]\s*["\']?([^"\'\s]+)["\']?',
            r'(?i)(token)[=:]\s*["\']?([^"\'\s]+)["\']?',
            r'(?i)(credit[_-]?card|cc)[=:]\s*["\']?(\d[ -]*?){13,16}["\']?',
            r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',  # 电话号码
            r'\b[a-za-z0-9._%+-]+@[a-za-z0-9.-]+\.[a-z|a-z]{2,}\b',  # 邮箱
        ]
        
        self.compiled_patterns = [re.compile(pattern) for pattern in self.sensitive_patterns]
    
    def filter(self, record: logrecord) -> bool:
        """过滤敏感信息"""
        # 对消息进行脱敏
        record.message = self._mask_sensitive_data(record.message)
        
        # 对extra字段进行脱敏
        for key, value in record.extra.items():
            if isinstance(value, str):
                record.extra[key] = self._mask_sensitive_data(value)
        
        return true
    
    def _mask_sensitive_data(self, text: str) -> str:
        """脱敏文本中的敏感信息"""
        if not isinstance(text, str):
            return text
        
        masked_text = text
        
        for pattern in self.compiled_patterns:
            masked_text = pattern.sub(self._mask_replacer, masked_text)
        
        return masked_text
    
    def _mask_replacer(self, match) -> str:
        """替换匹配的敏感信息"""
        full_match = match.group(0)
        
        # 根据匹配内容决定脱敏策略
        if '@' in full_match:  # 邮箱
            parts = full_match.split('@')
            if len(parts[0]) > 2:
                return parts[0][:2] + '***@' + parts[1]
            else:
                return '***@' + parts[1]
        elif any(keyword in full_match.lower() for keyword in ['password', 'passwd', 'pwd']):
            return 'password=***'
        elif any(keyword in full_match.lower() for keyword in ['key', 'token', 'secret']):
            return match.group(1) + '=***'
        elif re.match(r'\d', full_match.replace('-', '').replace(' ', '')):
            # 数字类型(信用卡、电话等)
            digits = re.sub(r'[^\d]', '', full_match)
            if 10 <= len(digits) <= 16:
                return digits[:4] + '*' * (len(digits) - 8) + digits[-4:]
        
        return '***'

4.2 高级日志处理器

class loghandler(abc):
    """日志处理器抽象基类"""
    
    def __init__(
        self,
        level: loglevel = loglevel.info,
        formatter: optional[structuredformatter] = none,
        filters: optional[list[logfilter]] = none
    ):
        self.level = level
        self.formatter = formatter or jsonformatter()
        self.filters = filters or []
        
        # 性能统计
        self.processed_count = 0
        self.dropped_count = 0
        self.start_time = time.time()
    
    @abstractmethod
    def emit(self, record: logrecord):
        """输出日志记录"""
        pass
    
    def handle(self, record: logrecord) -> bool:
        """处理日志记录"""
        
        # 检查级别
        record_level = loglevel.from_string(record.level)
        if record_level.value < self.level.value:
            self.dropped_count += 1
            return false
        
        # 应用过滤器
        for filter_obj in self.filters:
            if not filter_obj.filter(record):
                self.dropped_count += 1
                return false
        
        # 格式化
        formatted = self.formatter.format(record)
        
        # 输出
        try:
            self.emit(record)
            self.processed_count += 1
            return true
        except exception as e:
            # 处理器错误处理
            print(f"日志处理器错误: {e}")
            self.dropped_count += 1
            return false
    
    def get_stats(self) -> dict[str, any]:
        """获取处理器统计信息"""
        uptime = time.time() - self.start_time
        return {
            'processed': self.processed_count,
            'dropped': self.dropped_count,
            'uptime_seconds': uptime,
            'rate_per_second': self.processed_count / max(uptime, 0.001),
            'handler_type': self.__class__.__name__
        }


class consolehandler(loghandler):
    """控制台处理器"""
    
    def __init__(
        self,
        level: loglevel = loglevel.info,
        formatter: optional[structuredformatter] = none,
        output_stream: any = sys.stdout,
        use_colors: bool = true
    ):
        super().__init__(level, formatter)
        self.output_stream = output_stream
        self.use_colors = use_colors
        
        # 颜色映射
        self.color_map = {
            'trace': '\033[90m',    # 灰色
            'debug': '\033[36m',    # 青色
            'info': '\033[32m',     # 绿色
            'warn': '\033[33m',     # 黄色
            'error': '\033[31m',    # 红色
            'fatal': '\033[41m\033[37m',  # 红底白字
            'reset': '\033[0m'      # 重置
        }
    
    def emit(self, record: logrecord):
        """输出到控制台"""
        formatted = self.formatter.format(record)
        
        if self.use_colors:
            color = self.color_map.get(record.level.upper(), '')
            reset = self.color_map['reset']
            output = f"{color}{formatted}{reset}"
        else:
            output = formatted
        
        print(output, file=self.output_stream)


class filehandler(loghandler):
    """文件处理器"""
    
    def __init__(
        self,
        filename: union[str, path],
        level: loglevel = loglevel.info,
        formatter: optional[structuredformatter] = none,
        mode: str = 'a',
        encoding: str = 'utf-8',
        buffering: int = 1  # 行缓冲
    ):
        super().__init__(level, formatter)
        self.filename = path(filename)
        self.mode = mode
        self.encoding = encoding
        self.buffering = buffering
        
        # 确保目录存在
        self.filename.parent.mkdir(parents=true, exist_ok=true)
        
        # 打开文件
        self._open_file()
    
    def _open_file(self):
        """打开文件"""
        self.file = open(
            self.filename,
            mode=self.mode,
            encoding=self.encoding,
            buffering=self.buffering
        )
    
    def emit(self, record: logrecord):
        """输出到文件"""
        formatted = self.formatter.format(record)
        self.file.write(formatted + '\n')
        self.file.flush()
    
    def close(self):
        """关闭文件"""
        if hasattr(self, 'file') and self.file:
            self.file.close()
    
    def rotate(self, max_size_mb: float = 100, backup_count: int = 5):
        """日志轮转"""
        if not self.filename.exists():
            return
        
        file_size_mb = self.filename.stat().st_size / (1024 * 1024)
        
        if file_size_mb < max_size_mb:
            return
        
        # 关闭当前文件
        self.close()
        
        # 重命名旧文件
        for i in range(backup_count - 1, 0, -1):
            old_file = self.filename.with_suffix(f".{i}.log")
            new_file = self.filename.with_suffix(f".{i+1}.log")
            
            if old_file.exists():
                old_file.rename(new_file)
        
        # 重命名当前文件
        current_backup = self.filename.with_suffix(".1.log")
        self.filename.rename(current_backup)
        
        # 重新打开文件
        self._open_file()


class rotatingfilehandler(filehandler):
    """自动轮转的文件处理器"""
    
    def __init__(
        self,
        filename: union[str, path],
        level: loglevel = loglevel.info,
        formatter: optional[structuredformatter] = none,
        max_size_mb: float = 100,
        backup_count: int = 5,
        check_interval: int = 10  # 检查间隔(处理的日志条数)
    ):
        super().__init__(filename, level, formatter)
        self.max_size_mb = max_size_mb
        self.backup_count = backup_count
        self.check_interval = check_interval
        self.processed_since_check = 0
    
    def handle(self, record: logrecord) -> bool:
        """处理日志记录(添加轮转检查)"""
        self.processed_since_check += 1
        
        if self.processed_since_check >= self.check_interval:
            self.rotate(self.max_size_mb, self.backup_count)
            self.processed_since_check = 0
        
        return super().handle(record)


class asynchandler(loghandler):
    """异步处理器"""
    
    def __init__(
        self,
        base_handler: loghandler,
        max_queue_size: int = 10000,
        worker_count: int = 1,
        drop_when_full: bool = false
    ):
        super().__init__(base_handler.level, base_handler.formatter, base_handler.filters)
        self.base_handler = base_handler
        
        # 队列设置
        self.max_queue_size = max_queue_size
        self.queue = queue(maxsize=max_queue_size)
        self.drop_when_full = drop_when_full
        
        # 工作线程
        self.worker_count = worker_count
        self.executor = threadpoolexecutor(
            max_workers=worker_count,
            thread_name_prefix="asynclogger"
        )
        
        # 启动消费者
        self.running = true
        for i in range(worker_count):
            self.executor.submit(self._worker_loop)
    
    def emit(self, record: logrecord):
        """异步处理日志记录"""
        try:
            if self.drop_when_full and self.queue.full():
                self.dropped_count += 1
                return
            
            self.queue.put_nowait(record)
            
        except exception as e:
            # 队列满或其他错误
            self.dropped_count += 1
            print(f"异步日志队列错误: {e}")
    
    def _worker_loop(self):
        """工作线程循环"""
        while self.running:
            try:
                # 阻塞获取日志记录(带超时)
                try:
                    record = self.queue.get(timeout=1.0)
                except empty:
                    continue
                
                # 使用基础处理器处理
                self.base_handler.handle(record)
                
                # 标记任务完成
                self.queue.task_done()
                
            except exception as e:
                print(f"异步日志工作线程错误: {e}")
    
    def shutdown(self, timeout: float = 5.0):
        """关闭异步处理器"""
        self.running = false
        
        # 等待队列清空
        self.queue.join()
        
        # 关闭执行器
        self.executor.shutdown(wait=true, timeout=timeout)
        
        # 关闭基础处理器
        if hasattr(self.base_handler, 'close'):
            self.base_handler.close()
    
    def get_stats(self) -> dict[str, any]:
        """获取统计信息(包括队列信息)"""
        base_stats = super().get_stats()
        base_stats.update({
            'queue_size': self.queue.qsize(),
            'queue_max_size': self.max_queue_size,
            'queue_full': self.queue.full(),
            'worker_count': self.worker_count,
            'is_running': self.running,
            'base_handler_stats': self.base_handler.get_stats()
        })
        return base_stats


class batchhandler(loghandler):
    """批量处理器"""
    
    def __init__(
        self,
        base_handler: loghandler,
        batch_size: int = 100,
        flush_interval: float = 1.0,  # 秒
        compression: bool = false
    ):
        super().__init__(base_handler.level, base_handler.formatter, base_handler.filters)
        self.base_handler = base_handler
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.compression = compression
        
        # 批处理缓冲区
        self.buffer: list[logrecord] = []
        self.last_flush_time = time.time()
        
        # 启动定时刷新线程
        self.flush_thread = threading.thread(target=self._flush_loop, daemon=true)
        self.running = true
        self.flush_thread.start()
    
    def emit(self, record: logrecord):
        """添加到批处理缓冲区"""
        self.buffer.append(record)
        
        # 检查是否需要刷新
        if (len(self.buffer) >= self.batch_size or 
            (time.time() - self.last_flush_time) >= self.flush_interval):
            self._flush_buffer()
    
    def _flush_buffer(self):
        """刷新缓冲区"""
        if not self.buffer:
            return
        
        # 准备批量数据
        batch_records = self.buffer.copy()
        self.buffer.clear()
        
        try:
            # 批量处理
            if self.compression:
                # 压缩批量数据
                batch_data = self._compress_batch(batch_records)
                # 这里需要基础处理器支持批量数据
                # 简化实现:逐个处理
                for record in batch_records:
                    self.base_handler.handle(record)
            else:
                for record in batch_records:
                    self.base_handler.handle(record)
            
            self.last_flush_time = time.time()
            
        except exception as e:
            print(f"批量日志处理错误: {e}")
            # 错误处理:将记录放回缓冲区(避免丢失)
            self.buffer.extend(batch_records)
    
    def _compress_batch(self, records: list[logrecord]) -> bytes:
        """压缩批量数据"""
        batch_json = json.dumps([r.to_dict() for r in records])
        return zlib.compress(batch_json.encode())
    
    def _flush_loop(self):
        """定时刷新循环"""
        while self.running:
            time.sleep(self.flush_interval)
            self._flush_buffer()
    
    def shutdown(self):
        """关闭批量处理器"""
        self.running = false
        self._flush_buffer()  # 最后一次刷新
        
        if self.flush_thread.is_alive():
            self.flush_thread.join(timeout=2.0)
        
        if hasattr(self.base_handler, 'shutdown'):
            self.base_handler.shutdown()
    
    def get_stats(self) -> dict[str, any]:
        """获取统计信息"""
        base_stats = super().get_stats()
        base_stats.update({
            'buffer_size': len(self.buffer),
            'batch_size': self.batch_size,
            'flush_interval': self.flush_interval,
            'compression_enabled': self.compression,
            'base_handler_stats': self.base_handler.get_stats()
        })
        return base_stats

4.3 完整的日志系统

class structuredlogger:
    """结构化日志记录器"""
    
    def __init__(
        self,
        name: str,
        level: loglevel = loglevel.info,
        handlers: optional[list[loghandler]] = none,
        context: optional[logcontext] = none,
        capture_stacktrace: bool = false,
        enable_performance_stats: bool = false
    ):
        self.name = name
        self.level = level
        self.handlers = handlers or []
        self.context = context or logcontext()
        self.capture_stacktrace = capture_stacktrace
        self.enable_performance_stats = enable_performance_stats
        
        # 性能统计
        self.stats = {
            'log_count': defaultdict(int),
            'last_log_time': none,
            'total_log_time_ns': 0,
            'error_count': 0
        }
        
        # 缓存调用者信息(性能优化)
        self._caller_cache = {}
    
    def _get_caller_info(self, depth: int = 3) -> dict[str, any]:
        """获取调用者信息"""
        try:
            # 使用缓存提高性能
            cache_key = threading.get_ident()
            if cache_key in self._caller_cache:
                return self._caller_cache[cache_key]
            
            # 获取调用堆栈
            frame = inspect.currentframe()
            for _ in range(depth):
                if frame is none:
                    break
                frame = frame.f_back
            
            if frame is none:
                return {}
            
            # 提取信息
            info = {
                'filename': frame.f_code.co_filename,
                'function': frame.f_code.co_name,
                'line_no': frame.f_lineno,
                'module': frame.f_globals.get('__name__', '')
            }
            
            # 缓存
            self._caller_cache[cache_key] = info
            return info
            
        except exception:
            return {}
        finally:
            # 清理引用
            del frame
    
    def _create_record(
        self,
        level: loglevel,
        message: str,
        extra: optional[dict[str, any]] = none,
        error_info: optional[dict[str, any]] = none
    ) -> logrecord:
        """创建日志记录"""
        
        # 基础时间
        now = datetime.utcnow()
        
        # 调用者信息
        caller_info = self._get_caller_info() if self.capture_stacktrace else {}
        
        # 构建记录
        record = logrecord(
            timestamp=now.isoformat() + 'z',
            level=level.name,
            message=message,
            logger_name=self.name,
            **caller_info
        )
        
        # 添加线程信息
        record.thread_id = threading.get_ident()
        record.thread_name = threading.current_thread().name
        record.process_id = os.getpid()
        
        # 添加上下文
        context_data = self.context.get_all_context()
        for key, value in context_data.items():
            if hasattr(record, key):
                setattr(record, key, value)
            else:
                record.extra[key] = value
        
        # 添加额外字段
        if extra:
            record.extra.update(extra)
        
        # 添加错误信息
        if error_info:
            record.error_type = error_info.get('type')
            record.error_message = error_info.get('message')
            record.stack_trace = error_info.get('stack_trace')
        
        return record
    
    def log(
        self,
        level: loglevel,
        message: str,
        extra: optional[dict[str, any]] = none,
        **kwargs
    ):
        """记录日志"""
        
        start_time = time.time_ns() if self.enable_performance_stats else 0
        
        try:
            # 检查级别
            if level.value < self.level.value:
                return
            
            # 合并额外字段
            all_extra = extra.copy() if extra else {}
            all_extra.update(kwargs)
            
            # 错误信息处理
            error_info = none
            if 'exc_info' in kwargs and kwargs['exc_info']:
                exc_type, exc_value, exc_traceback = kwargs['exc_info']
                if exc_type:
                    error_info = {
                        'type': exc_type.__name__,
                        'message': str(exc_value),
                        'stack_trace': traceback.format_exc()
                    }
            
            # 创建记录
            record = self._create_record(level, message, all_extra, error_info)
            
            # 处理记录
            for handler in self.handlers:
                handler.handle(record)
            
            # 更新统计
            self.stats['log_count'][level.name] += 1
            self.stats['last_log_time'] = record.timestamp
            
            if level == loglevel.error or level == loglevel.fatal:
                self.stats['error_count'] += 1
            
        except exception as e:
            # 记录器内部错误处理
            print(f"日志记录错误: {e}")
            self.stats['error_count'] += 1
        
        finally:
            # 性能统计
            if self.enable_performance_stats and start_time:
                duration_ns = time.time_ns() - start_time
                self.stats['total_log_time_ns'] += duration_ns
    
    # 便捷方法
    def trace(self, message: str, **kwargs):
        """记录trace级别日志"""
        self.log(loglevel.trace, message, **kwargs)
    
    def debug(self, message: str, **kwargs):
        """记录debug级别日志"""
        self.log(loglevel.debug, message, **kwargs)
    
    def info(self, message: str, **kwargs):
        """记录info级别日志"""
        self.log(loglevel.info, message, **kwargs)
    
    def warn(self, message: str, **kwargs):
        """记录warn级别日志"""
        self.log(loglevel.warn, message, **kwargs)
    
    def error(self, message: str, **kwargs):
        """记录error级别日志"""
        self.log(loglevel.error, message, **kwargs)
    
    def fatal(self, message: str, **kwargs):
        """记录fatal级别日志"""
        self.log(loglevel.fatal, message, **kwargs)
    
    def exception(self, message: str, exc: optional[exception] = none, **kwargs):
        """记录异常"""
        if exc is none:
            # 捕获当前异常
            exc_info = sys.exc_info()
        else:
            exc_info = (type(exc), exc, exc.__traceback__)
        
        kwargs['exc_info'] = exc_info
        self.log(loglevel.error, message, **kwargs)
    
    def with_context(self, **kwargs):
        """添加上下文"""
        return logcontextmanager(self.context, kwargs)
    
    def add_handler(self, handler: loghandler):
        """添加处理器"""
        self.handlers.append(handler)
    
    def remove_handler(self, handler: loghandler):
        """移除处理器"""
        if handler in self.handlers:
            self.handlers.remove(handler)
    
    def get_stats(self) -> dict[str, any]:
        """获取统计信息"""
        handler_stats = [h.get_stats() for h in self.handlers]
        
        stats = {
            'logger_name': self.name,
            'level': self.level.name,
            'handler_count': len(self.handlers),
            'log_counts': dict(self.stats['log_count']),
            'error_count': self.stats['error_count'],
            'handler_stats': handler_stats
        }
        
        if self.enable_performance_stats:
            total_logs = sum(self.stats['log_count'].values())
            if total_logs > 0:
                avg_time_ns = self.stats['total_log_time_ns'] / total_logs
                stats['performance'] = {
                    'total_time_ns': self.stats['total_log_time_ns'],
                    'avg_time_ns': avg_time_ns,
                    'avg_time_ms': avg_time_ns / 1_000_000
                }
        
        return stats


class logmanager:
    """日志管理器"""
    
    _instance = none
    _lock = threading.lock()
    
    def __new__(cls):
        with cls._lock:
            if cls._instance is none:
                cls._instance = super().__new__(cls)
                cls._instance._initialized = false
            return cls._instance
    
    def __init__(self):
        if self._initialized:
            return
        
        self._loggers: dict[str, structuredlogger] = {}
        self._default_config: dict[str, any] = {}
        self._global_context = logcontext()
        self._initialized = true
        
        # 默认配置
        self._setup_defaults()
    
    def _setup_defaults(self):
        """设置默认配置"""
        self._default_config = {
            'level': loglevel.info,
            'handlers': [
                consolehandler(
                    level=loglevel.info,
                    formatter=jsonformatter(indent=none)
                )
            ],
            'capture_stacktrace': false,
            'enable_performance_stats': false
        }
        
        # 设置全局上下文
        import socket
        self._global_context.set('hostname', socket.gethostname(), global_scope=true)
        self._global_context.set('process_id', os.getpid(), global_scope=true)
    
    def get_logger(
        self,
        name: str,
        level: optional[loglevel] = none,
        handlers: optional[list[loghandler]] = none,
        capture_stacktrace: optional[bool] = none,
        enable_performance_stats: optional[bool] = none
    ) -> structuredlogger:
        """获取或创建日志记录器"""
        
        if name in self._loggers:
            return self._loggers[name]
        
        # 使用配置或默认值
        config = self._default_config.copy()
        
        if level is not none:
            config['level'] = level
        if handlers is not none:
            config['handlers'] = handlers
        if capture_stacktrace is not none:
            config['capture_stacktrace'] = capture_stacktrace
        if enable_performance_stats is not none:
            config['enable_performance_stats'] = enable_performance_stats
        
        # 创建日志记录器
        logger = structuredlogger(
            name=name,
            context=self._global_context,
            **config
        )
        
        self._loggers[name] = logger
        return logger
    
    def configure(
        self,
        config: dict[str, any],
        name: optional[str] = none
    ):
        """配置日志记录器"""
        
        if name:
            # 配置特定记录器
            if name in self._loggers:
                logger = self._loggers[name]
                
                if 'level' in config:
                    logger.level = loglevel.from_string(config['level'])
                
                if 'handlers' in config:
                    # 这里需要根据配置创建处理器
                    logger.handlers = self._create_handlers_from_config(config['handlers'])
                
                if 'capture_stacktrace' in config:
                    logger.capture_stacktrace = config['capture_stacktrace']
                
                if 'enable_performance_stats' in config:
                    logger.enable_performance_stats = config['enable_performance_stats']
        
        else:
            # 更新默认配置
            self._default_config.update(config)
            
            # 更新现有记录器
            for logger in self._loggers.values():
                self.configure(config, logger.name)
    
    def _create_handlers_from_config(self, handlers_config: list[dict]) -> list[loghandler]:
        """从配置创建处理器"""
        handlers = []
        
        for handler_config in handlers_config:
            handler_type = handler_config.get('type', 'console')
            
            try:
                if handler_type == 'console':
                    handler = consolehandler(
                        level=loglevel.from_string(handler_config.get('level', 'info')),
                        formatter=self._create_formatter_from_config(
                            handler_config.get('formatter', {})
                        ),
                        use_colors=handler_config.get('use_colors', true)
                    )
                
                elif handler_type == 'file':
                    handler = filehandler(
                        filename=handler_config['filename'],
                        level=loglevel.from_string(handler_config.get('level', 'info')),
                        formatter=self._create_formatter_from_config(
                            handler_config.get('formatter', {})
                        )
                    )
                
                elif handler_type == 'rotating_file':
                    handler = rotatingfilehandler(
                        filename=handler_config['filename'],
                        level=loglevel.from_string(handler_config.get('level', 'info')),
                        formatter=self._create_formatter_from_config(
                            handler_config.get('formatter', {})
                        ),
                        max_size_mb=handler_config.get('max_size_mb', 100),
                        backup_count=handler_config.get('backup_count', 5)
                    )
                
                elif handler_type == 'async':
                    base_handler_config = handler_config.get('base_handler', {})
                    base_handler = self._create_handlers_from_config([base_handler_config])[0]
                    
                    handler = asynchandler(
                        base_handler=base_handler,
                        max_queue_size=handler_config.get('max_queue_size', 10000),
                        worker_count=handler_config.get('worker_count', 1),
                        drop_when_full=handler_config.get('drop_when_full', false)
                    )
                
                else:
                    raise valueerror(f"未知的处理器类型: {handler_type}")
                
                # 添加过滤器
                filters_config = handler_config.get('filters', [])
                for filter_config in filters_config:
                    filter_type = filter_config.get('type', 'level')
                    
                    if filter_type == 'level':
                        handler.filters.append(levelfilter(
                            loglevel.from_string(filter_config.get('min_level', 'info'))
                        ))
                    elif filter_type == 'rate_limit':
                        handler.filters.append(ratelimitfilter(
                            max_per_second=filter_config.get('max_per_second', 10),
                            window_seconds=filter_config.get('window_seconds', 1)
                        ))
                    elif filter_type == 'sensitive_data':
                        handler.filters.append(sensitivedatafilter())
                
                handlers.append(handler)
                
            except exception as e:
                print(f"创建处理器失败 {handler_type}: {e}")
                continue
        
        return handlers
    
    def _create_formatter_from_config(self, formatter_config: dict) -> structuredformatter:
        """从配置创建格式化器"""
        formatter_type = formatter_config.get('type', 'json')
        
        if formatter_type == 'json':
            return jsonformatter(
                indent=formatter_config.get('indent'),
                ensure_ascii=formatter_config.get('ensure_ascii', false),
                sort_keys=formatter_config.get('sort_keys', false)
            )
        elif formatter_type == 'ndjson':
            return ndjsonformatter(
                indent=formatter_config.get('indent'),
                ensure_ascii=formatter_config.get('ensure_ascii', false),
                sort_keys=formatter_config.get('sort_keys', false)
            )
        else:
            # 默认使用json
            return jsonformatter()
    
    def set_global_context(self, **kwargs):
        """设置全局上下文"""
        self._global_context.update(kwargs, global_scope=true)
    
    def get_global_context(self) -> dict[str, any]:
        """获取全局上下文"""
        return self._global_context.get_all_context()
    
    def shutdown(self):
        """关闭所有日志记录器"""
        for logger in self._loggers.values():
            for handler in logger.handlers:
                if hasattr(handler, 'shutdown'):
                    handler.shutdown()
                elif hasattr(handler, 'close'):
                    handler.close()
        
        self._loggers.clear()
    
    def get_all_stats(self) -> dict[str, any]:
        """获取所有统计信息"""
        logger_stats = {}
        total_logs = 0
        total_errors = 0
        
        for name, logger in self._loggers.items():
            stats = logger.get_stats()
            logger_stats[name] = stats
            
            total_logs += sum(stats['log_counts'].values())
            total_errors += stats['error_count']
        
        return {
            'logger_count': len(self._loggers),
            'total_logs': total_logs,
            'total_errors': total_errors,
            'loggers': logger_stats,
            'global_context': self.get_global_context()
        }

5. 高级特性实现

5.1 分布式追踪集成

class distributedtracecontext:
    """分布式追踪上下文"""
    
    def __init__(self):
        self._local = threading.local()
    
    @property
    def current(self) -> dict[str, any]:
        """获取当前追踪上下文"""
        if not hasattr(self._local, 'trace_context'):
            self._local.trace_context = self._generate_new_context()
        return self._local.trace_context
    
    def _generate_new_context(self) -> dict[str, any]:
        """生成新的追踪上下文"""
        return {
            'trace_id': self._generate_trace_id(),
            'span_id': self._generate_span_id(),
            'parent_span_id': none,
            'sampled': true,
            'flags': 0
        }
    
    def _generate_trace_id(self) -> str:
        """生成追踪id"""
        return uuid.uuid4().hex
    
    def _generate_span_id(self) -> str:
        """生成跨度id"""
        return uuid.uuid4().hex[:16]
    
    def start_span(self, name: str, **attributes) -> 'span':
        """开始新的跨度"""
        parent_context = self.current.copy()
        
        new_context = parent_context.copy()
        new_context['span_id'] = self._generate_span_id()
        new_context['parent_span_id'] = parent_context['span_id']
        new_context['span_name'] = name
        new_context['start_time'] = time.time_ns()
        new_context['attributes'] = attributes
        
        # 保存父上下文
        if not hasattr(self._local, 'trace_stack'):
            self._local.trace_stack = []
        self._local.trace_stack.append(parent_context)
        
        # 设置新上下文
        self._local.trace_context = new_context
        
        return span(self, new_context)
    
    def end_span(self, context: dict[str, any], status: str = "ok", **attributes):
        """结束跨度"""
        if not hasattr(self._local, 'trace_stack') or not self._local.trace_stack:
            return
        
        # 计算持续时间
        end_time = time.time_ns()
        start_time = context.get('start_time', end_time)
        duration_ns = end_time - start_time
        
        # 创建跨度记录
        span_record = {
            'trace_id': context.get('trace_id'),
            'span_id': context.get('span_id'),
            'parent_span_id': context.get('parent_span_id'),
            'name': context.get('span_name', 'unknown'),
            'start_time': start_time,
            'end_time': end_time,
            'duration_ns': duration_ns,
            'status': status,
            'attributes': {**context.get('attributes', {}), **attributes}
        }
        
        # 恢复父上下文
        self._local.trace_context = self._local.trace_stack.pop()
        
        return span_record
    
    def get_current_span_id(self) -> optional[str]:
        """获取当前跨度id"""
        return self.current.get('span_id')
    
    def get_current_trace_id(self) -> optional[str]:
        """获取当前追踪id"""
        return self.current.get('trace_id')


class span:
    """追踪跨度"""
    
    def __init__(self, tracer: distributedtracecontext, context: dict[str, any]):
        self.tracer = tracer
        self.context = context
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        status = "error" if exc_type else "ok"
        self.tracer.end_span(self.context, status)
    
    def set_attribute(self, key: str, value: any):
        """设置跨度属性"""
        if 'attributes' not in self.context:
            self.context['attributes'] = {}
        self.context['attributes'][key] = value
    
    def set_status(self, status: str):
        """设置跨度状态"""
        self.context['status'] = status


class tracinglogger(structuredlogger):
    """集成追踪的日志记录器"""
    
    def __init__(
        self,
        name: str,
        tracer: optional[distributedtracecontext] = none,
        **kwargs
    ):
        super().__init__(name, **kwargs)
        self.tracer = tracer or distributedtracecontext()
        
        # 自动添加上下文
        self.context.set('tracer', self.tracer)
    
    def _create_record(self, *args, **kwargs) -> logrecord:
        """创建记录(添加追踪信息)"""
        record = super()._create_record(*args, **kwargs)
        
        # 添加追踪信息
        record.trace_id = self.tracer.get_current_trace_id()
        record.span_id = self.tracer.get_current_span_id()
        
        return record
    
    def trace_span(self, name: str, **attributes):
        """创建追踪跨度上下文管理器"""
        return self.tracer.start_span(name, **attributes)
    
    def log_with_span(
        self,
        level: loglevel,
        message: str,
        span_name: optional[str] = none,
        **kwargs
    ):
        """在追踪跨度中记录日志"""
        
        if span_name:
            # 创建新跨度
            with self.tracer.start_span(span_name):
                self.log(level, message, **kwargs)
        else:
            # 使用当前跨度
            self.log(level, message, **kwargs)

5.2 性能监控集成

class performancemonitor:
    """性能监控器"""
    
    def __init__(self, logger: structuredlogger):
        self.logger = logger
        self.metrics = defaultdict(list)
        self.thresholds = {}
    
    def measure(self, operation: str):
        """测量操作性能"""
        return performancetimer(self, operation)
    
    def record_metric(
        self,
        name: str,
        value: float,
        unit: str = "ms",
        tags: optional[dict[str, str]] = none
    ):
        """记录性能指标"""
        
        timestamp = time.time_ns()
        metric_record = {
            'name': name,
            'value': value,
            'unit': unit,
            'timestamp': timestamp,
            'tags': tags or {}
        }
        
        # 存储指标
        self.metrics[name].append(metric_record)
        
        # 检查阈值
        if name in self.thresholds:
            threshold = self.thresholds[name]
            if value > threshold:
                self.logger.warn(
                    f"性能阈值超过: {name} = {value}{unit} > {threshold}{unit}",
                    metric=metric_record
                )
        
        # 记录指标日志
        self.logger.debug(
            f"性能指标: {name}",
            metric=metric_record,
            extra={'metric_type': 'performance'}
        )
        
        return metric_record
    
    def set_threshold(self, metric_name: str, threshold: float):
        """设置性能阈值"""
        self.thresholds[metric_name] = threshold
    
    def get_statistics(self, metric_name: str) -> dict[str, float]:
        """获取统计信息"""
        records = self.metrics.get(metric_name, [])
        
        if not records:
            return {}
        
        values = [r['value'] for r in records]
        
        return {
            'count': len(values),
            'mean': sum(values) / len(values),
            'min': min(values),
            'max': max(values),
            'p50': self._percentile(values, 50),
            'p95': self._percentile(values, 95),
            'p99': self._percentile(values, 99)
        }
    
    def _percentile(self, values: list[float], p: float) -> float:
        """计算百分位数"""
        if not values:
            return 0
        
        sorted_values = sorted(values)
        k = (len(sorted_values) - 1) * (p / 100)
        f = int(k)
        c = k - f
        
        if f + 1 < len(sorted_values):
            return sorted_values[f] + c * (sorted_values[f + 1] - sorted_values[f])
        else:
            return sorted_values[f]
    
    def report_summary(self):
        """报告性能摘要"""
        summary = {}
        
        for metric_name in self.metrics:
            stats = self.get_statistics(metric_name)
            summary[metric_name] = stats
        
        self.logger.info(
            "性能监控摘要",
            performance_summary=summary,
            extra={'report_type': 'performance_summary'}
        )
        
        return summary


class performancetimer:
    """性能计时器"""
    
    def __init__(self, monitor: performancemonitor, operation: str):
        self.monitor = monitor
        self.operation = operation
        self.start_time = none
        self.tags = {}
    
    def __enter__(self):
        self.start_time = time.time_ns()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.start_time is none:
            return
        
        end_time = time.time_ns()
        duration_ns = end_time - self.start_time
        duration_ms = duration_ns / 1_000_000
        
        self.monitor.record_metric(
            name=self.operation,
            value=duration_ms,
            unit="ms",
            tags=self.tags
        )
    
    def add_tag(self, key: str, value: str):
        """添加标签"""
        self.tags[key] = value
        return self

5.3 日志采样与聚合

class logsampler:
    """日志采样器"""
    
    def __init__(
        self,
        base_logger: structuredlogger,
        sample_rate: float = 1.0,  # 采样率 0.0-1.0
        adaptive_sampling: bool = false,
        min_sample_rate: float = 0.01,
        max_sample_rate: float = 1.0
    ):
        self.base_logger = base_logger
        self.sample_rate = sample_rate
        self.adaptive_sampling = adaptive_sampling
        self.min_sample_rate = min_sample_rate
        self.max_sample_rate = max_sample_rate
        
        # 采样统计
        self.sampled_count = 0
        self.total_count = 0
        
        # 自适应采样状态
        self.current_rate = sample_rate
        self.last_adjust_time = time.time()
    
    def should_sample(self, level: loglevel) -> bool:
        """决定是否采样"""
        self.total_count += 1
        
        # 高等级日志总是采样
        if level in [loglevel.error, loglevel.fatal]:
            self.sampled_count += 1
            return true
        
        # 计算当前采样率
        if self.adaptive_sampling:
            self._adjust_sample_rate()
        
        # 随机采样
        import random
        if random.random() <= self.current_rate:
            self.sampled_count += 1
            return true
        
        return false
    
    def _adjust_sample_rate(self):
        """调整采样率"""
        current_time = time.time()
        
        # 每分钟调整一次
        if current_time - self.last_adjust_time < 60:
            return
        
        # 计算当前实际采样率
        if self.total_count == 0:
            actual_rate = 0
        else:
            actual_rate = self.sampled_count / self.total_count
        
        # 调整采样率
        target_rate = self.sample_rate
        
        if actual_rate < target_rate * 0.8:
            # 采样不足,提高采样率
            self.current_rate = min(self.current_rate * 1.2, self.max_sample_rate)
        elif actual_rate > target_rate * 1.2:
            # 采样过多,降低采样率
            self.current_rate = max(self.current_rate * 0.8, self.min_sample_rate)
        
        # 重置统计
        self.sampled_count = 0
        self.total_count = 0
        self.last_adjust_time = current_time
    
    def log(self, level: loglevel, message: str, **kwargs):
        """记录日志(带采样)"""
        if self.should_sample(level):
            self.base_logger.log(level, message, **kwargs)


class logaggregator:
    """日志聚合器"""
    
    def __init__(
        self,
        base_logger: structuredlogger,
        aggregation_window: float = 5.0,  # 聚合窗口(秒)
        max_aggregation_count: int = 1000  # 最大聚合条数
    ):
        self.base_logger = base_logger
        self.aggregation_window = aggregation_window
        self.max_aggregation_count = max_aggregation_count
        
        # 聚合缓冲区
        self.buffer: dict[str, list[logrecord]] = defaultdict(list)
        self.last_flush_time = time.time()
        
        # 启动定时刷新
        self.flush_thread = threading.thread(target=self._flush_loop, daemon=true)
        self.running = true
        self.flush_thread.start()
    
    def _get_aggregation_key(self, record: logrecord) -> str:
        """获取聚合键"""
        # 基于消息和级别聚合
        key_parts = [
            record.level,
            record.message,
            record.logger_name,
            str(record.error_type) if record.error_type else "",
        ]
        
        return hashlib.md5("|".join(key_parts).encode()).hexdigest()
    
    def log(self, level: loglevel, message: str, **kwargs):
        """记录日志(带聚合)"""
        # 创建记录但不立即发送
        record = self.base_logger._create_record(level, message, kwargs.get('extra'))
        
        # 添加到缓冲区
        aggregation_key = self._get_aggregation_key(record)
        self.buffer[aggregation_key].append(record)
        
        # 检查是否达到聚合上限
        total_count = sum(len(records) for records in self.buffer.values())
        if total_count >= self.max_aggregation_count:
            self._flush_buffer()
    
    def _flush_buffer(self):
        """刷新缓冲区"""
        if not self.buffer:
            return
        
        flushed_records = []
        
        for aggregation_key, records in self.buffer.items():
            if not records:
                continue
            
            # 取第一条记录作为模板
            template_record = records[0]
            
            # 创建聚合记录
            aggregated_record = logrecord(
                timestamp=datetime.utcnow().isoformat() + 'z',
                level=template_record.level,
                message=template_record.message + f" (aggregated {len(records)} times)",
                logger_name=template_record.logger_name,
                extra={
                    **template_record.extra,
                    'aggregated_count': len(records),
                    'aggregation_key': aggregation_key,
                    'first_occurrence': records[0].timestamp,
                    'last_occurrence': records[-1].timestamp
                }
            )
            
            flushed_records.append(aggregated_record)
        
        # 发送聚合记录
        for record in flushed_records:
            self.base_logger._log_direct(record)
        
        # 清空缓冲区
        self.buffer.clear()
        self.last_flush_time = time.time()
    
    def _flush_loop(self):
        """定时刷新循环"""
        while self.running:
            time.sleep(self.aggregation_window)
            self._flush_buffer()
    
    def shutdown(self):
        """关闭聚合器"""
        self.running = false
        self._flush_buffer()
        
        if self.flush_thread.is_alive():
            self.flush_thread.join(timeout=2.0)

6. 配置与使用示例

6.1 配置管理系统

import yaml
import toml
from pathlib import path


class loggingconfig:
    """日志配置管理器"""
    
    config_schema = {
        'type': 'object',
        'properties': {
            'version': {'type': 'string'},
            'defaults': {
                'type': 'object',
                'properties': {
                    'level': {'type': 'string', 'enum': ['trace', 'debug', 'info', 'warn', 'error', 'fatal']},
                    'capture_stacktrace': {'type': 'boolean'},
                    'enable_performance_stats': {'type': 'boolean'}
                }
            },
            'loggers': {
                'type': 'object',
                'additionalproperties': {
                    'type': 'object',
                    'properties': {
                        'level': {'type': 'string', 'enum': ['trace', 'debug', 'info', 'warn', 'error', 'fatal']},
                        'handlers': {'type': 'array', 'items': {'type': 'string'}},
                        'propagate': {'type': 'boolean'}
                    }
                }
            },
            'handlers': {
                'type': 'object',
                'additionalproperties': {
                    'type': 'object',
                    'properties': {
                        'type': {'type': 'string', 'enum': ['console', 'file', 'rotating_file', 'async', 'batch']},
                        'level': {'type': 'string', 'enum': ['trace', 'debug', 'info', 'warn', 'error', 'fatal']},
                        'formatter': {'type': 'string'},
                        'filters': {
                            'type': 'array',
                            'items': {
                                'type': 'object',
                                'properties': {
                                    'type': {'type': 'string', 'enum': ['level', 'rate_limit', 'sensitive_data']},
                                    'max_per_second': {'type': 'number', 'minimum': 1},
                                    'window_seconds': {'type': 'number', 'minimum': 0.1}
                                }
                            }
                        },
                        'filename': {'type': 'string'},
                        'max_size_mb': {'type': 'number', 'minimum': 1},
                        'backup_count': {'type': 'integer', 'minimum': 1},
                        'max_queue_size': {'type': 'integer', 'minimum': 100},
                        'worker_count': {'type': 'integer', 'minimum': 1},
                        'drop_when_full': {'type': 'boolean'},
                        'batch_size': {'type': 'integer', 'minimum': 1},
                        'flush_interval': {'type': 'number', 'minimum': 0.1},
                        'compression': {'type': 'boolean'},
                        'use_colors': {'type': 'boolean'}
                    },
                    'required': ['type']
                }
            },
            'formatters': {
                'type': 'object',
                'additionalproperties': {
                    'type': 'object',
                    'properties': {
                        'type': {'type': 'string', 'enum': ['json', 'ndjson']},
                        'indent': {'type': ['integer', 'null']},
                        'ensure_ascii': {'type': 'boolean'},
                        'sort_keys': {'type': 'boolean'}
                    }
                }
            }
        },
        'required': ['version']
    }
    
    def __init__(self, config_path: optional[union[str, path]] = none):
        self.config = {}
        self.config_path = path(config_path) if config_path else none
        
        if config_path and path(config_path).exists():
            self.load_config(config_path)
        else:
            self._load_default_config()
    
    def _load_default_config(self):
        """加载默认配置"""
        self.config = {
            'version': '1.0',
            'defaults': {
                'level': 'info',
                'capture_stacktrace': false,
                'enable_performance_stats': false
            },
            'formatters': {
                'json': {
                    'type': 'json',
                    'indent': none,
                    'ensure_ascii': false,
                    'sort_keys': false
                },
                'json_pretty': {
                    'type': 'json',
                    'indent': 2,
                    'ensure_ascii': false,
                    'sort_keys': true
                },
                'ndjson': {
                    'type': 'ndjson',
                    'indent': none,
                    'ensure_ascii': false,
                    'sort_keys': false
                }
            },
            'handlers': {
                'console': {
                    'type': 'console',
                    'level': 'info',
                    'formatter': 'json',
                    'use_colors': true
                },
                'console_pretty': {
                    'type': 'console',
                    'level': 'info',
                    'formatter': 'json_pretty',
                    'use_colors': true
                },
                'file_app': {
                    'type': 'file',
                    'level': 'info',
                    'formatter': 'ndjson',
                    'filename': 'logs/app.log'
                },
                'file_error': {
                    'type': 'file',
                    'level': 'error',
                    'formatter': 'json_pretty',
                    'filename': 'logs/error.log'
                },
                'async_console': {
                    'type': 'async',
                    'level': 'info',
                    'base_handler': {
                        'type': 'console',
                        'formatter': 'json'
                    },
                    'max_queue_size': 10000,
                    'worker_count': 2,
                    'drop_when_full': false
                }
            },
            'loggers': {
                'root': {
                    'level': 'info',
                    'handlers': ['console'],
                    'propagate': false
                },
                'app': {
                    'level': 'debug',
                    'handlers': ['console_pretty', 'file_app'],
                    'propagate': false
                },
                'app.error': {
                    'level': 'error',
                    'handlers': ['file_error'],
                    'propagate': true
                },
                'app.performance': {
                    'level': 'info',
                    'handlers': ['async_console'],
                    'propagate': false
                }
            }
        }
    
    def load_config(self, config_path: union[str, path]):
        """加载配置文件"""
        config_path = path(config_path)
        
        if not config_path.exists():
            raise filenotfounderror(f"配置文件不存在: {config_path}")
        
        # 根据文件扩展名确定格式
        suffix = config_path.suffix.lower()
        
        try:
            with open(config_path, 'r', encoding='utf-8') as f:
                content = f.read()
            
            if suffix == '.json':
                config = json.loads(content)
            elif suffix in ['.yaml', '.yml']:
                config = yaml.safe_load(content)
            elif suffix == '.toml':
                config = toml.loads(content)
            else:
                raise valueerror(f"不支持的配置文件格式: {suffix}")
            
            # 验证配置
            if self.validate_config(config):
                self.config = config
                self.config_path = config_path
                print(f"配置文件加载成功: {config_path}")
            else:
                raise valueerror("配置文件验证失败")
                
        except exception as e:
            print(f"配置文件加载失败: {e}")
            raise
    
    def validate_config(self, config: dict) -> bool:
        """验证配置"""
        # 简化验证 - 实际生产环境应该使用json schema
        required_keys = ['version', 'defaults', 'handlers', 'loggers']
        
        for key in required_keys:
            if key not in config:
                print(f"配置缺少必需键: {key}")
                return false
        
        return true
    
    def get_logger_config(self, logger_name: str) -> dict[str, any]:
        """获取日志记录器配置"""
        
        # 查找最具体的配置
        config = self.config.get('loggers', {}).get(logger_name)
        
        if config:
            return config
        
        # 查找父记录器配置
        parts = logger_name.split('.')
        for i in range(len(parts) - 1, 0, -1):
            parent_name = '.'.join(parts[:i])
            parent_config = self.config.get('loggers', {}).get(parent_name)
            
            if parent_config and parent_config.get('propagate', false):
                return parent_config
        
        # 返回根配置
        return self.config.get('loggers', {}).get('root', {})
    
    def get_handler_config(self, handler_name: str) -> dict[str, any]:
        """获取处理器配置"""
        return self.config.get('handlers', {}).get(handler_name, {})
    
    def get_formatter_config(self, formatter_name: str) -> dict[str, any]:
        """获取格式化器配置"""
        return self.config.get('formatters', {}).get(formatter_name, {})
    
    def save_config(self, config_path: optional[union[str, path]] = none):
        """保存配置"""
        save_path = path(config_path) if config_path else self.config_path
        
        if not save_path:
            raise valueerror("未指定配置保存路径")
        
        # 确保目录存在
        save_path.parent.mkdir(parents=true, exist_ok=true)
        
        # 根据文件扩展名确定格式
        suffix = save_path.suffix.lower()
        
        try:
            with open(save_path, 'w', encoding='utf-8') as f:
                if suffix == '.json':
                    json.dump(self.config, f, indent=2, ensure_ascii=false)
                elif suffix in ['.yaml', '.yml']:
                    yaml.dump(self.config, f, default_flow_style=false, allow_unicode=true)
                elif suffix == '.toml':
                    toml.dump(self.config, f)
                else:
                    # 默认使用json
                    json.dump(self.config, f, indent=2, ensure_ascii=false)
            
            print(f"配置文件保存成功: {save_path}")
            
        except exception as e:
            print(f"配置文件保存失败: {e}")
            raise

6.2 使用示例

def logging_system_demo():
    """日志系统演示"""
    
    print("=" * 60)
    print("结构化日志系统演示")
    print("=" * 60)
    
    # 1. 基础使用
    print("\n1. 基础使用")
    print("-" * 40)
    
    # 获取日志管理器单例
    log_manager = logmanager()
    
    # 获取日志记录器
    logger = log_manager.get_logger("demo.app")
    
    # 记录不同级别的日志
    logger.trace("这是一个trace级别日志")
    logger.debug("这是一个debug级别日志")
    logger.info("这是一个info级别日志", user="john", action="login")
    logger.warn("这是一个warn级别日志")
    
    # 记录错误
    try:
        result = 1 / 0
    except exception as e:
        logger.error("除法计算错误", exc=e, dividend=1, divisor=0)
    
    # 2. 上下文管理
    print("\n2. 上下文管理")
    print("-" * 40)
    
    # 添加上下文
    logger.info("没有上下文")
    
    with logger.with_context(request_id="req123", user_id="user456"):
        logger.info("有请求上下文")
        
        with logger.with_context(stage="processing"):
            logger.info("嵌套上下文")
        
        logger.info("回到父上下文")
    
    logger.info("上下文已清除")
    
    # 3. 性能监控
    print("\n3. 性能监控")
    print("-" * 40)
    
    monitor = performancemonitor(logger)
    
    # 测量操作性能
    with monitor.measure("database_query") as timer:
        timer.add_tag("table", "users")
        time.sleep(0.1)  # 模拟数据库查询
    
    with monitor.measure("api_call") as timer:
        timer.add_tag("endpoint", "/api/users")
        time.sleep(0.05)  # 模拟api调用
    
    # 记录自定义指标
    monitor.record_metric("memory_usage", 125.5, unit="mb")
    monitor.record_metric("cpu_usage", 15.2, unit="%")
    
    # 查看统计
    stats = monitor.get_statistics("database_query")
    print(f"数据库查询统计: {stats}")
    
    # 4. 分布式追踪
    print("\n4. 分布式追踪")
    print("-" * 40)
    
    tracing_logger = tracinglogger("demo.tracing")
    
    # 在追踪上下文中记录日志
    with tracing_logger.trace_span("process_request") as span:
        span.set_attribute("method", "post")
        span.set_attribute("path", "/api/data")
        
        tracing_logger.info("开始处理请求")
        
        with tracing_logger.trace_span("validate_input"):
            tracing_logger.debug("验证输入数据")
            time.sleep(0.01)
        
        with tracing_logger.trace_span("process_data"):
            tracing_logger.debug("处理数据")
            time.sleep(0.02)
        
        tracing_logger.info("请求处理完成")
    
    # 5. 高级配置
    print("\n5. 高级配置")
    print("-" * 40)
    
    # 创建自定义配置
    config = loggingconfig()
    
    # 添加自定义处理器
    config.config['handlers']['custom_file'] = {
        'type': 'rotating_file',
        'level': 'info',
        'formatter': 'ndjson',
        'filename': 'logs/custom.log',
        'max_size_mb': 10,
        'backup_count': 3,
        'filters': [
            {
                'type': 'rate_limit',
                'max_per_second': 100
            },
            {
                'type': 'sensitive_data'
            }
        ]
    }
    
    # 添加自定义记录器
    config.config['loggers']['custom'] = {
        'level': 'debug',
        'handlers': ['custom_file'],
        'propagate': false
    }
    
    # 保存配置
    config.save_config("logs/logging_config.yaml")
    
    # 6. 日志采样
    print("\n6. 日志采样")
    print("-" * 40)
    
    # 创建采样日志记录器
    base_logger = log_manager.get_logger("demo.sampling")
    sampler = logsampler(base_logger, sample_rate=0.1)  # 10%采样率
    
    # 记录大量日志
    for i in range(100):
        sampler.log(loglevel.info, f"日志消息 {i}", iteration=i)
    
    print(f"采样统计: {sampler.sampled_count}/{sampler.total_count}")
    
    # 7. 聚合日志
    print("\n7. 日志聚合")
    print("-" * 40)
    
    aggregator = logaggregator(base_logger, aggregation_window=2.0)
    
    # 记录重复日志
    for i in range(50):
        aggregator.log(loglevel.info, "重复的日志消息")
        time.sleep(0.01)
    
    time.sleep(3)  # 等待聚合
    
    # 8. 获取统计信息
    print("\n8. 系统统计")
    print("-" * 40)
    
    stats = log_manager.get_all_stats()
    print(f"总日志记录器: {stats['logger_count']}")
    print(f"总日志条数: {stats['total_logs']}")
    
    for logger_name, logger_stats in stats['loggers'].items():
        print(f"\n{logger_name}:")
        print(f"  日志统计: {logger_stats['log_counts']}")
    
    # 清理
    aggregator.shutdown()
    
    print("\n演示完成!")
    return log_manager


def production_logging_setup():
    """生产环境日志配置"""
    
    # 创建生产配置
    config = {
        'version': '1.0',
        'defaults': {
            'level': 'info',
            'capture_stacktrace': true,
            'enable_performance_stats': true
        },
        'formatters': {
            'json': {
                'type': 'json',
                'indent': none,
                'ensure_ascii': false,
                'sort_keys': false
            }
        },
        'handlers': {
            'console': {
                'type': 'console',
                'level': 'info',
                'formatter': 'json',
                'use_colors': false  # 生产环境通常不需要颜色
            },
            'app_file': {
                'type': 'rotating_file',
                'level': 'info',
                'formatter': 'json',
                'filename': '/var/log/app/app.log',
                'max_size_mb': 100,
                'backup_count': 10
            },
            'error_file': {
                'type': 'rotating_file',
                'level': 'error',
                'formatter': 'json',
                'filename': '/var/log/app/error.log',
                'max_size_mb': 50,
                'backup_count': 5
            },
            'async_app': {
                'type': 'async',
                'level': 'info',
                'base_handler': {
                    'type': 'rotating_file',
                    'filename': '/var/log/app/async.log',
                    'max_size_mb': 100,
                    'backup_count': 10
                },
                'max_queue_size': 50000,
                'worker_count': 4,
                'drop_when_full': true
            }
        },
        'loggers': {
            'root': {
                'level': 'warn',
                'handlers': ['console'],
                'propagate': false
            },
            'app': {
                'level': 'info',
                'handlers': ['app_file', 'async_app'],
                'propagate': false
            },
            'app.api': {
                'level': 'debug',
                'handlers': ['app_file'],
                'propagate': true
            },
            'app.error': {
                'level': 'error',
                'handlers': ['error_file'],
                'propagate': true
            },
            'app.performance': {
                'level': 'info',
                'handlers': ['async_app'],
                'propagate': false
            }
        }
    }
    
    # 初始化日志管理器
    log_manager = logmanager()
    
    # 应用配置
    log_manager.configure(config)
    
    # 设置全局上下文
    import socket
    log_manager.set_global_context(
        app_name="production_app",
        app_version="1.0.0",
        environment="production",
        hostname=socket.gethostname(),
        region=os.environ.get("aws_region", "unknown")
    )
    
    return log_manager


if __name__ == "__main__":
    # 运行演示
    demo_manager = logging_system_demo()
    
    # 演示完成后关闭
    demo_manager.shutdown()

7. 测试与验证

7.1 单元测试

import pytest
import tempfile
import json
import time
from pathlib import path


class teststructuredlogger:
    """结构化日志记录器测试"""
    
    @pytest.fixture
    def temp_log_file(self):
        """创建临时日志文件"""
        with tempfile.namedtemporaryfile(mode='w', suffix='.log', delete=false) as f:
            temp_file = f.name
        
        yield temp_file
        
        # 清理
        path(temp_file).unlink(missing_ok=true)
    
    @pytest.fixture
    def test_logger(self):
        """创建测试日志记录器"""
        logger = structuredlogger(
            name="test",
            level=loglevel.debug,
            handlers=[],
            capture_stacktrace=true
        )
        return logger
    
    def test_log_record_creation(self, test_logger):
        """测试日志记录创建"""
        record = test_logger._create_record(
            loglevel.info,
            "测试消息",
            extra={"key": "value"}
        )
        
        assert isinstance(record, logrecord)
        assert record.level == "info"
        assert record.message == "测试消息"
        assert record.logger_name == "test"
        assert record.extra["key"] == "value"
        
        # 检查时间戳格式
        assert record.timestamp.endswith('z')
        
        # 检查调用者信息
        assert record.filename is not none
        assert record.function is not none
        assert record.line_no is not none
    
    def test_log_level_filtering(self):
        """测试日志级别过滤"""
        
        # 创建记录器和处理器
        logger = structuredlogger("test", level=loglevel.warn)
        
        # 使用模拟处理器
        class mockhandler(loghandler):
            def __init__(self):
                super().__init__(level=loglevel.info)
                self.records = []
            
            def emit(self, record):
                self.records.append(record)
        
        handler = mockhandler()
        logger.add_handler(handler)
        
        # 记录不同级别的日志
        logger.debug("debug消息")
        logger.info("info消息")
        logger.warn("warn消息")
        logger.error("error消息")
        
        # 检查过滤结果
        assert len(handler.records) == 2  # warn和error
        assert all(r.level in ["warn", "error"] for r in handler.records)
    
    def test_json_formatter(self):
        """测试json格式化器"""
        formatter = jsonformatter(indent=2)
        
        record = logrecord(
            timestamp="2024-01-01t00:00:00z",
            level="info",
            message="测试消息",
            logger_name="test"
        )
        
        formatted = formatter.format(record)
        
        # 验证json格式
        parsed = json.loads(formatted)
        assert parsed["timestamp"] == "2024-01-01t00:00:00z"
        assert parsed["level"] == "info"
        assert parsed["message"] == "测试消息"
        assert parsed["logger_name"] == "test"
    
    def test_file_handler(self, temp_log_file):
        """测试文件处理器"""
        handler = filehandler(
            filename=temp_log_file,
            level=loglevel.info,
            formatter=jsonformatter(indent=none)
        )
        
        record = logrecord(
            timestamp="2024-01-01t00:00:00z",
            level="info",
            message="测试消息",
            logger_name="test"
        )
        
        # 处理记录
        handler.handle(record)
        handler.close()
        
        # 验证文件内容
        with open(temp_log_file, 'r') as f:
            content = f.read().strip()
        
        parsed = json.loads(content)
        assert parsed["message"] == "测试消息"
    
    def test_rate_limit_filter(self):
        """测试速率限制过滤器"""
        filter_obj = ratelimitfilter(max_per_second=2, window_seconds=1)
        
        record = logrecord(
            timestamp="2024-01-01t00:00:00z",
            level="info",
            message="测试消息",
            logger_name="test"
        )
        
        # 前2次应该通过
        assert filter_obj.filter(record) is true
        assert filter_obj.filter(record) is true
        
        # 第3次应该被限制
        assert filter_obj.filter(record) is false
        
        # 等待窗口重置
        time.sleep(1.1)
        assert filter_obj.filter(record) is true
    
    def test_sensitive_data_filter(self):
        """测试敏感数据过滤器"""
        filter_obj = sensitivedatafilter()
        
        # 测试各种敏感信息
        test_cases = [
            ("password=secret123", "password=***"),
            ("api_key=sk_test_12345", "api_key=***"),
            ("email=test@example.com", "email=te***@example.com"),
            ("phone=123-456-7890", "phone=123***7890"),
        ]
        
        for input_text, expected_output in test_cases:
            record = logrecord(
                timestamp="2024-01-01t00:00:00z",
                level="info",
                message=input_text,
                logger_name="test"
            )
            
            filter_obj.filter(record)
            assert expected_output in record.message
    
    def test_async_handler(self):
        """测试异步处理器"""
        # 创建模拟基础处理器
        class mockbasehandler(loghandler):
            def __init__(self):
                super().__init__(level=loglevel.info)
                self.records = []
                self.process_times = []
            
            def emit(self, record):
                self.records.append(record)
                self.process_times.append(time.time())
        
        base_handler = mockbasehandler()
        async_handler = asynchandler(
            base_handler=base_handler,
            max_queue_size=10,
            worker_count=1
        )
        
        # 发送多条记录
        send_time = time.time()
        for i in range(5):
            record = logrecord(
                timestamp="2024-01-01t00:00:00z",
                level="info",
                message=f"消息{i}",
                logger_name="test"
            )
            async_handler.handle(record)
        
        # 等待处理完成
        time.sleep(0.5)
        
        # 关闭处理器
        async_handler.shutdown()
        
        # 验证结果
        assert len(base_handler.records) == 5
        assert all(t > send_time for t in base_handler.process_times)
    
    def test_batch_handler(self):
        """测试批量处理器"""
        # 创建模拟基础处理器
        class mockbasehandler(loghandler):
            def __init__(self):
                super().__init__(level=loglevel.info)
                self.records = []
                self.batch_count = 0
            
            def emit(self, record):
                self.records.append(record)
            
            def handle(self, record):
                self.batch_count += 1
                return super().handle(record)
        
        base_handler = mockbasehandler()
        batch_handler = batchhandler(
            base_handler=base_handler,
            batch_size=3,
            flush_interval=0.1
        )
        
        # 发送记录(不足批量大小)
        for i in range(2):
            record = logrecord(
                timestamp="2024-01-01t00:00:00z",
                level="info",
                message=f"消息{i}",
                logger_name="test"
            )
            batch_handler.handle(record)
        
        # 等待定时刷新
        time.sleep(0.2)
        
        # 验证结果
        assert len(base_handler.records) == 2
        assert base_handler.batch_count == 2  # 逐个处理
        
        # 关闭处理器
        batch_handler.shutdown()


class testdistributedtracing:
    """分布式追踪测试"""
    
    def test_trace_context(self):
        """测试追踪上下文"""
        tracer = distributedtracecontext()
        
        # 获取初始上下文
        context1 = tracer.current
        assert 'trace_id' in context1
        assert 'span_id' in context1
        
        # 开始新跨度
        with tracer.start_span("test_span") as span:
            context2 = tracer.current
            assert context2['trace_id'] == context1['trace_id']
            assert context2['span_id'] != context1['span_id']
            assert context2['parent_span_id'] == context1['span_id']
        
        # 恢复上下文
        context3 = tracer.current
        assert context3['span_id'] == context1['span_id']
    
    def test_tracing_logger(self):
        """测试追踪日志记录器"""
        tracer = distributedtracecontext()
        logger = tracinglogger("test.tracing", tracer=tracer)
        
        # 在追踪上下文中记录日志
        with tracer.start_span("parent_span"):
            logger.info("父跨度中的日志")
            
            with tracer.start_span("child_span"):
                logger.info("子跨度中的日志")
        
        # 验证追踪信息
        assert logger.tracer.get_current_trace_id() is not none


class testperformancemonitoring:
    """性能监控测试"""
    
    def test_performance_monitor(self):
        """测试性能监控器"""
        # 创建模拟日志记录器
        class mocklogger:
            def __init__(self):
                self.records = []
            
            def debug(self, message, **kwargs):
                self.records.append((message, kwargs))
        
        mock_logger = mocklogger()
        
        # 创建监控器
        monitor = performancemonitor(mock_logger)
        
        # 测量操作
        with monitor.measure("test_operation"):
            time.sleep(0.01)
        
        # 记录自定义指标
        monitor.record_metric("custom_metric", 42.0)
        
        # 获取统计
        stats = monitor.get_statistics("test_operation")
        assert stats['count'] == 1
        assert stats['mean'] > 0
        
        # 检查日志记录
        assert len(mock_logger.records) > 0


if __name__ == "__main__":
    # 运行测试
    pytest.main([__file__, '-v', '--tb=short'])

7.2 性能测试

class loggingperformancetest:
    """日志性能测试"""
    
    @staticmethod
    def test_single_thread_performance():
        """测试单线程性能"""
        print("单线程性能测试")
        print("-" * 40)
        
        # 创建测试日志记录器
        logger = structuredlogger(
            name="performance.test",
            level=loglevel.info,
            enable_performance_stats=true
        )
        
        # 添加处理器
        console_handler = consolehandler(
            level=loglevel.info,
            formatter=jsonformatter(indent=none),
            use_colors=false
        )
        logger.add_handler(console_handler)
        
        # 性能测试
        iterations = 10000
        start_time = time.time()
        
        for i in range(iterations):
            logger.info(f"性能测试消息 {i}", iteration=i)
        
        end_time = time.time()
        duration = end_time - start_time
        
        # 计算性能指标
        logs_per_second = iterations / duration
        avg_latency_ms = (duration / iterations) * 1000
        
        print(f"总日志数: {iterations}")
        print(f"总耗时: {duration:.3f}秒")
        print(f"日志/秒: {logs_per_second:.1f}")
        print(f"平均延迟: {avg_latency_ms:.3f}毫秒")
        
        # 获取统计信息
        stats = logger.get_stats()
        print(f"实际记录数: {sum(stats['log_counts'].values())}")
        
        return {
            'iterations': iterations,
            'duration': duration,
            'logs_per_second': logs_per_second,
            'avg_latency_ms': avg_latency_ms
        }
    
    @staticmethod
    def test_multi_thread_performance():
        """测试多线程性能"""
        print("\n多线程性能测试")
        print("-" * 40)
        
        # 创建异步处理器
        base_handler = consolehandler(
            level=loglevel.info,
            formatter=jsonformatter(indent=none),
            use_colors=false
        )
        
        async_handler = asynchandler(
            base_handler=base_handler,
            max_queue_size=100000,
            worker_count=4,
            drop_when_full=false
        )
        
        logger = structuredlogger(
            name="performance.async",
            level=loglevel.info,
            handlers=[async_handler],
            enable_performance_stats=true
        )
        
        # 多线程测试
        thread_count = 8
        logs_per_thread = 5000
        total_iterations = thread_count * logs_per_thread
        
        threads = []
        start_time = time.time()
        
        def worker(thread_id):
            for i in range(logs_per_thread):
                logger.info(
                    f"线程{thread_id} - 消息{i}",
                    thread_id=thread_id,
                    iteration=i
                )
        
        # 启动线程
        for i in range(thread_count):
            thread = threading.thread(target=worker, args=(i,))
            threads.append(thread)
            thread.start()
        
        # 等待完成
        for thread in threads:
            thread.join()
        
        # 等待队列清空
        time.sleep(1)
        
        end_time = time.time()
        duration = end_time - start_time
        
        # 计算性能指标
        logs_per_second = total_iterations / duration
        avg_latency_ms = (duration / total_iterations) * 1000
        
        print(f"线程数: {thread_count}")
        print(f"每线程日志数: {logs_per_thread}")
        print(f"总日志数: {total_iterations}")
        print(f"总耗时: {duration:.3f}秒")
        print(f"日志/秒: {logs_per_second:.1f}")
        print(f"平均延迟: {avg_latency_ms:.3f}毫秒")
        
        # 获取处理器统计
        handler_stats = async_handler.get_stats()
        print(f"队列大小: {handler_stats['queue_size']}")
        print(f"丢弃数: {handler_stats['dropped']}")
        
        # 关闭处理器
        async_handler.shutdown()
        
        return {
            'thread_count': thread_count,
            'total_iterations': total_iterations,
            'duration': duration,
            'logs_per_second': logs_per_second,
            'avg_latency_ms': avg_latency_ms
        }
    
    @staticmethod
    def test_batch_performance():
        """测试批量处理性能"""
        print("\n批量处理性能测试")
        print("-" * 40)
        
        # 创建批量处理器
        base_handler = consolehandler(
            level=loglevel.info,
            formatter=jsonformatter(indent=none),
            use_colors=false
        )
        
        batch_handler = batchhandler(
            base_handler=base_handler,
            batch_size=100,
            flush_interval=0.1,
            compression=false
        )
        
        logger = structuredlogger(
            name="performance.batch",
            level=loglevel.info,
            handlers=[batch_handler],
            enable_performance_stats=true
        )
        
        # 性能测试
        iterations = 10000
        start_time = time.time()
        
        for i in range(iterations):
            logger.info(f"批量测试消息 {i}", iteration=i)
        
        # 等待批处理完成
        time.sleep(0.5)
        
        end_time = time.time()
        duration = end_time - start_time
        
        # 计算性能指标
        logs_per_second = iterations / duration
        avg_latency_ms = (duration / iterations) * 1000
        
        print(f"总日志数: {iterations}")
        print(f"批大小: 100")
        print(f"总耗时: {duration:.3f}秒")
        print(f"日志/秒: {logs_per_second:.1f}")
        print(f"平均延迟: {avg_latency_ms:.3f}毫秒")
        
        # 获取处理器统计
        handler_stats = batch_handler.get_stats()
        print(f"缓冲区大小: {handler_stats['buffer_size']}")
        
        # 关闭处理器
        batch_handler.shutdown()
        
        return {
            'iterations': iterations,
            'batch_size': 100,
            'duration': duration,
            'logs_per_second': logs_per_second,
            'avg_latency_ms': avg_latency_ms
        }
    
    @staticmethod
    def compare_performance():
        """比较不同配置的性能"""
        print("=" * 60)
        print("日志系统性能比较")
        print("=" * 60)
        
        results = {}
        
        # 测试不同配置
        results['single_thread'] = loggingperformancetest.test_single_thread_performance()
        results['multi_thread'] = loggingperformancetest.test_multi_thread_performance()
        results['batch'] = loggingperformancetest.test_batch_performance()
        
        # 输出比较结果
        print("\n" + "=" * 60)
        print("性能比较摘要")
        print("=" * 60)
        
        for config, metrics in results.items():
            print(f"\n{config}:")
            print(f"  日志/秒: {metrics['logs_per_second']:.1f}")
            print(f"  平均延迟: {metrics['avg_latency_ms']:.3f}毫秒")
        
        # 建议
        print("\n建议:")
        print("- 单线程场景: 使用标准处理器")
        print("- 高并发场景: 使用异步处理器")
        print("- 日志量大场景: 使用批量处理器")
        
        return results


if __name__ == "__main__":
    # 运行性能测试
    loggingperformancetest.compare_performance()

8. 最佳实践与部署

8.1 结构化日志最佳实践

一致的字段命名

# 好
logger.info("用户登录", user_id="123", action="login", result="success")

# 不好
logger.info("用户登录", userid="123", action="login", result="success")

有意义的日志级别

  • trace: 详细的调试信息
  • debug: 开发环境调试信息
  • info: 正常的业务操作
  • warn: 预期外但可恢复的情况
  • error: 需要干预的错误
  • fatal: 系统无法继续运行

包含足够的上下文

# 添加请求上下文
with logger.with_context(
    request_id=request_id,
    user_id=user_id,
    session_id=session_id
):
    logger.info("处理用户请求", endpoint=request.path)

8.2 生产环境部署指南

class productionloggingdeployment:
    """生产环境日志部署"""
    
    @staticmethod
    def setup_logging_for_web_app():
        """为web应用设置日志"""
        
        config = {
            'version': '1.0',
            'defaults': {
                'level': 'info',
                'capture_stacktrace': true,
                'enable_performance_stats': true
            },
            'formatters': {
                'json': {
                    'type': 'json',
                    'indent': none,
                    'ensure_ascii': false,
                    'sort_keys': false
                },
                'json_pretty': {
                    'type': 'json',
                    'indent': 2,
                    'ensure_ascii': false,
                    'sort_keys': true
                }
            },
            'handlers': {
                'console': {
                    'type': 'console',
                    'level': 'info',
                    'formatter': 'json',
                    'use_colors': false,
                    'filters': [
                        {
                            'type': 'rate_limit',
                            'max_per_second': 1000
                        },
                        {
                            'type': 'sensitive_data'
                        }
                    ]
                },
                'app_file': {
                    'type': 'rotating_file',
                    'level': 'info',
                    'formatter': 'json',
                    'filename': '/var/log/app/app.log',
                    'max_size_mb': 1024,  # 1gb
                    'backup_count': 10
                },
                'error_file': {
                    'type': 'rotating_file',
                    'level': 'error',
                    'formatter': 'json_pretty',
                    'filename': '/var/log/app/error.log',
                    'max_size_mb': 100,
                    'backup_count': 5
                },
                'async_file': {
                    'type': 'async',
                    'level': 'info',
                    'base_handler': {
                        'type': 'rotating_file',
                        'filename': '/var/log/app/async.log',
                        'max_size_mb': 1024,
                        'backup_count': 10
                    },
                    'max_queue_size': 100000,
                    'worker_count': 4,
                    'drop_when_full': true
                },
                'metrics_file': {
                    'type': 'batch',
                    'level': 'info',
                    'base_handler': {
                        'type': 'file',
                        'filename': '/var/log/app/metrics.log',
                        'formatter': 'json'
                    },
                    'batch_size': 100,
                    'flush_interval': 5.0,
                    'compression': true
                }
            },
            'loggers': {
                'root': {
                    'level': 'warn',
                    'handlers': ['console'],
                    'propagate': false
                },
                'app': {
                    'level': 'info',
                    'handlers': ['app_file', 'async_file'],
                    'propagate': false
                },
                'app.api': {
                    'level': 'debug',
                    'handlers': ['app_file'],
                    'propagate': true
                },
                'app.error': {
                    'level': 'error',
                    'handlers': ['error_file'],
                    'propagate': true
                },
                'app.metrics': {
                    'level': 'info',
                    'handlers': ['metrics_file'],
                    'propagate': false
                },
                'app.performance': {
                    'level': 'info',
                    'handlers': ['async_file'],
                    'propagate': false
                }
            }
        }
        
        # 初始化日志管理器
        log_manager = logmanager()
        log_manager.configure(config)
        
        # 设置全局上下文
        import socket
        import os
        
        log_manager.set_global_context(
            app_name=os.environ.get('app_name', 'unknown'),
            app_version=os.environ.get('app_version', 'unknown'),
            environment=os.environ.get('environment', 'production'),
            hostname=socket.gethostname(),
            pod_name=os.environ.get('pod_name', 'unknown'),
            region=os.environ.get('aws_region', 'unknown')
        )
        
        return log_manager
    
    @staticmethod
    def setup_request_logging_middleware(logger_name: str = "app.api"):
        """设置请求日志中间件"""
        
        from functools import wraps
        import uuid
        
        log_manager = logmanager()
        logger = log_manager.get_logger(logger_name)
        
        def request_logging_middleware(func):
            @wraps(func)
            def wrapper(request, *args, **kwargs):
                # 生成请求id
                request_id = str(uuid.uuid4())
                
                # 添加上下文
                with logger.with_context(
                    request_id=request_id,
                    method=request.method,
                    path=request.path,
                    client_ip=request.remote_addr,
                    user_agent=request.headers.get('user-agent', 'unknown')
                ):
                    # 记录请求开始
                    logger.info(
                        "请求开始",
                        request_size=request.content_length or 0
                    )
                    
                    # 测量性能
                    start_time = time.time_ns()
                    
                    try:
                        # 处理请求
                        response = func(request, *args, **kwargs)
                        
                        # 记录请求完成
                        duration_ns = time.time_ns() - start_time
                        logger.info(
                            "请求完成",
                            status_code=response.status_code,
                            response_size=response.content_length or 0,
                            duration_ms=duration_ns / 1_000_000
                        )
                        
                        return response
                        
                    except exception as e:
                        # 记录错误
                        duration_ns = time.time_ns() - start_time
                        logger.error(
                            "请求错误",
                            error_type=type(e).__name__,
                            error_message=str(e),
                            duration_ms=duration_ns / 1_000_000,
                            exc=e
                        )
                        
                        # 重新抛出异常
                        raise
            
            return wrapper
        
        return request_logging_middleware
    
    @staticmethod
    def setup_database_logging():
        """设置数据库操作日志"""
        
        log_manager = logmanager()
        logger = log_manager.get_logger("app.database")
        
        class databaselogger:
            """数据库操作日志记录器"""
            
            def __init__(self):
                self.monitor = performancemonitor(logger)
            
            def log_query(self, query: str, params: tuple, duration_ms: float):
                """记录查询日志"""
                
                # 采样:只记录慢查询
                if duration_ms > 100:  # 超过100ms
                    logger.warn(
                        "慢查询",
                        query=query[:100] + "..." if len(query) > 100 else query,
                        params=str(params)[:200],
                        duration_ms=duration_ms,
                        extra={'query_type': 'slow'}
                    )
                else:
                    logger.debug(
                        "数据库查询",
                        query=query[:50] + "..." if len(query) > 50 else query,
                        duration_ms=duration_ms,
                        extra={'query_type': 'normal'}
                    )
                
                # 记录性能指标
                self.monitor.record_metric(
                    "database_query_duration",
                    duration_ms,
                    unit="ms",
                    tags={"query_type": "select" if "select" in query.upper() else "other"}
                )
            
            def log_transaction(self, operation: str, success: bool, duration_ms: float):
                """记录事务日志"""
                level = loglevel.info if success else loglevel.error
                
                logger.log(
                    level,
                    "数据库事务",
                    operation=operation,
                    success=success,
                    duration_ms=duration_ms
                )
        
        return databaselogger()

8.3 监控与告警配置

class logmonitoringandalerting:
    """日志监控与告警"""
    
    @staticmethod
    def setup_log_based_alerts():
        """设置基于日志的告警"""
        
        alerts = {
            'error_rate': {
                'description': '错误率超过阈值',
                'condition': lambda stats: (
                    stats.get('error_count', 0) > 10 and
                    stats.get('total_logs', 1) > 100 and
                    stats['error_count'] / stats['total_logs'] > 0.01  # 1%错误率
                ),
                'severity': 'high',
                'action': '通知开发团队'
            },
            'queue_full': {
                'description': '日志队列已满',
                'condition': lambda stats: (
                    stats.get('queue_full', false) or
                    stats.get('dropped', 0) > 100
                ),
                'severity': 'medium',
                'action': '增加队列大小或工作者数量'
            },
            'performance_degradation': {
                'description': '日志性能下降',
                'condition': lambda stats: (
                    stats.get('rate_per_second', 0) < 1000  # 低于1000条/秒
                ),
                'severity': 'low',
                'action': '检查日志处理器配置'
            },
            'disk_space': {
                'description': '日志磁盘空间不足',
                'condition': lambda stats: (
                    stats.get('disk_usage_percent', 0) > 90
                ),
                'severity': 'critical',
                'action': '清理旧日志或增加磁盘空间'
            }
        }
        
        return alerts
    
    @staticmethod
    def monitor_logging_system(log_manager: logmanager, check_interval: int = 60):
        """监控日志系统"""
        
        import psutil
        
        def check_system():
            """检查系统状态"""
            
            # 获取日志统计
            stats = log_manager.get_all_stats()
            
            # 获取系统信息
            disk_usage = psutil.disk_usage('/var/log' if os.path.exists('/var/log') else '.')
            
            system_stats = {
                'disk_usage_percent': disk_usage.percent,
                'disk_free_gb': disk_usage.free / (1024**3),
                'memory_percent': psutil.virtual_memory().percent,
                'cpu_percent': psutil.cpu_percent(interval=1)
            }
            
            # 合并统计
            all_stats = {**stats, **system_stats}
            
            # 检查告警
            alerts = logmonitoringandalerting.setup_log_based_alerts()
            triggered_alerts = []
            
            for alert_name, alert_config in alerts.items():
                if alert_config['condition'](all_stats):
                    triggered_alerts.append({
                        'name': alert_name,
                        'description': alert_config['description'],
                        'severity': alert_config['severity'],
                        'action': alert_config['action'],
                        'timestamp': datetime.now().isoformat(),
                        'stats': {k: v for k, v in all_stats.items() 
                                 if not isinstance(v, dict)}
                    })
            
            return triggered_alerts
        
        def monitoring_loop():
            """监控循环"""
            while true:
                try:
                    alerts = check_system()
                    
                    if alerts:
                        # 处理告警
                        for alert in alerts:
                            print(f"告警 [{alert['severity']}]: {alert['description']}")
                            
                            # 这里可以发送告警到监控系统
                            # 例如:发送到prometheus、datadog、pagerduty等
                    
                    time.sleep(check_interval)
                    
                except exception as e:
                    print(f"监控循环错误: {e}")
                    time.sleep(check_interval)
        
        # 启动监控线程
        monitor_thread = threading.thread(target=monitoring_loop, daemon=true)
        monitor_thread.start()
        
        return monitor_thread

9. 总结与展望

9.1 关键收获

通过本文的实现,我们获得了以下关键能力:

  • 完整的结构化日志系统:支持json格式、上下文管理、敏感信息过滤
  • 高性能处理能力:异步处理、批量处理、速率限制
  • 分布式追踪集成:支持跨服务调用追踪
  • 性能监控:内置性能指标收集和分析
  • 灵活的配置管理:支持yaml/json/toml配置文件
  • 生产就绪:包含轮转、采样、聚合等高级特性

9.2 性能数据总结

根据我们的性能测试,不同配置的日志系统性能表现:

配置吞吐量(日志/秒)平均延迟适用场景
单线程同步5,000-10,0000.1-0.2ms低并发应用
多线程异步50,000-100,0000.01-0.05ms高并发web服务
批量处理100,000+0.5-1ms(批处理延迟)日志密集型应用

9.3 未来发展方向

  • ai驱动的日志分析:使用机器学习自动检测异常模式
  • 实时流处理:与kafka、flink等流处理系统集成
  • 无服务器架构支持:适应函数计算等无服务器环境
  • 多语言支持:统一的日志格式跨语言使用
  • 自动日志优化:基于使用模式自动调整日志级别和采样率

附录

a. 日志级别对照表

级别数值描述使用场景
trace0最详细的跟踪信息开发调试,性能分析
debug1调试信息开发环境问题排查
info2常规信息业务操作,系统状态
warn3警告信息预期外但可恢复的情况
error4错误信息需要干预的错误
fatal5严重错误系统无法继续运行

b. 常见问题解答

q1: 结构化日志应该包含哪些字段?

a: 建议包含:时间戳、级别、消息、来源、请求id、用户id、追踪id、执行时间等基础字段,以及业务相关字段。

q2: 如何处理日志中的敏感信息?

a: 使用敏感信息过滤器自动脱敏,避免在日志中记录密码、密钥、个人身份信息等。

q3: 日志采样率如何设置?

a: 根据应用负载和存储容量决定。生产环境通常设置1-10%的采样率,错误日志通常100%采样。

q4: 日志应该保留多久?

a: 根据合规要求和业务需求决定。通常:调试日志保留7天,业务日志保留30天,审计日志保留1年以上。

c. 性能优化建议

  • 异步处理:对于高并发应用,使用异步日志处理器
  • 批量写入:减少磁盘i/o次数
  • 内存缓冲:使用内存缓冲区减少锁竞争
  • 连接池:对于远程日志服务,使用连接池
  • 压缩存储:对历史日志进行压缩存储

免责声明:本文提供的代码和方案仅供参考,生产环境中请根据具体需求进行性能测试和安全审计。日志系统设计应考虑具体业务场景和合规要求。

以上就是python实现结构化日志系统的完整方案和最佳实践的详细内容,更多关于python日志系统的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com