当Python遇上异步编程:实现高效、快速的程序运行!

news2024/9/9 1:28:27

前言

同步/异步的概念:

同步是指完成事务的逻辑,先执行第一个事务,如果阻塞了,会一直等待,直到这个事务完成,再执行第二个事务,顺序执行
异步是和同步相对的,异步是指在处理调用这个事务的之后,不会等待这个事务的处理结果,直接处理第二个事务去了,通过状态、通知、回调来通知调用者处理结果。

asyncio是python3.4版本引入到标准库
python3.5又加入了async/await特性。

背景

因为业务需求需要写一个接口,然后返回数据。但是这个接口需要执行程序,需要1分钟左右。
1、当接口需要执行长时间的程序时,浏览器必须等待程序运行结束并返回结果才能响应请求。
2、如果程序执行时间过长,浏览器会一直等待,这将影响用户的体验,因为用户需要等待很长时间才能得到响应。
3、改造成异步执行可以避免这种情况,因为异步执行可以使得程序不需要等待长时间的 IO 操作完成,而是让程序在进行这些操作时可以进行其他的计算任务,从而提高程序的效率和响应速度,从而提高用户体验。

原理

当我们使用 Python 进行异步编程时,使用异步调用可以提高程序的效率。异步调用是指程序在执行过程中可以在某些操作等待的时候切换到其他任务,从而提高程序的并发性能,简单来说就是通过利用 I/O 等待时间提高程序的执行效率。在 Python 中,通常我们使用 asyncio 库来实现异步调用。

介绍在 Python 中使用 asyncio 实现异步调用的一些常见操作和技巧

1. 使用 async/await

使用 async/await 是使用 asyncio 库的首选方法。使用这种方法可以把异步代码看作是顺序执行的代码,让代码更加易于编写和阅读。以下是使用 async/await 进行异步调用的一个简单示例:

import asyncio

async def my_coroutine():
    print('开始执行协程')
    await asyncio.sleep(1)
    print('协程执行完毕')

async def main():
    print('开始执行主程序')
    await asyncio.gather(
        my_coroutine(),
        my_coroutine(),
        my_coroutine()
    )
    print('主程序执行完毕')

asyncio.run(main())
在上面的示例中,我们定义了一个

名为 my_coroutine 的协程,其中使用了 await asyncio.sleep(1) 来模拟等待 1 秒钟的操作。在主程序中,我们使用 asyncio.gather() 函数并行执行了 3 个 my_coroutine 协程,并等待它们全部执行完后输出了主程序执行完毕的提示。注意,在 Python 中使用 asyncio 进行异步编程,程序的入口点必须是一个协程,这个协程通常被称为主程序。

2. 使用 asyncio.ensure_future() 等价于 await

asyncio.ensure_future() 函数可以将协程或者一个 Future 对象封装成另一个 Future 对象,并触发其执行。相比于 Awaitable 协议,Future 有更广泛的用途,Future 可以由标准库中的其他库或者第三方库引入,可以在未来的时间点以其他方式进行操作。

以下是使用 asyncio.ensure_future() 函数并发执行多个协程的示例:

import asyncio

async def my_coroutine(i):
    print(f'协程 {i} 开始执行')
    await asyncio.sleep(1)
    print(f'协程 {i} 执行完毕')

async def main():
    tasks = []
    for i in range(3):
        task = asyncio.ensure_future(my_coroutine(i))
        tasks.append(task)
    await asyncio.gather(*tasks)
    print('主程序执行完毕')

asyncio.run(main())

·同步代码:

import time

def hello():
    time.sleep(1)

def run():
    for i in range(5):
        hello()
        print('Hello World:%s' % time.time())  # 任何伟大的代码都是从Hello World 开始的!
if __name__ == '__main__':
    run()

·异步代码

import time
import asyncio

# 定义异步函数
async def hello():
    await asyncio.sleep(1)
    print('Hello World:%s' % time.time())

