【python高级用法】进程

news2025/1/18 20:24:23

一个简单的进程

# -*- coding: utf-8 -*-

import multiprocessing

def foo(i):
    print ('called function in process: %s' %i)
    return

if __name__ == '__main__':
    Process_jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=foo, args=(i,))
        Process_jobs.append(p)
        p.start()
        p.join()

按照本节前面提到的步骤,创建进程对象首先需要引入multiprocessing模块:

import multiprocessing

然后,我们在主程序中创建进程对象:

p = multiprocessing.Process(target=foo, args=(i,))

最后,我们调用 start() 方法启动:

p.start()

进程对象的时候需要分配一个函数,作为进程的执行任务,本例中,这个函数是 foo() 。我们可以用元组的形式给函数传递一些参数。最后,使用进程对象调用 join() 方法。

如果没有 join() ,主进程退出之后子进程会留在idle中,你必须手动杀死它们。

 进程命名

# 命名一个进程
import multiprocessing
import time

def foo():
    name = multiprocessing.current_process().name
    print("Starting %s \n" % name)
    time.sleep(3)
    print("Exiting %s \n" % name)

if __name__ == '__main__':
    process_with_name = multiprocessing.Process(name='foo_process', target=foo)
    process_with_name.daemon = True  # 注意原代码有这一行,但是译者发现删掉这一行才能得到正确输出
    process_with_default_name = multiprocessing.Process(target=foo)
    process_with_name.start()
    process_with_default_name.start()

这个过程和命名线程很像。命名进程需要为进程对象提供 name 参数:

process_with_name = multiprocessing.Process(name='foo_process', target=foo)

在本例子中,进程的名字就是 foo_function 。如果子进程需要知道父进程的名字,可以使用以下声明:

name = multiprocessing.current_process().name

然后就能看见父进程的名字。

 后台使用进程

 如果需要处理比较巨大的任务,又不需要人为干预,将其作为后台进程执行是个非常常用的编程模型。此进程又可以和其他进程并发执行。通过Python的multiprocessing模块的后台进程选项,我们可以让进程在后台运行。

import multiprocessing
import time

def foo():
    name = multiprocessing.current_process().name
    print("Starting %s " % name)
    time.sleep(3)
    print("Exiting %s " % name)

if __name__ == '__main__':
    background_process = multiprocessing.Process(name='background_process', target=foo)
    background_process.daemon = True
    NO_background_process = multiprocessing.Process(name='NO_background_process', target=foo)
    NO_background_process.daemon = False
    background_process.start()
    NO_background_process.start()

 

为了在后台运行进程,我们设置 daemon 参数为 True

background_process.daemon = True

在非后台运行的进程会看到一个输出,后台运行的没有输出,后台运行进程在主进程结束之后会自动结束。

 进程杀死

我们可以使用 terminate() 方法立即杀死一个进程。另外,我们可以使用 is_alive() 方法来判断一个进程是否还存活。

我们创建了一个线程,然后用 is_alive() 方法监控它的声明周期。然后通过调用 terminate() 方法结束进程。

最后,我们通过读进程的 ExitCode 状态码(status code)验证进程已经结束, ExitCode 可能的值如下:

  • == 0: 没有错误正常退出
  • > 0: 进程有错误,并以此状态码退出
  • < 0: 进程被 -1 * 的信号杀死并以此作为 ExitCode 退出

在我们的例子中,输出的 ExitCode 是 -15 。负数表示子进程被数字为15的信号杀死。

自定义一个进程子类 

 

实现一个自定义的进程子类,需要以下三步:

  • 定义 Process 的子类
  • 覆盖 __init__(self [,args]) 方法来添加额外的参数
  • 覆盖 run(self, [.args]) 方法来实现 Process 启动的时候执行的任务

创建 Porcess 子类之后,你可以创建它的实例并通过 start() 方法启动它,启动之后会运行 run() 方法。

# -*- coding: utf-8 -*-
# 自定义子类进程
import multiprocessing

class MyProcess(multiprocessing.Process):
        def run(self):
                print ('called run method in process: %s' % self.name)
                return

if __name__ == '__main__':
        jobs = []
        for i in range(5):
                p = MyProcess()
                jobs.append(p)
                p.start()
                p.join()

 

