python:并发编程(十)

news2025/1/23 15:09:15

前言

本文将和大家一起探讨python的多协程并发编程(上篇),使用内置基本库asyncio来实现并发,先通过官方来简单使用这个模块。先打好基础,能够有个基本的用法与认知,后续文章,我们再进行详细使用。

本文为python并发编程的第十篇,上一篇文章地址如下:

python:并发编程(九)_Lion King的博客-CSDN博客

下一篇文章地址如下:

python:并发编程(十一)_Lion King的博客-CSDN博客

一、快速开始

官方文档:asyncio --- 异步 I/O — Python 3.11.4 文档

1、事件循环

asyncio 提供了事件循环(Event Loop)作为协程的调度器和执行者。事件循环负责处理协程的调度和执行,并处理事件、回调函数等。以下是关于 asyncio 事件循环的一些重要概念和用法:

(1)获取事件循环对象:可以使用 asyncio.get_event_loop() 获取默认的事件循环对象,或者使用 asyncio.new_event_loop() 创建一个新的事件循环对象。

(2)设置默认事件循环:可以使用 asyncio.set_event_loop() 设置默认的事件循环对象。

(3)运行事件循环:可以使用 loop.run_forever() 方法以无限循环的方式运行事件循环,直到调用 loop.stop()

(4)执行协程:可以使用 loop.run_until_complete() 方法执行一个协程或任务,并等待其完成。

(5)停止事件循环:可以使用 loop.stop() 停止事件循环的运行。

(6)调度协程:可以使用 loop.create_task() 方法创建一个任务,并将其添加到事件循环中运行。

(7)定时调度:可以使用 loop.call_later()loop.call_at() 方法在指定的时间点调度回调函数的执行。

(8)异常处理:可以使用 try-except 块来捕获和处理在协程执行过程中抛出的异常。

以下是一个简单的示例代码,演示了如何使用 asyncio 的事件循环:

import asyncio

# 定义一个协程
async def my_coroutine():
    print("Coroutine is running")
    await asyncio.sleep(1)  # 模拟耗时操作
    print("Coroutine is done")

# 创建事件循环
loop = asyncio.get_event_loop()

# 将协程添加到事件循环中
task = loop.create_task(my_coroutine())

# 运行事件循环直到任务完成
loop.run_until_complete(task)

# 关闭事件循环
loop.close()

以上代码创建了一个简单的协程 my_coroutine(),并将其添加到事件循环中运行。loop.run_until_complete(task) 运行事件循环直到任务完成,然后关闭事件循环。

通过使用 asyncio 的事件循环,可以方便地管理和调度协程的执行,实现异步操作和并发编程。

2、Futures

asyncio 中,Futures 是一种用于表示异步操作结果的对象。它充当了协程和异步函数之间的桥梁,可以用于等待和获取异步操作的结果。

Futures 提供了以下主要功能:

(1)表示异步操作的结果:Future 对象表示一个尚未完成的异步操作,可以在协程中使用它来等待操作的完成,并获取最终的结果。

(2)设置结果值:通过调用 Future 对象的 set_result() 方法,可以设置异步操作的结果值。

(3)获取结果值:可以使用 await 关键字或 yield from 表达式等待 Future 对象的完成,并获取操作的结果值。

(4)异常处理:Future 对象可以捕获和传播异步操作中抛出的异常。可以使用 try-except 块来捕获异常,并使用 set_exception() 方法将异常传播给等待的协程。

以下是一个简单的示例代码,演示了如何使用 Futures

import asyncio

# 定义一个异步函数
async def my_async_function():
    await asyncio.sleep(1)
    return "Async operation completed"

# 创建事件循环
loop = asyncio.get_event_loop()

# 创建一个 Future 对象
future = asyncio.ensure_future(my_async_function())

# 执行事件循环,等待 Future 完成
loop.run_until_complete(future)

# 获取 Future 的结果值
result = future.result()
print(result)

# 关闭事件循环
loop.close()

