python多进程程序设计 之三

news2024/9/20 19:28:33

python多进程程序设计 之三

  • 进程间同步
    • Condition
      • 构造器
      • acquire
      • release
      • wait/wait_for
      • notify/notify_all
      • 实列代码
    • Event
      • 构造器
      • set
      • clear
      • wait
      • 应用实例
    • Lock
      • 构造器
      • acquire
      • release
      • 实列代码

进程间同步

Condition

条件变量总是与某种类型的锁相关联;可以通过构造器传输传递一个锁,也可以默认创建一个。当多个条件变量必须共享同一锁时,构造器传输传递一个锁很有用。

条件变量遵循上下文管理协议:使用 with 语句在封闭块的持续时间内获取关联的锁。 acquire() 和release() 方法也会调用关联锁的相应方法。

必须在持有关联锁的情况下,调用其他方法。

  • wait()方法释放锁,然后阻塞,直到另一个线程通过调用notify()或notify_all()唤醒它。一旦被唤醒,wait()重新获取锁并返回。这个函数可以指定超时。
  • notify() 方法会唤醒等待条件变量的线程之一(如果有)。 notify_all() 方法唤醒所有等待条件变量的线程。

notify()和notify_all()方法不会释放锁;这意味着被唤醒的一个或多个线程,不会立即从其 wait() 调用中返回,而是仅在调用 notify() 或 notify_all() 的线程最终放弃锁的所有权时,返回。

构造器

multiprocessing.Condition(lock=None)
Condition类实现条件变量对象。条件变量允许一个或多个线程等待,直到收到另一线程的通知。

如果给出了锁参数,而不是 None,则它必须是 Lock 或 RLock 对象,并且它被用作底层锁。否则,将创建一个新的 RLock 对象,并将其用作基础锁。

acquire

词法:acquire(*args)
该函数获取Condition锁。该函数调用Condition锁的对应的函数;返回值是,Condition锁返回的任何值。

release

词法:release()
释放Condition锁。该函数调用Condition锁对应的函数;没有返回值。

wait/wait_for

词法:
wait(timeout=None)
wait_for(predicate, timeout=None)

函数wait引起调用进程等待,直至收到通知,或发生超时。如果调用此函数时,调用线程尚未获取锁,则会产生RuntimeError异常。

函数wait释放Condition锁,然后阻塞,直到

  1. 由另一个进程,对同一条件变量,调用notify() 或notify_all() ,将其唤醒,
  2. 直到可选超时timeout发生。

一旦被唤醒,或发生超时,函数wait会重新获取锁,并返回。

当超时参数timeout 存在,且非None时,它​​应该是一个浮点数,timeout的单位是秒。

若超时,函数wait返回值为True;否则,返回值为 False。

wait_for引起调用进程等待,直到可调用函数predicate计算为真。predicate的计算结果将被解释为布尔值。可以提供超时,给出最长的等待时间。

函数wait_for支持参数timeout。

  • 如果predicate计算结果是True,则wait_for返回True;
  • 如果调用函数被阻塞时间超过timeout,则wait_for返回False。

notify/notify_all

词法:
notify(n=1)
notify_all()

notify在默认情况下,唤醒一个等待该条件的进程。如果调用此函数时,调用进程尚未获取锁,则会产生RuntimeError。

该方法最多唤醒n个等待条件变量的进程;如果没有进程在等待,则是空操作。

notify_all唤醒所有等待此条件变量的进程。如果调用此函数时,调用进程未获取锁,则会产生RuntimeError异常。

实列代码

from multiprocessing import *
import numpy as np
from multiprocessing import shared_memory
import random

dim = 50

def an_item_is_available(np_array):
    for i in range (5):
        if np_array[i] == 0:
            return False
    return True;

def make_an_item_available(num, np_array):
    np_array[num] = random.randrange(50, 200)
  
def proc_0(cv, shm):
    print("Process: {0}".format(current_process().name))

    shm = shared_memory.SharedMemory(name=shm);
    np_array = np.ndarray((dim, ), dtype=np.int32, buffer=shm.buf)

    with cv:
        while not an_item_is_available(np_array):
            cv.wait()
    for i in range(5):
        print("{0} ".format(np_array[i]), end="")

