Python多进程编程

news2024/9/24 9:20:17

一 多进程编程

Python实现多进程的方式有两种:一种方法是os模块中的fork方法,另一种是使用multiprocessing模块。

前者仅适用于LINUX/UNIX操作系统,对Windows不支持,后者则是跨平台的实现方式。

第一种方式:使用os模块中的fork方式实现多进程

import os
if __name__ == '__main__':
    print 'current Process (%s) start ...'%(os.getpid())
    pid = os.fork()
    if pid < 0:
        print 'error in fork'
    elif pid == 0:
        print 'I am child process(%s) and my parent process is (%s)',(os.getpid(),os.getppid())
    else:
        print 'I(%s) created a chlid process (%s).',(os.getpid(),pid)

第二种方式:multiprocessing

由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

  • 在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。
  • multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是用户进程的资源)。
  • 多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过共享内存和Manager的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。

Process.PID中保存有PID,如果进程还没有start(),则PID为None。

window系统下,需要注意的是要想启动一个子进程,必须加上那句if __name__ == "main",进程相关的要写在这句下面。

创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group实质上不使用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。join()方法实现进程间的同步。

#__author: greg
#date: 2017/9/19 23:52
from multiprocessing import Process
import time

def f(name):
    time.sleep(1)
    print('hello', name,time.ctime())

if __name__ == '__main__':
    p_list=[]
    for i in range(3):
        p = Process(target=f, args=('alvin',))
        p_list.append(p)
        p.start()
    for i in p_list:
        i.join()
    print('end')#一个主进程,三个子进程


# output:
# hello alvin Fri Nov 24 19:10:08 2017
# hello alvin Fri Nov 24 19:10:08 2017
# hello alvin Fri Nov 24 19:10:08 2017
# end

类式调用:

#__author: greg
#date: 2017/9/21 20:02
from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self):
        super(MyProcess, self).__init__()
        #self.name = name
    def run(self):
        time.sleep(1)
        print ('hello', self.name,time.ctime())

if __name__ == '__main__':
    p_list=[]
    for i in range(3):
        p = MyProcess()
        p.start()
        p_list.append(p)
    for p in p_list:
        p.join()
    print('end')
#output:
# hello MyProcess-1 Fri Nov 24 19:12:17 2017
# hello MyProcess-2 Fri Nov 24 19:12:17 2017
# hello MyProcess-3 Fri Nov 24 19:12:17 2017
# end

显示进程ID号:

#__author: greg
#date: 2017/9/21 20:16
from multiprocessing import Process
import os
import time
def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())#父进程号
    print('process id:', os.getpid())#进程号

def f(name):
    info('\033[31;1mfunction f\033[0m')
    print('hello', name)

if __name__ == '__main__':
    info('\033[32;1mmain process line\033[0m')
    time.sleep(10)
    p = Process(target=info, args=('bob',))
    p.start()
    p.join()

#output:
# main process line
# module name: __main__
# parent process: 1548 pycharm的进程号
# process id: 8416  Python进程号
# bob
# module name: __mp_main__
# parent process: 8416  Python进程号
# process id: 5556  info进程号

二 Process类

构造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 线程组,目前还没有实现,库引用中提示必须是None; 
  target: 要执行的方法; 
  name: 进程名; 
  args/kwargs: 要传入方法的参数。

实例方法:

  is_alive():返回进程是否在运行。

  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

  start():进程准备就绪,等待CPU调度

  run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

  terminate():不管任务是否完成,立即停止工作进程

属性:

  authkey

  daemon:和线程的setDeamon功能一样

  exitcode(进程在运行时为None、如果为–N,表示被信号N结束)

  name:进程名字。

  pid:进程号。

三 进程间通讯

不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

Queues 用来在多个进程间通信

1. 阻塞模式

import queue
import time

q = queue.Queue(10) #创建一个队列
start=time.time()
for i in range(10):
q.put('A')
time.sleep(0.5)
end=time.time()
print(end-start)

