concurrent.futures 提供的线程池
concurrent.futures
模块提供了线程池和进程池简化了多线程/进程操作。
线程池原理是用一个任务队列让多个线程从中获取任务执行,然后返回结果。
常见的用法是创建线程池,提交任务,等待完成并获取结果:
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(count, item) for item in number_list] # count是一个函数,item是其参数
for future in concurrent.futures.as_completed(futures):
print(future.result())
concurrent.futures.ThreadPoolExecutor(max_workers=5)
创建了一个线程池,max_workers
指定了线程数量上限。通过线程池可以创建和执行任务。concurrent.futures
使用Future
类表示(未来的)任务。调用.submit()
时会创建并执行一个任务(Future
)。.as_completed(futures)
是一个迭代器,当futures
中有任务完成时会产出该future
.
Python最广为使用的并发处理库futures使用入门与内部原理 - 知乎 (zhihu.com)对这个过程做了比较好的说明:
主线程是通过队列将任务传递给多个子线程的。一旦主线程将任务塞进任务队列,子线程们就会开始争抢,最终只有一个线程能抢到这个任务,并立即进行执行,执行完后将结果放进Future对象就完成了这个任务的完整执行过程。
python-parallel-programming-cookbook-cn 1.0 文档 中的一个例子对使用顺序执行、线程池、进程池三种方式进行计算的时间进行了比较:
import concurrent.futures
import time
# 一个耗时的计算
def count(number) :
for i in range(0, 10000000):
i=i+1
return i * number
if __name__ == "__main__":
number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 顺序执行
start_time = time.time()
for item in number_list:
print(count(item))
print("Sequential execution in " + str(time.time() - start_time), "seconds")
# 线程池
start_time_1 = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(count, item) for item in number_list]
for future in concurrent.futures.as_completed(futures):
print(future.result())
print("Thread pool execution in " + str(time.time() - start_time_1), "seconds")
# 进程池
start_time_2 = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(count, item) for item in number_list]
for future in concurrent.futures.as_completed(futures):
print(future.result())
print("Process pool execution in " + str(time.time() - start_time_2), "seconds")
结果为:
Sequential execution in 7.095552206039429 seconds
Thread pool execution in 7.140377998352051 seconds
Process pool execution in 4.240718126296997 seconds
竞争和锁
由于共享内存,多线程程序容易遇到竞争问题:两个内存对同一个变量进行修改可能导致意想不到的问题。
看下面这个计数的例子:
我们创建了一个全局变量thread_visits
,在visit_counter()
中修改这个变量值。
from threading import Thread
thread_visits = 0
def visit_counter():
global thread_visits
for _ in range(100_000):
thread_visits += 1 # thread_visits = thread_visits + 1
if __name__ == "__main__":
thread_count = 100
threads = [
Thread(target=visit_counter)
for _ in range(thread_count)
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print(f"thread_count={thread_count}, thread_visits={thread_visits}")
执行结果:
第1次 :thread_count=100, thread_visits=7227793
第2次 :thread_count=100, thread_visits=9544020
第3次 :thread_count=100, thread_visits=9851811
执行该程序会发现每次运行thread_visits
的值都不一样。
因为在 thread_visits
变量上的读取和写入操作之间有一段时间,另一个线程可以介入并操作结果。这导致了竞争。
(线程1和线程2对变量thread_visits
的竞争。两个线程都对thread_visits
执行了+1的操作,但最后thread_visits
的是1,而不是2。)
thread_visits += 1
实际包含读写两个操作,它等价于
thread_visits = thread_visits + 1
,先读取thread_visits
的值并+1,再写入到thread_visits
。
正确方法是使用锁保证一次只有一个线程可以处理单个代码块
from threading import Thread
from threading import Lock
thread_visits = 0
thread_visits_lock = Lock()
def visit_counter():
global thread_visits
for _ in range(100_000):
with thread_visits_lock:
thread_visits += 1 # thread_visits = thread_visits + 1
运行结果:
thread_count=100, thread_visits=10000000
这次我们得到了正确的结果,但花费了接近一分钟的时间。因为受保护的块不能并行运行。此外,获取和释放锁是需要一些额外操作。
将锁放在外面的时候,会发现花费的时间减少了很多。因为减少了获取和释放锁的消耗。
with thread_visits_lock:
for _ in range(100_000):
thread_visits += 1