30. 多进程编程

news2024/12/22 22:56:33

一、什么是进程

  进程(process)则是一个执行中的程序。每个进程都拥有自己的地址空间、内存、数据栈以及其它用于跟踪执行的辅助数据。操作系统管理其上所有进程的执行,并为这些进程合理分配时间。进程也可以通过派生新的进程来执行其它任务,不过因为每个新进程也都拥有自己的内存和数据栈等,所以只能采用进程间通信的方式共享数据;

二、进程的生命周期

  一个完整进程的生命周期中通常要经过如下的五种状态:

  • 创建:当一个 Process 类或及其子类的对象被声明并创建时,新生的进程就处于创建状态;
  • 就绪:处于新建的进程被 start() 后,将进入进程队列等待 CPU 时间片,此时它已具备了运行的条件,只是没分配到 CPU 资源;
  • 运行:当就绪的进程被调度并获得 CPU 资源时,便进入运行状态,run() 方法定义了进程的操作和功能;
  • 阻塞:在某种特殊情况下,被人为挂起或执行输入输出操作时,让出 CPU 并临时中止自己的执行,进入阻塞状态;
  • 退出:进程完成了它的全部或进程被提前强制性中止或出现异常导致结束;
    在这里插入图片描述

三、进程的创建

【1】、使用 multiprocessing 模块

  在 Python 中,我们可以使用 multiprocessing模块中的 Process 类创建一个对象。这个对象表示一个进程,但它不会真正创建出来一个进程。而当我们调用 start() 方法时,才会真正创建一个新的子进程,并开始执行的。

  至于这个进程去执行哪里的代码,要看在用 Process 创建对象的时候给 target 传递的是哪个函数的引用,即将来进程就会执行 target 参数指向的那个函数。target 指向的那个函数代码执行完之后,意味着这个子进程结束;

创建 Thread 对象时,target 参数指明进程将来去哪里执行代码,而 args 参数执行进程去执行代码时所携带的数据,并且 args 参数是一个元组。如果我们想给指定的参数传递数据,我们可以给 kwargs 参数传递一个字典。

import time

from multiprocessing import Process

def task(name):
    print(f"{name}开始执行")
    time.sleep(3)
    print(f"{name}执行结束")

"""
Window操作系统下,创建系统一定要在main内创建
因为Window系统下创建进程类似于模块的导入的方式,会从上往下依次执行代码

Linux中则是直接将代码完成拷贝一份
"""
if __name__ == "__main__":
    # 1、实例化对象
    p1 = Process(target=task, args=("进程1",))
    p2 = Process(target=task, kwargs={"name": "进程2"})

    # 2、开启进程
    p1.start()                # 告诉操作系统帮你创建一个进程
    p2.start()
  
    print("主进程执行")

【2】、自定义类继承 Process

我们可以自定义一个类继承 Process,然后一定要实现它的 run() 方法,即定义一个 run() 方法,并且在方法中实现要执行的代码。当我们调用自己编写的类创建出来的对象的 start() 方法时,会创建新的进程,并且进程会自动调用 run() 方法开始执行。

如果除了 run() 方法之外还定义了很多其它的方法,那么这些方法需要在 run() 方法中自己去第调用,进程它不会自动调用。

import time

from multiprocessing import Process

class MyProcess(Process):

    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print(f"{self.name}开始执行")
        time.sleep(1)
        print(f"{self.name}执行结束")

if __name__ == "__main__":
    p= MyProcess("进程1")
    p.start()
    print("主进程执行")

创建进程就是在内存中申请一块内存空间将需要运行的代码丢进去;一个进程对应在内存中就是一块独立的内存空间,多个进程对应在内存中就是多块独立的内存空间。默认情况下,进程与进程之间时无法直接交互的。如果想交互,可以借助第三方模块。

四、进程的常用属性和方法

