Python10天突击--Day 2: 实现观察者模式

news2025/4/16 1:15:16

以下是 Python 实现观察者模式的完整方案,包含同步/异步支持、类型注解、线程安全等特性:


1. 经典观察者模式实现

from abc import ABC, abstractmethod
from typing import List, Any

class Observer(ABC):
    """观察者抽象基类"""
    @abstractmethod
    def update(self, subject: Any) -> None:
        pass

class Subject:
    """被观察对象基类"""
    def __init__(self):
        self._observers: List[Observer] = []

    def attach(self, observer: Observer) -> None:
        if observer not in self._observers:
            self._observers.append(observer)

    def detach(self, observer: Observer) -> None:
        try:
            self._observers.remove(observer)
        except ValueError:
            pass

    def notify(self) -> None:
        """同步通知所有观察者"""
        for observer in self._observers:
            observer.update(self)

# 使用示例
class TemperatureSensor(Subject):
    """具体被观察者:温度传感器"""
    def __init__(self):
        super().__init__()
        self._temperature = 0.0

    @property
    def temperature(self) -> float:
        return self._temperature

    @temperature.setter
    def temperature(self, value: float) -> None:
        self._temperature = value
        self.notify()  # 温度变化时通知观察者

class Display(Observer):
    """具体观察者:显示屏"""
    def update(self, subject: TemperatureSensor) -> None:
        print(f"当前温度: {subject.temperature}°C")

# 客户端代码
sensor = TemperatureSensor()
display = Display()
sensor.attach(display)

sensor.temperature = 25.5  # 输出: 当前温度: 25.5°C

2. 线程安全增强版

import threading
from typing import List, Any

class ThreadSafeSubject:
    """线程安全的被观察对象"""
    def __init__(self):
        self._observers: List[Observer] = []
        self._lock = threading.RLock()

    def attach(self, observer: Observer) -> None:
        with self._lock:
            if observer not in self._observers:
                self._observers.append(observer)

    def detach(self, observer: Observer) -> None:
        with self._lock:
            try:
                self._observers.remove(observer)
            except ValueError:
                pass

    def notify(self) -> None:
        """线程安全的通知"""
        with self._lock:
            observers = self._observers.copy()
        
        for observer in observers:
            observer.update(self)

3. 异步观察者模式

import asyncio
from abc import ABC, abstractmethod
from typing import List, Any

class AsyncObserver(ABC):
    """异步观察者接口"""
    @abstractmethod
    async def update(self, subject: Any) -> None:
        pass

class AsyncSubject:
    """支持异步通知的被观察对象"""
    def __init__(self):
        self._observers: List[AsyncObserver] = []

    def attach(self, observer: AsyncObserver) -> None:
        if observer not in self._observers:
            self._observers.append(observer)

    async def notify(self) -> None:
        """异步通知所有观察者"""
        await asyncio.gather(
            *[observer.update(self) for observer in self._observers]
        )

# 使用示例
class AsyncTemperatureSensor(AsyncSubject):
    def __init__(self):
        super().__init__()
        self._temp = 0.0

    async def set_temperature(self, value: float) -> None:
        self._temp = value
        await self.notify()

class CloudLogger(AsyncObserver):
    async def update(self, subject: AsyncTemperatureSensor) -> None:
        print(f"云端记录温度: {subject._temp}°C")
        await asyncio.sleep(0.1)  # 模拟网络请求

async def main():
    sensor = AsyncTemperatureSensor()
    sensor.attach(CloudLogger())
    await sensor.set_temperature(28.5)  # 输出: 云端记录温度: 28.5°C

asyncio.run(main())

4. 事件总线实现(发布-订阅模式)

from typing import Dict, List, Callable, Any
import inspect

class EventBus:
    """事件总线(高级观察者模式)"""
    _instance = None

    def __new__(cls):
        if not cls._instance:
            cls._instance = super().__new__(cls)
            cls._instance._subscriptions: Dict[str, List[Callable]] = {}
        return cls._instance

    def subscribe(self, event_type: str, callback: Callable) -> None:
        if not inspect.iscoroutinefunction(callback):
            callback = self._sync_to_async(callback)
            
        if event_type not in self._subscriptions:
            self._subscriptions[event_type] = []
        self._subscriptions[event_type].append(callback)

    async def publish(self, event_type: str, **data) -> None:
        if event_type in self._subscriptions:
            await asyncio.gather(
                *[callback(**data) for callback in self._subscriptions[event_type]]
            )

    @staticmethod
    def _sync_to_async(func: Callable) -> Callable:
        async def wrapper(*args, **kwargs):
            return func(*args, **kwargs)
        return wrapper

# 使用示例
bus = EventBus()

@bus.subscribe("temperature_change")
async def log_temp_change(value: float):
    print(f"温度变化记录: {value}°C")

