同步、异步无障碍:Python异步装饰器指南

news2024/10/7 10:22:02

一、引言

Python异步开发已经非常流行了,一些主流的组件像MySQL、Redis、RabbitMQ等都提供了异步的客户端,再处理耗时的时候不会堵塞住主线程,不但可以提高并发能力,也能减少多线程带来的cpu上下文切换以及内存资源消耗。但在业务开发的时候一些第三方库没有异步的处理方式,例如OSS、CV、其他第三方提供的SDK以及自己封装的函数有耗时等,此时还是需要借助线程来加速,再异步中就不会堵塞主线程,因此封装一个异步装饰器可以更好的处理异步,让代码更简洁。

二、功能分析

  1. 支持同步函数使用线程加速

  2. 异、同步函数需支持 await 语法等待返回结果

  3. 异、同步函数需支持后台任务,无需等待

同步函数使用线程加速

同步函数使用线程,这还是挺简单的使用,内置库的 threading.Thread 就可以实现

import time
import threading

def task1(name):
    print(f"Hello {name}")
    time.sleep(1)
    print(f"Completed {name}")


t1 = threading.Thread(target=task1, args=("hui",))
t2 = threading.Thread(target=task1, args=("wang",))

t1.start()
t2.start()

t1.join()
t2.join()

>>> out
Hello hui
Hello wang
Completed hui
Completed wang

  • start()方法用于启动线程执行函数。
  • join()方法用于等待线程执行结束。

但这样直接开线程的方式比较暴力,也不太好管理,因此可以想到线程池,进行线程复用与管理。Python内置的 concurrent.futures 模块提供了线程池和进程池的实现与封装。

import time
from concurrent.futures import ThreadPoolExecutor

def task2(name):
    print(f"Hello {name}")
    time.sleep(1)
    return f"Completed {name}"

with ThreadPoolExecutor(max_workers=2) as executor:
    future1 = executor.submit(task2, "hui")
    future2 = executor.submit(task2, "zack")

print("ret1", future1.result())
print("ret2", future2.result())

>>> out
Hello hui
Hello zack
ret1 Completed hui
ret2 Completed zack

异、同步函数需支持 await 语法

异、同步函数需支持 await 语法等待返回结果,异步函数本身就支持 await语法,这里主要是实现同步函数支持

await 语法,在python中可以await语法的对象有如下几大类:

  1. 协程对象(coroutine):定义了__await__方法的对象,异步框架中的协程函数都是此类型。
  2. 任务对象(Task):封装了协程的对象, 如 asyncio 中的 Task, trio中的Task。
  3. Future对象:表示异步操作结果的对象, 如 concurrent.futures.Future。
  4. 协程装饰器封装的对象:一些装饰器可以将普通函数或对象包装成可await的对象,如@asyncio.coroutine。

综上,凡是实现了__await__魔术方法的对象或者封装了协程/任务的对象,都可以被await,await会自动把对象交给事件循环运行,等待其完成。

常见的可await对象包括协程函数、任务对象、Future、被@coroutine装饰的函数等,这可以使异步代码更简洁。await对象可以暂停当前协程,等待异步操作完成后再继续执行。

import asyncio

async def coro_demo():
    print("await coroutine demo")

async def task_demo():
    print("await task demo")

    async def coro():
        print("in coro task")

    # 创建 Task 对象
    task = asyncio.create_task(coro())
    await task

async def future_demo():
    print("await future demo")
    future = asyncio.Future()
    await future

# 这个装饰器已经过时
@asyncio.coroutine
def coro_decorated_demo():
    print("await decorated function demo")

async def main():
    await coro_demo()

    await task_demo()

    await future_demo()

    await coro_decorated_demo()

if __name__ == '__main__':
    asyncio.run(main())
    
    
>>> out 
DeprecationWarning: "@coroutine" decorator is deprecated since Python 3.8, use "async def" instead
  def coro_decorated_demo():
  
await coroutine demo
await task demo
in coro task
await future demo