multiprocessing.process.name                # 当前进程实例别名,默认为Process-N,N从1开始递增的整数
multiprocessing.process.pid                 # 当前进程实例的PID值
multiprocessing.process.start()             # 启动进程实例
multiprocessing.process.run()               # 如果没有给定target参数,对这个对象调用start()方法时,就会执行对象中的run()方法
multiprocessing.process.is_alive()          # 判断进程实例是否还在执行
multiprocessing.process.join([timeout])     # 是否等待进程实际执行结束,或等得多少秒
multiprocessing.process.terminate()         # 不管任务是否完成,立即终止
import time

from multiprocessing import Process, current_process

money = 100

def task(n):
    print(f"{current_process().name }- {current_process().pid}")
    print(f"{current_process().name}开始执行")
    global money
    money *= n
    time.sleep(n)
    print(f"{current_process().name}的money: {money}")
    print(f"{current_process().name}执行结束")

"""
Window操作系统下,创建系统一定要在main内创建
因为Window系统下创建进程类似于模块的导入的方式,会从上往下依次执行代码

Linux中则是直接将代码完成拷贝一份
"""
if __name__ == "__main__":
    ## 1、实例化对象
    p1 = Process(target=task, args=(1,))
    p2 = Process(target=task, args=(2,))
    p3 = Process(target=task, args=(3,))

    start_time = time.time()

    # 2、开启进程
    p1.start()                      # 告诉操作系统帮你创建一个进程
    p2.start()
    p3.start()

    p2.terminate()                  # 告诉操作系统,终止进程,但是需要一定的时间
    print(p2.is_alive())            # 获取进程状态

    # 主进程等待子进程运行结束之后在继续往后执行
    p3.join()

    print(f"{current_process().name} {time.time() - start_time}")
    print(f"{current_process().name} money: {money}")

五、僵尸进程与孤儿进程

  当你开设子进程之后,该进程死后不会立即释放占用的进程号。这是因为要让父进程能够查看它开设的子进程的一些基本信息,例如:占用的 pid 号、运行时间等;这种的进程称为 僵尸进程。所有的进程都会步入僵尸进程。

  孤儿进程 是指子进程存活,父进程意外死亡的进程。操作系统会开设一个特殊的空间专门管理孤儿进程回收相关资源。

六、守护进程

  被守护的进程结束之后,守护进程也会立即跟着结束。如果我们想把一个进程设置为守护进程,那么需要在调用 start() 方法前把 daemon 属性设置为 True。

import time

from multiprocessing import Process

def task(name, n):
    print(f"{name}开始执行")
    time.sleep(n)
    print(f"{name}执行结束")

"""
Window操作系统下,创建系统一定要在main内创建
因为Window系统下创建进程类似于模块的导入的方式,会从上往下依次执行代码

Linux中则是直接将代码完成拷贝一份
"""
if __name__ == "__main__":
    p1 = Process(target=task, args=("守护进程1", 3), daemon=True)
    p2 = Process(target=task, args=("守护进程2", 3))                 # 1、实例化对象
    p2.daemon = True                                                # 2、将进程设置为守护进程

    p1.start()                                                      # 3、开启进程,告诉操作系统帮你创建一个进程
    p2.start()

    time.sleep(1)

    print("主进程执行")

七、进程互斥锁

  多个进程操作同一份数据时,可能会出现数据错乱的问题。针对上述问题,解决方式就是加锁处理:将并发变成串行,牺牲效率但保证了数据的安全。

import time
import json

from multiprocessing import Process,Lock

def buy(name, mutex):
    # 加锁处理
    mutex.acquire()             # 抢锁

    # 先查剩余的票数
    with open("data.txt","r",encoding="utf-8") as f:
        ticket_dict = json.load(f)

    # 模拟网络延迟
    time.sleep(1)

    # 判断当前是否有票
    if ticket_dict.get("ticket_num") > 0:
        # 修改数据买票
        ticket_dict["ticket_num"] -= 1
        # 写入数据
        with open("data.txt","w",encoding="utf-8") as f:
            json.dump(ticket_dict,f)

        print(f"用户{name}买票成功")
    else:
        print(f"用户{name}买票失败")

    mutex.release()             # 释放锁 