在上述示例中,通过调用 asyncio.ensure_future() 方法,将异步函数 my_async_function() 转换为 Future 对象。然后,使用 loop.run_until_complete() 方法等待 Future 对象的完成,并获取结果值。

通过使用 Futures,可以方便地进行异步操作的等待和结果处理,实现并发编程和异步任务的管理。

3、传输和协议

服务端

运行以下代码将启动一个简单的服务端,监听本地的8888端口。当客户端连接并发送数据时,服务端将接收数据并发送回一条响应。请确保在运行服务器之前,没有其他应用程序在使用相同的IP地址和端口号,并且防火墙设置不会阻止客户端与服务器建立连接。

import asyncio

# 自定义协议类
class MyProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        print('Client connected')
        self.transport = transport

    def data_received(self, data):
        print('Received:', data.decode())

        # 响应客户端请求
        response = b'Hello, client!'
        self.transport.write(response)

    def connection_lost(self, exc):
        print('Client disconnected')

# 创建事件循环
loop = asyncio.get_event_loop()

# 启动服务端
async def start_server():
    # 创建服务器
    server = await loop.create_server(lambda: MyProtocol(), 'localhost', 8888)

    # 获取绑定的地址和端口
    address = server.sockets[0].getsockname()
    print('Server started at', address)

    # 等待服务器关闭
    await server.wait_closed()

# 运行事件循环,启动服务端
loop.run_until_complete(start_server())

客户端

asyncio 中,传输(Transport)和协议(Protocol)是用于实现网络通信的重要组件。

**传输(Transport)**是网络连接的抽象表示,负责发送和接收数据。它提供了发送和关闭连接的方法,以及处理连接状态的相关属性。

**协议(Protocol)**是定义网络通信规则和行为的抽象接口。它定义了在网络连接上进行数据交换的方式,包括数据的编码、解码和处理等。协议通常是从 asyncio.Protocol 类派生而来的子类。

asyncio 中,可以通过以下步骤来使用传输和协议进行网络通信:

(1)创建传输和协议对象:使用 loop.create_connection() 方法创建传输和协议对象。该方法接受一个协议类和主机地址、端口号等参数,并返回一个 (transport, protocol) 对象。

(2)连接到远程主机:调用传输对象的 transport.connect() 方法,将传输对象与远程主机建立连接。连接成功后,将调用协议对象的 protocol.connection_made() 方法。

(3)数据收发:使用传输对象的 transport.write() 方法发送数据到远程主机,以及通过协议对象的回调方法接收和处理远程主机发送的数据。

(4)关闭连接:调用传输对象的 transport.close() 方法关闭连接。关闭连接后,将调用协议对象的 protocol.connection_lost() 方法。

以下是一个简单的示例代码,演示了如何使用传输和协议进行网络通信:

import asyncio

# 自定义协议类
class MyProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        print('Connected')
        self.transport = transport

    def data_received(self, data):
        print('Received:', data.decode())

    def connection_lost(self, exc):
        print('Connection closed')

# 创建事件循环
loop = asyncio.get_event_loop()

# 创建传输和协议对象
coro = loop.create_connection(MyProtocol, 'localhost', 8888)

# 运行事件循环,建立连接
transport, protocol = loop.run_until_complete(coro)

# 发送数据
transport.write(b'Hello, server!')

# 关闭连接
transport.close()

# 停止事件循环
loop.stop()

在上述示例中,我们自定义了一个协议类 MyProtocol,继承自 asyncio.Protocol。然后,使用 loop.create_connection() 方法创建传输和协议对象,并通过 loop.run_until_complete() 方法建立连接。最后,通过传输对象的 transport.write() 方法发送数据,并调用传输对象的 transport.close() 方法关闭连接。

通过使用传输和协议,我们可以方便地实现网络通信,并处理收发数据的逻辑。

4、策略

asyncio中,策略对象是用于控制事件循环行为的一种机制。asyncio提供了默认的策略对象,但也允许用户自定义和替换策略对象来满足特定的需求。