这个 @asyncio.coroutine 协程装饰器已经过时了,都是使用 async、await 语法替代。

下面是实现 __await__ 方法的demo

import asyncio

class AsyncDownloader:

    def __init__(self, url):
        self.url = url
        self.download_ret = None

    def __await__(self):
        print(f'Starting download of {self.url}')
        loop = asyncio.get_event_loop()
        future = loop.run_in_executor(None, self.download)
        yield from future.__await__()
        return self

    def download(self):
        print(f'Downloading {self.url}...')
        # 模拟下载过程
        import time
        time.sleep(2)
        self.download_ret = f'{self.url} downloaded ok'

async def main():
    print('Creating downloader...')
    downloader = AsyncDownloader('https://www.ithui.top/file.zip')
    print('Waiting for download...')
    downloader_obj = await downloader
    print(f'Download result: {downloader_obj.download_ret}')

if __name__ == '__main__':
    asyncio.run(main())
    

>>> out
Creating downloader...
Waiting for download...
Starting download of https://www.ithui.top/file.zip
Downloading https://www.ithui.top/file.zip...
Download result: https://www.ithui.top/file.zip downloaded ok    

用 yield from 来迭代 future对象(符合__await__逻辑),并在结束时return self

异、同步函数需支持后台任务

异步、后台任务的好处与场景

  1. 减少主程序的等待时间

    异步函数可以通过后台任务的方式执行一些耗时操作,如IO操作、网络请求等,而主程序无需等待这些操作完成,可以继续执行其他任务,从而减少程序总体的等待时间。

  2. 提高程序响应性能

    后台任务的异步执行,可以避免主程序被长时间阻塞,从而改善程序的整体响应性能。用户无需长时间等待才能得到响应。

  3. 解决IO密集型任务阻塞问题

    对于网络、文件IO等密集型任务,使用同步执行可能会导致长时间阻塞,而异步后台任务可以很好地解决这个问题,避免资源浪费。

  4. 良好的用户体验

    后台任务的异步处理,给用户的感觉是多个任务同时在执行,实际上CPU在切换处理,这相比线性等待任务完成,可以提供更好的用户体验。

  5. 适用于不需要实时结果的任务

    邮件发送、数据批处理、文件处理等不需要用户即时等待结果的任务非常适合通过异步方式在后台完成。

在python中同异步函数实现后台任务

  • 异步函数可以通过 asyncio.create_task 方法实现后台任务

  • 同步函数可以通过线程、线程池来实现

import asyncio
import time
from threading import Thread
from concurrent.futures import ThreadPoolExecutor

async def async_bg_task():
    print('async bg task running')
    await asyncio.sleep(3)
    print('async bg task completed')

def sync_bg_task():
    print('sync bg task running')
    time.sleep(3)
    print('sync bg task completed')

async def main():
    print('Starting main program')

    # 异步函数的后台任务
    asyncio.create_task(async_bg_task())

    # 同步函数的后台任务
    # with ThreadPoolExecutor() as executor:
    #     executor.submit(sync_bg_task)

    # Thread(target=sync_bg_task).start()

    loop = asyncio.get_running_loop()
    loop.run_in_executor(executor=ThreadPoolExecutor(), func=sync_bg_task)

    print('Main program continues')
    await asyncio.sleep(5)

if __name__ == '__main__':
    asyncio.run(main())
    
   
>>> ThreadPoolExecutor out
Starting main program
sync bg task running
sync bg task completed
Main program continues
async bg task running
async bg task completed

>>> Thread out
Starting main program
sync bg task running
Main program continues
async bg task running
sync bg task completed
async bg task completed

>>> run_in_executor out
Starting main program
sync bg task running
Main program continues
async bg task running
async bg task completed
sync bg task completed