def proc_1(num, cv, shm):
    print("Process: {1} num: {0}".format(num, current_process().name))

    shm = shared_memory.SharedMemory(name=shm);
    np_array = np.ndarray((dim, ), dtype=np.int32, buffer=shm.buf)

    with cv:
        make_an_item_available(num, np_array)
        cv.notify()
    
def main():
    global dim
    
    lock = Lock()
    cv = Condition(lock)
    a1 = np.ones(shape=(dim, ), dtype=np.int32)
    shm = shared_memory.SharedMemory(create=True, size=a1.nbytes);
    
    kwkeys = {"cv":cv, "shm":shm.name}
    p1 = Process(target=proc_0, kwargs=kwkeys)
    p_list = []
    for i in range(5):
        p2 = Process(target=proc_1, args=(i,), kwargs=kwkeys)
        p_list.append(p2)
    p1.start();
    for p in p_list:
        p.start()
    p1.join()
    for p in p_list:
        p.join()
    
if __name__=="__main__":
    freeze_support()
    main();

显示器输出

Process: Process-1
Process: Process-3 num: 1
Process: Process-2 num: 0
Process: Process-5 num: 3
Process: Process-6 num: 4
Process: Process-4 num: 2
124 143 138 141 159

Event

这是进程间通信最简单的机制之一:一个进程发出事件信号,其他进程等待该事件。

事件对象管理一个内部标志,可以使用 set() 方法将其设置为 true,并使用clear() 方法将其重置为 false。 wait() 方法会阻塞,直到标志为 true。

构造器

词法:threading.Event()
实现事件对象的类。事件管理一个标志,可以使用 set() 方法将其设置为 true,并使用clear() 方法,将其重置为 false。 wait() 方法会阻塞,直到标志为 true。该标志最初是false。

set

词法:set()
将内部标志设置为 true。所有等待它变为 true 的线程都会被唤醒。一旦标志为 true,调用 wait() 的线程将根本不会阻塞。

clear

词法:clear()

将内部标志重置为 false。随后,调用 wait() 的线程将阻塞,直到调用 set() 再次将内部标志设置为 true。

wait

词法:wait(timeout=None)

只要内部标志为假

  • timeout是None,或者未指定,调用进程被阻塞。
  • 并且timeout正值,但阻塞时间尚未达到timeout,调用进程被阻塞;但当阻塞时间达到timeout,超时发生,则该函数返回,结束阻塞。

返回值代表这个阻塞函数返回的原因;

  • 如果由于内部标志设置为 true,而返回,则返回 True。
  • 如果该有timeout参数,且非None,内部标志在给定等待时间内,未变为 true,则返回 False。

当timeout参数存在,且非 None 时,它​​应该是一个浮点数,指定操作的超时时间,时间单位是秒。

应用实例

这个例子模拟两个初始化进程,但进程init_1必须等待init_0完成一些初始化工作之后,才能开始。

from time import sleep
from random import random

from multiprocessing import *
 
def init_0(event):
    print('Process: {0}(init_0) start...'.format(current_process().name))
    value = random()
    sleep(value)
    print('Process: {0} got {1}'.format(current_process().name, value))
    event.set()
    print('Process: {0} init_1 can start ...'.format(current_process().name))    
    value1 = random()
    sleep(value1)    

def init_1(event):
    print('Process: {0}(init_1) start...'.format(current_process().name))
    event.wait()
    print('Process: {0} init_0 done'.format(current_process().name))
    value = random()
    sleep(value)
 
def main():
    event = Event()
    ctx = get_context('spawn')

    kwargs = {"event" : event}
    p1 = ctx.Process(target=init_0, kwargs=kwargs)
    p2 = ctx.Process(target=init_1, kwargs=kwargs)

    p1.start();
    p2.start();

    p1.join();
    p2.join();

if __name__=="__main__":
    freeze_support()
    main();

