Python 中整洁的并行输出

news2025/1/10 3:07:52

原文:https://bernsteinbear.com/blog/python-parallel-output/
代码:https://gist.github.com/tekknolagi/4bee494a6e4483e4d849559ba53d067b

Python 并行输出

使用进程和锁并行输出多个任务的状态。
在这里插入图片描述

注:以下代码在linux下可用,windows下可能要进行修改。

假设你有一个程序,它对列表进行一些处理:

def log(repo_name, *args):
    print(f"{repo_name}:", *args)

def randsleep():
    import random
    import time
    time.sleep(random.randint(1, 5))

def func(repo_name):
    log(repo_name, "Starting")
    randsleep()  # Can be substituted for actual work
    log(repo_name, "Installing")
    randsleep()
    log(repo_name, "Building")
    randsleep()
    log(repo_name, "Instrumenting")
    randsleep()
    log(repo_name, "Running tests")
    randsleep()
    log(repo_name, f"Result in {repo_name}.json")

repos = ["repoA", "repoB", "repoC", "repoD"]
for repo in repos:
    func(repo)

这很好。它有效。有点吵,但有效。但随后你发现了一件好事:你的程序是数据并行。也就是说,您可以并行处理:
在这里插入图片描述

import multiprocessing

# ...

with multiprocessing.Pool() as pool:
    pool.map(func, repos, chunksize=1)

不幸的是,输出有点笨拙。虽然每行仍然很好输出一个 repo,但它正在左右喷出行,并且这些行是混合的。
在这里插入图片描述

幸运的是,StackOverflow 用户 Leedehai是终端专业用户,知道如何在控制台中一次重写多行。我们可以根据自己的需要调整这个答案:

def fill_output():
    to_fill = num_lines - len(last_output_per_process)
    for _ in range(to_fill):
        print()

def clean_up():
    for _ in range(num_lines):
        print("\x1b[1A\x1b[2K", end="")  # move up cursor and delete whole line

def log(repo_name, *args):
    with terminal_lock:
        last_output_per_process[repo_name] = " ".join(str(arg) for arg in args)
        clean_up()
        sorted_lines = last_output_per_process.items()
        for repo_name, last_line in sorted_lines:
            print(f"{repo_name}: {last_line}")
        fill_output()

def func(repo_name):
    # ...
    with terminal_lock:
        del last_output_per_process[repo_name]

# ...

repos = ["repoA", "repoB", "repoC", "repoD"]
num_procs = multiprocessing.cpu_count()
num_lines = min(len(repos), num_procs)
with multiprocessing.Manager() as manager:
    last_output_per_process = manager.dict()
    terminal_lock = manager.Lock()
    fill_output()
    with multiprocessing.Pool() as pool:
        pool.map(func, repos, chunksize=1)
    clean_up()

在这里插入图片描述

这会将每个项目的状态(一次一行)打印到终端。它将按项目添加到的 last_output_per_process 顺序打印,但您可以通过(例如)按字母数字排序来更改它: sorted(last_output_per_process.items())

请注意,我们必须锁定数据结构和终端输出,以避免事情被破坏;它们在过程之间共享(pickled,via Manager )。

如果日志输出有多行长,或者其他人正在用 stdout / stderr (也许是流浪的 print )搞砸,我不确定这会做什么。如果您发现或有整洁的解决方案,请写信。

这种技术对于任何具有线程和锁的编程语言来说可能是相当可移植的。关键的区别在于这些实现应该使用线程而不是进程;我做进程是因为它是 Python。

最终版

import multiprocessing
import random
import time


class Logger:
    def __init__(self, num_lines, last_output_per_process, terminal_lock):
        self.num_lines = num_lines
        self.last_output_per_process = last_output_per_process
        self.terminal_lock = terminal_lock

    def fill_output(self):
        to_fill = self.num_lines - len(self.last_output_per_process)
        for _ in range(to_fill):
            print()

    def clean_up(self):
        for _ in range(self.num_lines):
            print("\x1b[1A\x1b[2K", end="")  # move up cursor and delete whole line

    def log(self, repo_name, *args):
        with self.terminal_lock:
            self.last_output_per_process[repo_name] = " ".join(str(arg) for arg in args)
            self.clean_up()
            sorted_lines = self.last_output_per_process.items()
            for repo_name, last_line in sorted_lines:
                print(f"{repo_name}: {last_line}")
            self.fill_output()

    def done(self, repo_name):
        with self.terminal_lock:
            del self.last_output_per_process[repo_name]