看输出结果可以发现在同步函数使用直接使用线程池 ThreadPoolExecutor 执行还是堵塞了主线程,然后 Thread 没有,通过 loop.run_in_executor 也不会阻塞。后面发现 是 with 语法导致的堵塞,with 的根本原因就是它会等待线程池内的所有线程任务完成并回收,所以主线程必须等同步函数结束后才能继续。一开始我还一以为是线程池使用了主线程的线程后面打印线程名称看了下不是,然后调试下就发现了with的问题。

import asyncio
import time
import threading
from concurrent.futures import ThreadPoolExecutor

async def async_bg_task():
    print(f"async_bg_task In thread: {threading.current_thread().name}")

    print('async bg task running')
    await asyncio.sleep(3)
    print('async bg task completed')

def sync_bg_task(num):
    print(f"sync_bg_task{num} In thread: {threading.current_thread().name}")

    print(f'sync bg task{num} running')
    time.sleep(3)
    print(f'sync bg task{num} completed')

async def main():
    print('Starting main program')

    # 异步函数的后台任务
    asyncio.create_task(async_bg_task())

    # 同步函数的后台任务
    thread_pool = ThreadPoolExecutor()
    # with thread_pool as pool:
    #     for i in range(5):
    #         pool.submit(sync_bg_task, i)

    for i in range(5):
        thread_pool.submit(sync_bg_task, i)

    threading.Thread(target=sync_bg_task, args=["thread"]).start()

    loop = asyncio.get_running_loop()
    loop.run_in_executor(ThreadPoolExecutor(), sync_bg_task, "loop.run_in_executor")

    print('Main program continues')
    print(f"Main program In thread: {threading.current_thread().name}")
    await asyncio.sleep(5)

if __name__ == '__main__':
    asyncio.run(main())

三、具体封装实现

import asyncio
from concurrent.futures import ThreadPoolExecutor, Executor

def run_on_executor(executor: Executor = None, background: bool = False):
    """
    异步装饰器
    - 支持同步函数使用 executor 加速
    - 异步函数和同步函数都可以使用 `await` 语法等待返回结果
    - 异步函数和同步函数都支持后台任务,无需等待
    Args:
        executor: 函数执行器, 装饰同步函数的时候使用
        background: 是否后台执行,默认False

    Returns:
    """

    def _run_on_executor(func):
        @functools.wraps(func)
        async def async_wrapper(*args, **kwargs):
            if background:
                return asyncio.create_task(func(*args, **kwargs))
            else:
                return await func(*args, **kwargs)

        @functools.wraps(func)
        def sync_wrapper(*args, **kwargs):
            loop = asyncio.get_event_loop()
            task_func = functools.partial(func, *args, **kwargs)    # 支持关键字参数
            return loop.run_in_executor(executor, task_func)

        # 异步函数判断
        wrapper_func = async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
        return wrapper_func

    return _run_on_executor

封装成了带参数的装饰器

  • executor: 函数执行器, 装饰同步函数的时候使用

    • 可以传递指定的线程池,默认None 根据系统cpu核心数动态创建线程的数量
  • background: 用于标识是否后台执行,默认False

    • 有点诟病同步函数的后台任务没有用到这个参数而是使用 await 语法控制,但在使用装饰器时候可以起到后台任务标识作用,别人一看有这个参数就知道是后台任务就不用细看函数业务逻辑
    • 后续再看看怎么优化,大家有没有比较好建议
  • loop.run_in_executor(executor, task_func) 方法不支持关键字参数的传递,故而采用 task_func = functools.partial(func, *args, **kwargs) ,来构造一个不带参数的函数就可以方便使用了

测试demo

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

from py_tools.decorators.base import run_on_executor
from loguru import logger

thread_executor = ThreadPoolExecutor(max_workers=3)

@run_on_executor(background=True)
async def async_func_bg_task():
    logger.debug("async_func_bg_task start")
    await asyncio.sleep(1)
    logger.debug("async_func_bg_task running")
    await asyncio.sleep(1)
    logger.debug("async_func_bg_task end")
    return "async_func_bg_task ret end"

@run_on_executor()
async def async_func():
    logger.debug("async_func start")
    await asyncio.sleep(1)
    logger.debug("async_func running")
    await asyncio.sleep(1)
    return "async_func ret end"