默认的策略对象是asyncio.DefaultEventLoopPolicy,它基于具体的事件循环实现,如asyncio.SelectorEventLoop(基于selectors模块)或asyncio.SelectorEventLoop(基于select模块)。默认策略对象通过asyncio.get_event_loop_policy()方法获取。

用户可以通过自定义策略对象来改变事件循环的行为,例如使用特定的事件循环实现或者自定义调度器。自定义策略对象必须实现asyncio.AbstractEventLoopPolicy接口的方法,例如get_event_loop()set_event_loop()new_event_loop()等。

下面是一个使用自定义策略对象的示例:

import asyncio

# 自定义策略对象
class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
    def get_event_loop(self):
        loop = super().get_event_loop()
        print('Using custom event loop:', type(loop).__name__)
        return loop

    # 其他自定义方法...

# 设置自定义策略对象
asyncio.set_event_loop_policy(MyEventLoopPolicy())

# 创建事件循环并运行
loop = asyncio.get_event_loop()
loop.run_forever()

在这个示例中,我们创建了一个自定义的策略对象MyEventLoopPolicy,并重写了get_event_loop()方法来打印使用的事件循环类型。然后通过asyncio.set_event_loop_policy()方法将自定义策略对象设置为当前的策略。最后,使用asyncio.get_event_loop()获取事件循环并运行。

请注意,自定义策略对象的设置应该在创建事件循环之前进行,以确保新创建的事件循环使用自定义策略。

5、Extending

asyncio提供了扩展其功能的机制,使开发者能够根据自己的需求进行定制和扩展。以下是一些扩展asyncio功能的常见方法:

(1)自定义协议(Protocol):可以通过继承asyncio.Protocol类来创建自定义的协议。通过实现connection_madedata_receivedconnection_lost等方法,可以处理连接的建立、数据的接收和连接的关闭等事件。

(2)自定义传输(Transport):可以通过继承asyncio.BaseTransport类来创建自定义的传输。传输是协议和网络层之间的接口,负责处理数据的发送和接收。

(3)自定义事件循环(Event Loop):可以通过继承asyncio.AbstractEventLoop类来创建自定义的事件循环。事件循环是asyncio的核心组件,负责调度和执行异步任务。

(4)自定义异步任务(Coroutines):可以使用asyncio.coroutine装饰器和async def关键字来定义自己的异步任务。通过使用协程,可以编写异步的、非阻塞的代码逻辑。

(5)自定义异步函数(Async Functions):可以使用@asyncio.coroutine装饰器和async def关键字来定义自己的异步函数。异步函数可以与协程配合使用,用于编写更高级的异步逻辑。

(6)自定义事件处理器(Event Handlers):可以使用asyncio提供的事件处理器机制来定制和处理特定的事件。例如,可以注册和处理定时器事件、I/O事件、信号事件等。

通过以上扩展方法,可以根据具体需求对asyncio进行定制和扩展,以满足特定的异步编程场景和功能要求。这使得asyncio具有很高的灵活性和可扩展性,适用于各种异步编程任务和应用领域。

下面是一个简单的示例代码,展示了如何扩展asyncio的功能,通过自定义协议和传输来实现一个简单的Echo服务器:

import asyncio

# 自定义协议类
class EchoProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print("Received:", message)
        self.transport.write(data)

    def connection_lost(self, exc):
        print("Connection closed")

# 自定义传输类
class EchoTransport(asyncio.Transport):
    def __init__(self, loop):
        super().__init__(extra=None)
        self.loop = loop
        self.buffer = b''

    def write(self, data):
        print("Sending:", data.decode())
        self.buffer += data

    def close(self):
        print("Closing connection")

    def can_write_eof(self):
        return False

    def get_write_buffer_size(self):
        return len(self.buffer)

    def write_eof(self):
        pass

    def abort(self):
        pass

    def pause_reading(self):
        pass

    def resume_reading(self):
        pass

# 创建事件循环
loop = asyncio.get_event_loop()

# 创建协议和传输对象
protocol = EchoProtocol()
transport = EchoTransport(loop)

# 将协议和传输对象关联起来
protocol.connection_made(transport)

# 模拟收到数据并处理
data = b"Hello, server!"
protocol.data_received(data)