if __name__ =='__main__':
    loop = asyncio.get_event_loop()
    tasks = [hello() for i in range(5)]
    loop.run_until_complete(asyncio.wait(tasks))

async def 用来定义异步函数,await 表示当前协程任务等待睡眠时间,允许其他任务运行。然后获得一个事件循环,主线程调用asyncio.get_event_loop()时会创建事件循环,你需要把异步的任务丢给这个循环的run_until_complete()方法,事件循环会安排协同程序的执行。

3. 使用 asyncio.Queue

asyncio.Queue 类是一个基于协程的生产者-消费者队列,它可以让生产者和消费者通过异步编程方式交换数据。以下是使用 asyncio.Queue 实现生产者-消费者模式的示例:

import asyncio
import random

async def producer(queue):
    while True:
        print('生产者正在生产数据...')
        data = random.randint(1, 100)
        await queue.put(data)
        await asyncio.sleep(1)

async def consumer(queue):
    while True:
        print('消费者正在等待数据...')
        data = await queue.get()
        print(f'消费者消费了数据: {data}')

async def main():
    queue = asyncio.Queue()
    producer_task = asyncio.ensure_future(producer(queue))
    consumer_task = asyncio.ensure_future(consumer(queue))
    await asyncio.gather(producer_task, consumer_task)

asyncio.run(main())

在上面的示例中,我们定义了一个生产者协程和一个消费者协程,生产者负责往队列中生产数据,消费者则负责从队列中消费数据。在主程序中,我们使用 asyncio.Queue() 创建了一个队列,然后使用 asyncio.ensure_future() 来创建了生产者和消费者 task,最后使用 asyncio.gather() 同时执行了这两个 task。

使用 asyncio.Queue 不仅可以实现生产者-消费者模式,还可以实现多任务并发处理场景,例如使用多个生产者往队列中添加数据,使用多个消费者从队列中取数据进行处理等。

综上所述,Python 中的异步调用是通过 asyncio 库来实现的,通过 async/await 语法或者 asyncio.ensure_future() 函数可以创建协程对象,并使用 asyncio.gather() 函数并行执行这些协程,从而实现异步调用的目的。此外,asyncio.Queue 类可以用于实现生产者-消费者模式等多种异步编程场景。

异步数据返回

from flask import Flask, jsonify, request
import time
import threading

app = Flask(__name__)

# Dictionary to store task status and results
tasks = {}


def perform_task(task_id):
    # Simulating a time-consuming task
    time.sleep(5)
    # Update task status and result
    tasks[task_id]['status'] = 'completed'
    tasks[task_id]['result'] = 'Task completed successfully'


@app.route('/task', methods=['GET'])
def create_task():
    # Generate a unique task ID
    task_id = str(len(tasks) + 1)
    # Create a new task with initial status as 'processing'
    tasks[task_id] = {'status': 'processing', 'result': None}

    # Start a new thread to perform the task asynchronously
    thread = threading.Thread(target=perform_task, args=(task_id,))
    thread.start()

    return jsonify({'task_id': task_id, 'status': 'processing'})


@app.route('/task/<task_id>', methods=['GET'])
def get_task_status(task_id):
    if task_id in tasks:
        task_status = tasks[task_id]['status']
        task_result = tasks[task_id]['result']

        if task_status == 'completed':
            # Task completed, return the result
            return jsonify({'status': task_status, 'result': task_result})
        else:
            # Task still processing, return the status
            return jsonify({'status': task_status})

    else:
        # Invalid task ID
        return jsonify({'error': 'Invalid task ID'}), 404


if __name__ == '__main__':
    app.run(debug=True)

在访问接口task时,会生成一个信息存入到{},然后异步执行任务。网页会自动跳转到/task/id 等待数据返回。
在这里插入图片描述
在这里插入图片描述

其实也可以使用Flask 的Celery 更加方便 简单。

参考文档:
https://juejin.cn/post/6992116138398187533
https://www.cnblogs.com/shenh/p/9090586.html
https://www.cnblogs.com/shenh/p/15401891.html

实战代码