显示器输出

Process: SpawnProcess-1(init_0) start...
Process: SpawnProcess-2(init_1) start...
Process: SpawnProcess-1 got 0.903477834169278
Process: SpawnProcess-1 init_1 can start ...
Process: SpawnProcess-2 init_0 done

Lock

这个类实现原语Lock对象。

Lock处于两种状态之一:“锁定”或“解锁”。它有两个基本方法:acquire() 和release()。

  • 当解锁状态时,acquire()将状态更改为锁定,并立即返回。
  • 当锁定状态时,acquire()阻塞调用进程,直到另一个进程调用release(),将其更改为解锁状态,然后,acquire()调用将其重置为锁定状态,并返回。
  • release()方法只能在锁定状态下调用;它将状态更改为解锁,并立即返回。如果尝试释放未锁定的锁,则会产生运行时错误。

当多个线程阻塞在 acquire() 中等待状态转为解锁状态时,当调用 release() 将状态重置为解锁时,只有一个线程继续执行;哪一个等待线程继续进行是未定义的,并且可能因实现而异。

构造器

词法:multiprocessing.Lock()

实现原始锁对象的类。一旦线程获取了锁,后续获取它的尝试就会被阻塞,直到它被释放;任何线程都可以释放它。

请注意,Lock 实际上是一个工厂函数,它返回平台支持的具体 Lock 类的最有效版本的实例。

acquire

词法:acquire(blocking=True, timeout=-1)
获取锁,阻塞或非阻塞。

当将阻塞参数设置为 True(默认值)进行调用时,将阻塞直到锁解锁,然后将其设置为锁定并返回 True。

当在阻塞参数设置为 False 的情况下调用时,不阻塞。如果将阻塞设置为 True 的调用会阻塞,则立即返回 False;否则,将锁设置为锁定并返回 True。

当在浮点超时参数设置为正值的情况下调用时,只要无法获取锁,就会阻塞最多超时指定的秒数。超时参数 -1 指定无限等待。当blocking为False时,禁止指定超时时间。

如果成功获取锁,则返回值为 True,否则返回值为 False(例如,如果超时已到)。

release

词法:release()
释放锁。这可以从任何线程调用,而不仅仅是已获取锁的线程。

当锁被锁定时,将其重置为解锁,然后返回。如果任何其他线程在等待锁解锁时被阻塞,则只允许其中一个线程继续进行。

当在未锁定的锁上调用时,会引发 RuntimeError。

没有返回值。

实列代码

下列实列代码在两个进程之间共享存储器,使用Lock,实现存储器共享

from multiprocessing import *
from multiprocessing import shared_memory

dim = 5
           
def proc_0(lock, shmem_name, val):
    global dim
    print('Process: {0}(init_0) start...'.format(current_process().name))

    existing_shm = shared_memory.SharedMemory(name=shmem_name)
    np_array = np.ndarray((dim, dim,), dtype=np.int32, buffer=existing_shm.buf)
    
    lock.acquire()
    np_array[:] = np_array[0] + val   
    for x in np_array:
        print(x)
    lock.release()

def proc_1(lock, shmem_name, val):
    global dim
    print('Process: {0}(init_1) start...'.format(current_process().name))

    existing_shm = shared_memory.SharedMemory(name=shmem_name)
    np_array = np.ndarray((dim, dim,), dtype=np.int32, buffer=existing_shm.buf)

    lock.acquire()
    np_array[:] = np_array[0] + val
    for x in np_array:
        print(x)
    lock.release()
 
def main():
    global dim
    
    lock = Lock()
    a = np.ones(shape=(dim, dim,), dtype=np.int32)
    shm_a = shared_memory.SharedMemory(create=True, size=a.nbytes)
    np_array = np.ndarray((dim, dim,), dtype=np.int32, buffer=shm_a.buf)
    for x in np_array:
        print(x)

    ctx = get_context('spawn')
    p1 = ctx.Process(target=proc_0, args=(lock, shm_a.name, 10))   
    p2 = ctx.Process(target=proc_1, args=(lock, shm_a.name, 5))

    p1.start();
    p2.start();
    p1.join();
    p2.join();
    
