以下是 Python 实现观察者模式的完整方案,包含同步/异步支持、类型注解、线程安全等特性:
1. 经典观察者模式实现
from abc import ABC, abstractmethod
from typing import List, Any
class Observer(ABC):
"""观察者抽象基类"""
@abstractmethod
def update(self, subject: Any) -> None:
pass
class Subject:
"""被观察对象基类"""
def __init__(self):
self._observers: List[Observer] = []
def attach(self, observer: Observer) -> None:
if observer not in self._observers:
self._observers.append(observer)
def detach(self, observer: Observer) -> None:
try:
self._observers.remove(observer)
except ValueError:
pass
def notify(self) -> None:
"""同步通知所有观察者"""
for observer in self._observers:
observer.update(self)
# 使用示例
class TemperatureSensor(Subject):
"""具体被观察者:温度传感器"""
def __init__(self):
super().__init__()
self._temperature = 0.0
@property
def temperature(self) -> float:
return self._temperature
@temperature.setter
def temperature(self, value: float) -> None:
self._temperature = value
self.notify() # 温度变化时通知观察者
class Display(Observer):
"""具体观察者:显示屏"""
def update(self, subject: TemperatureSensor) -> None:
print(f"当前温度: {subject.temperature}°C")
# 客户端代码
sensor = TemperatureSensor()
display = Display()
sensor.attach(display)
sensor.temperature = 25.5 # 输出: 当前温度: 25.5°C
2. 线程安全增强版
import threading
from typing import List, Any
class ThreadSafeSubject:
"""线程安全的被观察对象"""
def __init__(self):
self._observers: List[Observer] = []
self._lock = threading.RLock()
def attach(self, observer: Observer) -> None:
with self._lock:
if observer not in self._observers:
self._observers.append(observer)
def detach(self, observer: Observer) -> None:
with self._lock:
try:
self._observers.remove(observer)
except ValueError:
pass
def notify(self) -> None:
"""线程安全的通知"""
with self._lock:
observers = self._observers.copy()
for observer in observers:
observer.update(self)
3. 异步观察者模式
import asyncio
from abc import ABC, abstractmethod
from typing import List, Any
class AsyncObserver(ABC):
"""异步观察者接口"""
@abstractmethod
async def update(self, subject: Any) -> None:
pass
class AsyncSubject:
"""支持异步通知的被观察对象"""
def __init__(self):
self._observers: List[AsyncObserver] = []
def attach(self, observer: AsyncObserver) -> None:
if observer not in self._observers:
self._observers.append(observer)
async def notify(self) -> None:
"""异步通知所有观察者"""
await asyncio.gather(
*[observer.update(self) for observer in self._observers]
)
# 使用示例
class AsyncTemperatureSensor(AsyncSubject):
def __init__(self):
super().__init__()
self._temp = 0.0
async def set_temperature(self, value: float) -> None:
self._temp = value
await self.notify()
class CloudLogger(AsyncObserver):
async def update(self, subject: AsyncTemperatureSensor) -> None:
print(f"云端记录温度: {subject._temp}°C")
await asyncio.sleep(0.1) # 模拟网络请求
async def main():
sensor = AsyncTemperatureSensor()
sensor.attach(CloudLogger())
await sensor.set_temperature(28.5) # 输出: 云端记录温度: 28.5°C
asyncio.run(main())
4. 事件总线实现(发布-订阅模式)
from typing import Dict, List, Callable, Any
import inspect
class EventBus:
"""事件总线(高级观察者模式)"""
_instance = None
def __new__(cls):
if not cls._instance:
cls._instance = super().__new__(cls)
cls._instance._subscriptions: Dict[str, List[Callable]] = {}
return cls._instance
def subscribe(self, event_type: str, callback: Callable) -> None:
if not inspect.iscoroutinefunction(callback):
callback = self._sync_to_async(callback)
if event_type not in self._subscriptions:
self._subscriptions[event_type] = []
self._subscriptions[event_type].append(callback)
async def publish(self, event_type: str, **data) -> None:
if event_type in self._subscriptions:
await asyncio.gather(
*[callback(**data) for callback in self._subscriptions[event_type]]
)
@staticmethod
def _sync_to_async(func: Callable) -> Callable:
async def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
# 使用示例
bus = EventBus()
@bus.subscribe("temperature_change")
async def log_temp_change(value: float):
print(f"温度变化记录: {value}°C")
async def trigger_events():
await bus.publish("temperature_change", value=30.0)
asyncio.run(trigger_events()) # 输出: 温度变化记录: 30.0°C
5. 带过滤器的观察者模式
from typing import Callable, Any
class FilteredObserver:
"""带条件过滤的观察者"""
def __init__(self, callback: Callable, filter_condition: Callable[[Any], bool]):
self.callback = callback
self.filter = filter_condition
def update(self, subject: Any) -> None:
if self.filter(subject):
self.callback(subject)
# 使用示例
sensor = TemperatureSensor()
def alert(temp: float):
print(f"警报!当前温度过高: {temp}°C")
# 只接收温度>30的通知
high_temp_observer = FilteredObserver(
callback=alert,
filter_condition=lambda s: s.temperature > 30
)
sensor.attach(high_temp_observer)
sensor.temperature = 25 # 无输出
sensor.temperature = 35 # 输出: 警报!当前温度过高: 35°C
方案对比
实现方式 | 特点 | 适用场景 |
---|---|---|
经典实现 | 简单直接 | 单线程简单场景 |
线程安全版 | 避免竞态条件 | 多线程环境 |
异步实现 | 非阻塞通知 | I/O密集型应用 |
事件总线 | 松耦合,支持多对多 | 复杂事件系统 |
过滤观察者 | 条件触发 | 需要选择性通知的场景 |
最佳实践建议
-
生命周期管理:
# 使用上下文管理器自动取消注册 class ObserverContext: def __init__(self, subject: Subject, observer: Observer): self.subject = subject self.observer = observer def __enter__(self): self.subject.attach(self.observer) return self def __exit__(self, *args): self.subject.detach(self.observer) with ObserverContext(sensor, display): sensor.temperature = 20
-
性能优化:
- 对于高频事件,考虑使用弱引用(
weakref.WeakSet
) - 批量通知时使用
@dataclass
封装事件数据
- 对于高频事件,考虑使用弱引用(
-
异常处理:
def safe_notify(self): for observer in self._observers: try: observer.update(self) except Exception as e: print(f"Observer failed: {e}")
-
与Python生态集成:
- 使用
PyPubSub
等现成库 - 结合
asyncio.Queue
实现生产者-消费者模式
- 使用
根据项目复杂度选择合适实现,简单场景用经典模式即可,分布式系统建议使用事件总线架构。