@run_on_executor(background=True, executor=thread_executor)
def sync_func_bg_task():
    logger.debug("sync_func_bg_task start")
    time.sleep(1)
    logger.debug("sync_func_bg_task running")
    time.sleep(1)
    logger.debug("sync_func_bg_task end")
    return "sync_func_bg_task end"

@run_on_executor()
def sync_func():
    logger.debug("sync_func start")
    time.sleep(1)
    logger.debug("sync_func running")
    time.sleep(1)
    return "sync_func ret end"

async def main():
    ret = await async_func()
    logger.debug(ret)

    async_bg_task = await async_func_bg_task()
    logger.debug(f"async bg task {async_bg_task}")
    logger.debug("async_func_bg_task 等待后台执行中")

    loop = asyncio.get_event_loop()
    for i in range(3):
        loop.create_task(async_func())

    ret = await sync_func()
    logger.debug(ret)

    sync_bg_task = sync_func_bg_task()
    logger.debug(f"sync bg task {sync_bg_task}")
    logger.debug("sync_func_bg_task 等待后台执行")

    await asyncio.sleep(10)

if __name__ == '__main__':
    asyncio.run(main())

测试详细输出

async_func start
async_func running
async_func ret end

async bg task <Task pending name='Task-2' coro=<async_func_bg_task() running at ...
sync_func start
async_func_bg_task start
async_func start
async_func start
async_func start

sync_func running
async_func_bg_task running
async_func running
async_func running
async_func running

async_func_bg_task end
sync_func ret end

