Python高级函数特性详解 🚀
目录
- 匿名函数(Lambda)
- 装饰器的使用
- 生成器与迭代器
- 递归函数应用
- 实战案例:文件批处理工具
1. 匿名函数(Lambda)深入解析 🎯
1.1 Lambda函数基础与进阶
1.1.1 基本语法与类型注解
from typing import Callable, List, Any, Dict, Optional, Union
# 基础lambda函数(带类型注解)
square: Callable[[int], int] = lambda x: x ** 2
greeting: Callable[[str, Optional[str]], str] = lambda name, msg="你好": f"{msg}, {name}!"
# 带类型注解的复杂lambda函数
process_data: Callable[[Dict[str, Any]], List[Any]] = lambda d: [v for k, v in d.items() if isinstance(v, (int, float))]
1.1.2 函数式编程应用
# 1. 组合多个lambda函数
compose = lambda f, g: lambda x: f(g(x))
double = lambda x: x * 2
increment = lambda x: x + 1
# 先加1再乘2
double_after_increment = compose(double, increment)
print(double_after_increment(3)) # 输出: 8
# 2. 柯里化示例
curry = lambda f: lambda x: lambda y: f(x, y)
add = lambda x, y: x + y
curried_add = curry(add)
increment = curried_add(1)
print(increment(5)) # 输出: 6
# 3. 偏函数应用
from functools import partial
multiply = lambda x, y: x * y
double = partial(multiply, 2)
print(double(4)) # 输出: 8
1.2 Lambda函数实战应用
1.2.1 数据处理与转换
# 1. 复杂数据结构处理
users = [
{"name": "张三", "age": 25, "salary": 8000, "department": "技术"},
{"name": "李四", "age": 30, "salary": 12000, "department": "销售"},
{"name": "王五", "age": 28, "salary": 15000, "department": "技术"},
{"name": "赵六", "age": 35, "salary": 20000, "department": "管理"}
]
# 按多个条件排序(先按部门,再按薪资降序)
sorted_users = sorted(
users,
key=lambda u: (u["department"], -u["salary"])
)
# 数据转换和筛选
tech_salaries = list(map(
lambda u: u["salary"],
filter(lambda u: u["department"] == "技术", users)
))
# 2. 数据聚合
from collections import defaultdict
# 按部门统计平均薪资
dept_avg_salary = defaultdict(list)
for user in users:
dept_avg_salary[user["department"]].append(user["salary"])
dept_averages = {
dept: sum(salaries) / len(salaries)
for dept, salaries in dept_avg_salary.items()
}
1.2.2 事件处理与回调
class EventSystem:
def __init__(self):
self.handlers: Dict[str, List[Callable]] = defaultdict(list)
def register(self, event_name: str, handler: Callable) -> None:
self.handlers[event_name].append(handler)
def trigger(self, event_name: str, *args, **kwargs) -> None:
for handler in self.handlers[event_name]:
handler(*args, **kwargs)
# 使用lambda作为事件处理器
event_system = EventSystem()
# 注册多个简单的事件处理器
event_system.register("user_login", lambda user: print(f"用户 {user} 登录"))
event_system.register("user_login", lambda user: print(f"发送欢迎邮件给 {user}"))
# 注册带条件判断的处理器
event_system.register(
"payment",
lambda amount, user: print(f"大额支付警告: {user} 支付了 {amount}元")
if amount > 10000 else None
)
1.3 Lambda函数最佳实践与性能考虑
1.3.1 性能优化技巧
from timeit import timeit
import operator
# 1. 使用operator模块替代简单lambda
numbers = list(range(1000))
# 不推荐
lambda_time = timeit(lambda: list(map(lambda x: x + 1, numbers)), number=1000)
# 推荐
operator_time = timeit(lambda: list(map(operator.add, numbers, [1]*len(numbers))), number=1000)
# 2. 缓存计算结果
from functools import lru_cache
# 使用缓存装饰器包装lambda
cached_computation = lru_cache(maxsize=128)(lambda x: sum(i * i for i in range(x)))
1.3.2 代码可维护性建议
# ✅ 适合使用Lambda的场景
# 1. 简单的键函数
sorted_items = sorted(items, key=lambda x: x.priority)
# 2. 简单的数据转换
transformed = map(lambda x: x.upper(), items)
# 3. 简单的过滤条件
filtered = filter(lambda x: x > 0, numbers)
# ❌ 不建议使用Lambda的场景
# 1. 复杂的业务逻辑
# 不推荐
process = lambda data: {
k: [i * 2 for i in v if i > 0]
for k, v in data.items()
if isinstance(v, list)
}
# ✅ 推荐使用常规函数
def process_data(data: Dict[str, List[int]]) -> Dict[str, List[int]]:
"""处理数据集合
Args:
data: 输入数据字典
Returns:
处理后的数据字典
"""
result = {}
for key, values in data.items():
if isinstance(values, list):
result[key] = [i * 2 for i in values if i > 0]
return result
1.4 实战示例:数据分析管道
from typing import List, Dict, Any
from datetime import datetime
class DataPipeline:
"""数据处理管道"""
def __init__(self):
self.transforms: List[Callable] = []
def add_transform(self, transform: Callable) -> 'DataPipeline':
"""添加转换步骤"""
self.transforms.append(transform)
return self
def process(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""执行所有转换"""
result = data
for transform in self.transforms:
result = transform(result)
return result
# 使用示例
# 1. 创建转换函数
filter_active = lambda data: [
item for item in data
if item.get('status') == 'active'
]
calculate_metrics = lambda data: [
{**item, 'efficiency': item['output'] / item['input'] if item['input'] else 0}
for item in data
]
add_timestamp = lambda data: [
{**item, 'processed_at': datetime.now().isoformat()}
for item in data
]
# 2. 构建和使用管道
pipeline = DataPipeline()
pipeline.add_transform(filter_active)\
.add_transform(calculate_metrics)\
.add_transform(add_timestamp)
# 3. 处理数据
sample_data = [
{'id': 1, 'status': 'active', 'input': 100, 'output': 85},
{'id': 2, 'status': 'inactive', 'input': 90, 'output': 70},
{'id': 3, 'status': 'active', 'input': 95, 'output': 80}
]
processed_data = pipeline.process(sample_data)
2. 装饰器的使用 🎨
2.1 基本装饰器
from functools import wraps
import time
from typing import Callable, TypeVar, Any
T = TypeVar('T', bound=Callable[..., Any])
def timing_decorator(func: T) -> T:
"""测量函数执行时间的装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"函数 {func.__name__} 执行时间: {end_time - start_time:.4f} 秒")
return result
return wrapper
@timing_decorator
def slow_function():
time.sleep(1)
return "完成"
2.2 带参数的装饰器
def retry(max_attempts: int = 3, delay: float = 1.0):
"""创建一个重试装饰器
Args:
max_attempts: 最大重试次数
delay: 重试间隔(秒)
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
attempts = 0
while attempts < max_attempts:
try:
return func(*args, **kwargs)
except Exception as e:
attempts += 1
if attempts == max_attempts:
raise e
time.sleep(delay)
return None
return wrapper
return decorator
@retry(max_attempts=3, delay=2.0)
def unstable_network_call():
"""模拟不稳定的网络调用"""
if random.random() < 0.7: # 70%的失败率
raise ConnectionError("网络连接失败")
return "成功"
2.3 多个装饰器的组合
def log_decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
print(f"调用函数: {func.__name__}")
return func(*args, **kwargs)
return wrapper
def validate_inputs(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
if not args and not kwargs:
raise ValueError("函数参数不能为空")
return func(*args, **kwargs)
return wrapper
@log_decorator
@validate_inputs
@timing_decorator
def process_data(data: List[Any]) -> List[Any]:
"""处理数据的函数"""
return [item for item in data if item is not None]
3. 生成器与迭代器 🔄
3.1 生成器函数
from typing import Generator, Iterator, List
def fibonacci_generator(n: int) -> Generator[int, None, None]:
"""生成斐波那契数列的生成器
Args:
n: 要生成的数字个数
Yields:
斐波那契数列中的下一个数字
"""
a, b = 0, 1
for _ in range(n):
yield a
a, b = b, a + b
# 内存效率对比
def get_numbers_list(n: int) -> List[int]:
"""返回列表的方式"""
return [i ** 2 for i in range(n)]
def get_numbers_generator(n: int) -> Generator[int, None, None]:
"""生成器方式"""
for i in range(n):
yield i ** 2
3.2 生成器表达式
# 列表推导式 vs 生成器表达式
numbers = [1, 2, 3, 4, 5]
# 列表推导式(立即计算)
squares_list = [x ** 2 for x in numbers] # 创建新列表
# 生成器表达式(惰性计算)
squares_gen = (x ** 2 for x in numbers) # 创建生成器对象
# 条件筛选
even_squares = (x ** 2 for x in numbers if x % 2 == 0)
# 多重生成器
matrix = ((i, j) for i in range(3) for j in range(3))
3.3 自定义迭代器
class DataIterator:
"""自定义数据迭代器"""
def __init__(self, data: List[Any]):
self.data = data
self.index = 0
def __iter__(self) -> 'DataIterator':
return self
def __next__(self) -> Any:
if self.index >= len(self.data):
raise StopIteration
value = self.data[self.index]
self.index += 1
return value
# 使用示例
class DataContainer:
def __init__(self, data: List[Any]):
self.data = data
def __iter__(self) -> DataIterator:
return DataIterator(self.data)
4. 递归函数应用 🌳
4.1 基本递归模式
from typing import Any, List, Optional
def factorial(n: int) -> int:
"""计算阶乘的递归实现"""
if n <= 1: # 基本情况
return 1
return n * factorial(n - 1) # 递归情况
def binary_search(arr: List[int], target: int, left: int = 0, right: Optional[int] = None) -> int:
"""递归实现二分查找
Returns:
目标值的索引,如果未找到返回-1
"""
if right is None:
right = len(arr) - 1
if left > right:
return -1
mid = (left + right) // 2
if arr[mid] == target:
return mid
elif arr[mid] > target:
return binary_search(arr, target, left, mid - 1)
else:
return binary_search(arr, target, mid + 1, right)
4.2 高级递归应用
class TreeNode:
def __init__(self, value: Any):
self.value = value
self.left: Optional[TreeNode] = None
self.right: Optional[TreeNode] = None
def tree_traversal(root: Optional[TreeNode], method: str = "inorder") -> Generator[Any, None, None]:
"""通用的树遍历生成器
Args:
root: 树的根节点
method: 遍历方法 ("preorder", "inorder", "postorder")
"""
if not root:
return
if method == "preorder":
yield root.value
yield from tree_traversal(root.left, method)
yield from tree_traversal(root.right, method)
elif method == "inorder":
yield from tree_traversal(root.left, method)
yield root.value
yield from tree_traversal(root.right, method)
elif method == "postorder":
yield from tree_traversal(root.left, method)
yield from tree_traversal(root.right, method)
yield root.value
5. 实战案例:文件批处理工具 🛠️
5.1 文件处理工具类
from pathlib import Path
from typing import Generator, List, Dict, Callable
import os
import shutil
class FileBatchProcessor:
"""文件批处理工具"""
def __init__(self, source_dir: str):
self.source_dir = Path(source_dir)
self.processors: Dict[str, Callable] = {}
def register_processor(self, extension: str, processor: Callable) -> None:
"""注册文件处理器"""
self.processors[extension] = processor
def find_files(self, pattern: str = "*") -> Generator[Path, None, None]:
"""递归查找文件"""
for item in self.source_dir.rglob(pattern):
if item.is_file():
yield item
def process_files(self) -> Dict[str, List[str]]:
"""处理所有文件"""
results = {"success": [], "error": []}
for file_path in self.find_files():
try:
extension = file_path.suffix.lower()
if extension in self.processors:
self.processors[extension](file_path)
results["success"].append(str(file_path))
except Exception as e:
results["error"].append(f"{file_path}: {str(e)}")
return results
5.2 实际应用示例
# 文件处理器函数
def process_text_file(file_path: Path) -> None:
"""处理文本文件"""
with file_path.open('r', encoding='utf-8') as f:
content = f.read()
# 处理文本内容
processed_content = content.upper()
# 保存处理后的文件
backup_path = file_path.with_suffix(file_path.suffix + '.bak')
shutil.copy2(file_path, backup_path)
with file_path.open('w', encoding='utf-8') as f:
f.write(processed_content)
def process_image_file(file_path: Path) -> None:
"""处理图片文件(示例)"""
# 这里可以添加图片处理逻辑
pass
# 使用示例
def main():
# 创建处理器实例
processor = FileBatchProcessor("./documents")
# 注册文件处理器
processor.register_processor(".txt", process_text_file)
processor.register_processor(".md", process_text_file)
processor.register_processor(".jpg", process_image_file)
processor.register_processor(".png", process_image_file)
# 执行批处理
results = processor.process_files()
# 输出处理结果
print(f"成功处理的文件: {len(results['success'])}")
print(f"处理失败的文件: {len(results['error'])}")
if results['error']:
print("\n错误详情:")
for error in results['error']:
print(f"- {error}")
if __name__ == "__main__":
main()
本文详细介绍了Python的高级函数特性,从匿名函数到装饰器,从生成器到递归函数,最后通过一个实战案例展示了这些特性的实际应用。
希望这些内容对你的Python编程之旅有所帮助!
如果你觉得这篇文章有帮助,欢迎点赞转发,也期待在评论区看到你的想法和建议!👇
咱们下一期见!