Python: 结合多进程和 Asyncio 以提高性能

news2025/1/11 10:18:56

动动发财的小手,点个赞吧!

简介

多亏了 GIL,使用多个线程来执行 CPU 密集型任务从来都不是一种选择。随着多核 CPU 的普及,Python 提供了一种多处理解决方案来执行 CPU 密集型任务。但是直到现在,直接使用多进程相关的API还是存在一些问题。

本文[1]开始之前,我们还有一小段代码来帮助演示:

import time
from multiprocessing import Process


def sum_to_num(final_num: int) -> int:
    start = time.monotonic()

    result = 0
    for i in range(0, final_num+11):
        result += i

    print(f"The method with {final_num} completed in {time.monotonic() - start:.2f} second(s).")
    return result

该方法接受一个参数并从 0 开始累加到该参数。打印方法执行时间并返回结果。

多进程存在的问题

def main():
    # We initialize the two processes with two parameters, from largest to smallest
    process_a = Process(target=sum_to_num, args=(200_000_000,))
    process_b = Process(target=sum_to_num, args=(50_000_000,))

    # And then let them start executing
    process_a.start()
    process_b.start()

    # Note that the join method is blocking and gets results sequentially
    start_a = time.monotonic()
    process_a.join()
    print(f"Process_a completed in {time.monotonic() - start_a:.2f} seconds")

    # Because when we wait process_a for join. The process_b has joined already.
    # so the time counter is 0 seconds.
    start_b = time.monotonic()
    process_b.join()
    print(f"Process_b completed in {time.monotonic() - start_b:.2f} seconds")

如代码所示,我们直接创建并启动多个进程,调用每个进程的start和join方法。但是,这里存在一些问题:

  1. join 方法不能返回任务执行的结果。
  2. join 方法阻塞主进程并按顺序执行它。

即使后面的任务比前面的任务执行得更快,如下图所示:

alt
alt

使用池的问题

如果我们使用multiprocessing.Pool,也会存在一些问题:

def main():
    with Pool() as pool:
        result_a = pool.apply(sum_to_num, args=(200_000_000,))
        result_b = pool.apply(sum_to_num, args=(50_000_000,))

        print(f"sum_to_num with 200_000_000 got a result of {result_a}.")
        print(f"sum_to_num with 50_000_000 got a result of {result_b}.")

如代码所示,Pool 的 apply 方法是同步的,这意味着您必须等待之前的 apply 任务完成才能开始执行下一个 apply 任务。

alt

当然,我们可以使用 apply_async 方法异步创建任务。但是同样,您需要使用 get 方法来阻塞地获取结果。它让我们回到 join 方法的问题:

def main():
    with Pool() as pool:
        result_a = pool.apply_async(sum_to_num, args=(200_000_000,))
        result_b = pool.apply_async(sum_to_num, args=(50_000_000,))

        print(f"sum_to_num with 200_000_000 got a result of {result_a.get()}.")
        print(f"sum_to_num with 50_000_000 got a result of {result_b.get()}.")
alt

直接使用ProcessPoolExecutor的问题

那么,如果我们使用 concurrent.futures.ProcesssPoolExecutor 来执行我们的 CPU 绑定任务呢?

def main():
    with ProcessPoolExecutor() as executor:
        numbers = [200_000_000, 50_000_000]
        for result in executor.map(sum_to_num, numbers):
            print(f"sum_to_num got a result which is {result}.")

如代码所示,一切看起来都很棒,并且就像 asyncio.as_completed 一样被调用。但是看看结果;它们仍按启动顺序获取。这与 asyncio.as_completed 完全不同,后者按照执行顺序获取结果:

alt
alt

使用 asyncio 的 run_in_executor 修复

幸运的是,我们可以使用 asyncio 来处理 IO-bound 任务,它的 run_in_executor 方法可以像 asyncio 一样调用多进程任务。不仅统一了并发和并行的API,还解决了我们上面遇到的各种问题:

async def main():
    loop = asyncio.get_running_loop()
    tasks = []

    with ProcessPoolExecutor() as executor:
        for number in [200_000_000, 50_000_000]:
            tasks.append(loop.run_in_executor(executor, sum_to_num, number))
        
        # Or we can just use the method asyncio.gather(*tasks)
        for done in asyncio.as_completed(tasks):
            result = await done
            print(f"sum_to_num got a result which is {result}")
alt