这是一段极其简单的代码(另有两个线程也在操作队列q),我期望每隔0.5秒写一个'A'到队列中,但总是不能如愿:
间隔时间有时会远远超过0.5秒。
原来,Queue.put()默认有 block = True 和 timeout两个参数。
源码:def put(self, item, block=True, timeout=None):
当 block = True 时,写入是阻塞式的,阻塞时间由 timeout确定。
当队列q被(其他线程)写满后,这段代码就会阻塞,直至其他线程取走数据。
Queue.put()方法加上 block=False 的参数,即可解决这个隐蔽的问题。
但要注意,非阻塞方式写队列,当队列满时会抛出 exception Queue.Full 的异常。

#__author: greg
#date: 2017/9/21 22:27
from multiprocessing import Process, Queue

def f(q,n):
    q.put([42, n, 'hello'])
    print('subprocess id',id(q))

if __name__ == '__main__':
    q = Queue()
    p_list=[]
    print('process id',id(q))
    for i in range(3):
        p = Process(target=f, args=(q,i))
        p_list.append(p)
        p.start()
    print(q.get())
    print(q.get())
    print(q.get())
    for i in p_list:
        i.join()

# output
# process id 2284856854176
# subprocess id 2607348001872
# [42, 0, 'hello']
# subprocess id 1712786975824
# [42, 2, 'hello']
# subprocess id 2254764977120
# [42, 1, 'hello']

Pipe常用来两个进程间进行通信,两个进程分别位于管道的两端

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())  # prints "[42, None, 'hello']"
    p.join()

Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。

send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。 

#__author: greg
#date: 2017/9/21 22:57
import multiprocessing
import random
import time,os

def proc_send(pipe,urls):
    for url in urls:
        print("Process(%s) send: %s" %(os.getpid(),url))
        pipe.send(url)
        time.sleep(random.random())

def proc_recv(pipe):
    while True:
        print("Process(%s) rev:%s" %(os.getpid(),pipe.recv()))
        time.sleep(random.random())

if __name__=="__main__":
    pipe=multiprocessing.Pipe()
    p1=multiprocessing.Process(target=proc_send,args=(pipe[0],['url_'+str(i)
                                                      for i in range(10)]))
    p2=multiprocessing.Process(target=proc_recv,args=(pipe[1],))
    p1.start()
    p2.start()
    p1.join()
    p2.terminate()

Manager()返回的管理器对象控制一个服务器进程,该进程持有Python对象,并允许其他进程使用代理来操纵它们。

#__author: greg
#date: 2017/9/21 23:10
from multiprocessing import Process, Manager
def f(d, l,n):
    d[n] = '1'
    d['2'] = 2
    d[0.25] = None
    l.append(n)
    # print(l)

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(5))
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l,i))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
        print(d)
        print(l)

四 进程同步

当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。

#__author: greg
#date: 2017/9/21 23:25
from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()
    for num in range(10):
        Process(target=f, args=(lock, num)).start()

五 进程池 Pool类

Pool可以提供指定数量的进程供用户使用,默认大小是CPU的核数。当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程来执行该请求

但如果池中的进程数已经达到规定的最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来处理它。

# -*- coding: utf-8 -*-
# 2017/11/24 20:15
from multiprocessing import Pool
import os, time, random

def run_task(name):
    print('Task %s (pid = %s) is running...' % (name, os.getpid()))
    time.sleep(random.random() * 3)
    print('Task %s end.' % name)