sync_func_bg_task start
sync bg task <Future pending cb=[_chain_future.<locals>._call_check_cancel() at ...
sync_func_bg_task 等待后台执行
sync_func_bg_task running
sync_func_bg_task end

四、源代码

HuiDBK/py-tools: 打造 Python 开发常用的工具,让Coding变得更简单 (github.com)

---------------------------END---------------------------

题外话

“不是只有程序员才要学编程?!”

认真查了一下招聘网站,发现它其实早已变成一项全民的基本技能了。

连国企都纷纷要求大家学Python!
在这里插入图片描述

世界飞速发展,互联网、大数据冲击着一切,各行各业对数据分析能力的要求越来越高,这便是工资差距的原因,学习编程顺应了时代的潮流。

在这个大数据时代,从来没有哪一种语言可以像Python一样,在自动化办公、爬虫、数据分析等领域都有众多应用。

更没有哪一种语言,语法如此简洁易读,消除了普通人对于“编程”这一行为的恐惧,从小学生到老奶奶都可以学会。

《2020年职场学习趋势报告》显示,在2020年最受欢迎的技能排行榜,Python排在第一。
在这里插入图片描述

它的角色类似于现在Office,成了进入职场的第一项必备技能。

如果你也想增强自己的竞争力,分一笔时代的红利,我的建议是,少加点班,把时间腾出来,去学一学Python。

因为,被誉为“未来十年的职场红利”的Python,赚钱、省钱、找工作、升职加薪简直无所不能!

目前,Python人才需求增速高达**174%,人才缺口高达50万,**部分领域如人工智能、大数据开发, 年薪30万都招不到人!在这里插入图片描述

感兴趣的小伙伴,赠送全套Python学习资料,包含面试题、简历资料等具体看下方。

👉CSDN大礼包🎁:全网最全《Python学习资料》免费赠送🆓!(安全链接,放心点击)

一、Python所有方向的学习路线

Python所有方向的技术点做的整理,形成各个领域的知识点汇总,它的用处就在于,你可以按照下面的知识点去找对应的学习资源,保证自己学得较为全面。

img
img

二、Python必备开发工具

工具都帮大家整理好了,安装就可直接上手!img

三、最新Python学习笔记

当我学到一定基础,有自己的理解能力的时候,会去阅读一些前辈整理的书籍或者手写的笔记资料,这些笔记详细记载了他们对一些技术点的理解,这些理解是比较独到,可以学到不一样的思路。

img

四、Python视频合集

观看全面零基础学习视频,看视频学习是最快捷也是最有效果的方式,跟着视频中老师的思路,从基础到深入,还是很容易入门的。

img

五、实战案例

纸上得来终觉浅,要学会跟着视频一起敲,要动手实操,才能将自己的所学运用到实际当中去,这时候可以搞点实战案例来学习。img

六、面试宝典

在这里插入图片描述

在这里插入图片描述

简历模板在这里插入图片描述

如有侵权,请联系删除。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/912877.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023前端面试题整理

前端面试大全整理 算法 n维数组旋转 90 度算法 export const rotate function (matrix: number[][]) {let n matrix.length// matrix[x][y] > matrix[y][n - 1 - x]const changeItem (num: number, x: number, y: number, rodateTime: number, isOnce?: boolean) >…

微信小程序裁剪图片成圆形

概述 分装的图片剪裁组件&#xff0c;可以把图片剪裁成圆形&#xff0c;主要思路就是使用canvas绘图&#xff0c;把剪裁的图片绘制成圆形&#xff0c;另外剪裁图片的窗口还可以移动放大缩小 详细 前言 最近在开发小程序&#xff0c;产品经理提了一个需求&#xff0c;要求微…

Java Web项目中spring.xml或springmvc.xml配置文件中出现名称空间爆红

在做项目的过程中&#xff0c;通常需要配置spring.xml或者springmvc.xml等配置文件&#xff0c;但是会出现名称空间爆红的情况&#xff0c;如下所示&#xff1a; 解决方法&#xff1a;打开Settings ——> Schemas and DTDs 然后添加爆红的语句即可

PSP - 蛋白质结构预测 AlphaFold2 的结构模版 (Template) 搜索与特征逻辑

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/132427617 结构模版 (Template) 是一种已知的蛋白质结构&#xff0c;可以作为 AlphaFold2 蛋白质结构预测的参考&#xff0c;AlphaFold2 可以从多…

构建系统自动化-autoreconf

autoreconf简介 autoreconf是一个GNU Autotools工具集中的一个命令&#xff0c;用于自动重新生成构建系统的配置脚本和相关文件。 Autotools是一组用于自动化构建系统的工具&#xff0c;包括Autoconf、Automake和Libtool。它们通常用于跨平台的软件项目&#xff0c;以便在不同…

AIR001开箱测试

最近&#xff0c;合宙的动作还是挺大的&#xff0c;又出了两款AIR001和RP2040&#xff0c;而且前段时间还出了AIR32F103系列&#xff0c;记的21年要采购STM32F1103的时候&#xff0c;1片的价格从开发时的5块涨到了生产阶段的100多&#xff0c;即使最后无奈采用了别的芯片&#…

容器化微服务:用Kubernetes实现弹性部署

随着云计算的迅猛发展&#xff0c;容器化和微服务架构成为了构建现代应用的重要方式。而在这个过程中&#xff0c;Kubernetes&#xff08;常简称为K8s&#xff09;作为一个开源的容器编排平台&#xff0c;正在引领着容器化微服务的部署和管理革命。本文将深入探讨容器化微服务的…

LeetCode150道面试经典题-- 二叉树的最大深度(简单)

1.题目 给定一个二叉树 root &#xff0c;返回其最大深度。 二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 2.示例 3.思路 深度优先遍历 一个二叉树要查询到最大深度&#xff0c;可以将问题转为从根节点出发&#xff0c;查看左右子树的最大深度&am…

php 系列题目,包含查看后端源代码

一、弱类型比较问题 原则&#xff1a; 1.字符串和数字比较&#xff0c;字符串回被转换成数字。 "admin" 0&#xff08;true) admin被转换成数字&#xff0c;由于admin是字符串&#xff0c;转换失败&#xff0c;变成0 int(admin)0,所以比较结果是ture 2.混合字符串转…

opencv 进阶17-使用K最近邻和比率检验过滤匹配(图像匹配)

K最近邻&#xff08;K-Nearest Neighbors&#xff0c;简称KNN&#xff09;和比率检验&#xff08;Ratio Test&#xff09;是在计算机视觉中用于特征匹配的常见技术。它们通常与特征描述子&#xff08;例如SIFT、SURF、ORB等&#xff09;一起使用&#xff0c;以在图像中找到相似…

【C#学习笔记】匿名函数和lambda表达式

文章目录 匿名函数匿名函数的定义匿名函数作为参数传递匿名函数的缺点 lambda表达式什么是lambda表达式闭包 匿名函数 为什么我们要使用匿名函数&#xff1f;匿名函数存在的意义是为了简化一些函数的定义&#xff0c;特别是那些定义了之后只会被调用一次的函数&#xff0c;与其…

【Unity3D赛车游戏制作】初步导入,资源很哇塞【一】

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;Uni…

位运算相关总结

371. 两整数之和 给你两个整数 a 和 b &#xff0c;不使用 运算符 和 - ​​​​​​​&#xff0c;计算并返回两整数之和。 class Solution { public:int getSum(int a, int b) {while (b ! 0) {// 计算进位&#xff1a;将 a 和 b 进行位与操作&#xff0c;然后左移 1 位。u…

ExoPlayer如何使用MediaExtractor的思路

本文主要针对于&#xff0c;自己的设备解码能力&#xff08;比如底层集成ffmpeg 、qti、android 、需要付费的格式等等&#xff09;大于ExoPlayer自己封装的固有Extractor&#xff0c;基于现在Android架构通俗的来说&#xff0c;就是MediaPlayer可以播&#xff0c;但是ExoPlaye…

解码客厅:知名设计师带你探索其历史与设计风格

会客厅又称接待室&#xff0c;&#xff0c;它们是宾客和家人享受下午或晚上娱乐时光的天然聚会场所。由于会客厅反映了每个家庭的个性&#xff0c;因此在家具和设计上花费了很多心思。装饰品、复古艺术品、三角钢琴以及雕塑和花瓶等其他装饰元素在今天的会客厅中已司空见惯。 下…

UE4 地形编辑基础知识 学习笔记

之前自己写过这样的功能&#xff0c;今天看到一个UE现成的 点击地形&#xff0c;选择样条 按住CTRL键点击屏幕中某一个点会在场景内生成一个这样的图标 再点两次&#xff0c;会生成B样条的绿线条 点击号再选择一个模型&#xff0c;会生成对应的链条状的mesh 拉高最远处的一个图…

2023国赛数学建模思路 - 案例:粒子群算法

文章目录 1 什么是粒子群算法&#xff1f;2 举个例子3 还是一个例子算法流程算法实现建模资料 # 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 什么是粒子群算法&#xff1f; 粒子群算法&#xff08;Pa…

无涯教程-PHP - preg_split()函数

preg_split() - 语法 array preg_split (string pattern, string string [, int limit [, int flags]]); preg_split()函数的操作与split()完全相同&#xff0c;只不过正则表达式被接受为pattern的输入参数。 如果指定了可选的输入参数limit&#xff0c;则仅返回子字符串的限…

测试框架pytest教程(11)-pytestAPI

常量 pytest.__version__ #输出pytest版本 pytest.version_tuple #输出版本的元组形式 功能 pytest.approx pytest.approx 是一个用于进行数值近似比较的 pytest 断言工具。 在测试中&#xff0c;有时候需要对浮点数或其他具有小数部分的数值进行比较。然而&#xff0c;由于…

正中优配:散户也算股东吗?能不能参加股东大会?

股市上&#xff0c;常有散户自称韭菜&#xff0c;长一波被股市收割一波&#xff0c;不可谓不惨。但要较真的话&#xff0c;散户仍是有必定身份的&#xff0c;最少他是所持股的股东。有人会好奇&#xff0c;原来散户也算股东吗&#xff1f;那他能不能参与公司的股东大会&#xf…