Python Asyncio 调用 CPU 多核工作

news2025/1/6 20:00:32

前言

Python 的 Asyncio 适合异步处理 IO 密集型的事务, 比如 HTTP 请求, 磁盘读写, 数据库操作等场景. 如果使用传统的顺序执行代码, 需要等待每次 IO 事务进行完成后才可以继续后面的代码. 通过在定义函数时添加修饰词 async 可以将其设置为异步函数, 后续配合 Asyncio 可以实现多线程并行提高代码的运行效率.
然而在实际应用过程中发现存在一个很明显的性能问题, Asyncio 并行运行过程中观察系统任务管理器可以看到并不是所有的 CPU 内核在工作, 一多半内核都是闲置状态. (资料中描述的是其实只有一个内核会工作)
在这里插入图片描述
本文将参考大佬 PENG QIAN 的文章 Harnessing Multi-Core Power with Asyncio in Python 进行学习和讨论.

传统方式

import asyncio
import random
import time

# 模拟 IO 阻塞的函数
async def fake_io_func():
    io_delay = round(random.uniform(0.2, 1.0), 2)
    await asyncio.sleep(io_delay)

    result = 0
    for i in range(random.randint(100_000, 500_000)):
        result += i

    return result

# 异步任务主函数
async def main():
    start = time.monotonic()
    tasks = [asyncio.create_task(fake_io_func()) for i in range(10000)]

    await asyncio.gather(*tasks)
    print(f'All tasks completed. And last {time.monotonic() - start:.2f} seconds.')


if __name__ == '__main__':
    asyncio.run(main())

执行结果

All tasks completed. And last 188.95 seconds.

期间观察任务管理器
在这里插入图片描述

多核模式

import asyncio
import random
import time
from concurrent.futures import ProcessPoolExecutor

# 模拟 IO 阻塞的函数
async def fake_io_func():
    io_delay = round(random.uniform(0.2, 1.0), 2)
    await asyncio.sleep(io_delay)

    result = 0
    for i in range(random.randint(100_000, 500_000)):
        result += i

    return result

# 并行运行特定范围编号的任务
async def query_concurrently(begin_idx: int, end_idx: int):
    """ Start concurrent tasks by start and end sequence number """
    tasks = []
    for _ in range(begin_idx, end_idx, 1):
        tasks.append(asyncio.create_task(fake_io_func()))
    results = await asyncio.gather(*tasks)
    return results

# 在子进程中执行批量任务
def run_batch_tasks(batch_idx: int, step: int):
    """ Execute batch tasks in sub processes """
    begin = batch_idx * step + 1
    end = begin + step

    results = [result for result in asyncio.run(query_concurrently(begin, end))]
    return results

# 修改后的主函数, 用来分配批量任务在子进程中并行执行
async def main():
    """ Distribute tasks in batches to be executed in sub-processes """
    start = time.monotonic()

    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as executor:
        tasks = [loop.run_in_executor(executor, run_batch_tasks, batch_idx, 2000)
                 for batch_idx in range(5)]

    results = [result for sub_list in await asyncio.gather(*tasks) for result in sub_list]

    print(f"We get {len(results)} results. All last {time.monotonic() - start:.2f} second(s)")

if __name__ == '__main__':
    asyncio.run(main())

执行结果

We get 10000 results. All last 46.47 second(s)

期间观察任务管理器, 效果挺明显, 虽然还有俩核在摸鱼
在这里插入图片描述

Deep dive

下面开始柯南时间 😉

根据文章 Async IO in Python: A Complete Walkthrough 中的描述:

In fact, async IO is a single-threaded, single-process design: it uses cooperative multitasking, a term that you’ll flesh out by the end of this tutorial. It has been said in other words that async IO gives a feeling of concurrency despite using a single thread in a single process. Coroutines (a central feature of async IO) can be scheduled concurrently, but they are not inherently concurrent.

Asyncio 应该就是单线程, 单进程的设计. 其中对于 Asyncio 最重要的一个核心概念 event loop 可以参考下图理解:
How does the event loop work. Image by PENG QIAN
Python 官方文档对 event loop 的解释:

The event loop is the core of every asyncio application. Event loops run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses.
Application developers should typically use the high-level asyncio functions, such as asyncio.run(), and should rarely need to reference the loop object or call its methods. This section is intended mostly for authors of lower-level code, libraries, and frameworks, who need finer control over the event loop behavior.

event loop 说白了就是将 IO 密集的事务(函数)抽象成一个任务, 调用后并不会立刻返回结果, 而是扔到后台去执行, 啥时候执行完了啥时候返回结果. 所以如果并发执行多个任务的时候, 可以不用等待所有任务都执行完了再继续其余代码. 官方建议的是直接用高级封装后的 asyncio.run() 调用异步函数, 对于普通场景来说也就足够了, 但是在任务数量特别大的时候, 由于单核的限制, 会造成大量的计算资源的浪费, 并且效率极低.