from flask import Flask, jsonify, request, make_response

from testrunner import LabourRunner

app = Flask(__name__)
app.config["SEND_FILE_MAX_AGE_DEFAULT"] = 300
app.config["timeout"] = 60


 @app.errorhandler(400)
 def par_err(error):
     return make_response(jsonify({'code': '400', 'msg': '请求参数不合法'}), 400)


 @app.errorhandler(404)
 def page_not_found(error):
     return make_response(jsonify({'code': '404', 'msg': '请求参数不合法'}), 404)


@app.route('/actuator/health', methods=['GET', 'HEAD'])
def health():
    return jsonify({'online': True})

@app.route('/api/autotest')
def autotest():
    dir_path = request.args.get('dir_path')
    print(dir_path)

    # TODO: 在这里添加对 path 和 file 参数的处理逻辑
    # ...
    if not dir_path:
        # return make_response(jsonify({'code': '404', 'msg': '请求参数不合法'}), 404)
        return "请传入数据"

    s = LabourRunner()
    dest_name = s.runner(dir_path)
    file_url = "https://uat-xxx.xxxx.com/static/" + dest_name
    result = {'path': dir_path, 'dest_name': file_url}
    return result


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000, debug=True)

实战代码异步优化

import threading
import time

from flask import Flask, jsonify, request, make_response, url_for, redirect

from testrunner import LabourRunner

app = Flask(__name__)
app.config["SEND_FILE_MAX_AGE_DEFAULT"] = 300
app.config["timeout"] = 60
tasks = {}


 @app.errorhandler(400)
 def par_err(error):
     return make_response(jsonify({'code': '400', 'msg': '请求参数不合法'}), 400)


 @app.errorhandler(404)
 def page_not_found(error):
     return make_response(jsonify({'code': '404', 'msg': '请求参数不合法'}), 404)


@app.route('/actuator/health', methods=['GET', 'HEAD'])
def health():
    return jsonify({'online': True})


def perform_task(task_id, dir_path):
    # Simulating a time-consuming task
    s = LabourRunner()
    dest_name = s.runner(dir_path)
    file_url = "https://uat-xxxx.xxxx.com/static/" + dest_name

    # Update task status and result
    tasks[task_id]['status'] = file_url
    tasks[task_id]['result'] = dir_path


@app.route('/api/autotest', methods=['GET'])
def autotest():
    task_id = str(len(tasks) + 1)
    tasks[task_id] = {'status': 'processing', 'result': None}

    dir_path = request.args.get('dir_path')

    # TODO: 在这里添加对 path 和 file 参数的处理逻辑
    # ...
    if not dir_path:
        error_msg = {'code': 400, 'msg': '请求参数不合法或者文件夹不存在'}
        return jsonify(error_msg), 400
        # return "请传入数据"

    thread = threading.Thread(target=perform_task, args=(task_id, dir_path,))
    thread.start()
    return redirect(url_for('get_task_status', task_id=task_id))


@app.route('/api/autotest/<task_id>', methods=['GET'])
def get_task_status(task_id):
    if task_id in tasks:
        task_status = tasks[task_id]['status']
        task_result = tasks[task_id]['result']

        if task_status == 'completed':
            # Task completed, return the result
            return jsonify({'status': task_status, 'result': task_result})
        else:
            # Task still processing, return the status
            return jsonify({'status': task_status})

    else:
        # Invalid task ID
        return jsonify({'error': 'Invalid task ID'}), 404


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000, debug=True)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/544410.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

单片机的几种ota内存分区表介绍

前言 在做项目时&#xff0c;现在越来越多被要求单片机要支持升级功能。需求变化快&#xff0c;固件要不断支持新的功能&#xff0c;手动人工去烧固件越来越显得麻烦&#xff0c;已经操作成本高。 典型的方式是通过单片机外接的蓝牙、wifi等无线模块&#xff0c;或者通过单片…

如何创建UE5插件?