class MultiprocessingLogger(Logger):
    def __init__(self, num_lines, manager):
        super().__init__(num_lines, manager.dict(), manager.Lock())


class FakeLock:
    def __enter__(self):
        pass

    def __exit__(self, exc_type, exc_value, traceback):
        pass


class SingleProcessLogger(Logger):
    def __init__(self, num_lines):
        super().__init__(num_lines, {}, FakeLock())


def randsleep():
    time.sleep(random.randint(1, 2) / random.randint(1, 5))


def func(repo_name):
    logger.log(repo_name, "Starting")
    randsleep()
    logger.log(repo_name, "Installing")
    randsleep()
    logger.log(repo_name, "Building")
    randsleep()
    logger.log(repo_name, "Instrumenting")
    randsleep()
    logger.log(repo_name, "Running tests")
    randsleep()
    logger.log(repo_name, f"Result in {repo_name}.json")
    randsleep()
    logger.done(repo_name)


def multi_process_demo():
    ascii_uppercase = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
    repos = [f"repo{letter}" for letter in ascii_uppercase]
    num_procs = multiprocessing.cpu_count()
    num_lines = min(len(repos), num_procs)
    with multiprocessing.Manager() as manager:
        global logger
        logger = MultiprocessingLogger(num_lines, manager)
        # Make space for our output
        logger.fill_output()
        with multiprocessing.Pool(num_procs) as pool:
            pool.map(func, repos, chunksize=1)
        logger.clean_up()


def single_process_demo():
    repo = "repoA"
    num_lines = 1
    global logger
    logger = SingleProcessLogger(num_lines)
    logger.fill_output()
    func(repo)
    logger.clean_up()

if __name__ == "__main__":
    multi_process_demo()
    # single_process_demo()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1610906.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【LLM 论文】Self-Consistency — 一种在 LLM 中提升 CoT 表现的解码策略

论文:Self-Consistency Improves Chain of Thought Reasoning in Language Models ⭐⭐⭐⭐⭐ ICLR 2023, Google Research 文章目录 论文速读 论文速读 本工作提出了一种解码策略:self-consistency,并可以用于 CoT prompting 中。 该策略提…

VScode配置launch+tasks[自己备用]

VScode配置launchtasks[自己备用],配置文件详解 launch.json 字段 name :启动配置的名称,也就是显示在调试配置下拉菜单中的名字,如果添加了多个配置可以用此作为区分 字段 program :可执行文件完整路径。 ① 由于 C…

3分钟看懂Microchip 32位MCU CAN模块的配置

文章目录 CAN模块系统框图Microchip MCC Harmony下CAN模块配置选项CAN模块工作模式CAN模块中断模式CAN工作速率Bit Timing Calculation配置CAN 接收的配置CAN 发送的配置CAN 过滤器工作流程说明CAN 过滤器的配置 CAN模块系统框图 CAN的英文全称:Control Area Networ…

ubuntu在xshell中使用快捷方式操作命令,减少命令行的数入量

第一步 第二步 然后无脑确定 第三步 在xshell的显示方式 方式一 这样就会在每个窗格中进行显示 方式二 效果显示–> 这种窗格的显示是全局的 然后你双击这个process就会自动把命令打在命令行上,减少你的输入量

Ansible初识以及安装

1. Ansible应用简述: Ansible基于python语言实现,由Paramiko和PyYAML两个关键模块构建。具有独特的设计理念: 1)安装部署简单 2)管理主机便捷,支持多主机并行管理 3)避免在被管理主机上安装客户…

小红书情感博主暴力玩法,流量巨大,客单300+

这个项目的盈利核心在于提供情感咨询服务,每笔交易的利润通常在200到300元之间,这种方式比撰写大量情感内容来吸引流量要简单得多,可以说是一种快速超车的策略。 项 目 地 址 : laoa1.c n 我们以男性的视角提供感情建议&a…

Vitis AI 环境搭建 KV260 PYNQ 安装 要点总结

