进程是应用程序正在执行的实体,当程序执行时,也就创建了一个主线程。进程在创建和执行需要占用一定的资源,如内存、文件、I/O设备等。
线程是CPU使用的基本单元,由主线程创建,并使用这个进程的资源,因此线程创建成本低,可以实现并发处理,充分利用CPU。
1. 使用进程
程序只是一堆静态的代码,而进程则是程序的运行过程。同一个程序执行两次,就是两个进程。一个进程就是一个正在运行的任务。对于单核CPU来说,同一时间只能处理一个任务,如果要实现多任务并发处理,可以多任务之间轮换执行。
multiprocessing
是多进程管理包,可以编写多进程和多线程。如编写多线程,使用multiprocessing.dummy即可,用法与multiprocessing基本相同。
常用组件和功能:
- 管理进程模块
Process
:用于创建进程模块。Pool
:用于创建管理进程池。Queue
:用于进程通信,资源共享。Value,Array
:用于进程通信,资源共享。Pipe
:用于管理通信。Manager
:用于资源共享。
- 同步子进程模块
Condition
:条件对象。Even
t:事件通信。Lock
:进程锁。RLock
:递归锁。Semaphore
:进程信号量。
使用多线程往往是用来处理CPU密集型(如科学计算)的需求,如果是IO密集型(如文件读取、爬虫等),则可以使用多线程去处理。
Process
是 multiprocessing 的子类,也是multiprocessing 的核心模块,用来创建子进程。可以实现多进程的创建、启动、关闭等操作。在multiprocessing中,每一个进程都用一个Process类表示。
multiprocessing.Process(group=None,target=None,name=None,args=(),kwargs={})
参数说明如下:
- group:线程组,目前还没有实现,参数值必须为None。
- target:表示当前进程启动时要执行的调用对象,一般为可执行的方法或函数。
- name:进程名称,相当于给当前进程取一个别名。
- args:表示传递给target函数的位置参数,格式为元组。
- kwargs:表示传递给target函数的关键字参数,格式为字典。
Process
对象包含的实例方法如下:
- is_alive():判断进程实例是否还在执行。
- join([timeout]):阻塞进程执行,直到进程终止,或者等待一段时间,具体时间由timeout(可选参数)设置,单位为s。
- start():启动进程实例。
- run():如果没有设置target参数,调用start()方法时,将执行对象的run()方法。
- terminate():不管任务是否完成,立即停止进程。
Process
对象的常用属性:
- name:进程名称。
- pid:进程ID,在进程被创造前返回None。
- exitcode:进程的退出码,如果进程没有结束,那么返回None;如果进程被信号N终结,则返回- N。
- authkey:进程的认证密钥,为一个字节串。
- sentinel:当进程结束时变为ready状态,可用于同时等待多个事件,否则用join()更简单些。
- daemon:将父进程设置为守护进程,当父进程结束时,子进程也结束。
# 新建一个test2.py文件,输入以下内容
from multiprocessing import Process
def foo(i):
print('say hi',i)
if __name__=='__main__':
for i in range(10):
p = Process(target=foo,args=(i,))
p.start()
# 执行同时输出以下10行数据
# 新建一个test3.py文件,输入以下内容
import multiprocessing # 导入multiprocessing模块
import time # 导入time模块
def worker(): # 处理任务
name = multiprocessing.current_process().name # 获取进程的名称
print(name,'Starting')
time.sleep(4) # 睡眠4s
print(name,'Exiting')
def my_service(): # 处理任务
name = multiprocessing.current_process().name # 获取进程的名称
print(name,'Starting')
time.sleep(5) # 睡眠5s
print(name,'Exiting')
if __name__ == '__main__': # 主进程
service = multiprocessing.Process( # 创建子进程1
name = 'my_service', # 修改进程名称
target = my_service # 调用对象
)
worker_1 = multiprocessing.Process( # 创建子进程2
name = 'worker 1', # 修改进程名称
target = worker # 调用对象
)
worker_2 = multiprocessing.Process( # 创建子进程3,保持默认的进程名称
target = worker # 调用对象
)
worker_1.start() # 启动进程1
worker_2.start() # 启动进程2
service.start() # 启动进程3
自定义进程:简单的任务,直接使用multiprocessing.Process实现多进程,而对于复杂的任务,通常自定义Process类,扩展Process功能。
# 新建一个test4.py文件,输入以下内容
from multiprocessing import Process # 导入 Process 类
import time,os # 导入time和os模块
class MyProcess(Process): # 自定义进程类,继承自Process
def __init__(self,name): # 重写初始化函数
super().__init__() # 调用父类的初始化函数
self.name = name # 重写name属性值
def run(self): # 重写 run方法
print('%s is running'%self.name,os.getppid()) # 打印子进程信息,os.getppid()获取父进程ID
time.sleep(3)
print('%s is done'%self.name,os.getpid()) # 打印子进程信息,os.getpid()获取子进程(当前进程)ID
if __name__ == '__main__':
p = MyProcess('子进程1') # 创建子进程
p.start() # 执行进程
print('主进程',os.getppid()) # 打印主进程ID
管道:Pipe
可以创建管道,用来在两个进程间进行通信,两个进程分别位于管道的两端。
Pipe([duplex])
# (conn1,conn2) = Pipe()
该方法返回两个链接对象(conn1,conn2) 元组,代表管道的两端。参数duplex为可选,默认值为True。
- 如果duplex为True,那么是双工模式,即conn1和conn2均可收发消息。
- 如果duplex为False,conn1只负责接收消息,conn2只负责发送消息。
实例化的Pipe对象拥有connection的方法,5种常用的方法如下:
- send(obj):发送数据。
- recv():接收数据。如果没有消息可接收,recv()方法一直阻塞。如果管道已经被关闭,那么recv()方法抛出EOFError错误。
- poll([timeout]):查看缓冲区是否有数据,可设置时间。如果timeout为None,则无限超时。
- send_bytes([buffer[,offset[,size]]):发送二进制字节数据。
- recv_bytes([maxlength]):接收二进制字节数据。
from multiprocessing import Process,Pipe # 导入 Process和Pipe
a,b = Pipe(True) # 如果改 a,b = Pipe(False)
a.send('Hi,b') # 发送数据
print(b.recv()) # 输出 Hi,b
# 一个可发送消息,另一个可以接受消息
# 新建一个test5.py文件,输入以下内容
from multiprocessing import Process,Pipe # 导入 Process和Pipe
def send(pipe): # send传输一个列表
pipe.send(['spam']+[42,'egg'])
pipe.close()
if __name__ == '__main__':
(conn1,conn2) = Pipe() # 创建两个Pipe实例
sender = Process(target=send,args=(conn1,)) # args一定是实例化后的Pipe变量,不能写args=(Pipe(),)
sender.start() # Process 类启动进程
print('conn2 got:%s'%conn2.recv()) # 管道的另一端conn2从send收到消息
conn2.close() # 关闭管道
# 管道可以同时发送和接收消息
# 新建一个test6.py文件,输入以下内容
from multiprocessing import Process,Pipe # 导入 Process和Pipe
def talk(pipe):
pipe.send(dict(name='Bob',spam=42)) # 传入一个字典
reply = pipe.recv() # 接收传输的数据
print('talker got:',reply)
if __name__ == '__main__':
(parentEnd,childEnd) = Pipe() # 创建两个Pipe()实例
child = Process(target=talk,args=(childEnd,)) # 创建一个Process进程,名称为child
child.start() # 启动进程
print('parent got:',parentEnd.recv()) # parentEnd是一个Pip()管道,可以接受child Process进程传输的数据
parentEnd.send({x*2 for x in 'spam'}) # 使用send方法来传输数据
child.join()
print('parent exit')
队列:Queue
可以创建队列,实现在多个进程间通信。Queue
是multiprocessing的子类
Queue([maxsize])
Queue
实例对象的常用方法:
- empty():判断队列是否为空,空返回True,否则返回False。
- full():判断队列是否已满,满返回True,否则返回False
- put(obj[,block[,timeout]]):写入数据。
- get([block[,timeout]]):读取数据。
- put_nowait():直接写入数据。
- get_nowait():直接读取数据。
- close():关闭队列。
- qsize():返回队列的大小。
from multiprocessing import Queue
q = Queue() # 创建一个队列对象
# 使用put方法往队列里面放值
q.put(1) # 添加数字1
q.put(2) # 添加数字2
q.put(3) # 添加数字3
# 使用get方法从队列取值。先进先出,后进后出原则
print(q.get()) # 1
print(q.get()) # 2
print(q.get()) # 3
print(q.full()) # False
print(q.empty()) # True
get()方法将从队列取值,并且把队列内被取出来的值删掉。如果get()没有参数情况下就是默认一直等着取值,就算队列里面没有可取的值,程序也不会结束,就会卡在那里一直等待。
# 新建一个test7.py文件,输入以下内容
from multiprocessing import Process,Queue # 导入 Process,Queue 类
def f(q,name,age): # 进程函数
q.put([name,age]) # 添加数据
if __name__ == '__main__':
q = Queue() # 创建一个Queue对象
p = Process(target=f,args=(q,'张三',18)) # 创建一个进程
p.start() # 执行进程
print(q.get()) # ['张三', 18]
p.join() # 阻塞进程
进程池:Pool
可以提供指定数量的进程供用户调用。
进程池对象 = Pool(进程池,初始函数,初始参数,最大任务数,上下文)
进程池对象常用方法:
- apply():执行进程函数。
- apply_async():异步执行进程函数。
- map():迭代执行进程函数。
- map_async():异步迭代执行进程函数。
- close():关闭进程池。
- terminal():结束工作进程。
- join():阻塞主进程等待退出。
# 新建一个test8.py文件,输入以下内容
import time
from multiprocessing import Pool # 导入Pool 类
def run(n): # 进程处理函数
time.sleep(1) # 阻塞 1 s
return n*n # 返回浮点数的平方
if __name__ == '__main__': # 主进程
testFL = [1,2,3,4,5,6] # 待处理的数列
print('顺序执行') # 但进程
s = time.time() # 计时开始
for fn in testFL:
run(fn)
e1 = time.time() # 计时结束
print('顺序执行时间:',int(e1-s)) # 计算所用时差
print('并行执行') # 创建多进程,并行执行
pool = Pool(6) # 创建6个进程数量的进程池
r1 = pool.map(run,testFL) # 并发执行运算
pool.close() # 关闭进程池,不再接受新的进程
pool.join() # 主进程阻塞等待子进程的退出
e2 = time.time() # 计时结束
print('并行执行时间:',int(e2-e1)) # 计算所用时差
print(r1) # 打印计算结果
进程锁:当多个进程使用同一资源时,容易引发数据安全或顺序混乱问题,这时可以考虑为进程加锁,使进程产生同步,确保数据的一致性。使用Lock可以创建锁。
lock = multiprocessing.Lock() # 创建锁
lock.acquire() # 获取锁
lock.release() # 释放锁
# 新建一个test9.py文件,输入以下内容
import os,time,random
from multiprocessing import Process,Lock,set_start_method
def work(lock,n):
lock.acquire()
print('%s:%s is runing'%(n,os.getpid()))
time.sleep(random.random())
print('%s:%s is down'%(n,os.getpid()))
lock.release()
if __name__ == '__main__':
set_start_method('fork')
lock = Lock()
for i in range(3): # 利用for循环模拟多线程
p = Process(target=work,args=(lock,i))
p.start()
# 使用加锁形式实现了顺序执行,保证了数据的安全,类似于数据库的事务。
2.使用线程
进程是执行的应用程序,而线程是进程的执行序列,一个进程可以有多个线程,线程(Thread)也叫轻量级进程,多线程类似于同时执行多个不同的程序。
多线程运行优点如下:
- 进程之间不能共享内存,但线程之间可以共享内存。
- 操作系统在创建进程时,需要为该进程重新分配系统资源,但创建线程的代价则小的多因此使用多线程实现多任务并发执行比使用多进程的效率高。
多线程编程的优势如下:
- 使用线程可以把占据长时间程序的任务放到后台处理。
- 用户界面可以更加吸引人,如用户点击一个按钮去触发某些事件的处理,可以弹出一个进度条来显示处理的进度。
- 程序的运行速度可能加快。
在一些等待的任务实现上如用户输入、文件读写和网络收敛数据等,线程的优势较明显。
使用Thread
构造器可以创建线程,Thread
是threading模块最核心的类,每个Threa对象代表一个线程,每个线程可以独立处理不同的任务。
Thread(group=None,target=None,name=None,args=(),kwargs={})
# 新建一个test11.py文件,输入以下内容
import time
import threading
def f(n):
print('线程运算',n)
time.sleep(1)
if __name__ == '__main__':
a = time.time()
for i in range(5):
# f(i+1) # 单线程运算:5s
t = threading.Thread(target=f,args=(i+1,)) # 多线程运算:不到1s
t.start()
b = time.time()
print('花费实际:%2f'%(b-a))
自定义线程:继承 threading.Thread 自定义线程类,其本质是重构Thread类中的run方法。
# 新建一个test12.py文件,输入以下内容
# 自定义Threading
import time
import threading
class MyThread(threading.Thread): # 以继承的方式实现线程创建
def __init__(self,n): # 重写初始化函数
super(MyThread,self).__init__() # 重构run函数必须重写
self.n = n
def run(self): # 重写run函数
print('task',self.n)
time.sleep(1)
print('2s')
time.sleep(1)
print('1s')
time.sleep(1)
print('0s')
if __name__ == '__main__':
t1 = MyThread('t1') # 实例化线程对象
t2 = MyThread('t2')
t1.start() # 执行线程
t2.start()
线程锁:在多线程中,所有变量都由所有线程共享,任何一个变量都可以被任何一个线程修改。因此,线程之间共享数据同时改变一个变量时会把内容改乱。为了确保一个线程在修改变量时,别的线程一定不能改该变量,这就是锁。
Lock
对象有两个基本方法:
- acquire():可以阻塞或非阻塞地获取锁。
- release():释放一个锁。
线程锁的优点:确保某段关键代码只能由一个线程从头到尾完整地执行。
线程锁的缺点如下:
- 阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率降低了。
- 由于存在多个锁,不同线程持有不同的锁,可能回造成死锁,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止。
# 新建一个test13.py文件,输入以下内容
import time
import threading
t = 0
lock = threading.Lock() # 创建Lock对象
def run_thread(n): # 线程处理函数
global t # 声明全局变量
for i in range(1000000): # 无数次重复操作,对变量执行先存后取相同的值
lock.acquire() # 获取锁
try:
t = t + n
t = t - n
finally:
lock.release() # 释放锁
t1 = threading.Thread(target=run_thread,args=(5,)) # 创建线程
t2 = threading.Thread(target=run_thread,args=(8,))
t1.start() # 开始执行线程
t2.start()
t1.join() # 阻塞线程
t2.join()
print(t) # 0
递归锁:RLock
允许在同一线程中多次调用acqire(),不回阻塞程序,这种锁称为递归锁。acquire和release必须成对出现,即调用了n次acquire()方法,就必须调用n次release()方法,才能真正释放所占用的锁。
# 新建一个test13.py文件,输入以下内容
import time
import threading
t = 0
rlock = threading.RLock() # 创建Lock对象
def run_thread(n): # 线程处理函数
global t # 声明全局变量
for i in range(1000000): # 无数次重复操作,对变量执行先存后取相同的值
rlock.acquire() # 获取锁
rlock.acquire() # 获取锁
try:
t = t + n
t = t - n
finally:
rlock.release() # 释放锁
rlock.release() # 释放锁
t1 = threading.Thread(target=run_thread,args=(5,)) # 创建线程
t2 = threading.Thread(target=run_thread,args=(8,))
t1.start() # 开始执行线程
t2.start()
t1.join() # 阻塞线程
t2.join()
print(t) # 0
条件对象:允许一个或多个线程在被其它线程通知之前处于等待中。Condition 时 threading 模块的一个子类,用于维护多线程之间的同步协作。内部使用的也是Lock或Rlock,同时增加了等待池功能。常见方法如下:
- acquire():请求底层锁。
- release():释放底层锁。
- wait():等待直到被通知或发送超时。
- wait_for():等待,直接条件计算为真。
- notify(n=1):默认唤醒一个等待这个条件的线程。
- notify_all():唤醒所有正在等待这个条件的线程。
import time
import threading
class Test1(threading.Thread):
def __init__(self,name,cond):
super().__init__()
self.name = name
self.cond = cond
def run(self):
with self.cond:
print(self.name,':1')
time.sleep(1)
print(self.name,':3')
class Test2(threading.Thread):
def __init__(self,name,cond):
super().__init__()
self.name = name
self.cond = cond
def run(self):
with self.cond:
print(self.name,':2')
time.sleep(1)
print(self.name,':4')
cond = threading.Condition()
a = Test1('A',cond)
b = Test2('B',cond)
a.start()
b.start()
事件通信:事件用于主线程控制其他线程的执行,事件是一个简单的线程同步对象,其主要提供以下几个方法:
- is_set():当且仅当内部标志为True时返回True。
- set():将内部标志设置为True。所有正在等待这个事件的线程将被唤醒。当标志为True时,调用wait()方法的线程不回被阻塞。
- clear():将内部标志设置为False。
- wait():等待设置标志。