async def trigger_events():
    await bus.publish("temperature_change", value=30.0)

asyncio.run(trigger_events())  # 输出: 温度变化记录: 30.0°C

5. 带过滤器的观察者模式

from typing import Callable, Any

class FilteredObserver:
    """带条件过滤的观察者"""
    def __init__(self, callback: Callable, filter_condition: Callable[[Any], bool]):
        self.callback = callback
        self.filter = filter_condition

    def update(self, subject: Any) -> None:
        if self.filter(subject):
            self.callback(subject)

# 使用示例
sensor = TemperatureSensor()

def alert(temp: float):
    print(f"警报!当前温度过高: {temp}°C")

# 只接收温度>30的通知
high_temp_observer = FilteredObserver(
    callback=alert,
    filter_condition=lambda s: s.temperature > 30
)

sensor.attach(high_temp_observer)
sensor.temperature = 25  # 无输出
sensor.temperature = 35  # 输出: 警报!当前温度过高: 35°C

方案对比

实现方式特点适用场景
经典实现简单直接单线程简单场景
线程安全版避免竞态条件多线程环境
异步实现非阻塞通知I/O密集型应用
事件总线松耦合,支持多对多复杂事件系统
过滤观察者条件触发需要选择性通知的场景

最佳实践建议

  1. 生命周期管理

    # 使用上下文管理器自动取消注册
    class ObserverContext:
        def __init__(self, subject: Subject, observer: Observer):
            self.subject = subject
            self.observer = observer
            
        def __enter__(self):
            self.subject.attach(self.observer)
            return self
            
        def __exit__(self, *args):
            self.subject.detach(self.observer)
    
    with ObserverContext(sensor, display):
        sensor.temperature = 20
    
  2. 性能优化

    • 对于高频事件,考虑使用弱引用(weakref.WeakSet
    • 批量通知时使用@dataclass封装事件数据
  3. 异常处理

    def safe_notify(self):
        for observer in self._observers:
            try:
                observer.update(self)
            except Exception as e:
                print(f"Observer failed: {e}")
    
  4. 与Python生态集成

    • 使用PyPubSub等现成库
    • 结合asyncio.Queue实现生产者-消费者模式

根据项目复杂度选择合适实现,简单场景用经典模式即可,分布式系统建议使用事件总线架构。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2334726.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springboot框架集成websocket依赖实现物联网设备、前端网页实时通信!

需求: 最近在对接一个物联网里设备,他的通信方式是 websocket 。所以我需要在 springboot框架中集成websocket 依赖,从而实现与设备实时通信! 框架:springboot2.7 java版本:java8 好了,还是直接…

ES6学习03-字符串扩展(unicode、for...of、字符串模板)和新方法()

一、字符串扩展 1. eg: 2.for...of eg: 3. eg: 二。字符串新增方法 1. 2. 3. 4. 5.

目前状况下,计算机和人工智能是什么关系?

目录 一、计算机和人工智能的关系 (一)从学科发展角度看 计算机是基础 人工智能是计算机的延伸和拓展 (二)从技术应用角度看 二、计算机系学生对人工智能的了解程度 (一)基础层面的了解 必备知识 …

Flutter 2025 Roadmap

2025 这个路线图是有抱负的。它主要代表了我们这些在谷歌工作的人收集的内容。到目前为止,非Google贡献者的数量超过了谷歌雇佣的贡献者,所以这并不是一个详尽的列表,列出了我们希望今年Flutter能够出现的所有令人兴奋的新事物!在…

[数据结构]排序 --2

目录 8、快速排序 8.1、Hoare版 8.2、挖坑法 8.3、前后指针法 9、快速排序优化 9.1、三数取中法 9.2、采用插入排序 10、快速排序非递归 11、归并排序 12、归并排序非递归 13、排序类算法总结 14、计数排序 15、其他排序 15.1、基数排序 15.2、桶排序 8、快速排…

第16届蓝桥杯c++省赛c组个人题解

偷偷吐槽: c组没人写题解吗,找不到题解啊 P12162 [蓝桥杯 2025 省 C/研究生组] 数位倍数 题目背景 本站蓝桥杯 2025 省赛测试数据均为洛谷自造,与官方数据可能存在差异,仅供学习参考。 题目描述 请问在 1 至 202504&#xff…

记一次InternVL3- 2B 8B的部署测验日志

1、模型下载魔搭社区 2、运行环境: 1、硬件 RTX 3090*1 云主机[普通性能] 8核15G 200G 免费 32 Mbps付费68Mbps ubuntu22.04 cuda12.4 2、软件: flash_attn(好像不用装 忘记了) numpy Pillow10.3.0 Requests2.31.0 transfo…

使用SSH解决在IDEA中Push出现403的问题

错误截图: 控制台日志: 12:15:34.649: [xxx] git -c core.quotepathfalse -c log.showSignaturefalse push --progress --porcelain master refs/heads/master:master fatal: unable to access https://github.com/xxx.git/: The requested URL return…

Tauri 2.3.1+Leptos 0.7.8开发桌面应用--Sqlite数据库的写入、展示和选择删除

在前期工作的基础上(Tauri2Leptos开发桌面应用--Sqlite数据库操作_tauri sqlite-CSDN博客),尝试制作产品化学成分录入界面,并展示数据库内容,删除选中的数据。具体效果如下: 一、前端Leptos程序 前端程序主…

《车辆人机工程-》实验报告

汽车驾驶操纵实验 汽车操纵装置有哪几种,各有什么特点 汽车操纵装置是驾驶员直接控制车辆行驶状态的关键部件,主要包括以下几种,其特点如下: 一、方向盘(转向操纵装置) 作用:控制车辆行驶方向…

使用多进程和 Socket 接收解析数据并推送到 Kafka 的高性能架构

使用多进程和 Socket 接收解析数据并推送到 Kafka 的高性能架构 在现代应用程序中,实时数据处理和高并发性能是至关重要的。本文将介绍如何使用 Python 的多进程和 Socket 技术来接收和解析数据,并将处理后的数据推送到 Kafka,从而实现高效的…

Redis 哨兵模式 搭建

1 . 哨兵模式拓扑 与 简介 本文介绍如何搭建 单主双从 多哨兵模式的搭建 哨兵有12个作用 。通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。 当哨兵监测到master宕机,会自动将slave切换成master,然后通过…

【网络安全 | 项目开发】Web 安全响应头扫描器(提升网站安全性)

原创项目,未经许可,不得转载。 文章目录 项目简介工作流程示例输出技术栈项目代码使用说明项目简介 安全响应头是防止常见 Web 攻击(如点击劫持、跨站脚本攻击等)的有效防线,因此合理的配置这些头部信息对任何网站的安全至关重要。 Web 安全响应头扫描器(Security Head…

基于PySide6与pycatia的CATIA绘图比例智能调节工具开发全解析

引言:工程图纸自动化处理的技术革新 在机械设计领域,CATIA图纸的比例调整是高频且重复性极强的操作。传统手动调整方式效率低下且易出错。本文基于PySide6+pycatia技术栈,提出一种支持智能比例匹配、实时视图控制、异常自处理的图纸批处理方案,其核心突破体现在: ​操作效…

STM32硬件IIC+DMA驱动OLED显示——释放CPU资源,提升实时性

目录 前言 一、软件IIC与硬件IIC 1、软件IIC 2、硬件IIC 二、STM32CubeMX配置KEIL配置 三、OLED驱动示例 1、0.96寸OLED 2、OLED驱动程序 3、运用示例 4、效果展示 总结 前言 0.96寸OLED屏是一个很常见的显示模块,其驱动方式在用采IIC通讯时,常用软件IIC…

泛型的二三事

泛型(Generics)是Java语言的一个重要特性,它允许在定义类、接口和方法时使用类型参数(Type Parameters),从而实现类型安全的代码重用。泛型在Java 5中被引入,极大地增强了代码的灵活性和安全性。…

编程思想——FP、OOP、FRP、AOP、IOC、DI、MVC、DTO、DAO

个人简介 👀个人主页: 前端杂货铺 🙋‍♂️学习方向: 主攻前端方向,正逐渐往全干发展 📃个人状态: 研发工程师,现效力于中国工业软件事业 🚀人生格言: 积跬步…

【区块链安全 | 第三十九篇】合约审计之delegatecall(一)

文章目录 外部调用函数calldelegatecallcall 与 delegatecall 的区别示例部署后初始状态调用B.testCall()函数调用B.testDelegatecall()函数区别总结漏洞代码代码审计攻击代码攻击原理解析攻击流程修复建议审计思路外部调用函数 在 Solidity 中,常见的两种底层外部函数调用方…

linux多线(进)程编程——(6)共享内存

前言 话说进程君的儿子经过父亲点播后就开始闭关,它想要开发出一种全新的传音神通。他想,如果两个人的大脑生长到了一起,那不是就可以直接知道对方在想什么了吗,这样不是可以避免通过语言传递照成的浪费吗? 下面就是它…

信息安全管理与评估2021年国赛正式卷答案截图以及十套国赛卷

2021年全国职业院校技能大赛高职组 “信息安全管理与评估”赛项 任务书1 赛项时间 共计X小时。 赛项信息 赛项内容 竞赛阶段 任务阶段 竞赛任务 竞赛时间 分值 第一阶段 平台搭建与安全设备配置防护 任务1 网络平台搭建 任务2 网络安全设备配置与防护 第二…