每一个继承了 Process 并重写了 run() 方法的子类都代表一个进程。此方法是进程的入口:

class MyProcess(multiprocessing.Process):
    def run(self):
        print ('called run method in process: %s' % self.name)
        return

在主程序中,我们创建了一些 MyProcess() 的子类。当 start() 方法被调用的时候进程开始执行:

p = MyProcess()
p.start()

join() 命令可以让主进程等待其他进程结束最后退出。

进程间交换对象

并行应用常常需要在进程之间交换数据。Multiprocessing库有两个Communication Channel可以交换对象:队列(queue)和管道(pipe)。

我们可以通过队列数据结构来共享对象。

Queue 返回一个进程共享的队列,是线程安全的,也是进程安全的。任何可序列化的对象(Python通过 pickable 模块序列化对象)都可以通过它进行交换。

 

import multiprocessing
import random
import time

class Producer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        for i in range(10):
            item = random.randint(0, 256)
            self.queue.put(item)
            print("Process Producer : item %d appended to queue %s" % (item, self.name))
            time.sleep(1)
            print("The size of queue is %s" % self.queue.qsize())

class Consumer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            if self.queue.empty():
                print("the queue is empty")
                break
            else:
                time.sleep(2)
                item = self.queue.get()
                print('Process Consumer : item %d popped from by %s \n' % (item, self.name))
                time.sleep(1)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    process_producer = Producer(queue)
    process_consumer = Consumer(queue)
    process_producer.start()
    process_consumer.start()
    process_producer.join()
    process_consumer.join()

我们使用 multiprocessing 类在主程序中创建了 Queue 的实例:

if __name__ == '__main__':
    queue = multiprocessing.Queue()

然后我们创建了两个进程,生产者和消费者, Queue 对象作为一个属性。

process_producer = Producer(queue)
process_consumer = Consumer(queue)

生产者类负责使用 put() 方法放入10个item:

for i in range(10):
    item = random.randint(0, 256)
    self.queue.put(item)

消费者进程负责使用 get() 方法从队列中移除item,并且确认队列是否为空,如果为空,就执行 break 跳出 while 循环:

def run(self):
    while True:
        if self.queue.empty():
            print("the queue is empty")
            break
        else:
            time.sleep(2)
            item = self.queue.get()
            print('Process Consumer : item %d popped from by %s \n' % (item, self.name))
            time.sleep(1)

 

队列还有一个 JoinableQueue 子类,它有以下两个额外的方法:

  • task_done(): 此方法意味着之前入队的一个任务已经完成,比如, get() 方法从队列取回item之后调用。所以此方法只能被队列的消费者调用。
  • join(): 此方法将进程阻塞,直到队列中的item全部被取出并执行。

( Microndgt 注:因为使用队列进行通信是一个单向的,不确定的过程,所以你不知道什么时候队列的元素被取出来了,所以使用task_done来表示队列里的一个任务已经完成。

这个方法一般和join一起使用,当队列的所有任务都处理之后,也就是说put到队列的每个任务都调用了task_done方法后,join才会完成阻塞。)

 进程间同步

 

多个进程可以协同工作来完成一项任务。通常需要共享数据。所以在多进程之间保持数据的一致性就很重要了。需要共享数据协同的进程必须以适当的策略来读写数据。相关的同步原语和线程的库很类似。

进程的同步原语如下:

  • Lock: 这个对象可以有两种装填:锁住的(locked)和没锁住的(unlocked)。一个Lock对象有两个方法, acquire() 和 release() ,来控制共享数据的读写权限。
  • Event: 实现了进程间的简单通讯,一个进程发事件的信号,另一个进程等待事件的信号。 Event 对象有两个方法, set() 和 clear() ,来管理自己内部的变量。
  • Condition: 此对象用来同步部分工作流程,在并行的进程中,有两个基本的方法: wait() 用来等待进程, notify_all() 用来通知所有等待此条件的进程。
  • Semaphore: 用来共享资源,例如,支持固定数量的共享连接。
  • Rlock: 递归锁对象。其用途和方法同 Threading 模块一样。
  • Barrier: 将程序分成几个阶段,适用于有些进程必须在某些特定进程之后执行。处于障碍(Barrier)之后的代码不能同处于障碍之前的代码并行。