PENG QIAN 的思路是通过将任务切片, 交给多个子进程(Sub process), 每个子进程负责一部分任务并行执行. 由于子进程是可以在多个 CPU 内核上同时运行的, 这样就可以解决 event loop 先天不支持多 CPU 内核的性能问题.

下图中绿色部分表示子进程, 黄色表示 event loop 任务.
How the code is executed
下面再来逐个理解改进后的函数.

query_concurrently 异步函数用于并行直接特定范围, 即切片后的任务:

async def query_concurrently(begin_idx: int, end_idx: int):
    """ Start concurrent tasks by start and end sequence number """
    tasks = []
    for _ in range(begin_idx, end_idx, 1):
        tasks.append(asyncio.create_task(fake_io_func()))
    results = await asyncio.gather(*tasks)
    return results

run_batch_tasks 注意这是一个普通非异步的函数, 用来批量调用 asyncio.run() 获取异步结果:

def run_batch_tasks(batch_idx: int, step: int):
    """ Execute batch tasks in sub processes """
    begin = batch_idx * step + 1
    end = begin + step

    results = [result for result in asyncio.run(query_concurrently(begin, end))]
    return results

最后的异步主函数 main 使用偏底层的 event loop, 通过调用 loop.run_in_executorrun_batch_tasks 函数指定在 ProcessPoolExecutor 资源池中执行, loop.run_in_executor() 会返回 asyncio.Future 类型的对象,

async def main():
    """ Distribute tasks in batches to be executed in sub-processes """
    start = time.monotonic()

    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as executor:
        tasks = [loop.run_in_executor(executor, run_batch_tasks, batch_idx, 2000)
                 for batch_idx in range(5)]

    results = [result for sub_list in await asyncio.gather(*tasks) for result in sub_list]

    print(f"We get {len(results)} results. All last {time.monotonic() - start:.2f} second(s)")

下面这行代码用了单行嵌套循环的 Python 语法 (Nested loop), 看上去就挺不好读的:

results = [result for sub_list in await asyncio.gather(*tasks) for result in sub_list]

拆开写等价于这样:

results = []
for sublist in await asyncio.gather(*tasks):
    for result in sublist:
        results.append(result)

# 或者这样
results = [result for result in [sub_list for sub_list in await asyncio.gather(*tasks)]]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/993885.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【MySQL】MySQL对于千万级的数据库或者大表怎么处理?

大致的思路 第一优化你的sql和索引; 第二加缓存,memcached,redis; 第三以上都做了后,还是慢,就做主从复制或主主复制,读写分离,可以在应用层做,效率高,也可以用三方工…

前端面试题JS篇(3)

["1", "2", "3"].map(parseInt) 答案是多少? 答案:[1, NaN, NaN] 原因:[1, NaN, NaN]因为 parseInt 需要两个参数(val, radix),其中 radix 表示解时用的基数。 map 传了 3 个(element, index, array) &…

【好书推荐】《速学Linux:系统应用从入门到精通》

目录 前言一、为什么学习Linux系统二、Linux系统的应用领域1.Linux在服务器的应用2.嵌入式Linux的应用3.桌面Linux的应用 三、Linux的版本选择1、经验人士使用的Debian2、以桌面应用为主的Ubuntu3、以经典桌面配置为主的Mint4、社区企业操作系…

CTFhub_SSRF靶场教程

CTFhub SSRF 题目 1. Bypass 1.1 URL Bypass 请求的URL中必须包含http://notfound.ctfhub.com,来尝试利用URL的一些特殊地方绕过这个限制吧 1.利用?绕过限制urlhttps://www.baidu.com?www.xxxx.me 2.利用绕过限制urlhttps://www.baidu.comwww.xxxx.me 3.利用斜…

ctfshow-web-【nl】难了

0x00 前言 CTF 加解密合集CTF Web合集网络安全知识库 文中工具皆可关注 皓月当空w 公众号 发送关键字 工具 获取 0x01 题目 0x02 Write Up 这里首先看到只要当1小于4的时候才可以执行命令。这里看到题目【nl】,先查一下nl的含义,这个命令是将文件里的…

Eureka 笔记

服务端&#xff1a; 创建springBoot 项目 1.步骤 导入在pom.xml中 导入 eureka-server的 jar包 2.步骤 在主方法加注解 EnableEurekaServer 3. 步骤 在配置config 1.步骤pox.xml: <?xml version"1.0" encoding"UTF-8"?&g…

瑞吉外卖第二天