if __name__ == "__main__":
    # 在主进程中生成一把锁,让所有的进程程抢,谁先抢到谁先买票
    mutex = Lock()

    p1 = Process(target=buy, args=("Sakura",mutex))     # 1、实例化对象
    p1.start()                                          # 2、开启进程,告诉操作系统帮你创建一个进程
  
    p2 = Process(target=buy, args=("Mikoto",mutex))
    p2.start()

    p3 = Process(target=buy, args=("Shana",mutex))
    p3.start()

  【data.txt】文本内容如下:

{"ticket_num": 1}

锁应该只在处理数据的部分加锁保证数据安全;

八、进程间通信

  创建进程就是在内存中申请一块内存空间将需要运行的代码丢进去;一个进程对应在内存中就是一块独立的内存空间,多个进程对应在内存中就是多块独立的内存空间。默认情况下,进程与进程之间时无法直接交互的。如果想交互,可以借助第三方模块队列类实现。

  当创建一个子进程的时候,会复制父进程的很多东西(全局变量等)。子进程和主进程是单独的两个进程,当一个进程结束的时候,不会对其它进程产生影响。

import time

from multiprocessing import Process

num = 100

def task1():
    global num
    num = 300
    print(f"task1中的num:{num}")

def task2():
    print(f"task2中的num:{num}")

"""
Window操作系统下,创建系统一定要在main内创建
因为Window系统下创建进程类似于模块的导入的方式,会从上往下依次执行代码

Linux中则是直接将代码完成拷贝一份
"""
if __name__ == "__main__":
    p1 = Process(target=task1)
    p2 = Process(target=task2)

    # 先让p1线程执行
    p1.start()
    # 让主进程延迟1s,保证p1进程执行完之后,在执行p2进程
    time.sleep(1)
    # 让p2进程开始执行,看看获取的值是否是p1进程修改后的值
    p2.start()

  如果我们想让多个进程间共享数据,可以通过队列来实现。队列 (Queue)是具有一定约束的线性表,它只能在 一端插入入队 ,AddQ)而在 另一端删除出队 ,DeleteQ)。它具有 先进先出 (FIFO)的特性。,它的常用方法如下:

multiprocessing.Queue([maxsize])                            # 生成队列,最大可以存放maxsize数据量,默认值为32767
multiprocessing.Queue.qsize()                               # 返回当前队列包含的消息数量
multiprocessing.Queue.put(item, block=True, timeout=None)   # 向队列中存取数据,默认情况下,如果队列已满,还要放数据,程序会阻塞,直到有位置让出来,不会报错
multiprocessing.Queue.put_nowait(obj)                       # 向队列中存取数据,如果队列已满,还要放数据,程序会抛出异常
multiprocessing.Queue.get(block=True, timeout=None)         # 取队列中的数据,默认情况下,如果队列中没有数据,还要取数据,程序会阻塞,直到有新的数据到来,不会报错
multiprocessing.Queue.get_nowait()                          # 取队列中的数据,如果队列中没有数据,还要取数据,程序会抛出异常
multiprocessing.Queue.empty()                               # 如果队列为空,返回True,反之返回False
multiprocessing.Queue.full()                                # 如果队列满了,返回True,反之返回False
from multiprocessing import Queue

names = ["Sakura","Mikoto","Shana","Akame","Kurome"]

q = Queue(3)

print("向队列中存储数据")
i = 0
while not q.full():
    q.put(names[i])
    i += 1

# 如果消息队列已满,如果还要向队列中存储数据,程序会阻塞或抛出异常
try:
    # 如果没有设置timeout,向已满队列存储数据会阻塞,直到有位置让出来
    # 如果设置timeout,则会等待timeout秒,如果在此期间还没有位置空出来,程序会抛出异常
    q.put(names[i],timeout=3)
except Exception:
    print("队列已满,现有消息数量:%s" % q.qsize())

try:
    # 向已满队列存储数据会抛出异常
    q.put_nowait(names[i+1])
except Exception:
    print("队列已满,现有消息数量:%s" % q.qsize())

print("从队列中读取数据")
while not q.empty():
    data = q.get()
    print(f"读取的数据为{data}")