下面的代码展示了如何使用 barrier() 函数来同步两个进程。我们有4个进程,进程1和进程2由barrier语句管理,进程3和进程4没有同步策略。

 

import multiprocessing
from multiprocessing import Barrier, Lock, Process
from time import time
from datetime import datetime

def test_with_barrier(synchronizer, serializer):
    name = multiprocessing.current_process().name
    synchronizer.wait()
    now = time()
    with serializer:
        print("process %s ----> %s" % (name, datetime.fromtimestamp(now)))

def test_without_barrier():
    name = multiprocessing.current_process().name
    now = time()
    print("process %s ----> %s" % (name, datetime.fromtimestamp(now)))

if __name__ == '__main__':
    synchronizer = Barrier(2)
    serializer = Lock()
    Process(name='p1 - test_with_barrier', target=test_with_barrier, args=(synchronizer,serializer)).start()
    Process(name='p2 - test_with_barrier', target=test_with_barrier, args=(synchronizer,serializer)).start()
    Process(name='p3 - test_without_barrier', target=test_without_barrier).start()
    Process(name='p4 - test_without_barrier', target=test_without_barrier).start()

 下面这幅图表示了barrier如何同步两个进程:

 进程池

 

多进程库提供了 Pool 类来实现简单的多进程任务。 Pool 类有以下方法:

  • apply(): 直到得到结果之前一直阻塞。
  • apply_async(): 这是 apply() 方法的一个变体,返回的是一个result对象。这是一个异步的操作,在所有的子类执行之前不会锁住主进程。
  • map(): 这是内置的 map() 函数的并行版本。在得到结果之前一直阻塞,此方法将可迭代的数据的每一个元素作为进程池的一个任务来执行。
  • map_async(): 这是 map() 方法的一个变体,返回一个result对象。如果指定了回调函数,回调函数应该是callable的,并且只接受一个参数。当result准备好时会自动调用回调函数(除非调用失败)。回调函数应该立即完成,否则,持有result的进程将被阻塞。

 下面的例子展示了如果通过进程池来执行一个并行应用。我们创建了有4个进程的进程池,然后使用 map() 方法进行一个简单的计算。

 

import multiprocessing

def function_square(data):
    result = data*data
    return result

if __name__ == '__main__':
    inputs = list(range(100))
    pool = multiprocessing.Pool(processes=4)
    pool_outputs = pool.map(function_square, inputs)
    pool.close()
    pool.join()
    print ('Pool    :', pool_outputs)

进程和线程比较

  • 地址空间和其它资源(如打开文件):进程间相互独立,同一进程的各线程间共享。某进程内的线程在其它进程不可见。
  • 通信:进程间通信IPC,线程间可以直接读写进程数据段(如全局变量)来进行通信——需要进程同步和互斥手段的辅助,以保证数据的一致性。
  • 调度和切换:线程上下文切换比进程上下文切换要快得多。
  • 在多线程OS中,进程不是一个可执行的实体。

还可以类比为火车和车厢:

  • 线程在进程下行进(单纯的车厢无法运行)
  • 一个进程可以包含多个线程(一辆火车可以有多个车厢)
  • 不同进程间数据很难共享(一辆火车上的乘客很难换到另外一辆火车,比如站点换乘)
  • 同一进程下不同线程间数据很易共享(A车厢换到B车厢很容易)
  • 进程要比线程消耗更多的计算机资源(采用多列火车相比多个车厢更耗资源)
  • 进程间不会相互影响,一个线程挂掉将导致整个进程挂掉(一列火车不会影响到另外一列火车,但是如果一列火车上中间的一节车厢着火了,将影响到该趟火车的所有车厢)
  • 进程可以拓展到多机,进程最多适合多核(不同火车可以开在多个轨道上,同一火车的车厢不能在行进的不同的轨道上)
  • 进程使用的内存地址可以上锁,即一个线程使用某些共享内存时,其他线程必须等它结束,才能使用这一块内存。(比如火车上的洗手间)-”互斥锁(mutex)”
  • 进程使用的内存地址可以限定使用量(比如火车上的餐厅,最多只允许多少人进入,如果满了需要在门口等,等有人出来了才能进去)-“信号量(semaphore)”