# 关闭连接
protocol.connection_lost(None)

# 停止事件循环
loop.stop()

这个示例中,EchoProtocol类继承了asyncio.Protocol,并实现了connection_madedata_receivedconnection_lost等方法,用于处理连接的建立、数据的接收和连接的关闭。EchoTransport类继承了asyncio.Transport,并实现了一系列传输相关的方法,用于处理数据的发送和关闭连接。

通过自定义协议和传输,我们可以根据需要来扩展和定制asyncio的功能,以适应不同的应用场景和需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/652590.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【程序人生-Hello‘s P2P】哈尔滨工业大学深入理解计算机系统大作业

计算机系统 大作业 题 目 程序人生-Hello’s P2P 专 业 xxxx 学 号 2021xxxx 班 级 210xxxx 学 生 xx 指 导 教 师 xxx 计算机科学与技术学院 2023年5月 摘 要 HelloWorld是每个程序员接触的第一个程序,表面上平平无奇的它背后却是由操作系统许多设计精巧的机制支撑…

直击面试!阿里技术官手码12W字面试小册在Github上爆火

Java面试 临近金三银四,想必有不少老哥已经在为面试做准备了。大家想必也知道现在面试就是看项目经验基本技术个人潜力(也就是值不值得培养)。总之就是每一次面试都是对我们能力的检验(无论是软实力还是硬实力)。软实力其实就是简历包装&…

高性能计算专业发展及就业前景

高性能计算专业是指一种针对计算领域最高端、最具挑战性且最具应用价值的领域专业。本文从专业背景、就业前景、发展趋势、技能需求和职业发展路径等5个角度进行较为详细的论证,旨在为高性能计算专业的学生提供一个全面的了解,以及为需要参考的人员提供参…

Java中常用的工具类

有时间就该多学,我还年轻,吃苦趁现在! 文章目录 ❗前言1️⃣Object类①、使用步骤equals方法hashCode方法toString方法❗equals方法和运算符的区别 ②、重要方法总结 2️⃣包装类基本类型和包装类相互转换字符串类型转换成基本类型 3️⃣Date…

【Java入门】-- Java基础详解之 [变量]

目录 1.变量 1.1 变量的概念 1.2 变量的使用基本步骤 1.3 变量使用注意事项 1.4 程序中 号的使用 1.5 数据类型 1.6 整数的类型 1.7 整型的使用细节 1.8 浮点类型 1.9 浮点型的使用细节 2.0 Java API文档 2.1 字符类型(char) 2.2 字符的使用细节 2.3 字符编码 …

JS事件冒泡与JS事件代理(事件委托)

JS事件冒泡与JS事件代理(事件委托) 1、事件冒泡1.1 概念1.2 要是不给子元素添加具体的oncilck处理方法,也能冒泡么?1.3 子元素触发的事件冒泡会触发父元素所有的事件么?还是触发对应的事件?1.4 那么我们应该…

pytorch笔记:Conv2d 和 ConvMixer