UE5 插件开发指南 前言1.0.打开插件窗口1.1.打开新建插件窗口1.2.填写新插件信息1.3.查看引擎自动生成的插件内容前言 首先,笔者默认读者已经知道如何安装UE5虚幻引擎了,并且也会编辑器的一些基本操作,那么这里省略了:如何注册Epic Games账号?如何安装UE5引擎?如何安装C++相…

基于SpringBoot的完成SSM整合项目开发

整合第三方技术 1. 整合JUnit问题导入1.1 Spring整合JUnit&#xff08;复习&#xff09;1.2 SpringBoot整合JUnit 2. 基于SpringBoot实现SSM整合问题导入2.1 Spring整合MyBatis&#xff08;复习&#xff09;2.2 SpringBoot整合MyBatis2.3 案例-SpringBoot实现ssm整合 1. 整合JU…

Maven多环境配置与使用、跳过测试的三种方法

文章目录 1 多环境开发步骤1:父工程配置多个环境,并指定默认激活环境步骤2:执行安装查看env_dep环境是否生效步骤3:切换默认环境为生产环境步骤4:执行安装并查看env_pro环境是否生效步骤5:命令行实现环境切换步骤6:执行安装并查看env_test环境是否生效 2 跳过测试方式1:IDEA工具…

机器学习之滤波入门

滤波的基本概念&#xff1a; 滤波是一种信号处理技术。在机器学习中&#xff0c;滤波通常指的是对输入信号进行加工&#xff0c;以消除噪声、平滑信号或突出特定频率范围的信号 简言之:加工输入,达到理想信号。 用生活的例子来解释: 假设你正在听一首音乐&#xff0c;但是在你的…

UML中的assembly关系

UML中的assembly关系 1.什么是Assembly关系 在UML&#xff08;统一建模语言&#xff09;中&#xff0c;"assembly"&#xff08;组装&#xff09;是一种表示组件之间关系的关联关系。组件是系统中可替换和独立的模块&#xff0c;可以通过组装来构建更大的系统。 当一…

零基础入门网络安全必看书单(附电子书籍+配套资料)

学习的方法有很多种&#xff0c;看书就是一种不错的方法&#xff0c;但为什么总有人说&#xff1a;“看书是学不会技术的”。 其实就是书籍没选对&#xff0c;看的书不好&#xff0c;你学不下去是很正常的。 一本好书其实不亚于一套好的视频教程&#xff0c;尤其是经典的好书…

中间件(一)

中间件 1. 概念1.1 为什么要使用中间件&#xff1f;1.2 中间件定义及分类 2. 主要分类2.1 事务式中间件2.2 过程式中间件2.3 面向消息的中间件2.4 面向对象中间件2.5 Web应用服务器2.6 数据库中间件2.7 其他 3. 常用的中间件 1. 概念 中间件&#xff08;Middleware&#xff09…

BigDecimal 类型的使用

目录 一、前言 二、BigDecimal构造方法 二、BigDecimal参与运算 2.1定义初始值 2.2计算 2.3比较大小 2.4BigDecimal取其中最大、最小值、绝对值、相反数&#xff1a; 2.5补充 2.6、java中 BigDecimal类型的可以转换到double类型&#xff1a; 三、BigDecimal格式化、小…

小白白也能学会的 PyQt 教程 —— 图像类及图像相关基础类介绍

文章目录 〇、前言一、PyQt 中的图像类1、图像类简介2、图像类转换① 常用类转换&#xff08;QPixmap、QImage、QIcon&#xff09;② QBitmap、QBrush、QPen 转换为 QPixmap 或 QImage③ QByteArray 与 QPixmap、QImage 的互转④ numpy 与 QImage 互转 二、图像显示组件1、使用…

DNDC模型在土地利用变化、未来气候变化下的建模方法及温室气体时空动态模拟实践技术

DNDC模型讲解 1.1 碳循环模型简介 1.2 DNDC模型原理 1.3 DNDC下载与安装 1.4 DNDC注意事项 ​ DNDC初步操作 2.1 DNDC界面介绍 2.2 DNDC数据及格式 2.3 DNDC点尺度模拟 2.4 DNDC区域尺度模拟 2.5 DNDC结果分析 ​ DNDC气象数据制备 3.1 数据制备中的遥感和GIS技术 3…

