第十三章 进程与线程
程序与进程的概念
程序:
英文单词为Program,是指一系列有序指令的集合,使用编程语言所编写,用于实现一定的功能。
进程:
进程则是指启动后的程序,系统会为进程分配内存空间。
函数式创建子进程
在Python中创建进程有两种方式:
第一种使用os模块中的fork函数,但该函数只适用于Unix、Linux和MacOS。
Windows只能使用第二种方式:使用 multiprocessing模块 中的 Process类 创建进程。
第二种方式创建进程的语法结构:
Process(group = None,target,name,args,kwargs)
参数说明:
- group:表示分组,实际上不使用,值默认为None即可
- target:表示子进程要执行的任务,支持函数名
- name:表示子进程的名称
- args:表示调用函数的位置参数,以元组的形式进行传递
- kwargs:表示调用函数的关键字参数,以字典的形式进行传递
【注】:
target支持函数名,即函数可以作为这个参数传入(函数作为参数时不带小括号,函数调用时才带小括号)。如果target传入的函数有位置参数,就需要用args进行传入;如果target传入的函数有关键字参数,就需要用kwargs进行传入。
因此group,name可以不写;target必须写;args、kwargs有就写,没有就不写。
# 使用内置模块multiprocessing中的Process类创建进程
from multiprocessing import Process
import os, time
# 函数中的代码是子进程要执行的代码
def test():
print(f'我是子进程,我的PID是:{os.getpid()},我的父进程是:{os.getppid()}')
time.sleep(1)
if __name__ == '__main__':
# main是主进程
print('主进程开始执行')
lst = []
# 创建五个子进程
for i in range(5):
# 创建子进程
p = Process(target=test) # 函数作为参数时不加小括号,函数调用时才加小括号
# 启动子进程
p.start()
# 启动中的进程添加到列表中
lst.append(p)
print('主进程执行结束')
这里会发现子进程还没执行结束时,主进程就已经执行结束了。
原因见下方图示:
那么怎么让所有子进程执行结束之后主进程再执行结束呢?——需要用到 join() 进行阻塞。
# 使用内置模块multiprocessing中的Process类创建进程
from multiprocessing import Process
import os, time
# 函数中的代码是子进程要执行的代码
def test():
print(f'我是子进程,我的PID是:{os.getpid()},我的父进程是:{os.getppid()}')
time.sleep(1)
if __name__ == '__main__':
# main是主进程
print('主进程开始执行')
lst = []
# 创建五个子进程
for i in range(5):
# 创建子进程
p = Process(target=test) # 函数作为参数时不加小括号,函数调用时才加小括号
# 启动子进程
p.start()
# 启动中的进程添加到列表中
lst.append(p)
# 当所有子进程执行结束后主进程再执行结束--->使用join()阻塞主进程
# 遍历lst,列表中五个子进程
for item in lst: # item的数据类型是Process类型
# join()-->主进程和子进程在一起执行时,看到join()后主进程便不再执行了,先让子进程执行,子进程执行结束之后再去执行主进程
item.join() # join()是Process对象的方法,作用是阻塞主进程
# 有了join()后主进程要等到所有子进程执行完毕之后,主进程才会执行结束
print('主进程执行结束')
Process类常用的属性和方法
Process类中常用的属性和方法:
方法/属性名称 | 功能描述 |
---|---|
name | 当前进程实例别名,默认为Process-N |
pid | 当前进程对象的PID值 |
is_alive() | 进程是否执行完,没执行完结果为True,否则为False |
join(timeout) | 等待结束或等待timeout秒 |
start() | 启动进程 |
run() | 如果没有指定target参数,则启动进程后,会调用父类(Process)中的run()方法 |
terminate() | 强制终止进程 |
from multiprocessing import Process
import os, time
# 函数中的代码是子进程要执行的代码
# 函数式方式创建子进程
def sub_process(name):
print(f'子进程PID:{os.getpid()},父进程PID是:{os.getppid()}----{name}')
time.sleep(1)
def sub_process2(name):
print(f'子进程PID:{os.getpid()},父进程PID是:{os.getppid()}----{name}')
time.sleep(1)
if __name__ == '__main__':
# main是主进程
print('主进程开始执行')
for i in range(3):
# 创建第一种子进程
# sub_process()有一个位置参数,因此需要用args传入该函数的位置参数,以元组方式
p1 = Process(target=sub_process, args=('lxl',))
# 创建第二种子进程
# sub_process2()有一个位置参数,因此需要用args传入该函数的位置参数,以元组方式
p2 = Process(target=sub_process2, args=(18,))
# 调用start()启动子进程
p1.start()
p2.start()
print(p1.name, '是否处于活跃状态:', p1.is_alive())
print(p2.name, '是否处于活跃状态:', p2.is_alive())
print(p1.name, 'PID是:', p1.pid)
print(p2.name, 'PID是:', p2.pid)
# 阻塞主进程
p1.join() # 主进程要等待p1执行结束
p2.join() # 主进程要等待p2执行结束
print(p1.name, '是否处于活跃状态:', p1.is_alive())
print(p2.name, '是否处于活跃状态:', p2.is_alive())
print('-' * 50)
print('主进程执行结束')
如果将以下两句注释掉:
# p1.join() # 主进程要等待p1执行结束
# p2.join() # 主进程要等待p2执行结束
则会出现:
具体原因同上面一样。
from multiprocessing import Process
import os, time
# 函数中的代码是子进程要执行的代码
# 函数式方式创建子进程
def sub_process(name):
print(f'子进程PID:{os.getpid()},父进程PID是:{os.getppid()}----{name}')
time.sleep(1)
def sub_process2(name):
print(f'子进程PID:{os.getpid()},父进程PID是:{os.getppid()}----{name}')
time.sleep(1)
if __name__ == '__main__':
# main是主进程
print('主进程开始执行')
for i in range(3):
# 创建第一种子进程
# 若没有给定target参数,则不会执行自己编写的函数中的代码,会调用执行Process类中run方法
p1 = Process(target=sub_process, args=('lxl',))
# 创建第二种子进程
p2 = Process(target=sub_process2, args=(18,))
# 调用start()启动子进程
p1.start() # 如果Process类创建对象时没有指定target参数,则会调用Process类中的run方法去执行
p2.start() # 如果Process类创建对象时指定了target参数,start()调用target指定的函数去执行
# 强制终止进程
p1.terminate() # target指定的函数还没有执行,进程就会被终止
p2.terminate()
print('主进程执行结束')
继承式创建子进程
除了使用函数式创建子进程,亦可以使用Python面向对象中的继承创建子进程。
使用继承创建进程的语法结构:
class 子进程(Process):
pass
即编写一个类,这个类继承Process类,然后重写Process类中的run()方法。
from multiprocessing import Process
import os, time
# 自定义一个类
class SubProcess(Process):
# 编写初始化方法
def __init__(self, name):
# 调用父类中的初始化方法
super().__init__()
self.name = name
# 重写父类中的run()方法
def run(self):
print(f'子进程的名称:{self.name},PID:{os.getpid()},父进程的PID:{os.getppid()}')
if __name__ == '__main__':
print('主进程开始执行')
lst = []
for i in range(1, 6):
p1 = SubProcess(f'进程:{i}')
# 启动进程
p1.start() # SubProcess类中没有start()方法,实际上会去调用父类中的start()方法
lst.append(p1)
# 阻塞主进程
for item in lst:
item.join()
print('主进程执行结束')
进程池的使用
当进程数量比较少时,可以使用上面的两种方法Process类或Process的子类创建进程。但如果要创建的进程数量很多,再使用上述的方法创建进程和销毁进程就会耗费大量时间,这时可以使用multiprocessing模块中的 Pool类 创建进程池。
进程池的原理:
创建一个进程池,并设置进程池中最大的进程数量。假设进程池中最大的进程数为3,现在有10个任务需要执行,那么进程池一次可以执行3个任务,4次即可完成全部任务的执行。
创建进程池的语法结构:
进程池对象 = Pool(N)
进程池对象的方法:
方法名 | 功能描述 |
---|---|
apply_async(func,args,kwargs) | 使用非阻塞方式调用函数func |
apply(func,args,kwargs) | 使用阻塞方式调用函数func |
close() | 关闭进程池,不再接收新任务 |
terminate() | 不管任务是否完成,立即终止 |
join() | 阻塞主进程,必须在terminate()或close()之后使用 |
使用进程池:非阻塞和阻塞方式调用函数func有什么区别:
先来看非阻塞方式:
from multiprocessing import Pool
import os, time
# 编写函数(即要执行的任务)
def task(name):
print(f'子进程的PID:{os.getppid()},执行的任务:{name}')
time.sleep(1)
# 使用进程池:非阻塞和阻塞方式调用函数func有什么区别
if __name__ == '__main__':
# 主进程
start = time.time()
print('主进程开始执行')
# 创建进程池
p = Pool(3)
# 创建任务
for i in range(1, 11):
# 以非阻塞方式
# task()有一个位置参数,因此需要用args传入该函数的位置参数,以元组方式
p.apply_async(func=task, args=(i,)) # 函数作为参数不带小括号,调用才带小括号
p.close() # 关闭进程池,不再接收新任务
p.join() # 阻塞父进程,等待所有的子进程执行完毕之后,才会执行父进程中的代码
print('所有的子进程执行完毕,父进程执行结束')
print(time.time() - start)
再来看阻塞方式(只需将 apply_async 方法改成 apply 方法即可):
from multiprocessing import Pool
import os, time
# 编写函数(即要执行的任务)
def task(name):
print(f'子进程的PID:{os.getppid()},执行的任务:{name}')
time.sleep(1)
# 使用进程池:非阻塞和阻塞方式调用函数func有什么区别
if __name__ == '__main__':
# 主进程
start = time.time()
print('主进程开始执行')
# 创建进程池
p = Pool(3)
# 创建任务
for i in range(1, 11):
# 以阻塞方式
# task()有一个位置参数,因此需要用args传入该函数的位置参数,以元组方式
p.apply(func=task, args=(i,)) # 函数作为参数不带小括号,调用才带小括号
p.close() # 关闭进程池,不再接收新任务
p.join() # 阻塞父进程,等待所有的子进程执行完毕之后,才会执行父进程中的代码
print('所有的子进程执行完毕,父进程执行结束')
print(time.time() - start)
并发和并行的概念
并发:
是指两个或多个事件同一时间间隔发生,多个任务被交替轮换着执行。比如A事件是吃苹果,在吃苹果的过程中有快递员敲门让你收下快递,收快递就是B事件,那么收完快递继续吃没吃完的苹果。这就是并发。
并行:
指两个或多个事件在同一时刻发生,多个任务在同一时刻在多个处理器上同时执行。比如A事件是泡脚,B事件是打电话,C事件是记录电话内容,这三件事则可以在同一时刻发生,这就是并行。
进程之间数据是否共享
结论:
进程之间数据是不共享的。
from multiprocessing import Process
a = 100
def add():
print('子进程1开始执行')
global a
a += 30
print('a=', a)
print('子进程1执行完毕')
def sub():
print('子进程2开始执行')
global a
a -= 50
print('a=', a)
print('子进程2执行完毕')
if __name__ == '__main__':
# 主进程
print('主进程开始执行')
print('a=', a)
# 创建 加法 的子进程
p1 = Process(target=add)
# 创建 减法 的子进程
p2 = Process(target=sub)
# 启动子进程
p1.start()
p2.start()
# 阻塞主进程
p1.join()
p2.join()
print('主进程执行结束')
print('a=', a)
图示分析:
队列的基本使用
前面说到进程之间的数据是不共享的,但是可以通过队列来实现进程之间的数据共享。
进程之间可以通过队列(Queue)进程通信,队列是一种先进先出(First In First Out)的数据结构。
创建队列的语法结构:
队列对象 = Queue(N)
在multiprocessing模块中有一个Queue类。
Queue类中的常用方法:
方法名称 | 功能描述 |
---|---|
qsize() | 获取当前队列包含的消息数量 |
empty() | 判断队列是否为空,为空结果为True,否则为False |
full() | 判断队列是否满了,满结果为True,否则为False |
get(block=True) | 获取队列中的一条消息,然后从队列中移除(出队),block默认值为True |
get_nowait() | 相当于get(block=False),消息队列为空时,抛出异常 |
put(item,block=True) | 将item消息放入队列,block默认为True |
put_nowait(item) | 相当于put(item,block=False) |
注意:
在上面的方法中参数 block
表示 阻塞
的意思。使用get(block=True)【表示进行等待】,当队列为空并不会报错,而且会一直等待,等到队列中有消息时再取出一条消息。使用get_nowait()【相当于get(block=False)表示不进行等待】,当队列为空会报错。使用put(item,block=True)【表示进行等待】,当队列满时并不会报错,而且会一直等待,等到队列中有空位置时再入队。使用put_nowait(item)【put(item,block=False)表示不等待】,当队列满时会报错。
from multiprocessing import Queue
if __name__ == '__main__':
# 创建一个队列
q = Queue(3) # 最多可以接收3条信息
print('队列是否为空:', q.empty()) # True
print('队列是否为满:', q.full()) # False
# 向队列中添加消息
q.put('hello')
q.put('world')
print('队列是否为空:', q.empty()) # False
print('队列是否为满:', q.full()) # False
q.put('Python')
print('队列是否为空:', q.empty()) # False
print('队列是否为满:', q.full()) # True
print('队列当中信息的个数:', q.qsize())
# 出队
print(q.get())
print('队列当中信息的个数:', q.qsize())
# 入队
q.put_nowait('html')
# q.put_nowait('sql') # 报错,queue.Full
# q.put('sql') # 不报错,会一直等待,等到队列中有空位置在入队
# 遍历
if not q.empty():
for i in range(q.qsize()):
print(q.get_nowait()) # nowait() 不等待
print('队列是否为空:', q.empty()) # True
print('队列是否为满:', q.full()) # False
print('队列当中信息的个数:', q.qsize())
使用队列实现进程之间的通信
前面说过使用put(item,block=True)【表示进行等待】,当队列满时并不会报错,而且会一直等待,等到队列中有空位置时再入队。
而事实上还有一个参数put(item,block=True,timeout=n),timeout表示等待n秒后,若队列还没有空位置就抛异常
。
from multiprocessing import Queue
if __name__ == '__main__':
# 创建一个队列
q = Queue(3) # 最多可以接收3条信息
# 向队列中添加元素(入队)
q.put('hello')
q.put('world')
q.put('Python')
q.put('html', block=True, timeout=2) # 等待2秒之后,队列还没有空位置就抛异常
进程之间可以使用队列进行通信,实际上就是入队和出队操作,两个进程,一个进程负责入队,另一个进程负责出队即可。
下面例子,使用队列实现进程之间通信,去操作全局变量的值:
# 使用队列实现进程之间通信。注意这个队列应该是共享的,写和读用的是同一个队列
from multiprocessing import Queue, Process
import time
a = 100
def writ_msg(q): # q队列
global a
if not q.full():
for i in range(6): # 写6条消息
a -= 10
q.put(a) # 入队
print('a入队时的值:', a)
# 出队
def read_msg(q):
time.sleep(1)
while not q.empty():
print('出队时a的值:', q.get())
if __name__ == '__main__':
print('父进程开始执行')
q = Queue() # 由父进程创建队列,没有指定参数,表示可接收的消息个数没有上限
# 创建两个子进程
p1 = Process(target=writ_msg, args=(q,))
p2 = Process(target=read_msg, args=(q,))
# 启动两个子进程
p1.start()
p2.start()
# 等待写的进程执行结束,再去执行主进程
p1.join()
p2.join()
print('父进程执行结束')
函数式创建线程
一个应用程序内多任务的方式采用的是多进程,一个进程内的多任务方式采用多线程。
线程:
线程是CPU可调度的最小单位,被包含在进程中,是进程中实际的运作单位。一个进程中可以拥有N多个线程并发执行,而每个线程并行执行不同的任务。
threading模块中有Thread类。
函数式创建线程的语法结构(类似进程):
t = Thread(group,target,name,args,kwargs)
参数说明:
- group:创建线程对象的进程组
- target:创建的线程对象所要执行的目标函数
- name:创建线程对象的名称,默认为“Thread-n”
- args:用元组以位置参数的形式传入target对应函数的参数
- kwargs:用字典以关键字参数的形式传入target对应函数的参数
import threading
from threading import Thread
import time
# 编写函数
def test():
for i in range(3):
time.sleep(1)
print(f'线程:{threading.current_thread().name}正在执行{i}')
if __name__ == '__main__':
start = time.time()
print('主线程开始执行')
# 创建两个线程,使用列表生成式
lst = [Thread(target=test) for i in range(2)]
for item in lst: # item的数据类型是Thread类型
# 启动线程
item.start()
# 阻塞主线程
for item in lst:
item.join()
print(f'一共耗时{time.time() - start}')
'''
多个线程并发执行,每个线程并行执行不同的任务。
该程序中共有一个进程(main),三个线程(main,Thread-1,Thread-2)。
三个线程并行执行的任务是:
主线程负责执行main中的代码
Thread-1线程执行test()三次循环
Thread-2线程执行test()三次循环
三个线程又是并发执行,Thread-1和Thread-2谁先执行不一定。
'''
继承式创建线程
使用Thread子类创建线程的操作步骤(类似进程):
- 自定义类继承threading模块下的Thread类
- 重写run()方法
import threading
from threading import Thread
import time
class SubThread(Thread):
# 重写run()方法
def run(self):
for i in range(3):
time.sleep(1)
print(f'线程:{threading.current_thread().name}正在执行{i}')
if __name__ == '__main__':
print('主线程开始执行')
# 继承式创建两个线程,使用列表生成式
lst = [SubThread() for i in range(2)]
for item in lst: # item的数据类型是Thread类型
# 启动线程
item.start()
# 阻塞主线程
for item in lst:
item.join()
print('主线程执行结束')
线程之间数据共享
进程之间的数据不共享,线程之间的数据共享。这是因为线程之间会共享该进程中的资源。
from threading import Thread
a = 100 # 全局变量
def add():
print('加线程开始执行')
global a
a += 30
print(f'a的值为:{a}')
print('加的线程执行结束')
def sub():
print('减线程开始执行')
global a
a -= 50
print(f'a的值为:{a}')
print('减的线程执行结束')
if __name__ == '__main__':
print('主线程开始执行')
print(f'全局变量a的值:{a}')
# 线程
add = Thread(target=add)
sub = Thread(target=sub)
# 启动
add.start()
sub.start()
# 阻塞主线程
add.join() # 当加线程执行完毕主线程才能继续执行
sub.join() # 当减线程执行完毕主线程才能继续执行
print(f'主线程执行结束,a的值:{a}')
图示分析:
多个线程共享数据带来的问题以及Lock锁的使用
在上一小节中知道线程共享该进程的资源,所以全局变量a,被这个进程中的所有线程共享。
又因为多个线程是并发执行的,执行顺序无法确定(由CPU调度决定),所以这就可能造成数据错乱。
下面看一个多线程操作共享数据带来的安全性问题:
# 多线程操作共享数据的安全性问题
import threading
from threading import Thread
import time
ticket = 50 # 全局变量。代表50张票
def sale_ticket():
global ticket
# 每个排队窗口(线程)假设有100人
for i in range(100): # 每个线程要执行100次循环
if ticket > 0:
print(f'{threading.current_thread().name}正在出售第{ticket}张票')
ticket -= 1
time.sleep(1)
if __name__ == '__main__':
for i in range(3): # 创建三个线程,代表3个排队窗口
t = Thread(target=sale_ticket)
t.start() # 启动线程
解决这个问题可以采用Lock锁。
当线程A去操作共享数据时,对共享数据加一把锁,这时如果线程B想要操作共享数据就只能等待而不可以操作。只有当锁解开时才允许其他线程进行操作。
Lock对象的acquire()、release()方法:
使用Lock锁机制可以解决共享数据带来的安全性问题,那么如何锁,把锁上在什么地方呢?
在使用Lock锁时要把尽量少的、不耗时的代码放到锁中。
threading模块中的Lock类。
# 使用Lock锁解决多线程操作共享数据的安全性问题
import threading
from threading import Thread, Lock
import time
ticket = 50 # 全局变量。代表50张票
lock_obj = Lock() # 创建锁对象
def sale_ticket():
global ticket
# 每个排队窗口(线程)假设有100人
for i in range(100): # 每个线程要执行100次循环
lock_obj.acquire() # 上锁
if ticket > 0:
print(f'{threading.current_thread().name}正在出售第{ticket}张票')
ticket -= 1
time.sleep(1)
lock_obj.release() # 释放锁
if __name__ == '__main__':
for i in range(3): # 创建三个线程,代表3个排队窗口
t = Thread(target=sale_ticket)
t.start() # 启动线程
使用Lock锁降低了并发度,但可以解决多线程操作共享数据的安全性问题。
生产者与消费者问题
生产者与消费者模式:
是线程模型中的经典问题,与编程语言无关。当程序中出现了明确的两类任务,一个任务负责生产数据,一个任务负责处理生产的数据时就可以使用该模式。
生产者与消费者问题会出现等待与唤醒问题,如果中间仓库满,生产者等待;如果中间仓库空,消费者等待。
使用Python实现生产者与消费者模式需要用到Python内置模块queue中的Queue类。【注意在进程通信中提到的队列是multiprocessing模块中的Queue类】
Python内置模块queue中的Queue类中常用的方法:
方法名称 | 功能描述 |
---|---|
put(item) | 向队列中放置数据,如果队列为满,则阻塞 |
get() | 从队列中取走数据,如果队列为空,则阻塞 |
join() | 如果队列不为空,则等待队列变为空 |
task_done() | 消费者从队列中取走一项数据,当队列变为空时,唤醒调用join()的线程 |
# 生产者与消费者模式
from queue import Queue
from threading import Thread
import time
# 创建一个生产者类
class Producer(Thread):
def __init__(self, name, queue):
Thread.__init__(self, name=name)
self.queue = queue
def run(self):
for i in range(1, 6):
print(f'{self.name}将产品{i}放入队列')
self.queue.put(i)
time.sleep(1)
print('生产者完成了所有数据的存放')
# 创建一个消费者类
class Consumer(Thread):
def __init__(self, name, queue):
Thread.__init__(self, name=name)
self.queue = queue
def run(self):
for i in range(5):
value = self.queue.get()
print(f'消费者线程{self.name}取出了{value}')
time.sleep(1)
print('消费者线程完成了所有数据的取出')
if __name__ == '__main__':
# 创建队列
queue = Queue() # 该队列表示中间仓库。
# 创建生产者线程
p = Producer('Producer', queue)
# 创建消费者线程
c = Consumer('Consumer', queue)
# 启动线程
p.start()
c.start()
# 阻塞主线程
p.join()
c.join()
print('主线程执行结束')