# 如果消息队列已空,如果还要从队列中读取数据,程序会阻塞或抛出异常
try:
    # 如果没有设置timeout,向已满队列存储数据会阻塞,直到有位置让出来
    # 如果设置timeout,则会等待timeout秒,如果在此期间还没有位置空出来,程序会抛出异常
    q.get(timeout=3)
except Exception:
    print("队列已空,现有消息数量:%s" % q.qsize())

try:
    # 向已满队列存储数据会抛出异常
    q.get_nowait()
except Exception:
    print("队列已空,现有消息数量:%s" % q.qsize())

full()、empty()、get_nowait() 方法在多进程的情况下是不精确的;

【1】、主进程与子进程进行通信

from multiprocessing import Process,Queue

def task(q):
    print("子进程开始执行了!")
    q.put("hello world!")
    print("子进程执行结束了!")

if __name__ == "__main__":
    q = Queue(3)
    p = Process(target=task,args=(q,))
    p.start()
    print(q.get())

【2】、子进程与子进程进行通信

from multiprocessing import Process,Queue

def produces(q):
    q.put("hello world!")

def consumer(q):
    print(q.get())

if __name__ == "__main__":
    q = Queue(3)

    p1 = Process(target=produces,args=(q,))
    p1.start()
  
    p2 = Process(target=consumer,args=(q,))
    p2.start()

九、进程池

9.1、进程池的使用

  池是用来保证计算机硬件安全的情况下最大限度的利用计算机,它降低了程序的运行效率,但是保证了计算机硬件的安全,从而让你写的程序能够正常运行。

  初始化 Pool 时,可以指定一个最大进程数,当有新的请求提交到 Pool 时,如果池还没有满,那么就会创建一个新的进程用来执行该请求。但是如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务。

import time
import os

from multiprocessing import Pool

def task(num):
    print(f"pid: {os.getpid()}, num: {num}")
    time.sleep(1)
    return num * 100


if __name__ == "__main__":
    # 括号内可以传数字指定进程数,不传的话,默认会开设当前计算机CPU个数的进程
    # 池子造出来后,会存在一定数量的进程,这些进程不会出现重复创建和销毁的过程
    pool = Pool()
    p_list= []


    # 池子的使用非常简单,只需要将需要做的任务往池子中提交即可
    for i in range(20):
        res = pool.apply_async(task, args=(i,))  # 朝池子中提交任务,异步提交
        p_list.append(res)

    # 等待进程池中所有的任务执行完毕之后再继续往下执行
    pool.close()                                 # 关闭进程池,等待进程中所有任务运行完毕
    pool.join()                                  # 主进程等待子进程全部执行完

    for p in p_list:   
        print(f"result: {p.get()}")              # 拿到异步提交的返回结果
    print("主线程执行了")
import time
import os

from concurrent.futures import ProcessPoolExecutor

# 括号内可以传数字指定进程数,不传的话,默认会开设当前计算机CPU个数的进程
# 池子造出来后,会存在一定数量的进程,这些进程不会出现重复创建和销毁的过程
pool = ProcessPoolExecutor(5)

def task(num):
    print(f"pid: {os.getpid()}, num: {num}")
    time.sleep(1)
    return num * 100


if __name__ == "__main__":
    p_list= []

    # 池子的使用非常简单,只需要将需要做的任务往池子中提交即可
    for i in range(20):
        res = pool.submit(task,i)           # 朝池子中提交任务,异步提交
        p_list.append(res)

    # 等待进程池中所有的任务执行完毕之后再继续往下执行
    pool.shutdown()                         # 关闭进程池,等待进程中所有任务运行完毕

    for p in p_list:   
        print(f"result: {p.result()}")      # 拿到异步提交的返回结果
    print("主线程执行了")

9.2、进程池间通信

  进程池间通信要使用 Manage 创建的 Queue 队列,不能直接使用普通的 Queue。

import time
import os

from multiprocessing import Pool, Manager

def reader(q):
    print(f"reader启动({os.getpid()}),父进程({os.getppid()})")
    for i in range(q.qsize()):
        print(f"reader从Queue获取消息:{q.get()}")

