在 Python 中将 Tqdm 与 Asyncio 结合使用

news2024/11/16 21:58:38

动动发财的小手,点个赞吧!

简介

困扰

在 Python 中使用并发编程来提高效率对于数据科学家来说并不罕见。在后台观察各种子进程或并发线程以保持我的计算或 IO 绑定任务的顺序总是令人满意的。

但是还有一点困扰我的是,当我在后台并发处理成百上千个文件或者执行成百上千个进程时,我总是担心会不会有几个任务偷偷挂了,整个代码永远跑不完。我也很难知道代码现在在哪里执行。

最糟糕的是,当我看着一个空白屏幕时,很难说出我的代码需要多长时间才能执行或 ETA 是多少。这对我安排工作日程的能力非常不利。

因此,我想要一种方法让我知道代码执行到了哪里。

已有方法

比较传统的做法是任务之间共享一块内存区域,在这块内存区域放一个计数器,当一个任务结束的时候让这个计数器+1,然后用一个线程不停的打印这个计数器的值。

这从来都不是一个好的解决方案:一方面,我需要在你现有的业务逻辑中添加一段用于计数的代码,这违反了“低耦合,高内聚”的原则。另一方面,由于线程安全问题,我必须非常小心锁定机制,这会导致不必要的性能问题。

tqdm

alt

有一天,我发现了 tqdm 库,它使用进度条来可视化我的代码进度。我可以使用进度条来可视化我的 asyncio 任务的完成和预计到达时间吗?

那么本文[1]我把这个方法分享给大家,让每个程序员都有机会监控自己并发任务的进度。

异步

在我们开始之前,我希望您了解一些 Python asyncio 的背景知识。我的文章描述了asyncio[2]的一些常用API的用法,这将有助于我们更好地理解tqdm的设计:

alt

tqdm 概述

如官方网站所述,tqdm 是一个显示循环进度条的工具。它使用简单、高度可定制并且占用资源少。

一个典型的用法是将一个可迭代对象传递给 tqdm 构造函数,然后你会得到一个如下所示的进度条:

from time import sleep
from tqdm import tqdm


def main():
    for _ in tqdm(range(100)):
        # do something in the loop
        sleep(0.1)


if __name__ == "__main__":
    main()

或者您可以在读取文件时手动浏览并更新进度条的进度:

import os
from tqdm import tqdm


