以下是 Python 中实现方法耗时统计装饰器的完整方案,包含同步/异步支持、多级嵌套调用统计、可视化输出和性能分析等高级功能:
基础版:同步方法计时装饰器
import time
from functools import wraps
def timeit(func):
"""基础版:同步方法计时装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
elapsed = (time.perf_counter() - start_time) * 1000 # 毫秒
print(f"⏱️ {func.__module__}.{func.__name__} 耗时: {elapsed:.2f}ms")
return result
return wrapper
# 使用示例
@timeit
def calculate_sum(n):
return sum(range(n+1))
calculate_sum(1_000_000) # 输出: ⏱️ __main__.calculate_sum 耗时: 32.45ms
增强版:支持异步方法
import time
import asyncio
from functools import wraps
from typing import Callable, Union
def benchmark(func: Callable):
"""增强版:自动识别同步/异步方法"""
@wraps(func)
async def async_wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = await func(*args, **kwargs)
elapsed = (time.perf_counter() - start_time) * 1000
print(f"⏱️ [ASYNC] {func.__name__} 耗时: {elapsed:.2f}ms")
return result
@wraps(func)
def sync_wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
elapsed = (time.perf_counter() - start_time) * 1000
print(f"⏱️ [SYNC] {func.__name__} 耗时: {elapsed:.2f}ms")
return result
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
# 使用示例
@benchmark
async def async_task():
await asyncio.sleep(1)
@benchmark
def sync_task():
time.sleep(0.5)
asyncio.run(async_task()) # 输出: ⏱️ [ASYNC] async_task 耗时: 1000.12ms
sync_task() # 输出: ⏱️ [SYNC] sync_task 耗时: 500.34ms
专业版:带调用层级分析
import time
import inspect
from functools import wraps
class TimeProfiler:
"""专业版:带调用层级和可视化统计"""
_current_level = -1
@classmethod
def measure(cls, prefix: str = ""):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
cls._current_level += 1
indent = "│ " * cls._current_level
start_time = time.perf_counter()
result = func(*args, **kwargs)
elapsed = (time.perf_counter() - start_time) * 1000
caller = inspect.stack()[1].function
print(f"{indent}└─ {prefix}[{caller} → {func.__name__}] 耗时: {elapsed:.2f}ms")
cls._current_level -= 1
return result
return wrapper
return decorator
# 使用示例
class DataProcessor:
@TimeProfiler.measure(prefix="📊 ")
def process(self):
self._load_data()
self._clean_data()
return self._analyze()
@TimeProfiler.measure()
def _load_data(self):
time.sleep(0.1)
@TimeProfiler.measure()
def _clean_data(self):
time.sleep(0.2)
@TimeProfiler.measure()
def _analyze(self):
time.sleep(0.3)
DataProcessor().process()
输出示例:
│ └─ 📊 [<module> → process] 耗时: 600.45ms
│ │ └─ [_load_data] 耗时: 100.12ms
│ │ └─ [_clean_data] 耗时: 200.23ms
│ │ └─ [_analyze] 耗时: 300.10ms
生产级:集成日志系统
import time
import logging
from functools import wraps
from contextlib import contextmanager
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('performance')
@contextmanager
def time_measure(identifier: str):
"""生产级:带日志记录和阈值告警"""
start_time = time.perf_counter()
try:
yield
finally:
elapsed = (time.perf_counter() - start_time) * 1000
msg = f"{identifier} 耗时: {elapsed:.2f}ms"
if elapsed > 500: # 慢方法警告阈值
logger.warning(f"🚨 {msg} (超过500ms)")
else:
logger.info(msg)
def logged_timeit(scope: str = ""):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
with time_measure(f"{scope}.{func.__name__}"):
return func(*args, **kwargs)
return wrapper
return decorator
# 使用示例
@logged_timeit(scope="data_service")
def fetch_from_database():
time.sleep(0.6) # 模拟慢查询
fetch_from_database()
# 日志输出: WARNING:performance:🚨 data_service.fetch_from_database 耗时: 600.15ms (超过500ms)
高级特性扩展
1. 参数感知统计
def benchmark_with_args(func):
@wraps(func)
def wrapper(*args, **kwargs):
arg_str = ", ".join([str(arg) for arg in args[:3]])
start_time = time.perf_counter()
result = func(*args, **kwargs)
elapsed = (time.perf_counter() - start_time) * 1000
print(f"⏱️ {func.__name__}({arg_str}...) 耗时: {elapsed:.2f}ms")
return result
return wrapper
2. 性能分析集成
import cProfile
def profile_deco(func):
"""集成cProfile性能分析"""
@wraps(func)
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
result = profiler.runcall(func, *args, **kwargs)
profiler.print_stats(sort='cumulative')
return result
return wrapper
3. 统计结果聚合
from collections import defaultdict
stats = defaultdict(list)
def aggregated_timeit(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
elapsed = (time.perf_counter() - start_time) * 1000
stats[func.__name__].append(elapsed)
return result
return wrapper
# 调用后可通过 stats 查看所有方法的耗时统计
方案对比表
特性 | 基础版 | 增强版 | 专业版 | 生产级 |
---|---|---|---|---|
同步方法支持 | ✓ | ✓ | ✓ | ✓ |
异步方法支持 | ✗ | ✓ | ✗ | ✓ |
调用层级可视化 | ✗ | ✗ | ✓ | ✗ |
日志系统集成 | ✗ | ✗ | ✗ | ✓ |
性能阈值告警 | ✗ | ✗ | ✗ | ✓ |
适合生产环境 | ✗ | △ | △ | ✓ |
最佳实践建议
- 开发调试:使用专业版获得详细调用树
- 生产环境:使用生产级集成到日志系统
- 性能优化:结合
profile_deco
进行热点分析 - 注意事项:
- 避免在装饰器内进行耗时操作
- 高频调用方法慎用(可采样统计)
- 分布式系统需结合TraceID实现全链路追踪
根据实际需求选择合适的装饰器实现,小型项目用基础版即可,复杂系统推荐生产级方案。