def write(q):
    print(f"write启动({os.getpid()}),父进程({os.getppid()})")
    for i in "Sakura":
        q.put(i)


if __name__ == "__main__":
    # 括号内可以传数字指定进程数,不传的话,默认会开设当前计算机CPU个数的进程
    # 池子造出来后,会存在一定数量的进程,这些进程不会出现重复创建和销毁的过程
    pool = Pool()
    p_list= []
    q = Manager().Queue()

    print(f"main ({os.getpid()}) start!")

    # 池子的使用非常简单,只需要将需要做的任务往池子中提交即可
    pool.apply_async(write, args=(q,))      # 朝池子中提交任务,异步提交

    # 先让上面的任务向Queue存入数据,然后在让下面的任务从中读取数据
    time.sleep(1)

    pool.apply_async(reader, args=(q,))     # 朝池子中提交任务,异步提交

    # 等待进程池中所有的任务执行完毕之后再继续往下执行
    pool.close()                            # 关闭进程池,等待进程中所有任务运行完毕
    pool.join()                             # 主进程等待子进程全部执行完

    print(f"main ({os.getpid()}) end!")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2263936.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity Post请求发送fromdata数据content-type

wwwfrom 的 headers["Content-Type"]修改 错误代码: WWWForm form new WWWForm(); if (form.headers.ContainsKey("Content-Type")) {string boundary string.Format("--{0}", DateTime.Now.Ticks.ToString("x"));form…

aosp15 - Activity生命周期切换

本文探查的是,从App冷启动后到MainActivity生命周期切换的系统实现。 调试步骤 在com.android.server.wm.RootWindowContainer#attachApplication 方法下断点,为了attach目标进程在com.android.server.wm.ActivityTaskSupervisor#realStartActivityLock…

SAP PP ECN CSAP_MAT_BOM_MAINTAIN

刚开始的时候ECN总是加不上, 参考kimi给出的案例 点击链接查看和 Kimi 智能助手的对话 https://kimi.moonshot.cn/share/cth1ipmqvl7f04qkggdg 效果 加上了 FUNCTION ZPBOM_PLM2SAP. *"------------------------------------------------------------------…

GitLab的安装和使用

1.GitLab 环境说明 系统版本 CentOS 7.2 x86_64 软件版本 gitlab-ce-10.8.4 GitLab 是一个用于仓库管理系统的开源项目,使用Git作为代码管理工具,并在此基础上搭建起来的web服务。可通过Web界面进行访问公开的或者私人项目。它拥有与Github类似的功能…

开放词汇目标检测(Open-Vocabulary Object Detection, OVOD)综述

定义 开放词汇目标检测(Open-Vocabulary Object Detection, OVOD)是一种目标检测任务,旨在检测和识别那些未在训练集中明确标注的物体类别。传统的目标检测模型通常只能识别有限数量的预定义类别,而OVOD模型则具有识别“开放词汇…

JaxaFx学习(三)

目录: (1)JavaFx MVVM架构实现 (2)javaFX知识点 (3)JavaFx的MVC架构 (4)JavaFx事件处理机制 (5)多窗体编程 (6)数据…

【C++】小乐乐求和问题的高效求解与算法对比分析

博客主页: [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 💯前言💯问题描述与数学模型1.1 题目概述1.2 输入输出要求1.3 数学建模 💯方法一:朴素循环求和法2.1 实现原理2.2 分析与问题2.3 改进方案2.4 性能瓶颈与结论…

基于Spring Boot的找律师系统

一、系统背景与意义 在现代社会,法律服务的需求日益增长,但传统寻找律师的方式往往存在信息不透明、选择困难等问题。基于Spring Boot的找律师系统旨在解决这些问题,通过线上平台,用户可以轻松搜索、比较和选择合适的律师&#x…

【Spring】方法注解@Bean,配置类扫描路径

阿华代码,不是逆风,就是我疯 你们的点赞收藏是我前进最大的动力!! 希望本文内容能够帮助到你!! 目录 引入 一:Bean方法注解 1:方法注解要搭配类注解使用 2:执行结果 …

深度学习0-前置知识

一、背景 AI最大,它的目的是通过让机器模仿人类进而超越人类; ML次之,它是AI的一个分支,是让机器模仿人类的一种方法。开发人员用大量数据和算法“训练”机器,让机器自行学会如何执行任务,它的成功取决于…

前端面试题整理-前端异步编程

1. 进程、线程、协程的区别 在并发编程领域,进程、线程和协程是三个核心概念,它们在资源管理、调度和执行上有着本质的不同。 首先,进程是操作系统进行资源分配和调度的独立单位(资源分配基本单位),每个进…

ARM学习(38)多进程多线程之间的通信方式

ARM学习(38)ARM学习(38)多进程多线程之间的通信方式 一、问题背景 笔者在调试模拟器的时候,碰到进程间通信的问题,一个进程在等另外一个进程ready的时候,迟迟等不到,然后通过调试发现,另外一个进程变量已经变化了,但是当前进程变量没变化,需要了解进程间通信的方式…

群晖利用acme.sh自动申请证书并且自动重载证书的问题解决

前言 21年的时候写了一个在群晖(黑群晖)下利用acme.sh自动申请Let‘s Encrypt的脚本工具 群晖使用acme自动申请Let‘s Encrypt证书脚本,自动申请虽然解决了,但是自动重载一直是一个问题,本人也懒,一想到去…

level2逐笔委托查询接口

沪深逐笔委托队列查询 前置步骤 分配数据库服务器 查询模板 以下是沪深委托队列查询的请求模板&#xff1a; http://<数据库服务器>/sql?modeorder_book&code<股票代码>&offset<offset>&token<token>查询参数说明 参数名类型说明mo…

delve调试环境搭建—golang

原文地址&#xff1a;delve调试环境搭建—golang – 无敌牛 欢迎参观我的个人博客&#xff1a;无敌牛 – 技术/著作/典籍/分享等 由于平时不用 IDE 开发环境&#xff0c;习惯在 linux终端vim 环境下开发&#xff0c;所以找了golang的调试工具&#xff0c;delve类似gdb的调试界…

PC寄存器(Program Counter Register) jvm

在JVM&#xff08;Java虚拟机&#xff09;中&#xff0c;PC寄存器&#xff08;Program Counter Register&#xff09;扮演着至关重要的角色&#xff0c;它是JVM执行引擎的核心组成部分之一。以下是PC寄存器在JVM中的具体角色和职责&#xff1a; 指令执行指针&#xff1a; PC寄存…

线性分类器(KNN,SVM损失,交叉熵损失,softmax)

KNN 工作机制 k-近邻算法的工作机制可以分为两个主要阶段&#xff1a;训练阶段和预测阶段。 训练阶段 在训练阶段&#xff0c;k-近邻算法并不进行显式的模型训练&#xff0c;而是简单地存储训练数据集。每个样本由特征向量和对应的标签组成。此阶段的主要任务是准备好数据&…

重拾设计模式--适配器模式

文章目录 适配器模式&#xff08;Adapter Pattern&#xff09;概述适配器模式UML图适配器模式的结构目标接口&#xff08;Target&#xff09;&#xff1a;适配器&#xff08;Adapter&#xff09;&#xff1a;被适配者&#xff08;Adaptee&#xff09;&#xff1a; 作用&#xf…

StarRocks:存算一体模式部署

目录 一、StarRocks 简介 二、StarRocks 架构 2.1 存算一体 2.2 存算分离 三、前期准备 3.1前提条件 3.2 集群规划 3.3 配置环境 3.4 准备部署文件 四、手动部署 4.1 部署FE节点 4.2 部署BE节点 4.3 部署CN节点&#xff08;可选&#xff09; 4.4 FE高可用…

找数字:JAVA

题目描述 试计算在区间1 到n 的所有整数中&#xff0c;数字x&#xff08;0 ≤ x ≤ 9&#xff09;共出现了多少次&#xff1f; 例如&#xff0c;在1到11 中&#xff0c;即在1、2、3、4、5、6、7、8、9、10、11 中&#xff0c;数字1 出现了4 次。 输入描述: 输入共1行&#xf…