问题分析 前面我们已经完成了后台系统的员工登录功能开发&#xff0c;但是目前还存在一个问题&#xff0c;接下来我们来说明一个这个问题&#xff0c; 以及如何处理。 1). 目前现状 用户如果不登录&#xff0c;直接访问系统首页面&#xff0c;照样可以正常访问。 2). 理想…

Java 【异常】

一、认识异常 Exception 在 Java 中&#xff0c;将程序执行过程中发生的不正常行为称为异常 。 异常是异常exception&#xff0c;报错是报错error 1.算数异常 0不能作为除数&#xff0c;所以算数异常 2.空指针异常 arr不指向任何对象&#xff0c;打印不出arr的长度&#xff0c;…

【动态规划刷题 12】等差数列划分 最长湍流子数组

139. 单词拆分 链接: 139. 单词拆分 给你一个字符串 s 和一个字符串列表 wordDict 作为字典。请你判断是否可以利用字典中出现的单词拼接出 s 。 注意&#xff1a;不要求字典中出现的单词全部都使用&#xff0c;并且字典中的单词可以重复使用。 示例 1&#xff1a; 输入: …

思腾合力GPU服务器

布局智能生产基地&#xff01;经开区思腾合力超前助力“东数西算”&#xff01; 2022-04-20 18:41泰达服务 算力作为数字经济的核心生产力&#xff0c;是国民经济发展的重要基础设施。经开区企业思腾合力在“十四五”开局之年打造出集研发、生产、制造为一体的人工智能产业园…

【C++】常用拷贝和替换算法

0.前言 1.copy #include <iostream> using namespace std;// 常用拷贝算法 copy #include<vector> #include<algorithm>void myPrint(int val) {cout << val << " "; }void test01() {vector<int>v;for (int i 0; i < 10; i…

@Autowired和@Resource

文章目录 简介Autowired注解什么是Autowired注解Autowired注解的使用方式Autowired注解的优势和不足 Qualifier总结&#xff1a; Resource注解什么是Resource注解Resource注解的使用方式Resource注解的优势和不足 Autowired vs ResourceAutowired和Resource的区别为什么推荐使用…

Django+Nginx+uWSGI+Supervisor实战

大家好&#xff0c;真的是许久没有更新文章了&#xff0c;甚是想念&#xff0c;最近这段时间事情很多&#xff0c;家里的事情、工作的事情&#xff0c;真没有太多时间去码文章&#xff0c;其实已经搁置了些许文章&#xff0c;没有整理&#xff0c;趁着这段时间风平浪静&#xf…

MSOS604A是德科技keysight MSOS604A示波器

181/2461/8938Infiniium S系列示波器融合了创新技术&#xff0c;旨在提供卓越的测量。新的10位ADC和低噪声前端技术协同工作&#xff0c;提供高达8 GHz的性能和业界最佳的信号完整性。一个高级框架&#xff0c;配有可快速启动的固态硬盘、可轻松触摸的15英寸电容式显示屏和可快…

洛谷P8814:解密 ← CSP-J 2022 复赛第2题

【题目来源】https://www.luogu.com.cn/problem/P8814https://www.acwing.com/problem/content/4732/【题目描述】 给定一个正整数 k&#xff0c;有 k 次询问&#xff0c;每次给定三个正整数 ni&#xff0c;ei&#xff0c;di&#xff0c;求两个正整数 pi&#xff0c;qi&#xf…

取消合并单元格并快速填充

例如&#xff1a; 步骤如下&#xff1a; 1/ 取消合并单元格 2/ 全选表格 3/ excel导航栏 - 开始 - 查找和选择- 定位条件 快捷键&#xff1a;ctrlG 4/ 选择“空值” - 点击确定 5/ 输入公式 “a2" 注意&#xff0c;自定定位在a3单元格 输入完公式后&#xff0c;按…

file-storage-sdk项目开发中的踩坑记录

文章目录 file-storage-sdk项目开发中的踩坑记录问题1&#xff1a;项目启动报错&#xff1a;Attribute "click" appears more than once in element问题2&#xff1a;前端对话框被遮挡问题3&#xff1a;RequestBody无法接收表单数据问题4&#xff1a;文件上传失败问题…

WebServer 解析HTTP 响应报文

一、基础API部分&#xff0c;介绍stat、mmap、iovec、writev、va_list 1.1 stat​ 作用&#xff1a;获取文件信息 #include <sys/types.h> #include <sys/stat.h> #include <unistd.h>// 获取文件属性&#xff0c;存储在statbuf中 int stat(const char *…

Excel必备!6种快速插入√或x标记的方法揭秘

本教程展示了在Excel中插入勾或叉的六种不同方法。Excel中有两种复选标记——交互式复选框和勾号符号。 勾选框,也称为复选框或复选标记框,是一种特殊控件,允许你通过鼠标单击来选择或取消选择某个选项,即选中或取消选中勾选框。​ 勾号符号,也称为复选符号或复选标记,…