Python3多线程/多进程解决方案(持续更新ing...)

news2025/1/19 16:59:21

诸神缄默不语-个人CSDN博文目录

文章目录

  • 1. 多线程
  • 2. 多进程
    • 示例1:multiprocessing.Pool直接实现对一个列表中的每个元素的函数操作
    • 示例2:使用苏神写的工具函数实现对一个迭代器中每个元素的函数操作

1. 多线程

2. 多进程

示例1:multiprocessing.Pool直接实现对一个列表中的每个元素的函数操作

from tqdm import tqdm
from multiprocessing import Pool

original_data  #给定列表

#对每个元素的函数操作
def process_factor(i):
    try:
    	#输入元素索引,返回对该元素实现操作后的结果
        return result
    except Exception as e:
        print(f"Error processing factor {i}: {e}")
        return None

if __name__ == "__main__":
	#全局变量在每个进程中是不共享的,所以如果在process_factor()中直接修改original_data的话,将不会反映到主进程中
	#在使用多进程(multiprocessing)时,每个新进程都会重新导入当前模块,如果这些代码不在 if __name__ == "__main__": 块内,代码会在每个新进程中重新运行。这不仅可能导致性能问题,还可能产生意外的副作用。对于Windows和某些Unix系统,这是必要的,否则代码可能完全无法运行。
    with Pool() as p:
        results = list(tqdm(p.imap_unordered(process_factor,range(len(original_data))),total=len(original_data)))

    # 更新 original_data
    for i, result in enumerate(results):
        if result is not None:
            original_data[i] = result

关于选择imap()还是imap_unordered()
在这里插入图片描述

示例2:使用苏神写的工具函数实现对一个迭代器中每个元素的函数操作