来自B站视频,API查阅,TORCH.NN nn.Conv2d 中一般 kernel_size 是小奇数(很多是3),padding 设置为 k − 1 2 \frac{k-1}{2} 2k−1​(实际上padding的是 k − 1 k-1 k−1,因为参数的意义是左右各…

苹果新专利曝光:AirTags可以快速找到Apple Pencil

近日,据外媒报道,苹果一项新专利提出,苹果手写笔可以通过“声学谐振器”来帮助用户找出手写笔的位置。根据这项专利,苹果试图在手写笔的笔盖上加入一个被动元件,以响应特定的声波频率。iPhone、iPad或Apple Watch会发出…

【OpenMMLab AI实战营二期笔记】第七天 MMDetection代码课

0. 环境检测和安装 # 安装 mmengine 和 mmcv 依赖 # 为了防止后续版本变更导致的代码无法运行,暂时锁死版本 pip install -U "openmim0.3.7" mim install "mmengine0.7.1" mim install "mmcv2.0.0"# Install mmdetection rm -rf mmd…

【SpringBoot】SpringBoot集成支付宝支付

文章目录 背景1、预期效果展示2. 开发流程2.1 沙盒调试2.1.1. 创建沙盒应用2.1.2. SpringBoot代码实现2.1.3. 前端代码实现 2.2 创建并上线APP 背景 在开始集成支付宝支付之前,我们需要准备一个支付宝商家账户,如果是个人开发者,可以通过注册…

硬件速攻-E18-D80NK红外光电传感器

介绍 E18-D80NK光电开关是一款常见的光电传感器,具有高性价比、安装方便等优点。其工作原理是利用发射管发射出的红外线照射目标物,当目标物接近开关时,被照射的红外线反射回接收器,接收器就会向微处理器发出信号,从而…

1.9C++不同数据类型转换

C不同数据类型转换 在 C中,不同类型之间的数据转换可以通过强制类型转换(类型转换运算符)来实现。 C 中强制类型转换有以下三种: 1、static_cast static_cast 可以用于基本数据类型之间的转换,也可以用于类层次结构…

从实现到原理,总结11种延迟任务的实现方式(下)

7 监听Redis过期key 在Redis中,有个发布订阅的机制 生产者在消息发送时需要到指定发送到哪个channel上,消费者订阅这个channel就能获取到消息。图中channel理解成MQ中的topic。 并且在Redis中,有很多默认的channel,只不过向这些…

小鱼深度产品测评之:阿里云云效代码管理 Codeup,一款数十万企业正在使用,全方位保护企业代码资产的实力产品,。

云效代码管理 Codeup 0、引言1、进入页面2、创建代码库3、资源文件页面4、分支→新建保护分支规则5、分支-基本设置5.1 基本信息 模块5.2 存储空间管理 6、分支→仓库备份6.1 点击 如何启用 按钮6.2 点击 前往企业设置查看 7、合并请求8、度量报表9、动态10、流水线11、总结 0、…

基于html+css的图展示129

准备项目 项目开发工具 Visual Studio Code 1.44.2 版本: 1.44.2 提交: ff915844119ce9485abfe8aa9076ec76b5300ddd 日期: 2020-04-16T16:36:23.138Z Electron: 7.1.11 Chrome: 78.0.3904.130 Node.js: 12.8.1 V8: 7.8.279.23-electron.0 OS: Windows_NT x64 10.0.19044 项目…

24届秋招专场 · 数组如何用双指针解题呢?

你好,我是安然无虞。 文章目录 删除有序数组中的重复项删除排序链表中的重复元素移除元素移除零 大家好,近期主要更新数组相关的解题算法咯,感兴趣的老铁可以一起看过来啦。 今天更新使用双指针解决数组部分题型,注意哦&#xff…

支付宝小程序云亮相!向小程序生态开放全面云服务

前言: 小程序是一种轻量级应用程序,不需要安装即可直接在手机上使用。相较于传统的APP来讲,其无需下载安装,轻便快捷,快速启动,易于推广的良好特性为我们所青睐。 为此,支付宝小程序云&#xff…

DataX在有赞大数据平台的实践

文章目录 一、需求二、选型三、前期设计3.1 运行形态3.2 执行器设计3.3 开发策略 四、Datax-Web五、总结 大家好,我是脚丫先生 (o^^o) 在看技术文章的时候,发现有赞平台采用过Datax。想到指北数据中台,数据汇聚采用的是Datax-web二次开发&am…

chatgpt赋能python:Python的字符串索引操作技巧

Python的字符串索引操作技巧 Python是一个强大而灵活的编程语言,被广泛用于各种领域。在Python中,字符串是一个非常重要的数据类型,它可以包含文本、数字、符号和其他任何字符。在处理字符串时,索引操作是常见的操作之一。本文将…

计算机网络之网络层:数据平面

四.网络层:数据平面 4.1 网络层概述 网络层被分解为两个相互作用的部分,即数据平面和控制平面。 数据平面决定到达路由器输入链路之一的数据报如何转发到该路由器的输出链路之一,转发方式有: 传统的IP转发:转发基于…