在Python中,并发编程的实现有多种方式,包括多线程、多进程和异步编程。每一种方式都有其使用的场景和特点。那么如何去选择多线程、多进程和多协程呢?要知道如何选择的话就要了解一下什么是CPU密集型计算、什么是I/O密集型计算;多线程、多进程和多协程又有什么样的区别。
CPU密集型和I/O密集型
CPU 密集型(CPU-bound):CPU密集型任务指的是任务的主要负载在CPU上,而不是在输入/输出操作上。这种类型的任务通常涉及大量的计算、循环和逻辑操作,而且不需要频繁地进行文件读写、网络通信等操作。例如:压缩解压缩、加密解密、正则表达式搜索;
I/O密集型(I/O bound):IO密集型任务指的是任务的主要负载在输入/输出操作上,而不是在CPU计算上。这种类型的任务涉及到频繁的文件读写、网络通信、数据库操作等IO操作,而在执行这些操作时,CPU的利用率相对较低。例如:文件处理程序,网络爬虫程序,读写数据库程序;
多线程、多进程和协程对比
一个进程中可以启动 N 个进程,一个线程中可以启动 N 个协程;多进程,多线程,多协程三种技术中只有多进程能够同时利用多核cpu并行计算;
多线程、多进程和多协程是处理并发任务的常见方式,在不同的场景下具有各自的优势和适用性。下面是它们的对比:
1. 多线程:
- 优点:线程轻量,创建和销毁开销较小;可以在一个进程中实现并发执行,共享进程的内存空间,可直接访问共享数据;适合IO密集型任务。
- 缺点:线程间共享内存可能引发竞态条件和死锁等并发问题;全局解释器锁(GIL)限制了CPython中多线程的并行性能。
- 适用场景:IO密集型任务,如网络通信、文件读写等。
2. 多进程:
- 优点:各自独立的内存空间,通过进程间通信(IPC)以及操作系统调度,可以实现真正的并行执行;可以利用多核CPU资源;适合CPU密集型任务。
- 缺点:创建和销毁进程开销较大;进程间通信的代价较高。
- 适用场景:CPU密集型任务,如图像处理、数据分析等。
3. 多协程:
- 优点:协程是一种轻量级的线程,可以在同一个线程内实现并行执行;不需要线程切换的开销;适合IO密集型任务;适用于构建高效的异步IO应用程序。
- 缺点:协程需要显式地在代码中进行调度和切换,需要避免长时间阻塞的操作。
- 适用场景:IO密集型任务,如网络爬虫、Web服务器、消息队列等。
总结来说,多线程适合处理IO密集型任务,多进程适合处理CPU密集型任务,而多协程适用于构建高效的异步IO应用程序。在选择适合的并发处理方式时,需要考虑任务的特性、执行效率、资源占用等因素,并进行性能测试和调优。
Python全局解释性锁(GIL锁)
GIL(Global Interpreter Lock),全局解释器锁,是CPython解释器中的一个机制。GIL的作用是确保在同一时刻只有一个线程执行Python字节码。这意味着在CPython中,即使使用多个线程,也不能真正实现多线程的并行执行。
GIL的存在是为了保护CPython解释器内部的数据结构不被并发访问而导致的竞态条件。竞态条件是指多个线程在没有适当同步的情况下对共享数据进行读写操作,可能导致数据的不一致性和错误的结果。
因为有GIL的限制,多线程在CPU密集型任务中不能充分利用多核CPU资源,只有在IO密集型任务中才能获得一定的性能提升。原因是,当线程阻塞在IO操作上时,GIL会释放,允许其他线程执行。但在实际情况中,如果任务主要涉及CPU计算,那么GIL会成为性能瓶颈,因为同一时刻只能有一个线程在执行。
为了充分利用多核CPU,可以使用多进程来代替多线程,每个进程有自己独立的解释器和GIL。因此,多进程可以实现真正的并行执行,适合CPU密集型任务。
需要注意的是,GIL只存在于CPython解释器中。其他实现,如Jython(Java平台)、IronPython(.NET平台)或PyPy,没有GIL的限制,并可以实现真正的多线程并行执行。此外,GIL只会影响到Python的解释器层级,并不影响通过C、C++等语言编写的扩展模块的多线程执行。
多进程实现
在Python中,可以使用多进程来实现并发编程。Python提供了multiprocessing
模块,方便地创建和管理进程。
下面是一些常见的使用多进程的示例代码:
创建进程:
import multiprocessing
def worker():
print("Worker process")
# 创建进程对象
process = multiprocessing.Process(target=worker)
# 启动进程
process.start()
进程间通信:
import multiprocessing
def worker(pipe):
msg = pipe.recv() # 接收消息
print("Worker process received:", msg)
# 创建管道对象
parent_conn, child_conn = multiprocessing.Pipe()
# 创建进程并传递管道对象
process = multiprocessing.Process(target=worker, args=(child_conn,))
# 启动进程
process.start()
# 发送消息
parent_conn.send("Hello from parent process")
# 等待进程结束
process.join()
使用进程池:
import multiprocessing
def worker(x):
return x * x
# 创建进程池对象
pool = multiprocessing.Pool()
# 异步提交任务
result = pool.apply_async(worker, (10,))
# 获取结果
print(result.get())
# 关闭进程池
pool.close()
pool.join()
需要注意的是,在多进程编程中,每个进程都有独立的内存空间,因此进程间的数据无法直接共享。如果需要在进程间共享数据,可以使用Pipe
、Queue
、Manager
等机制来实现进程间通信。
此外,由于Python中的全局解释锁(GIL)限制,多进程适用于CPU密集型任务,而对于I/O密集型任务,通常使用多线程或异步编程效果更好。
多线程实现
在Python中,多线程是通过threading
模块实现的。这个模块提供了创建、管理线程以及线程间通信的功能。
以下是在Python中使用多线程编程的一般步骤:
导入threading
模块:
import threading
定义线程的执行函数:
def my_function():
# 线程的任务逻辑
创建线程对象:
my_thread = threading.Thread(target=my_function)
这里通过threading.Thread
类创建了一个线程对象my_thread
,并将要执行的函数my_function
作为目标函数,也可以传递参数给目标函数。
启动线程:
my_thread.start()
调用线程对象的start
方法来启动线程,线程将开始执行my_function
函数中的任务逻辑。
等待线程结束:
my_thread.join()
join
方法用于阻塞主线程,等待子线程执行完毕。这样可以保证在主线程退出之前,子线程已经完成。
多线程编程需要注意以下几点:
-
线程安全:多个线程同时访问共享数据可能导致数据竞争和不一致性。可以使用锁、条件变量等机制来保证线程安全。
-
全局解释器锁(GIL):在CPython解释器中,全局解释器锁限制了同一时刻只能有一个线程执行Python字节码。这意味着Python的多线程在CPU密集型任务中效果不佳,适合IO密集型任务。
-
线程间通信:多个线程之间可能需要进行数据交换和协调工作。可以使用队列或者共享变量等方式进行线程间通信。
-
死锁:当多个线程都在等待某个资源,而不释放自己的资源时,则可能发生死锁。需要小心设计和调试,避免死锁问题的发生。
-
异常处理:在线程中的异常默认会被忽略,无法通过try-except语句捕获。可以通过
threading.excepthook
设置全局的线程异常处理函数来捕获线程中的异常。
总的来说,Python的多线程编程相对容易上手,但需要注意线程安全、GIL和线程间通信等问题。在实际应用中,需要根据任务类型和性能要求来选择合适的多线程或多进程编程方式。
使用Lock类处理解决线程安全问题
存在两种方式对发生线程安全的代码进行加锁:① try-finally 模式;② with 模式;
# 用法1
import threading
lock = threading.Lock()
lock.acquire()
try:
...
finally:
lock.release()
# 用法2
import threading
lock = threading.Lock()
with lock:
...
接下来通过一个取钱的例子来进行学习(以下案例为虚构案例,并非实际情况)。
老王和妻子共同使用同一张银行卡并绑定了电子账户,卡上原本的余额为1000元,在老王消费800元的同时妻子也正消费800,此时两人同时提交了支付请求,但是还没有来到扣费请求,那么也就意味着此时余额减少,此时二者的请求在判断的时候都是满足余额大于消费金额的,所以就可能造成最终的余额进行了两次800的减少,则最终导致余额成了-600,发生了不可预料的后果。
import threading
import time
class Account:
def __init__(self, balance):
self.balance = balance
def draw(account, amount):
if account.balance >= amount:
# 调用sleep()大概率会出现问题, 因为此时会发生线程的切换, 那么就一定会进入if判断的语句那么两个线程都会balance 操作就出现了问题
time.sleep(0.1)
print(threading.current_thread().name + "取钱成功")
account.balance -= amount
print(threading.current_thread().name + " 余额: ", account.balance)
else:
print(threading.current_thread().name, "取钱失败, 余额不足")
if __name__ == '__main__':
account = Account(1000)
# 使用线程a, b来执行draw 函数
ta = threading.Thread(name="a", target=draw, args=(account, 800))
tb = threading.Thread(name="b", target=draw, args=(account, 800))
# 使用start()会启动一个线程
ta.start()
tb.start()
输出结果:
那如果同一张卡两人同时消费的时候我们加上一个在同一时刻只有一人能够进行余额减少的逻辑,也就是说二者虽然同时提交了消费请求但是二者的扣费逻辑肯定是有一个先后扣费的顺序的,当老王的请求先到达扣费逻辑时,老王妻子的扣费请求便必须等老王的扣费请求结束之后才能够实现。其实这个逻辑就是在代码中进行加锁实现的。
import threading
import time
# 获取锁对象
lock = threading.Lock()
class Account:
def __init__(self, balance):
self.balance = balance
def draw(account, amount):
# 使用with关键字对发生线程安全的代码进行加锁, 当一个线程执行完成之后才会释放锁另外一个线程才可以获取到锁
with lock:
if account.balance >= amount:
# 调用sleep()大概率会出现问题
time.sleep(0.1)
print(threading.current_thread().name + "取钱成功")
account.balance -= amount
print(threading.current_thread().name + " 余额: ", account.balance)
else:
print(threading.current_thread().name, "取钱失败, 余额不足")
if __name__ == '__main__':
account = Account(1000)
ta = threading.Thread(name="a", target=draw, args=(account, 800))
tb = threading.Thread(name="b", target=draw, args=(account, 800))
ta.start()
tb.start()
执行结果
进程池
进程池是一种用于管理和调度进程的技术。它通过预先创建一组可重用的进程,以便在需要时分配任务给这些进程来执行。这种方式可以减少进程创建和销毁的开销,并提高任务处理的效率。
Python中的multiprocessing
模块提供了进程池的实现。使用进程池可以实现以下功能:
创建进程池:
from multiprocessing import Pool
pool = Pool(processes=4) # 创建一个包含4个进程的进程池
在这个示例中,使用Pool
类创建了一个包含4个进程的进程池。
提交任务给进程池:
result = pool.apply_async(func, args) # 异步提交任务
使用apply_async
方法可以将任务提交给进程池,并返回一个表示任务执行结果的对象。
获取任务执行结果:
result.get() # 获取任务执行结果,阻塞主进程
使用get
方法可以获取任务执行的结果。如果任务还没有完成,主进程将在此处阻塞,直到任务完成并返回结果。
关闭进程池:
pool.close() # 关闭进程池
pool.join() # 阻塞主进程,等待所有任务完成
使用close
方法关闭进程池后,将不会再接受新的任务。然后使用join
方法等待所有任务完成。
通过使用进程池,可以简化并发任务的管理和调度,提高程序的执行效率。进程池在处理大量任务时特别有用,因为可以利用多个进程并行处理任务,从而加快任务的处理速度。但是,进程池也有一些限制,比如需要更多的系统资源,比如内存,而且进程间的通信相对复杂。因此,在选择使用进程池时,应该根据实际需求权衡利弊。
线程池
线程池是一种用于管理和复用线程的技术。它通过预先创建一组可复用的线程,以便在需要时分配任务给这些线程来执行。这种方式可以避免线程创建和销毁的开销,并提高任务处理的效率。
Python中的concurrent.futures
模块提供了线程池的实现。使用线程池可以实现以下功能:
创建线程池:
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(max_workers=4) # 创建一个包含4个线程的线程池
在这个示例中,使用ThreadPoolExecutor
类创建了一个包含4个线程的线程池。
提交任务给线程池:
result = pool.submit(func, *args, **kwargs) # 异步提交任务
使用submit
方法可以将任务提交给线程池,并返回一个表示任务执行结果的Future
对象。
获取任务执行结果:
result.result() # 获取任务执行结果,阻塞主线程
使用result
方法可以获取任务执行的结果。如果任务还没有完成,主线程将在此处阻塞,直到任务完成并返回结果。
关闭线程池:
pool.shutdown() # 关闭线程池
使用shutdown
方法关闭线程池后,将不会再接受新的任务。线程池会等待所有已提交的任务完成,然后终止所有线程。
通过使用线程池,可以简化并发任务的管理和调度,提高程序的执行效率。线程池适用于IO密集型任务,如网络请求、文件读写等操作,可以充分利用多个线程并发执行任务,而不会受到全局解释器锁(GIL)的限制。但是,线程池也有一些限制,比如需要更多的系统资源,如内存,而且线程间的通信相对复杂。因此,在选择使用线程池时,应该根据实际需求权衡利弊。
异步I/O
asyncio
(异步IO)是Python中用于编写异步程序的标准库。它提供了一种基于协程(coroutine)的异步编程模型,可以处理大量并发任务,实现高效的并发和并行操作。
协程(Coroutines):asyncio
使用协程来编写异步任务。协程是一种轻量级的非抢占式并发模型,它通过使用关键字async
来定义异步函数,使用await
来挂起协程并等待结果。
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1)
print("World")
asyncio.run(hello())
在上述示例中,hello
函数是一个异步函数,使用async
关键字进行定义。await asyncio.sleep(1)
可以挂起协程并等待1秒钟,然后再继续执行。
事件循环(Event Loop):asyncio
使用事件循环来调度和管理并发任务。事件循环是一个无限循环,不断地从任务队列中获取待执行的任务,调度协程的执行。
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1)
print("World")
loop = asyncio.get_event_loop()
loop.run_until_complete(hello())
在上述示例中,通过asyncio.get_event_loop()
获取一个事件循环对象,然后使用run_until_complete
方法来运行协程。
异步IO操作:asyncio
提供了一系列的异步IO操作函数,如文件读写、网络请求等。这些函数都是协程函数,可以通过await
来等待异步操作完成。
import asyncio
async def download_data(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.read()
return data
asyncio.run(download_data("http://example.com"))
在上述示例中,download_data
函数使用aiohttp
库来进行异步网络请求。通过await
来等待响应的数据,并最终返回结果。
并发和并行:asyncio
可以处理大量并发任务,并实现高效的并发和并行操作。将多个协程封装到asyncio.gather
函数中,可以同时运行多个协程。
import asyncio
async def task1():
# 执行任务1
async def task2():
# 执行任务2
loop = asyncio.get_event_loop()
tasks = [task1(), task2()]
loop.run_until_complete(asyncio.gather(*tasks))
在上述示例中,通过asyncio.gather
函数同时运行多个协程,*tasks
将任务列表作为参数传递给gather
函数。
asyncio
的优势在于它可以让程序员使用简单的语法来实现高效和可维护的异步代码。它提供了全面的异步IO支持,可以处理各种异步操作。然而,使用asyncio
编写代码需要理解协程和事件循环的概念,以及异步编程带来的一些复杂性。同时,要注意避免阻塞事件循环,以充分发挥asyncio
的优势。