微博官方API使用方法【从注册到实战】

第一步&#xff1a;微博开发者身份认证 访问微博开放平台&#xff0c;登录自己微博账号&#xff0c;登录之后首先需要完善开发者的基本信息。【使用个人】 填写完成之后【审核通过】如下&#xff1a; 第二步&#xff1a;创建自己的应用 【备注&#xff1a;如果只是为了测试…

Linux安装Redis6.0版本教程

前言&#xff1a;采用Redis源码压缩包解压编译的安装方式。因为centos7.x的gcc版本还是4.8.5&#xff0c;而GCC编译指定的版本是需要5.3以上。 1、下载Redis的tar.gz的安装包 百度云下载&#xff1a;Linux下载Redis6.0.8 提取码&#xff1a;dbbv 2、安装gcc新版本 #环境部署…

【IEEE CIM 2023】基于多目标进化算法的抗菌肽设计方法

Evolutionary Multi Objective Optimization in Searching for Various Antimicrobial Peptides 小生境共享&#xff08;Niche Sharing&#xff09;是生物进化算法中的一个重要概念。在传统的进化算法中&#xff0c;通常会假设每个个体都是独立且不同的&#xff0c;因此可能会导…

机器学习基础之单层感知机及线性可分

文章目录 线性可分和权重向量公式线性不可分 线性可分和权重向量 单层感知机是后续深度学习的基础模型&#xff0c;本身没什么用&#xff0c;因为只能解决线性可分问题。 如这张图&#xff0c;想识别照片是横向的还是竖向的&#xff0c;只需要在中间画一条线&#xff0c;白点…

【大数据学习篇9】各区域热门商品Top3分析

学习目标/Target 掌握各区域热门商品Top3分析实现思路 掌握如何创建Spark连接并读取数据集 掌握利用Spark获取业务数据 掌握利用Spark过滤商品的行为类型 掌握利用Spark转换数据格式 掌握利用Spark统计每个区域中的不同商品 掌握利用Spark根据区域进行分组 掌握利用Spark根据区…

Linux的tail,grep,sed命令总结,以使用上述三种命令获取日志信息为例

目录 tail命令语法说明基本参数命令举例 grep命令语法说明匹配模式选择杂项输入控制文件控制 sed命令语法格式举例 使用命令组合查询日志信息 业务需求需要对软件日志进行查询和呈现&#xff0c;查询的条件是时间区间和关键词&#xff0c;系统运行在linux环境下&#xff0c;为此…

阿里巴巴“高并发”核心笔记!《基础+实战+源码+面试+架构》

前言 作为一个普普通通的程序员&#xff0c;如何才能提升自己的能力&#xff0c;在职场上拥有一技之长&#xff0c;这也成为普通的你我&#xff0c;迫切的需求。 拥有什么样的能力才能不被淘汰&#xff1f;答案是&#xff1a;高并发&#xff0c;它几乎成为了每个程序员都想要…

ATTCK v13版本战术介绍——防御规避(六)

一、引言 在前几期文章中我们介绍了ATT&CK中侦察、资源开发、初始访问、执行、持久化、提权战术理论知识及实战研究、部分防御规避战术&#xff0c;本期我们为大家介绍ATT&CK 14项战术中防御规避战术第31-36种子技术&#xff0c;后续会介绍防御规避其他子技术&#xf…

还只是停留在听过KMP算法?保姆式分析让你吃透KMP算法

&#x1f495;成功不是将来才有的&#xff0c;而是从决定去做的那一刻起&#xff0c;持续积累而成。&#x1f495; &#x1f43c;作者&#xff1a;不能再留遗憾了&#x1f43c; &#x1f386;专栏&#xff1a;Java学习&#x1f386; &#x1f697;本文章主要内容&#xff1a;深…