参考链接:

第三章 基于进程的并行 — python-parallel-programming-cookbook-cn 1.0 文档

Python多线程、多进程最全整理 - 知乎 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1357854.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

展望2024: 中国AI算力能否引爆高性能计算和大模型训练的新革命?

★算力&#xff1b;算法&#xff1b;人工智能&#xff1b;高性能计算&#xff1b;高性能&#xff1b;高互联&#xff1b;生成式人工智能&#xff1b;StableDiffusion&#xff1b;ChatGPT&#xff1b;CoPilot&#xff1b;文本创建&#xff1b;图像生成&#xff1b;代码编写&…

Transformer从菜鸟到新手(三)

引言 这是Transformer的第三篇文章&#xff0c;上篇文章中我们了解了多头注意力和位置编码&#xff0c;本文我们继续了解Transformer中剩下的其他组件。 层归一化 层归一化想要解决一个问题&#xff0c;这个问题在Batch Normalization的论文中有详细的描述&#xff0c;即深层…

Windows找不到文件‘chrome‘,请确定文件名是否正确后,再试一次。

本文主要记录遇到vscode运行HTML文件提示&#xff1a; Windows找不到文件‘chrome‘&#xff0c;请确定文件名是否正确后&#xff0c;再试一次。问题的解决办法。 目录 一、打开设置 二 、搜索Live Server Config &#xff08;1&#xff09;安装Live Server插件 &#xff0…

弧垂观测手段再升级!输电线路导线弧垂检测装置的应用_深圳鼎信

输电线路导线弧垂是指在输电线路中导线的水平位置与塔杆之间的垂直距离。导线的弧垂是确定导线张力、塔杆高度等参数的重要依据。通过测量弧垂及时调整弧垂大小对保证输电线路的安全运行具有重要作用。鼎信将介绍两种测量弧垂的方法&#xff0c;一起来学习一下吧&#xff01; …

TikTok明星经济:短视频背后的网络名人

在当今数字时代&#xff0c;社交媒体平台的崛起为许多人带来了独特的机遇&#xff0c;其中TikTok作为一款短视频应用&#xff0c;正成为网络名人崛起的孵化器。随着TikTok用户量的不断增加&#xff0c;平台上涌现出越来越多的明星&#xff0c;这些明星不仅仅在虚拟世界中受到欢…

微信小程序分销商城制作源码系统:全部搭建好直接使用 带完整的安装代码包以及搭建教程

随着移动设备的普及和互联网技术的发展&#xff0c;电子商务逐渐成为人们购物的主要方式之一。而微信作为中国最大的社交平台之一&#xff0c;拥有庞大的用户基数和成熟的生态系统。因此&#xff0c;基于微信小程序开发分销商城具有巨大的市场潜力和商业价值。 以下是部分代码…

超级详细的 https 中间人攻击流程。

客户端发送 https 请求中间人截获 https 请求&#xff0c;然后在转发给服务端 中间人可以是抓包工具中间人可以通过伪造证书的方式截获请求服务端接收到请求【看起来是客户端发的&#xff0c;实际上已经经过中间人转发了】服务端以为是一个安全的请求&#xff0c;向客户端发送数…

计算机研究生论文检索方法汇总

计算机研究生论文检索方法汇总 作为一名优质(冤种)计算机在读研究生&#xff0c;检索论文是一项不可或缺的技能之一。 一、paperwithcode paperswithcode是一个免费开放的资源平台&#xff0c;提供了机器学习领域的论文、代码、数据集、方法和评估表。在这里我们可以检索不同…

对图片进行数据增强(基于pytorch)

背景 在进行机器学习的任务中&#xff0c;我们的训练数据往往是有限的&#xff0c;在有限的数据集上获得较好的模型训练结果&#xff0c;我们不仅要在模型结构上下功夫&#xff0c;另一方面也需要对数据集进行数据增强 图片数据增强 图像数据增强是一种在训练机器学习和深度学…

设计模式② :交给子类

文章目录 一、前言二、Template Method 模式1. 介绍2. 应用3. 总结 三、Factory Method 模式1. 介绍2. 应用3. 总结 参考内容 一、前言 有时候不想动脑子&#xff0c;就懒得看源码又不像浪费时间所以会看看书&#xff0c;但是又记不住&#xff0c;所以决定开始写"抄书&qu…