目录 1. 环境 2. 工具及版本介绍 2.1 工具版本兼容性 2.2 DPU结构 2.3 DPU命名规则 3. Vitis AI 配置要点 3.1 配置安装 Docker 库 3.2 Install Docker Engine 3.3 添加 Docker 用户组并测试 3.4 克隆 Vitis AI 库 3.5 构建 Docker (直接抓取&#xff09…

【网络编程】TCP流套接字编程(TCP实现回显服务器)

一.TCP流套字节相关API. Socket(既能给客户端使用,也能给服务器使用) 构造方法 基本方法: ServerSocket(只能给服务器使用) 构造方法: 基本方法: 二.TCP实现回显服务器. 客户端代码示例: package Demo2;import java.io.IOException; import java.io.InputStream; import j…

sketchup创建3D打印机的模型

查了一下,这玩意有几个版本,其中一个sketchup free是免费的,到官网上看看 下载 SketchUp | 免费试用 3D 建模软件 | SketchUp 是个在线网页版,然后可以再这个网站上注册一个账号 弄个邮箱试试看 创建好进入后,里面就…

项目实践---贪吃蛇游戏的实现

上一章,我们已经分析了贪吃蛇的具体内容,包括它是如何实现的,怎样完成这个项目的,其中就提到了 贪吃蛇有三个代码:一个是测试代码,一个是头文件代码,还有一个是主函数代码。那么今天我们就来讲一…

tensor是pytorch的核心,那torch.tensor和torch.Tensor区别是?

本文重点 从本节课程开始我们将正式开启pytorch的学习了,在深度学习框架中有一个重要的概念叫做张量,它是pytorch的基本操作单位,要想创建tensor有很多的方式,但是有两个torch.tensor和torch.Tensor容易混淆,本节课程…

2024年适用于 Android 的最佳免费数据恢复应用程序

无论是系统崩溃、软件升级、病毒攻击还是任何其他故障,这些软件问题都可能导致手机上的数据丢失。可以使用免费的数据恢复应用程序修复数据故障并检索丢失或删除的文件。 数据恢复应用程序旨在从另一个存储设备中检索丢失或无法访问的数据。这些工具扫描 UFS 并尝试…

销售经理(多继承/虚基类)

根据下图类之间的继承关系,以及main和输出定义,定义Staff类、Saleman类、Manager类和SaleManager类。 Staff类包含的数据成员有编号(num),姓名(name),基本工资(basicSale)。Saleman类…

算法打卡day52|单调栈篇03| 84.柱状图中最大的矩形

算法题 Leetcode 84.柱状图中最大的矩形 题目链接:84.柱状图中最大的矩形 大佬视频讲解:84.柱状图中最大的矩形视频讲解 个人思路 这题和接雨水是相似的题目,原理上基本相同,也是可以用双指针和单调栈解决,只是有些细节不同。…

MT3023 歌词中找单词

1.暴力 10/12 #include <bits/stdc.h> using namespace std; int n; string a[10005]; int main() {cin >> n;for (int i 0; i < n; i)cin >> a[i];string ll;cin >> ll;for (int i 0; i < n; i){string u a[i];int num 0;int j 0;for (in…

使用Python进行自动化测试

&#x1f47d;发现宝藏 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。【点击进入巨牛的人工智能学习网站】。 如何使用Python进行自动化测试&#xff1a;测试框架的选择与应用 自动化测试是软件开发过程…

【JavaSE进阶】10-网络编程 11-Lambda表达式 12-Stream API 13-Java新特性

10 网络编程 10.1 网络编程概述 10.2 网络编程三要素 10.3 网络编程基础类 package com.powernode.javase.net;import java.net.InetAddress;/*** ClassName: InetAddressTest* Description:* java.net.IntAddress类用来封装计算机的IP地址和DNS&#xff08;没有端口信息&…

增强现实(AR)开发框架

增强现实&#xff08;AR&#xff09;开发框架为开发者提供了构建AR应用程序所需的基本工具和功能。它们通常包括3D引擎、场景图、输入系统、音频系统和网络功能。以下是一些流行的AR开发框架。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流…

椭圆曲线密码学(ECC)基本介绍和总结

背景 ECC英文全称"Elliptic Curve Cryptography"&#xff0c;其背后的密码学原理或者说安全性&#xff0c;是基于椭圆曲线离散对数问题&#xff08;Elliptic Curve Discrete Logarithm Problem&#xff0c;ECDLP&#xff09;。ECC密码学被普遍认为是RSA密码系统的接…

prometheus+grafana可视化监控

prometheus监控 一、用二进制安装 1、安装Prometheus 打开官方网址:https://prometheus.io/download/ wget https://github.com/prometheus/prometheus/releases/download/v2.45.4/prometheus-2.45.4.linux-amd64.tar.gz下载完成后解压一下安装包 tar vxf prometheus-2.45.…