if __name__=="__main__":
    freeze_support()
    main();

显示屏输出

[0 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]
Process: SpawnProcess-1(init_0) start...
[10 10 10 10 10]
[10 10 10 10 10]
[10 10 10 10 10]
[10 10 10 10 10]
[10 10 10 10 10]
Process: SpawnProcess-2(init_1) start...
[15 15 15 15 15]
[15 15 15 15 15]
[15 15 15 15 15]
[15 15 15 15 15]
[15 15 15 15 15]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2149833.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++ | 二叉搜索树

前言 本篇博客讲解c中的继承 💓 个人主页:普通young man-CSDN博客 ⏩ 文章专栏:C_普通young man的博客-CSDN博客 ⏩ 本人giee: 普通小青年 (pu-tong-young-man) - Gitee.com 若有问题 评论区见📝 🎉欢迎大家点赞&…

【大模型】初识大模型(非常详细)零基础入门到精通,收藏这一篇就够了_大模型入门

大模型的定义 大模型是指具有数千万甚至数亿参数的深度学习模型。近年来,随着计算机技术和大数据的快速发展,深度学习在各个领域取得了显著的成果,如自然语言处理,图片生成,工业数字化等。为了提高模型的性能&#xf…

MeterSphere的一次越权审计

1 MeterSphere简介 MeterSphere是一个一站式开源持续测试平台,它提供了测试跟踪、接口测试、UI测试和性能测试等功能。它全面兼容JMeter、Selenium等主流开源标准,助力开发和测试团队实现自动化测试,加速软件的高质量交付。MeterSphere 的特点…

Java 微服务框架 HP-SOA v1.1.4

HP-SOA 功能完备,简单易用,高度可扩展的Java微服务框架。 项目主页 : https://www.oschina.net/p/hp-soa下载地址 : https://github.com/ldcsaa/hp-soa开发文档 : https://gitee.com/ldcsaa/hp-soa/blob/master/README.mdQQ Group: 44636872, 66390394…

解决selenium爬虫被浏览器检测问题