由于上一篇的示例代码都是模拟我们应该调用的并发过程的方法,所以很多读者在学习之后在实际编码中还是需要帮助理解如何使用。所以在了解了为什么我们需要在asyncio中执行CPU-bound并行任务之后,今天我们将通过一个真实世界的例子来解释如何使用asyncio同时处理IO-bound和CPU-bound任务,并领略asyncio对我们的效率代码。

Reference

[1]

Source: https://towardsdatascience.com/combining-multiprocessing-and-asyncio-in-python-for-performance-boosts-15496ffe96b

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/534256.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Pytrack 函数整理

1 distance 1.1 add_edge_lengths pytrack.graph.distance.add_edge_lengths(G, precision3) 将每条边的长度加到图里面去 1.1.1 主要参数 G路网图precision每一条边长度,保持几位小数 1.2 enlarge_bbox pytrack.graph.distance.enlarge_bbox(north, south, …

僵尸进程的避免 守护进程的创建 线程的创建,阻塞,数据传递 5.15

父子进程相关知识&#xff1a; 1.子进程结束时&#xff0c;系统 会立即自动刷新行缓存 2.手动结束进程&#xff1a; exit() exit(int status)&#xff1a;结束当前调用的进程&#xff0c;自动刷新缓存 标准库函数 头文件&#xff1a;#include <stdlib.h> _exit() …

压缩技术与常见linux解压/压缩命令总结

文章目录 1 RAR1.1 参数介绍1.2 压缩/解压1.3 分卷压缩/解压 2 7-Zip2.1 常用参数2.2 使用2.3 分卷压缩/解压 3 解压/压缩命令 总结 1 RAR RAR是一种专利文件格式&#xff0c;用于数据的压缩打包。 提供了强力压缩、分卷、加密和自解压模块 官方网址&#xff1a;https://www…

公有云——阿里云ECS服务器(IaaS)

作者简介&#xff1a;一名云计算网络运维人员、每天分享网络与运维的技术与干货。 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a;网络豆的主页​​​​​​ 目录 前言 一.了解云服务器的基础概念 1.云服务器的基础概念&#xff08;云服务器选择…

用IDEA写的第一个JavaWeb项目(保姆级)

任何事情只有0次和无数次&#xff0c;项目新建了第一个就有第二个 从第一次的略显青涩到后面的轻车熟路&#xff0c;区别就是第一次 ——Lungcen 在IDEA中新建一个web项目&#xff0c;有好几种方法。本人用的方法是通过maven构建工具来构建java项目的框架。对于web服务器的选择…

日志—加索引优化select

今天工作中遇到一个小问题&#xff0c;一个搜索接口&#xff0c;要加一个2个字段用于搜索 分析&#xff1a;这两个字段要在子表中去查询&#xff0c;查看原来mapper中的接口&#xff0c;已经连了N个子表&#xff0c;sql速度在10秒左右。 加上了新的子表&#xff0c;然后去试了…

悼念浩哥(左耳朵耗子),一个纯粹的技术人

上周末听闻浩哥的事了&#xff0c;期初还不信。在网上搜索消息&#xff0c;看来是真的。他才四十多岁&#xff0c;觉得非常可惜。很早就关注过浩哥&#xff0c;他是一位正直纯粹和爱分享的技术大牛。无论是技术分享还是人生感悟&#xff0c;或者是成长相关&#xff0c;都让我学…

展会直击 | 昂视精彩亮相CIBF2023深圳国际电池展

5月16日&#xff0c;CIBF2023深圳国际电池展在深圳国际会展中心&#xff08;宝安新馆&#xff09;正式开幕&#xff0c;昂视携2D视觉产品、3D视觉产品、锂电行业智能检测方案亮相9号馆T101-2展位&#xff0c;会场氛围火热&#xff0c;昂视展位人声鼎沸。 方案演示&#xff0c;助…

K8s进阶1——搭建K8s高可用集群

文章目录 一、资源清单二、系统初始化2.1 所有服务器配置2.2 master节点配置 三、nginxkeepalived3.1 主备机器上进行3.2 配置主节点3.3 配置备节点3.4 启动服务 四、部署etcd集群4.1 资源清单4.2 生成Etcd证书4.3 部署Etcd集群 五、安装Docker/kubeadm/kubelet5.1 安装docker5…

【数据库复习】第六章 关系数据理论 1

关系模式的设计 按照一定的原则从数量众多而又相互关联的数据中&#xff0c;构造出一组既能较好地反映现实世界&#xff0c;而又有良好的操作性能的关系模式 ●冗余度高 ●修改困难 ●插入问题 ●删除问题 ★产生问题的原因 属性间约束关系&#xff08;即数据间的依赖关系…

【C++从0到王者】第五站:类和对象(中)const和取地址运算符重载

文章目录 一、const修饰this指针二、取地址运算符重载以及const取地址运算符重载 一、const修饰this指针 我们继续使用之前实现的日期类&#xff0c;当我们写出如下代码的时候&#xff0c;我们可以观察到编译器报错了 这其实因为权限的放大&#xff0c;如下图所示&#xff0c;…

ML之VAR:基于上海最高气温数据集利用时间序列模型之VAR向量自回归模型/多变量自回归模型实现回归预测案例

ML之VAR&#xff1a;基于上海最高气温数据集利用时间序列模型之VAR向量自回归模型/多变量自回归模型实现回归预测案例 目录 基于上海最高气温数据集利用时间序列模型之VAR向量自回归模型/多变量自回归模型实现回归预测案例 # 1、定义数据集 # 2、数据集预处理 # 2.1、缺失值…

【Spring全家桶系列】Spring中的事务管理(基于注解完成实现)

⭐️前面的话⭐️ 本文已经收录到《Spring框架全家桶系列》专栏&#xff0c;本文将介绍Spring中的事务管理&#xff0c;事务的概念与作用&#xff0c;以及Spring事务的属性和传播机制。 &#x1f4d2;博客主页&#xff1a;未见花闻的博客主页 &#x1f389;欢迎关注&#x1f5…

怀念浩哥(左耳朵耗子),一个纯粹的技术人

上周末听闻浩哥的事了&#xff0c;期初还不信。在网上搜索消息&#xff0c;看来是真的。他才四十多岁&#xff0c;觉得非常可惜。很早就关注过浩哥&#xff0c;他是一位正直纯粹和爱分享的技术大牛。无论是技术分享还是人生感悟&#xff0c;或者是成长相关&#xff0c;都让我学…

Portainer: 带你领略强大且易用的容器管理平台

什么是Portainer? Portainer是一个强大的容器管理平台 Portainer是一款轻量级的应用,它提供了图形化界面,用于方便地管理Docker环境,包括单机环境和集群环境。Portainer全球最受欢迎的容器管理平台,拥有超过100万用户和24,600颗GitHub星Portainer的定位及与周边生态的交互…

【Redis】聊一下持久化机制-AOF

前言 持久化其实在任何存储系统中&#xff0c;都是避不开的话题&#xff0c;比如数据库系统就有ACID进行数据、日志的持久化。将文件写入到内存、缓存、磁盘中。在比如消息队列Kafka也有消息的持久化机制&#xff0c;为防止数据的丢失也需要将数据持久化存储。目的其实就是为了…

javaweb系列- JavaScript事件

1.6 JavaScript事件 1.6.1 事件介绍 如下图所示的百度注册页面&#xff0c;当我们用户输入完内容&#xff0c;百度可以自动的提示我们用户名已经存在还是可以使用。那么百度是怎么知道我们用户名输入完了呢&#xff1f;这就需要用到JavaScript中的事件了。 什么是事件呢&…

【SpringBoot】整合第三方技术Junit. MybatisPlus druid

【SpringBoot】整合第三方技术 整合junit整合MyBatis整合Mybatis-plus使用阿里云创建工程 SpringBoot整合druid 整合junit 自己定义一个功能&#xff0c;测试功能接口 测试步骤注入你要测试的对象 提前声明为bean资源执行你要测试的方法 package com.ustc.sp7;import com.us…

MySQL—MVCC

文章目录 数据库并发的场景有三种MVCC概念读-写3个记录隐藏列字段undo log模拟MVCC Read ViewRC与RR的本质区别RCRR 数据库并发的场景有三种 读-读: 不存在任何问题&#xff0c;也不需要并发控制 读-写∶有线程安全问题&#xff0c;可能会造成事务隔离性问题&#xff0c;可能遇…

阿里云GPU服务器租用费用包年包月、一个小时和学生价格

阿里云GPU服务器租用价格表包括包年包月、一个小时收费以及学生GPU服务器租用费用&#xff0c;阿里云GPU计算卡包括NVIDIA V100计算卡、T4计算卡、A10计算卡和A100计算卡&#xff0c;GPU云服务器gn6i可享受3折&#xff0c;阿里云百科分享阿里云GPU服务器租用价格表、GPU一个小时…