脑电范式学习(一):Psychopy安装

脑电范式学习&#xff08;一&#xff09;&#xff1a;Psychopy安装 1 引言2 Psychopy软件3 安装教程4 花活儿5 总结 1 引言 可能有人会疑惑&#xff1a;为什么要去学Psychopy&#xff1f;Psychopy有什么好的&#xff1f; 首先&#xff0c;要告诉大家这么一个情况&#xff1a;现…

使用 Swagger 导入 Postman: 最佳实践与步骤解析

Swagger和 Postman 都是常用的 API 测试工具&#xff0c;都有各自的优势。为了结合两者的优点&#xff0c;我们可以考虑将 Swagger 中的 API 定义导入到 Postman 中去&#xff0c;这样就可以利用 Postman 更强大的测试功能来测试 Swagger 定义的接口。 下面将以 Swagger Petst…

Spark调优解析-spark调优基本原则1(七)

1调优基本原则 1.1基本概念和原则 首先&#xff0c;要搞清楚Spark的几个基本概念和原则&#xff0c;否则系统的性能调优无从谈起&#xff1a; 每一台host上面可以并行N个worker&#xff0c;每一个worker下面可以并行M个executor&#xff0c;task们会被分配到executor上面去执…

yolov8实战第五天——yolov8+ffmpg实时视频流检测并进行实时推流——(推流,保姆教学)

yolov8实战第一天——yolov8部署并训练自己的数据集&#xff08;保姆式教程&#xff09;_yolov8训练自己的数据集-CSDN博客 yolov8实战第三天——yolov8TensorRT部署&#xff08;python推理&#xff09;&#xff08;保姆教学&#xff09;-CSDN博客 今天&#xff0c;我们继续y…

大数据开发个人简历范本(2024最新版-附模板)

大数据开发工程师个人简历范本> 男 22 本科 张三 计算机科学与技术 1234567890 个人概述 具备深入的Hadoop大数据运维工程师背景&#xff0c;熟悉相关技术和工具 具备良好的团队合作能力&#xff0c;善于沟通和协作 具有快速学习新知识和解决问题的能力 对于数据科学…

模型 回弹效应

系列文章 主要是 分享 思维模型&#xff0c;涉及各个领域&#xff0c;重在提升认知。行动反弹&#xff0c;效果加倍。 1 回弹效应的应用 1.1 纽约市的经济复苏-经济发展中的回弹效应 在20世纪70年代和80年代&#xff0c;纽约市面临了经济衰退、高犯罪率和城市衰败等问题。这导…

Redis 之父锐评 LLM 编程:全知全能 Stupid|一周IT资讯

阿里通义千问上线“科目三”&#xff0c;刘皇叔、奥特曼、马斯克通通没逃过 在刚到的2024年&#xff0c;阿里通义千问 APP 上线图片生成舞蹈功能&#xff0c;用户只需输入一张图片&#xff0c;就能生成爆款舞蹈图片。 不管是“科目三”&#xff0c;还是鬼步舞、兔子舞&#x…

晶振噪声来源及有效降低其影响的方法

低噪声晶振主要减少振荡器内部噪声对输出信号的影响&#xff0c;以获得短期频率稳定性的晶体振荡器。噪声会引起输出信号频率的随机起伏&#xff1a;起伏小&#xff0c;稳定度越高。 晶振噪声的由来 晶振的短期频率稳定度由噪声引起导致的频率不稳定。其中&#xff0c;电噪声…

使用Apache POI将数据写入Excel文件

首先导入依赖 <dependency><groupId>org.apache.poi</groupId><artifactId>poi</artifactId><version>3.16</version> </dependency> <dependency><groupId>org.apache.poi</groupId><artifactId>po…

基于Java SSM框架实现新闻推送系统项目【项目源码】计算机毕业设计

基于java的SSM框架实现新闻推送系统演示 SSM框架 当今流行的“SSM组合框架”是Spring SpringMVC MyBatis的缩写&#xff0c;受到很多的追捧&#xff0c;“组合SSM框架”是强强联手、各司其职、协调互补的团队精神。web项目的框架&#xff0c;通常更简单的数据源。Spring属于…