这是苏神的代码(复制自https://github.com/bojone/bert4keras/blob/master/bert4keras/snippets.py):

def parallel_apply_generator(
    func, iterable, workers, max_queue_size, dummy=False, random_seeds=True
):
    """多进程或多线程地将func应用到iterable的每个元素中。
    注意这个apply是异步且无序的,也就是说依次输入a,b,c,但是
    输出可能是func(c), func(a), func(b)。结果将作为一个
    generator返回,其中每个item是输入的序号以及该输入对应的
    处理结果。
    参数:
        dummy: False是多进程/线性,True则是多线程/线性;
        random_seeds: 每个进程的随机种子。
    """
    if dummy:
        from multiprocessing.dummy import Pool, Queue
    else:
        from multiprocessing import Pool, Queue

    in_queue, out_queue, seed_queue = Queue(max_queue_size), Queue(), Queue()
    if random_seeds is True:
        random_seeds = [None] * workers
    elif random_seeds is None or random_seeds is False:
        random_seeds = []
    for seed in random_seeds:
        seed_queue.put(seed)

    def worker_step(in_queue, out_queue):
        """单步函数包装成循环执行
        """
        if not seed_queue.empty():
            np.random.seed(seed_queue.get())
        while True:
            i, d = in_queue.get()
            r = func(d)
            out_queue.put((i, r))

    # 启动多进程/线程
    pool = Pool(workers, worker_step, (in_queue, out_queue))

    # 存入数据,取出结果
    in_count, out_count = 0, 0
    for i, d in enumerate(iterable):
        in_count += 1
        while True:
            try:
                in_queue.put((i, d), block=False)
                break
            except six.moves.queue.Full:
                while out_queue.qsize() > max_queue_size:
                    yield out_queue.get()
                    out_count += 1
        if out_queue.qsize() > 0:
            yield out_queue.get()
            out_count += 1

    while out_count != in_count:
        yield out_queue.get()
        out_count += 1

    pool.terminate()



def parallel_apply(
    func,
    iterable,
    workers,
    max_queue_size,
    callback=None,
    dummy=False,
    random_seeds=True,
    unordered=True
):
    """多进程或多线程地将func应用到iterable的每个元素中。
    注意这个apply是异步且无序的,也就是说依次输入a,b,c,但是
    输出可能是func(c), func(a), func(b)。
    参数:
        callback: 处理单个输出的回调函数;
        dummy: False是多进程/线性,True则是多线程/线性;
        random_seeds: 每个进程的随机种子;
        unordered: 若为False,则按照输入顺序返回,仅当callback为None时生效。
    """
    generator = parallel_apply_generator(
        func, iterable, workers, max_queue_size, dummy, random_seeds
    )

    if callback is None:
        if unordered:
            return [d for i, d in generator]
        else:
            results = sorted(generator, key=lambda d: d[0])
            return [d for i, d in results]
    else:
        for i, d in generator:
            callback(d)

使用示例(这个项目比较大,前后省略的函数我就不写了,总之意会就行):
(代码复制自https://github.com/bojone/SPACES/blob/main/extract_convert.py)

#对每个元素的函数操作
def extract_flow(inputs):
    """单个样本的构建流(给parallel_apply用)
    """
    text, summary = inputs
    texts = text_split(text, True)  # 取后maxlen句
    summaries = text_split(summary, False)
    mapping = extract_matching(texts, summaries)
    labels = sorted(set([i[1] for i in mapping]))
    pred_summary = ''.join([texts[i] for i in labels])
    metric = compute_main_metric(pred_summary, summary)
    return texts, labels, summary, metric

#对整个迭代器实现批量操作
def convert(data):
    """分句,并转换为抽取式摘要
    """
    D = parallel_apply(
        func=extract_flow,
        iterable=tqdm(data, desc=u'转换数据'),
        workers=100,
        max_queue_size=200
    )
    total_metric = sum([d[3] for d in D])
    D = [d[:3] for d in D]
    print(u'抽取结果的平均指标: %s' % (total_metric / len(D)))
    return D

if __name__ == '__main__':
    data = convert(data)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/943669.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

强化自主可控,润开鸿发布基于RISC-V架构的开源鸿蒙终端新品

2023 RISC-V中国峰会于8月23日至25日在北京召开,峰会以“RISC-V生态共建”为主题,结合当下全球新形势,把握全球新时机,呈现RISC-V全球新观点、新趋势。本次大会邀请了RISC-V国际基金会、业界专家、企业代表及社区伙伴等共同探讨RISC-V发展趋势与机遇,吸引超过百余家业界企业、高…

易基因:5mC DNA甲基化介导茶树组织功能分化和重要风味物质合成调控|植物研究

大家好,这里是专注表观组学十余年,领跑多组学科研服务的易基因。 在植物中,5mC DNA甲基化修饰(简称5mC甲基化)是一个重要而保守的表观基因标记,参与基因组稳定性、基因转录调控、发育调控、非生物胁迫响应…

重生c++系列之类与对象(中篇)

好的继上期,我们今天带来c类与对象系列的继续学习。 类的6个默认成员函数 如果一个类中什么成员都没有,简称为空类。 空类中真的什么都没有吗?并不是,任何类在什么都不写时,编译器会自动生成以下6个默认成员 函数。 …

【Latex】使用技能站:(一)Visio导出矢量图并导入Latex模板

Visio导出矢量图并导入Latex模板 引言1 安装Inkscape工具1.1 官网下载并安装1.2 添加环境变量 2 Visio导出svg文件3 Inkscape 转换为PDF或EPS格式4 Latex导入.pdf或者.eps矢量图 引言 矢量图格式有:svg,eps,pdfVisio能导出的矢量图格式有&am…

信息技术如何监测UPS设备?看了这篇我明白了

UPS设备能够在电力中断或波动的情况下提供临时的电力支持,以保障办公设备的正常运行和数据的安全性。 然而,仅仅拥有UPS设备是不够的,实时监控和管理这些设备同样至关重要。通过对办公室UPS进行监控,我们可以及时发现电力问题、设…

哪个牌子的运动耳机比较好、运动耳机品牌推荐

跑步和健身的朋友通常喜欢戴着运动耳机,这样可以在运动时享受音乐的伴奏。这不仅提高了运动效果,还能减轻疲劳感。因此市场上涌现了许多高品质的运动耳机品牌。现在让我们来看看一些受到健身达人们推荐的耳机款式吧! 1、南卡骨传导耳机 近年…

windows使用远程桌面连接

第一步:winR进入命令控制 第二步:输入mstsc进入远程连接界面 第三步:输入需要远程控制的加算器ip 注意: 控制计算器与被控制计算器都需要满足以下条件 可以使用远程桌面连接到 Windows 10 专业版和企业版、Windows 8.1 和 8 企…

GitLab启动失败:fail: alertmanager: runsv not running

问题描述 sudo gitlab-ctl restart ,报错如下 : summergaoubuntu:/etc/gitlab$ sudo gitlab-ctl start fail: alertmanager: runsv not running fail: gitaly: runsv not running fail: gitlab-exporter: runsv not running fail: gitlab-workhorse: …

docker desktop安装es 并连接elasticsearch-head:5

首先要保证docker安装成功,打开cmd,输入docker -v,出现如下界面说明安装成功了 下面开始安装es 第一步:拉取es镜像 docker pull elasticsearch:7.6.2第二步:运行容器 docker run -d --namees7 --restartalways -p 9…

MySQL日期格式及日期函数实践

目录 日期格式 日期函数 CURDATE()和CURRENT_DATE()CURTIME()和CURRENT_TIME()NOW()和CURRENT_TIMESTAMP()DATE_FORMAT()DATE_ADD()和DATE_SUB()DATEDIFF()DATE()DAYNAME()和MONTHNAME() 1. 日期格式 在MySQL中,日期可以使用多种格式进行存储和表示。常见的日期格式…

【大数据之Kafka】五、Kafka生产者之生产经验

1 生产者如何提高吞吐量 由于linger.ms默认为0,即缓冲区队列中一有数据就sender线程就将其拉出到Kafka集群,效率比较低,提高生产者吞吐量有四种方式: (1)扩大批次的大小batch.size,默认为16k&a…

LeetCode-160. 相交链表

这是一道真的非常巧妙的题,题解思路如下: 如果让他们尾端队齐,那么从后面遍历就会很快找到第一个相交的点。但是逆序很麻烦。 于是有一个巧妙的思路诞生了,如果让短的先走完自己的再走长的,长的走完走短的,…

达梦数据库屏蔽关键字

一、配制位置 配置设备:需要连接数据库的程序运行的设备; 配置位置: 1.32 位的 DM 安装在 Win32 操作平台下,此文件位于%SystemRoot%\system32 目录; 2.64 位的 DM 安装在 Win64 操作平台下,此文件位于…

在线制作作息时间表

时光荏苒,岁月如梭,人们描述时光易逝的句子,多如星河。 一寸光阴一寸金,寸金难买寸光阴。 人生就是一段时间而已,所以我明白了一个道理 人生之中最大的浪费就是时间的浪费 因此我想我们教给我们孩子重要的一课应该也是…

PMO的秘密武器:项目评估与审计机制的深度解析

引言:PMO的核心价值与挑战 项目管理办公室(PMO)在组织中的角色日益重要。它是组织中的一个关键部门,负责确保项目的顺利进行,并确保项目与组织的整体战略保持一致。但与此同时,PMO也面临着许多挑战。其中最…

1.6 编写双管道ShellCode

本文将介绍如何将CMD绑定到双向管道上,这是一种常用的黑客反弹技巧,可以让用户在命令行界面下与其他程序进行交互,我们将从创建管道、启动进程、传输数据等方面对这个功能进行详细讲解。此外,本文还将通过使用汇编语言一步步来实现…

【WINAPI】文件读写操作问题

问题描述 在利用WINAPI中的WriteFile和ReadFile函数进行文件读写操作时,出现无法正常读写文件报错。 分析问题 查阅WINAPI源码,查看参数列表各个参数的数据类型。 发现其中第二个参数,也就是需要写进文件的真实数据,其数据类型…

Google Play商店优化排名因素之应用的评分评论

下载次数是应用程序受欢迎程度的指标,Google在对我们的应用程序进行排名时也会将其考虑在内。评级和评论会影响应用程序的转化率,因为许多用户在做出决定之前会根据平均评级或最近的评论来评估我们的应用程序。 1、评级的重要性。 如果我们的应用程序有…

安卓版yolo-fastest

安卓版本yolofastest效果测试 安卓配置OPENCV4ANDROID,见我的博客一篇文章opencv4dandroid配置 这个不需要使用JNI,十分简单的配置 说真的,其实只调用OPENCV的函数,自己写的代码不多,使用OPENCV4ANDROID和JNI的时间差…