文章目录 专栏导读1.问题解析2.代码解析(Edge/Chrome通用)2.1 设置Edge浏览器选项:2.2 尝试启用后台模式2.3 排除启用自动化模式的标志2.4 禁用自动化扩展2.5 设置用户代理2.6 实例化浏览器驱动对象并应用配置2.7 在页面加载时执行JavaScript代码 3.完整代码(可直接…

[ IDE ] SEGGER Embedded Studio for RISC-V

一、FILE 二、Edit 三、View 四、Search 五、Navigate 六、Project 七、Build 7.1 编译 先选择一个目标类型,再选择编译。 八、Debug 九、Target 十、Tools 10.1 自定义快捷键 点击菜单项,通过Tools –> Options –> Keyboard,实现自…

初识Linux · 环境变量

目录 前言: 命令行参数 环境变量 直接看现象 更多的环境变量 尝试理解环境变量 前言: 今天介绍的是一个较为陌生的名词,环境变量,在学习环境变量之前,我们需要一定的预备知识,这个预备知识是命令行参…

HarmonyOS学习(十三)——数据管理(二) 关系型数据库

文章目录 1、基本概念2、运行机制3、默认配置与限制4、接口说明5、实战:开发“账本”5.1、创建RdbStore5.2、创建数据库5.3、增加数据5.4、删除数据5.5、修改数据5.6、查询数据5.7、备份数据库5.8、恢复数据库5.9、删除数据库 官方文档地址: 通过关系型…

堆的向下调整算法和TOPK问题

目录 1.什么是堆? 1.1 向下调整建堆的时间复杂度计算 1.2 堆的结构体设计 2.堆的功能实现: 2.1 堆的插入: 2.2 堆的删除: 2.3 堆排序: 2.4 向下调整建堆: 2.5 TOPK问题: 2.6 向上调整算…

对接金蝶云星空调用即时库存信息查询API(附JAVA实现)

文章目录 前言准备工作获取第三方授权权限与授权配置信息集成金蝶云SDK调用实现备注前言 对于有自己商品信息管理后台并且使用金蝶ERP系统管理物料的商家来说,将金蝶上物料的库存信息同步到管理后台就可以不用去金蝶上确认库存了,可以大大简化管理后台的库存变更工作,这篇文…

Call OpenAI API with Python requests is missing a model parameter

题意:使用 Python requests 调用 OpenAI API 时缺少 model 参数。 问题背景: Im trying to call OpenAI API from Python. I know they have their own openai package, but I want to use a generic solution. I chose the requests package for its f…

通义千问重磅开源Qwen2.5,性能超越Llama

Qwen2.5 新闻 9月19日云栖大会,阿里云CTO周靖人发布通义千问新一代开源模型Qwen2.5,旗舰模型Qwen2.5-72B性能超越Llama 405B,再登全球开源大模型王座。Qwen2.5全系列涵盖多个尺寸的大语言模型、多模态模型、数学模型和代码模型,每…

TransUNet: 通过Transformer的视角重新思考U-Net架构在医学图像分割中的设计|文献速递-Transformer架构在医学影像分析中的应用

Title 题目 TransUNet: Rethinking the U-Net architecture design for medical imagesegmentation through the lens of transformers TransUNet: 通过Transformer的视角重新思考U-Net架构在医学图像分割中的设计 01 文献速递介绍 卷积神经网络(CNNs&#xff…

计算机毕业设计之:教学平台微信小程序(

博主介绍: ✌我是阿龙,一名专注于Java技术领域的程序员,全网拥有10W粉丝。作为CSDN特邀作者、博客专家、新星计划导师,我在计算机毕业设计开发方面积累了丰富的经验。同时,我也是掘金、华为云、阿里云、InfoQ等平台…

生信初学者教程(四):软件

文章目录 RRstudioLinux系统其他软件本书是使用R语言编写的教程,用户需要下载R和RStudio软件用于进行分析。 版权归生信学习者所有,禁止商业和盗版使用,侵权必究 R R语言是一种免费的统计计算和图形化编程语言,是一种用于数据分析和统计建模的强大工具。它具有丰富的统计…

CSP-CCF★201912-2回收站选址★

一、问题描述 二、解答 代码&#xff1a; #include<iostream> #include<map> using namespace std; struct rubbish{int x;int y; }rub[1000]; int n; void input(){cin>>n;for(int i0;i<n;i){cin>>rub[i].x>>rub[i].y;} } bool has(int p,…

【machine learning-八-可视化loss funciton】

可视化lossfunction loss funciton可视化损失函数等高图 loss funciton 上一节讲过损失函数&#xff0c;也就是代价函数&#xff0c;它是衡量模型训练好坏的指标&#xff0c;对于线性回归来说&#xff0c;模型、参数、损失函数以及目标如下&#xff1a;、 损失函数的目标当然…

什么品牌超声波清洗机质量好?四大绝佳超声波清洗机品牌推荐!

在快节奏的现代生活中&#xff0c;个人物品的清洁卫生显得至关重要。眼镜、珠宝饰品、手表乃至日常餐厨用具&#xff0c;这些频繁接触的物品极易累积污渍与细菌。拿眼镜为例&#xff0c;缺乏定期清洁会让油渍与尘埃积累&#xff0c;进而成为细菌的温床&#xff0c;靠近眼睛使用…

SCDN是服务器吗?SCDN防御服务器有什么特点?

SCDN确实具有一定的防DDoS攻击能力&#xff0c;SCDN防御服务器有什么特点&#xff1f;高防SCDN通过结合内容分发网络&#xff08;CDN&#xff09;和分布式拒绝服务&#xff08;DDoS&#xff09;防护技术&#xff0c;提供了更全面的网络保护措施。在充满网络攻击的互联网时代&am…

dev c++输出中文乱码解决 printf乱码解决

把编码换成utf8就行 打开eiditor options