def main():
    filename = "../data/large-dataset"
    with (tqdm(total=os.path.getsize(filename)) as bar,
            open(filename, "r", encoding="utf-8"as f):
        for line in f:
            bar.update(len(line))


if __name__ == "__main__":
    main()
alt

将 tqdm 与异步集成

总体而言,tqdm 非常易于使用。但是,GitHub 上需要更多关于将 tqdm 与 asyncio 集成的信息。所以我深入研究了源代码,看看 tqdm 是否支持 asyncio。

幸运的是,最新版本的 tqdm 提供了包 tqdm.asyncio,它提供了类 tqdm_asyncio。

tqdm_asyncio 类有两个相关的方法。一个是 tqdm_asyncio.as_completed。从源码可以看出,它是对asyncio.as_completed的包装:

@classmethod
    def as_completed(cls, fs, *, loop=None, timeout=None, total=None, **tqdm_kwargs):
        """
        Wrapper for `asyncio.as_completed`.
        """

        if total is None:
            total = len(fs)
        kwargs = {}
        if version_info[:2] < (310):
            kwargs['loop'] = loop
        yield from cls(asyncio.as_completed(fs, timeout=timeout, **kwargs),
                       total=total, **tqdm_kwargs)

另一个是 tqdm_asyncio.gather ,从源代码可以看出,它基于模拟 asyncio.gather 功能的 tqdm_asyncio.as_completed 的实现:

@classmethod
    async def gather(cls, *fs, loop=None, timeout=None, total=None, **tqdm_kwargs):
        """
        Wrapper for `asyncio.gather`.
        """

        async def wrap_awaitable(i, f):
            return i, await f

        ifs = [wrap_awaitable(i, f) for i, f in enumerate(fs)]
        res = [await f for f in cls.as_completed(ifs, loop=loop, timeout=timeout,
                                                 total=total, **tqdm_kwargs)]
        return [i for _, i in sorted(res)]

所以,接下来,我将描述这两个API的用法。在开始之前,我们还需要做一些准备工作。在这里,我写了一个简单的方法来模拟一个随机休眠时间的并发任务:

import asyncio
import random

from tqdm.asyncio import tqdm_asyncio


class AsyncException(Exception):
    def __int__(self, message):
        super.__init__(self, message)


async def some_coro(simu_exception=False):
    delay = round(random.uniform(1.05.0), 2)

    # We will simulate throwing an exception if simu_exception is True
    if delay > 4 and simu_exception:
        raise AsyncException("something wrong!")

    await asyncio.sleep(delay)

    return delay

紧接着,我们将创建 2000 个并发任务,然后使用 tqdm_asyncio.gather 而不是熟悉的 asyncio.gather 方法来查看进度条是否正常工作:

async def main():
    tasks = []
    for _ in range(2000):
        tasks.append(some_coro())
    await tqdm_asyncio.gather(*tasks)

    print(f"All tasks done.")


if __name__ == "__main__":
    asyncio.run(main())
alt

或者让我们用 tqdm_asyncio.as_completed 替换 tqdm_asyncio.gather 并重试:

async def main():
    tasks = []
    for _ in range(2000):
        tasks.append(some_coro())

    for done in tqdm_asyncio.as_completed(tasks):
        await done

    print(f"The tqdm_asyncio.as_completed also works fine.")


if __name__ == "__main__":
    asyncio.run(main())
alt

Reference

[1]

Source: https://towardsdatascience.com/using-tqdm-with-asyncio-in-python-5c0f6e747d55

[2]

异步: https://github.com/Jwindler/Ice_story

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/503679.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构篇五:队列

文章目录 前言1.队列1.1 队列的概念及结构1.2 队列的实现 2. 各功能的解析及实现2.1 队列的创建2.2 初始化队列2.3 队尾入队列2.4 队头出队列2.5 获取队头元素2.6 获取队尾元素2.7 队列中有效元素个数2.8 检查队列是否为空2.9 销毁队列 3.代码实现3.1 Queue.h3.2 Queue.c3.3 te…

JavaWeb ( 七 ) JSTL Tag标签

2.5.JSTL标签与EL表达式 2.5.1.EL表达式 EL表达式 : Expression Language 目的&#xff1a;为了使JSP写起来更加简单 格式&#xff1a;${expression} EL 提供“.“和“[ ]“两种运算符来存取数据。${user.name}, ${user[“name”] }支持算术操作符, 关系操作符, 逻辑操作符…

Python:Python进阶:内存管理机制

Python内存管理机制 1. 堆2. 栈3. 引用4. Python中可变对象和不可变对象有个问题&#xff1a;你可以好好思考下总结 Python内存管理程序是用 C/C写的&#xff0c;这里我们以 CPython解释器为例说明。 在Python 中 所有数据类型 包括&#xff1a;int dict str都是一个对象&#…

层次分析法及找工作问题实战

学习知识要实时简单回顾&#xff0c;我把学习的层次分析法简单梳理一下&#xff0c;方便入门与复习。 AHP 层次分析法&#xff08;Analytic Hierarchy Process&#xff0c;简称 AHP&#xff09;是对一些较为复杂、较为模糊的问题作出决策的简易方法&#xff0c;它特别适用于那…

C++类和对象上

专栏&#xff1a;C/C 个人主页&#xff1a;HaiFan. 专栏简介&#xff1a;本章为大家带来C类和对象相关内容。 类和对象 前言面向过程和面向对象类的引入类的定义对于类中成员的命名建议 类的访问限定符及封装访问限定符封装 类的作用域类的实例化如何计算类对象的大小this指针t…

Web自动化测试——XAPTH高级定位

XAPTH高级定位 一、xpath 基本概念二、xpath 使用场景三、xpath 相对定位的优点四、xpath 定位的调试方法五、xpath 基础语法&#xff08;包含关系&#xff09;六、xpath 顺序关系&#xff08;索引&#xff09;七、xpath 高级用法1、[last()]: 选取最后一个2、[属性名属性值 an…

ESP32设备驱动-PCF8575IO扩展器驱动

PCF8575IO扩展器驱动 文章目录 PCF8575IO扩展器驱动1、PCF8575介绍2、硬件准备3、软件准备4、驱动实现1、PCF8575介绍 PCF8575用于两线双向总线 (I2C) 的 16 位 I/O 扩展器专为 2.5-V 至 5.5-V VCC 操作而设计。 PCF8575 器件通过 I2C 接口 [串行时钟 (SCL)、串行数据 (SDA)]…

flask教程8:模板

文章目录 一、模板与自定义过滤器1 模板2 过滤器转义过滤器讲解 3自定义过滤器 二、表单1表单2表单扩展 三、创建表单模型类与模板使用3.1 表单模型类 四 、使用表单接受并检验参数五、模板宏的使用六 、宏定义在外部的使用七 &#xff1a;模板继承与包含继承包含include 八 、…

PVE 安装 windows10

pve 安装教程大家可以参考视频&#xff1a;pve 安装 pve 安装 Windows10 视频教程&#xff1a;pve 安装Windows10 在安装好 pve 后我们就可以进行虚拟机的安装了。当然我们可以自行决定是否有必要进行 win10 的安装。 准备工作 1. 下载 win10 镜像文件&#xff1a;https://…

数据结构与算法基础(王卓)(35):交换排序之快排【第二阶段:标准答案、初步发现问题】

目录 第二阶段&#xff1a;一分为二 整个快排算法的程序运行大框架&#xff1a; 做出的改动&#xff08;和原来程序的区别&#xff09;&#xff1a; Project 1: PPT标准答案&#xff1a; Project 1小问题&#xff1a; Project 1还存在着一个巨大的问题&#xff1a; 具体问…

嵌入式软考备考_8 软件测试

软件测试 测试&#xff1a;在规定的条件下操作程序&#xff0c;以发现错误&#xff0c;对软件质量进行评估。 对象&#xff1a;程序&#xff0c;数据&#xff0c;文档。 目的&#xff1a;发现错误&#xff0c;看是否满足用户需求&#xff0c;发现错误产生的原因&#xff08;…

汇编四、51单片机汇编指令2

1、机器码 (1)MOV A,#0x60对应机器码为7460 (2)7460对应二进制 0111 0100 0110 0000 0x74对应指令&#xff0c;0x60对应立即数。 (3)immediate data翻译为立即数。 (4)可人为查表把汇编转为机器码&#xff0c;也可通过编译器把汇编转为机器码。 2、汇编常见缩写 (1)Rn: n可…

leetcode-040-组合总和2

题目及测试 package pid040; /* 40. 组合总和 II 给定一个候选人编号的集合 candidates 和一个目标数 target &#xff0c;找出 candidates 中所有可以使数字和为 target 的组合。candidates 中的每个数字在每个组合中只能使用 一次 。注意&#xff1a;解集不能包含重复的组合…

Vue中使用EasyPlayer播放H265视频流

需求说明 需要在Vue2的项目中使用EasyPlayer进行H265视频流的播放。使用官方的最新版本加载H265会有问题。一直处于加载中… 实现步骤 引入easyplayer,这里最开始引入了最新版会有问题&#xff0c;因此引入的是3.3.12版本&#xff0c;可参照官方文档进行配置。 EasyPlayer示…

HBase整合Phoenix

HBase整合Phoenix 创建软件目录 mkdir -p /opt/soft cd /opt/soft下载软件 wget https://dlcdn.apache.org/phoenix/phoenix-5.1.3/phoenix-hbase-2.5-5.1.3-bin.tar.gz解压 hbase tar -zxvf phoenix-hbase-2.5-5.1.3-bin.tar.gz修改 hbase 目录名称 mv phoenix-hbase-2.5…

(初)进程概念

目录 认识冯诺依曼系统 操作系统(Operator System) 设计OS的目的&#xff1a; 定位&#xff1a; 如何理解管理&#xff1a; 总结&#xff1a; 系统调用和库函数概念&#xff1a; 进程 基本概念 &#xff1a; 描述进程PCB task_struct - PCB的一种 task_struct内容分…

编译安装最新的Linux系统内核

现在还有不少机器是CentOS8 Stream系统&#xff0c;虽然上了贼船&#xff0c;不影响用就是了。8的编译和7大同小异&#xff0c;只是踩了更多的坑在这里记录一下&#xff0c;或许会帮到看到的朋友。 安装编译环境 CentOS8安装必要的包 yum groupinstall "Development Too…

【P13】JMeter 常数吞吐量定时器(Constant Throughput Timer)

文章目录 1、基于计算吞吐量&#xff1a;只有此线程2、基于计算吞吐量&#xff1a;所有活动线程3、基于计算吞吐量&#xff1a;当前线程组中的所有活动线程4、基于计算吞吐量&#xff1a;所有活动线程&#xff08;共享&#xff09;5、基于计算吞吐量&#xff1a;当前线程组中的…

【2023/05/08】雅卡尔织布机

Hello&#xff01;大家好&#xff0c;我是霜淮子&#xff0c;2023倒计时第3天。 Share The world puts off its mask of vastness to its lover. It becomes small as one song,as one kiss of the eternal. 译文&#xff1a; 世界对着它的爱人&#xff0c;把它浩瀚的面具揭…

已做过算法题总结2

20. 有效的括号 (括号匹配是使用栈解决的经典问题&#xff0c;这道题主要是记住三种不成立的情况) 给定一个只包括 (&#xff0c;)&#xff0c;{&#xff0c;}&#xff0c;[&#xff0c;] 的字符串&#xff0c;判断字符串是否有效。 有效字符串需满足&#xff1a; 左括号必须用…