基本说明
多进程是指在同一台计算机中同时运行多个独立的进程。每个进程都有自己的地址空间,可用于执行一些特定的任务。这些进程可以同时执行,从而提高了程序的性能和效率。多进程可以在多核计算机上实现真正的并行计算,可以同时运行多个程序,从而大大提高了计算机的利用率。
在多进程编程中,进程之间是独立的,它们各自运行自己的程序,有自己的地址空间和系统资源,不会相互干扰。每个进程都有自己的PID(进程标识符),它是一个唯一的标识符,用于在系统中识别该进程。多进程可以通过IPC(进程间通信)来实现数据共享和通信。
Python的多进程模块multiprocessing
提供了一种方便的方法来创建和管理多个进程。它可以在单个Python解释器中创建多个进程,从而实现并行计算。multiprocessing
模块还提供了一些方便的功能,如进程池、进程通信和共享内存等。
多进程编程可以在很多场景中提高程序的运行效率,如CPU密集型任务、I/O密集型任务、并行计算、大规模数据处理和机器学习等。但需要注意多进程之间的数据同步和通信,以避免数据竞争和死锁等问题。
import multiprocessing
from PIL import Image, ImageFilter
def process_image(filename, size, q):
"""使用高斯模糊对图像进行处理"""
img = Image.open(filename)
img = img.resize(size)
img = img.filter(ImageFilter.GaussianBlur(radius=5))
q.put(img)
if __name__ == '__main__':
filenames = ["image1.jpg", "image2.jpg", "image3.jpg"]
size = (800, 800)
q = multiprocessing.Queue()
processes = []
for filename in filenames:
p = multiprocessing.Process(target=process_image, args=(filename, size, q))
processes.append(p)
p.start()
for p in processes:
p.join()
images = []
while not q.empty():
images.append(q.get())
for i, img in enumerate(images):
img.save(f"processed_image_{i}.jpg")
这个程序使用多进程对多个图像进行处理。它将每个图像分配给一个进程来进行处理,并使用队列将处理后的图像收集起来。在这个例子中,我们对每个图像进行了大小调整和高斯模糊,然后将它们保存在磁盘上。
import multiprocessing
import requests
from bs4 import BeautifulSoup
def scrape_page(url, q):
"""从网页中提取信息"""
r = requests.get(url)
soup = BeautifulSoup(r.content, 'html.parser')
title = soup.title.string
q.put((url, title))
if __name__ == '__main__':
urls = ["http://www.example.com", "http://www.example.org", "http://www.example.net"]
q = multiprocessing.Queue()
processes = []
for url in urls:
p = multiprocessing.Process(target=scrape_page, args=(url, q))
processes.append(p)
p.start()
for p in processes:
p.join()
results = []
while not q.empty():
results.append(q.get())
for url, title in results:
print(f"{url}: {title}")
这个程序使用多进程来爬取多个网页的标题。它将每个URL分配给一个进程来处理,并使用队列将提取的标题收集起来。在这个例子中,我们使用requests
和BeautifulSoup
库来获取和解析网页。
进程间通信
Python的多进程模块multiprocessing
提供了多种进程间通信的方式,包括:
-
队列(Queue):队列是最常用的进程间通信方式之一,它是线程安全的,可以在多个进程之间共享数据。可以使用
multiprocessing.Queue
类来创建一个队列,进程之间可以使用put()
和get()
方法向队列中添加和获取数据。import multiprocessing def producer(q): for i in range(5): q.put(i) q.put(None) def consumer(q): while True: item = q.get() if item is None: break print(f"Consumed {item}") if __name__ == '__main__': q = multiprocessing.Queue() p1 = multiprocessing.Process(target=producer, args=(q,)) p2 = multiprocessing.Process(target=consumer, args=(q,)) p1.start() p2.start() p1.join() p2.join()
需要注意的是,在队列中添加一个特殊的值(如
None
)可以用来表示队列的结束。在这个示例中,生产者进程在添加完所有数据后,向队列中添加了一个None
,表示队列已经结束。消费者进程在获取到None
时,就知道队列已经结束,可以退出循环。 -
管道(Pipe):管道是另一种进程间通信方式,它可以在两个进程之间传递数据。可以使用
multiprocessing.Pipe()
函数创建一个管道,它返回两个连接对象,每个连接对象可以用于向另一个进程发送数据。import multiprocessing def send_data(conn): data = [1, 2, 3, 4, 5] conn.send(data) conn.close() def receive_data(conn): data = conn.recv() print(f"Received data: {data}") conn.close() if __name__ == '__main__': parent_conn, child_conn = multiprocessing.Pipe() p1 = multiprocessing.Process(target=send_data, args=(child_conn,)) p2 = multiprocessing.Process(target=receive_data, args=(parent_conn,)) p1.start() p2.start() p1.join() p2.join()
在这个示例中,我们创建了两个进程,一个发送进程和一个接收进程。发送进程向管道中发送一些数据,接收进程从管道中接收数据并进行处理。
在这个示例中,我们使用
multiprocessing.Pipe()
函数创建了一个管道,并将其分别传递给发送进程和接收进程。发送进程使用send()
方法将数据发送到管道中,接收进程使用recv()
方法从管道中接收数据。需要注意的是,管道是双向的,每个管道有两个连接对象,分别表示管道的两端。在这个示例中,我们使用
multiprocessing.Pipe()
函数创建了一个管道,并将其分别传递给发送进程和接收进程。管道的两个连接对象都可以用于发送和接收数据。在实际应用中,管道通常被用来进行进程之间的数据传递和协调,例如,我们可以将一个大型任务拆分成多个小任务,并将这些小任务分配给多个进程来处理。在处理完成后,这些进程可以使用管道来将结果传递给主进程,主进程可以将这些结果合并并进行汇总。
需要注意的是,管道通常比队列更快,但它只适用于两个进程之间的通信。如果需要进行多个进程之间的通信,应该使用队列或其他进程间通信方式。
-
共享内存(Value和Array):共享内存是一种特殊的进程间通信方式,它可以在多个进程之间共享数据。
multiprocessing
模块提供了Value
和Array
类来实现共享内存,Value
用于存储单个值,Array
用于存储多个值。import multiprocessing def modify_value(val, lock): with lock: val.value += 1 def modify_array(arr, lock): with lock: for i in range(len(arr)): arr[i] *= 2 if __name__ == '__main__': val = multiprocessing.Value('i', 0) arr = multiprocessing.Array('i', [1, 2, 3, 4, 5]) lock = multiprocessing.Lock() p1 = multiprocessing.Process(target=modify_value, args=(val, lock)) p2 = multiprocessing.Process(target=modify_array, args=(arr, lock)) p1.start() p2.start() p1.join() p2.join() print(f"Value: {val.value}") print(f"Array: {arr[:]}")
在这个示例中,我们创建了一个共享整数对象、一个共享数组对象和一个锁对象。在进程修改共享内存对象的值时,我们使用了
with lock:
语句来获取锁对象,以保证对共享内存对象的访问的同步和互斥。在修改完成后,锁会自动释放,以便其他进程可以继续访问共享内存对象。需要注意的是,在使用锁时,要尽可能地减小锁的粒度,避免出现死锁或性能问题。在实际应用中,共享内存通常被用来存储一些常用的数据,例如,程序配置信息、缓存数据等。共享内存通常比其他进程间通信方式更快,但由于数据共享的原因,也更容易出现问题。
-
锁(Lock):在多个进程之间共享数据时,需要使用锁来防止数据竞争。可以使用
multiprocessing.Lock
类来创建一个锁对象,进程可以使用acquire()
和release()
方法来获取和释放锁。import multiprocessing def increment_counter(counter, lock): for i in range(1000): with lock: counter.value += 1 if __name__ == '__main__': counter = multiprocessing.Value('i', 0) lock = multiprocessing.Lock() processes = [] for i in range(5): p = multiprocessing.Process(target=increment_counter, args=(counter, lock)) processes.append(p) p.start() for p in processes: p.join() print(f"Counter value: {counter.value}")
在这个示例中,我们创建了一个共享整数对象
counter
和一个锁对象lock
。进程可以使用with lock:
语句来获取锁对象,以保证对共享内存对象的访问的同步和互斥。在修改完成后,锁会自动释放,以便其他进程可以继续访问共享内存对象。在这个示例中,我们创建了5个进程,并将它们添加到进程列表中。每个进程都会执行
increment_counter()
函数,该函数使用with lock:
语句获取锁对象,并将共享整数对象counter
加1。在执行完成后,锁会自动释放。需要注意的是,在使用锁时,要尽可能地减小锁的粒度,避免出现死锁或性能问题。在实际应用中,锁通常被用来保护共享数据的读写操作,以避免数据竞争和死锁等问题。