Python基础之多进程

news2025/1/11 18:32:28

文章目录

  • 1 多进程
    • 1.1 简介
    • 1.2 Linux下多进程
    • 1.3 multiprocessing
    • 1.4 Pool
    • 1.5 进程间通信
    • 1.6 分布式进程

1 多进程

1.1 简介

要让Python程序实现多进程(multiprocessing),我们先了解操作系统的相关知识。
Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

1.2 Linux下多进程

Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程:

# multiprocessing.py
import os

print ('Process (%s) start...' % os.getpid())
pid = os.fork()
if pid==0:
    print ('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)
运行结果如下:

Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.

由于Windows没有fork调用,上面的代码在Windows上无法运行。由于Mac系统是基于BSD(Unix的一种)内核,所以,在Mac下运行是没有问题的

有了fork调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,常见的Apache服务器就是由父进程监听端口,每当有新的http请求时,就fork出子进程来处理新的http请求。

1.3 multiprocessing

如果打算编写多进程的服务程序,Unix/Linux无疑是正确的选择。由于Windows没有fork调用,难道在Windows上无法用Python编写多进程的程序?
由于Python是跨平台的,自然也应该提供一个跨平台的多进程支持。multiprocessing模块就是跨平台版本的多进程模块。

multiprocessing模块提供了一个Process类来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束:

from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print ('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print ('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print ('Process will start.')
    p.start()
    p.join()
    print 'Process end.'

执行结果如下:
Parent process 928.
Process will start.
Run child process test (929)...
Process end.

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。
join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

1.4 Pool

如果要启动大量的子进程,可以用进程池的方式批量创建子进程:

from multiprocessing import Pool
import os, time, random
def long_time_task(name):
    print ('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print ('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print 'Parent process %s.' % os.getpid()
    p = Pool()
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print ('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print 'All subprocesses done.'

执行结果如下:
Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.

代码解读:

  • 对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。
  • 注意输出的结果,task 0,1,2,3是立刻执行的,而task 4要等待前面某个task完成后才执行,这是因为Pool的默认大小在电脑上是4,因此,最多同时执行4个进程。这是Pool有意设计的限制,并不是操作系统的限制。如果改成:p = Pool(5),就可以同时跑5个进程。
  • 由于Pool的默认大小是CPU的核数,如果拥有8核CPU,那么要提交至少9个子进程才能看到上面的等待效果。

1.5 进程间通信

Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Pythonmultiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。

我们以 Queue 为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    for value in ['A', 'B', 'C']:
        print ('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
    while True:
        value = q.get(True)
        print ('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

运行结果如下:

Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

Unix/Linux下,multiprocessing模块封装了fork()调用,使我们不需要关注fork()的细节。由于Windows没有fork调用,因此,multiprocessing需要“模拟”出fork的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,所以,如果multiprocessingWindows下调用失败了,要先考虑是不是pickle失败了。

1.6 分布式进程

Pythonmultiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。

举个例子:如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现?
原有的Queue可以继续使用,但是,通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。
我们先看服务进程,服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务:

# taskmanager.py

import random, time, Queue
from multiprocessing.managers import BaseManager

# 发送任务的队列:
task_queue = Queue.Queue()
# 接收结果的队列:
result_queue = Queue.Queue()

# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
    pass

# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey='abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
    n = random.randint(0, 10000)
    print('Put task %d...' % n)
    task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
    r = result.get(timeout=10)
    print('Result: %s' % r)
# 关闭:
manager.shutdown()

请注意,当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。
然后,在另一台机器上启动任务进程(本机上启动也可以):

# taskworker.py

import time, sys, Queue
from multiprocessing.managers import BaseManager

# 创建类似的QueueManager:
class QueueManager(BaseManager):
    pass

# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到服务器,也就是运行taskmanager.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与taskmanager.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey='abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task queue is empty.')
# 处理结束:
print('worker exit.')

任务进程要通过网络连接到服务进程,所以要指定服务进程的IP。

现在,可以试试分布式进程的工作效果了。先启动taskmanager.py服务进程:

$ python taskmanager.py
Put task 3411...
Put task 1605...
Put task 1398...
Put task 4729...
Put task 5300...
Put task 7471...
Put task 68...
Put task 4219...
Put task 339...
Put task 7866...
Try get results...
taskmanager进程发送完任务后,开始等待result队列的结果。现在启动taskworker.py进程:

$ python taskworker.py 127.0.0.1
Connect to server 127.0.0.1...
run task 3411 * 3411...
run task 1605 * 1605...
run task 1398 * 1398...
run task 4729 * 4729...
run task 5300 * 5300...
run task 7471 * 7471...
run task 68 * 68...
run task 4219 * 4219...
run task 339 * 339...
run task 7866 * 7866...
worker exit.

taskworker进程结束,在taskmanager进程中会继续打印出结果:

Result: 3411 * 3411 = 11634921
Result: 1605 * 1605 = 2576025
Result: 1398 * 1398 = 1954404
Result: 4729 * 4729 = 22363441
Result: 5300 * 5300 = 28090000
Result: 7471 * 7471 = 55815841
Result: 68 * 68 = 4624
Result: 4219 * 4219 = 17799961
Result: 339 * 339 = 114921
Result: 7866 * 7866 = 61873956

这个简单的Manager/Worker模型有什么用?其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,比如把计算n*n的代码换成发送邮件,就实现了邮件队列的异步发送。

Queue对象存储在哪?注意到taskworker.py中根本没有创建Queue的代码,所以,Queue对象存储在taskmanager.py进程中:
在这里插入图片描述
Queue之所以能通过网络访问,就是通过QueueManager实现的。由于QueueManager管理的不止一个Queue,所以,要给每个Queue的网络调用接口起个名字,比如get_task_queue

authkey:是为了保证两台机器正常通信,不被其他机器恶意干扰。如果taskworker.py的authkey和taskmanager.py的authkey不一致,肯定连接不上。

注意Queue的作用是用来传递任务接收结果,每个任务的描述数据量要尽量小。比如发送一个处理日志文件的任务,就不要发送几百兆的日志文件本身,而是发送日志文件存放的完整路径,由Worker进程再去共享的磁盘上读取文件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1882655.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Unity小技巧】Unity字典序列化

字典序列化 在 Unity 中&#xff0c;标准的 C# 字典&#xff08;Dictionary<TKey, TValue>&#xff09;是不能直接序列化的&#xff0c;因为 Unity 的序列化系统不支持非 Unity 序列化的集合类型。可以通过手写字典实现 效果&#xff1a; 实现步骤&#xff1a; 继承ISe…

Python从0到100(三十三):xpath和lxml类库

1. 为什么要学习xpath和lxml lxml是一款高性能的 Python HTML/XML 解析器&#xff0c;我们可以利用XPath&#xff0c;来快速的定位特定元素以及获取节点信息 2. 什么是xpath XPath&#xff0c;全称为XML Path Language&#xff0c;是一种用于在XML文档中进行导航和数据提取的…

看不懂懂车大爆炸,你就错过了国产小车的王炸!

咦&#xff1f;咋的啦&#xff1f;咱中国自己的汽车品牌前几天在汽车工业协会公布的数据里一跃而起&#xff0c;真的是威风凛凛啊&#xff01;2023年咱们自家的乘用车品牌市场份额硬生生地占了个56%&#xff0c;这可是半壁江山啊&#xff01;特别是那些10万块钱以下的家用小车&…

K-Planes代码记录

随记 原文 K-Planes: Explicit Radiance Fields in Space, Time, and Appearance&#xff0c;又要换baseline&#xff0c;可是效果不好能怎么办呢&#xff0c;我可不可以发疯。k-planes的代码又是非常工程琐碎的&#xff0c;大佬的代码果然不顾小白死活。随便记录下整个过程。…

造一个交互式3D火山数据可视化

本文由ScriptEcho平台提供技术支持 项目地址&#xff1a;传送门 使用 Plotly.js 创建交互式 3D 火山数据可视化 应用场景 本代码用于将火山数据库中的数据可视化&#xff0c;展示火山的高度、类型和状态。可用于地质学研究、教育和数据探索。 基本功能 该代码使用 Plotly…

【面试系列】信息安全分析师高频面试题及详细解答

欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;欢迎订阅相关专栏&#xff1a; ⭐️ 全网最全IT互联网公司面试宝典&#xff1a;收集整理全网各大IT互联网公司技术、项目、HR面试真题. ⭐️ AIGC时代的创新与未来&#xff1a;详细讲解AIGC的概念、核心技术、…

基于TCP/QT/C++的网盘系统测试报告

目录 一、项目介绍 1、项目描述 2、项目组成模块 3、项目技术要点 二、用户功能测试 1、查看在线用户测试 1.1、运行服务器 1.2、登录两个账号 1.3、点击显示在线用户&#xff0c;可以看到jack和lucy 2、搜索用户测试 2.1、打开服务器&#xff0c;登录两个账号jack,lucy 2.2、在…

强化学习的数学原理:贝尔曼公式

大纲 这一节课程的大纲&#xff1a; 重点 对于这次课&#xff0c;重点是两个东西&#xff1a; Motivating examples(为什么需要贝尔曼公式) 首先要明白&#xff0c;为什么 return 是重要的&#xff1f; 之前其实就说过&#xff0c;return 能够帮助我们评价一个策略是好还是坏…

哪个牌子的超声波清洗器好?精选四大超强超声波清洗机力荐

生活中戴眼镜的人群不在少数&#xff0c;然而要维持眼镜的干净却不得不每次都需要清洗&#xff0c;只是通过手洗的方式实在太慢并且容易操作不当让镜片磨损更加严重&#xff01;所以超声波清洗机就诞生了&#xff01;超声波清洗机能够轻松清洗机眼镜上面的油脂污渍&#xff0c;…

Kubernetes的发展历程:从Google内部项目到云原生计算的基石

目录 一、起源与背景 1.1 Google的内部项目 1.2 Omega的出现 二、Kubernetes的诞生 2.1 开源的决策 2.2 初期发布 三、Kubernetes的发展历程 3.1 社区的成长 3.2 生态系统的壮大 3.3 重大版本和功能 3.4 多云和混合云的支持 四、Kubernetes的核心概念 4.1 Pod 4.…

有源晶振Output Load含义与供电范围解析

在深入探讨有源晶振的Output Load与供电范围之前&#xff0c;我们首先需要了解有源晶振的基本概念。有源晶振&#xff0c;又称为实时时钟模块或晶体振荡器&#xff0c;是一种能够提供稳定、精确频率信号的电子组件。与无源晶振不同&#xff0c;有源晶振内部包含了必要的电路&am…

机器学习复习总结

目录 第一章 机器学习的概念及其应用 1.1机器学习的特点&#xff1a; **1.2机器学习的分类&#xff1a; 1.2.1监督学习&#xff1a; 1.2.2无监督学习&#xff1a; 1.2.3强化学习&#xff1a; 1.2.4三种机器学习的区别 *1.3深度学习 1.3.1深度学习的实质&#xff1a; …

生产环境部署Nginx服务器双机热备部署-keepalived(多种模式教程)

前言&#xff1a;今天演示下生产环境keepalived的部署方式&#xff0c;安装模式有很多&#xff0c;比如说主备模型和双主模型&#xff0c;主备分&#xff1a;抢占模式 和 非抢占模式。这里我会一一展开说具体怎么配置一、双节点均部署Nginx&#xff1a; 第一步&#xff1a;上传…

烟台网站建设前需要了解哪些

在进行烟台网站建设之前&#xff0c;需要了解以下几个重要的方面&#xff1a; 1. 目标和定位&#xff1a;在建设网站之前&#xff0c;需要明确网站的目标和定位。是为了展示公司业务&#xff0c;还是为了销售产品&#xff0c;或者是为了提供信息和服务等。根据不同的目标和定位…

JAVA:常用的算法指南

请关注微信公众号&#xff1a;拾荒的小海螺 博客地址&#xff1a;http://lsk-ww.cn/ 1、简述 在软件开发过程中&#xff0c;算法扮演着关键的角色。它们用于解决各种问题&#xff0c;从数据处理到搜索、排序等。本文将介绍几种常见的算法及其 Java 实现&#xff0c;包括排序算…

防爆巡检终端在石化工厂安全保障中的应用

防爆巡检终端在石化工厂安全保障中的应用是广泛而关键的&#xff0c;其设计旨在确保在易燃易爆环境中进行安全、有效的巡检工作。以下是防爆巡检终端在石化工厂安全保障中的详细应用描述&#xff1a; 1. 环境监测与预警 防爆巡检终端配备了各种传感器&#xff0c;能够实时监测…

分享一个用于深入分析【大模型LLM】工作原理的工具

背景 LLM Transparency Tool 是一个用于深入分析和理解大型语言模型&#xff08;LLM&#xff09;工作原理的工具&#xff0c;旨在增加这些复杂系统的透明度。它提供了一个交互式界面&#xff0c;用户可以通过它观察、分析模型对特定输入&#xff08;prompts&#xff09;的反应…

【MySQL基础篇】函数及约束

1、函数 函数是指一段可以直接被另一段程序程序调用的程序或代码。 函数 - 字符串函数 MySQL中内置了很多字符串函数&#xff0c;常用的几个如下&#xff1a; 函数功能CONCAT(S1,S2,...,Sn)字符串拼接&#xff0c;将S1,S2,...,Sn拼接成一个字符串LOWER(str)将字符串str全部…

DDD学习笔记四

领域模型的构建 基础领域模型的基本组成有名称、属性、关联、职责、事件和异常 发掘领域概念3种策略&#xff1a; 1&#xff09;学习已有系统&#xff0c;重用已有模型 2&#xff09;使用分类标签。分类标签来源于领域&#xff0c;需要我们研究一些资料并做一些提炼。从采用5W…

解决 prettier/prettier 和 indent 冲突问题和一点简单思考

用过 prettier 的都知道&#xff0c;经常会遇到 prettier 与 eslint 的某些规则冲突的情况。在之前的一篇文章中&#xff0c;我简单地描述过怎么搭建起应用 eslint/prettier 的基本配置&#xff0c;也提到了怎么解决 prettier 与 eslint 的一些冲突问题。 其中有这么一段话&am…