if __name__=='__main__':
    print('Current process %s.' % os.getpid())
    p = Pool(processes=3)
    for i in range(5):
        p.apply_async(run_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')


"""
Current process 9788.
Waiting for all subprocesses done...
Task 0 (pid = 5916) is running...
Task 1 (pid = 3740) is running...
Task 2 (pid = 6964) is running...
Task 2 end.
Task 3 (pid = 6964) is running...
Task 1 end.
Task 4 (pid = 3740) is running...
Task 0 end.
Task 3 end.
Task 4 end.
All subprocesses done.
"""
  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞
  • close()    关闭pool,使其不在接受新的任务。
  • terminate()    结束工作进程,不在处理未完成的任务。
  • join()    主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。

每次最多运行3个进程,当一个任务结束了,新的任务依次添加进来,任务执行使用的进程依然是原来的进程,这一点通过进程的pid可以看出来。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/372563.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C++修行之路】STL——模拟实现string类

文章目录前言类框架构造与析构c_str迭代器操作符重载[]&#xff1a;&#xff1a;> > < < !:reverse与resizereverseresizepush_back与append复用实现insert和erasec_str与流插入、流提取eraseswap(s1,s2)与s1.swap(s2)结语前言 这次我们分几个部分来实现string类…

spark第一章:环境安装

系列文章目录 spark第一章&#xff1a;环境安装 文章目录系列文章目录前言一、文件准备1.文件上传2.文件解压3.修改配置4.启动环境二、历史服务器1.修改配置2.启动历史服务器总结前言 spark在大数据环境的重要程度就不必细说了&#xff0c;直接开始吧。 一、文件准备 1.文件…

React Use Hook 尝鲜

React Use Hook 尝鲜 最近继续在找处理 React 异步调用的方式……主要是现在需求比较复杂&#xff0c;用 cache query 的方式去实现有那么一丢丢的麻烦&#xff0c;又不是很想用额外的包&#xff0c;所以就想看看有没有比较好的一些处理方式。 当然&#xff0c;可以用到生产环…

tkinter界面的TCP通信/tkinter开启线程接收TCP

前言 用简洁的语言写一个可以与TCP客户端实时通信的界面。之前做了一个项目是要与PLC进行信息交互的界面&#xff0c;在测试的时候就利用TCP客户端来实验&#xff0c;文末会附上TCP客户端。本文分为三部分&#xff0c;第一部分是在界面向TCP发送数据&#xff0c;第二部分是接收…

Linux基础命令-dd拷贝、转换文件

文章目录 dd 命令介绍 语法格式 基本参数 参考实例 1&#xff09;生成一个200M的新文件 2&#xff09;拷贝文件的100个字节 3&#xff09;将文件的字母全部转换成大写 4&#xff09;将linux自带的光盘制作成iso格式的镜像文件 5&#xff09;使用dd命令制作1G的交换分…

软考中级-操作系统

1 操作系统地位计算机系统由硬件和软件组成&#xff0c;未配置软件的称为裸机&#xff0c;但这会导致效率低下。操作系统是为弥补用户与硬件之间的鸿沟的一种系统软件&#xff0c;汇编、编译、解释、数据库管理系统等系统软件和其他应用软件都在此基础。2 进程管理又称处理机管…

Linux Ubuntu配置国内源

因为众所周知的原因&#xff0c;国外的很多网站在国内是访问不了或者访问极慢的&#xff0c;这其中就包括了Ubuntu的官方源。 所以&#xff0c;想要流畅的使用apt安装应用&#xff0c;就需要配置国内源的镜像。 市面上Ubuntu的国内镜像源非常多&#xff0c;比较有代表性的有清华…

pytorch学习日记之激活函数

常用的激活函数为S型&#xff08;sigmoid&#xff09;激活函数、双曲正切&#xff08;Tanh&#xff09;激活函数、线性修正单元&#xff08;ReLU&#xff09;激活函数等&#xff0c;对应Pytorch的函数如下所示 层对应的种类功能torch.nn.SigmoidSigmoid激活函数torch.nn.TanhT…

_vue-3

Vue3有了解过吗&#xff1f;能说说跟vue2的区别吗&#xff1f; 1. 哪些变化 从上图中&#xff0c;我们可以概览Vue3的新特性&#xff0c;如下&#xff1a; 速度更快体积减少更易维护更接近原生更易使用 1.1 速度更快 vue3相比vue2 重写了虚拟Dom实现编译模板的优化更高效的…

数据挖掘概述

目录1、数据挖掘概述2、数据挖掘常用库3、模型介绍3.1 分类3.2 聚类3.3 回归3.4 关联3.5 模型集成4、模型评估ROC 曲线5、模型应用1、数据挖掘概述 数据挖掘&#xff1a;寻找数据中隐含的知识并用于产生商业价值 数据挖掘产生原因&#xff1a;海量数据、维度众多、问题复杂 数…

直接拿项目运行npm start 会出现’react-scripts’ 不是内部或外部命令,也不是可运行的程序或批处理文件错误

目录 解决方案 原因 解决方案 npm install react-scripts或npm install安装完成后再次运行 npm start 即可 原因 create-react-app有丢包的缺陷&#xff0c;手动安装包后&#xff0c;需要重新npm install一下&#xff0c;这样node_modules/.bin/目录下才会重新出现react-s…

【论文阅读】基于LevelDB的分布式数据库研究

基于LevelDB的分布式数据库研究 基于LevelDB的分布式数据库的研究与实现 - 中国知网 (cnki.net) 实现了什么&#xff1f; 基于键值型NoSQL数据库LevelDB&#xff0c;并与数据一致性算法Raft、 数据分片和负载均衡相结合&#xff0c;设计并实现基于LevelDB的分布式数据库。 主要…

Wireshark “偷窥”浏览器与服务器三次握手

本文使用的是Wireshark 4.0.3, Java 11 编写简易服务器&#xff0c;客户端使用Chrome浏览器移动端开发或是前、后端开发又或是高大上的云计算都脱离不了网络&#xff0c;离开了网络的计算机就是一个孤岛&#xff0c;快速上手开发、背面试八股文固然有些急功近利&#xff0c;但确…

jstatd的启动方式与关闭方式

启动方式与注意事项&#xff1a; 启动方式&#xff1a; 前台启动不打印日志&#xff1a; jstatd -J-Djava.security.policyjstatd.all.policy -J-Djava.rmi.server.hostname服务器IP 前台启动并打印日志&#xff1a; ./jstatd -J-Djava.security.policyjstatd.all.policy -…

傻瓜式minio使用指南

傻瓜式minio使用指南1. docker部署minio1.1 docker拉取minio镜像1.2 创建docker容器1.3 查看docker容器是否启动正常2.登陆minio2.1 账户、密码为原先设置minioadmin2.2 创建桶2.3 设置桶属性3.Java客户端使用3.1引入依赖3.2 使用3.3 结果1. docker部署minio 1.1 docker拉取mi…

你应该知道的ChatGPT提示语

ChatGPT 自上线以来&#xff0c;凭借其优异的自然语言理解和输出能力&#xff0c;仅花 5天就成为了活跃用户过百万的现象级产品。而上一个现象级产品 instagram 花了 2 个半月。到目前为止 ChatGPT 在全球累计用户数量已经过亿&#xff0c;相信现在也有很多人在跟 ChatGPT 聊过…

Acwing 蓝桥杯 第二章 二分与前缀和

今天来补一下之前没写的总结&#xff0c;题是写完了&#xff0c;但是总结没写感觉没什么好总结的啊&#xff0c;就当打卡了789. 数的范围 - AcWing题库思路&#xff1a;一眼二分&#xff0c;典中典先排个序&#xff0c;再用lower_bound和upper_bound维护相同的数的左界和右界就…

Google Guice 4:Bindings(2)

4 Scopes (实例的作用域&#xff09; 4.1 默认规则&#xff1a;unreuse instance 到目前为止&#xff0c;通过bind().to()和Provides定义的binding&#xff0c;每次需要注入实例对象时&#xff0c;Guice都会创建一个新的实例 // 修改DatabaseTransactionLog&#xff0c;使其打…

【python学习笔记】:SQL常用脚本(二)

11、四舍五入ROUND函数 ROUND ( numeric_expression , length [ ,function ] ) function 必须为 tinyint、smallint 或 int。 如果省略 function 或其值为 0&#xff08;默认值&#xff09;&#xff0c;则将舍入 numeric_expression。 如果指定了0以外的值&#xff0c;则将截…

TypeScript笔记-进行中

学习来源&#xff1a; 本笔记由尚硅谷教学视频整理而来 文章目录学习来源&#xff1a;一.TS简介TypeScript是什么TypeScript增加了什么二环境搭建安装nvm环境搭建二.TypeScript中的基本类型类型声明类型类型示例代码三.编译配置自动编译文件自动编译整个项目四.使用webpack打包…