目录
前言
一、线程的使用
(一)基础使用
(二)等待线程完成
(三)多个线程
(四)守护线程
(五)线程同步
(六)总结
二、队列对象 - Queue
(一)队列类型
(二)基本用法
(三)队列的常用方法
(四)LIFO队列和优先级队列
(五)适用场景
(六)总结
三、生产者和消费者模式
(一)基本结构
(二)生产者-消费者模式示例
(三)多个生产者和消费者
(四)适用场景
(五)总结
四、线程池
(一)简介
(二)线程池的使用
(三)关键方法
(四)线程池的 Future 对象
(五)适用场景
(六)注意事项
五、总结
前言
上篇文章讲述的是python的异常处理和文件读写,这篇文章讲述python的并发编程:线程。
一、线程的使用
在 python 中使用线程可以提高程序的并发能力,尤其是当任务涉及 I/O 密集型操作,如文件处理、网络请求等。这里将详细介绍如何在 Python 中使用 threading
模块来创建、启动和管理线程。
(一)基础使用
使用threading模块的Thread创建并启动线程:
import threading
import time
def print_numbers():
for i in range(5):
time.sleep(1)
print(f"Number: {i}")
# 创建一个线程
thread = threading.Thread(target=print_numbers)
# 启动线程
thread.start()
# 主线程继续执行
print("Thread started, main thread continues...")
通过Thread创建线程,target形参传递的是线程执行的目标方法,创建线程后使用start方法启动线程。
(二)等待线程完成
主线程可以使用 join()
方法等待子线程执行完成,这对于确保所有线程任务都完成后再继续主程序有帮助:
import threading
def print_numbers():
for i in range(5):
print(i)
thread = threading.Thread(target=print_numbers)
# 启动线程
thread.start()
# 等待线程完成
thread.join()
print("Thread has finished executing")
该这个例子中,join()
确保主线程在子线程执行完毕之前不会继续执行下一步。
(三)多个线程
可以创建多个线程来同时执行不同的任务。以下例子展示了如何并发执行多个任务:
import threading
import time
def task_1():
for i in range(3):
time.sleep(1)
print(f"Task 1: {i}")
def task_2():
for i in range(3):
time.sleep(1.5)
print(f"Task 2: {i}")
# 创建多个线程
thread1 = threading.Thread(target=task_1)
thread2 = threading.Thread(target=task_2)
# 启动线程
thread1.start()
thread2.start()
# 等待两个线程完成
thread1.join()
thread2.join()
print("Both threads have finished")
在这里例子中,创建了thread1和thread2两个线程,分别执行task_1和task_2方法,并且在主线程中使用join等待两个子线程结束。
(四)守护线程
守护线程通常用于后台任务,例如日志记录。通过子线程的setDaemon设置守护线程,若设置子线程为守护线程,则在主线程终止时会自动结束,若子线程为非守护线程,则主线程只有等到非守护线程运行完毕后才能结束。
示例:
import threading
import time
def background_task():
while True:
print("Running background task...")
time.sleep(2)
# 创建守护线程
daemon_thread = threading.Thread(target=background_task)
daemon_thread.setDaemon(True) # 设置为守护线程
daemon_thread.start()
# 主线程继续执行
print("Main thread is running")
time.sleep(5)
print("Main thread finished")
daemon为守护线程,无论daemon是否执行完成,守护线程在主线程结束时自动终止。
结果为:
(五)线程同步
多个线程同时访问共享资源时,可能会产生竞态条件,即多个线程同时操作同一个数据。使用 Lock
对临界区进行同步可以避免这个问题:
import threading
counter = 0
lock = threading.Lock()
def increment_counter():
global counter
for _ in range(100000):
# 通过锁保护临界区
with lock:
counter += 1
# 创建多个线程
threads = []
for i in range(5):
thread = threading.Thread(target=increment_counter)
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
print(f"Final counter value: {counter}")
在这个例子中,锁 lock
)确保多个线程不会同时修改 counter
,从而避免竞态条件。
(六)总结
Python 中使用线程主要通过 threading
模块来实现,核心方法包括:
-
Thread(target=func)
:创建新线程执行目标函数。 -
start()
:启动线程。 -
join()
:等待线程结束。 -
Lock()
:用于同步线程,避免竞态条件。
二、队列对象 - Queue
Queue
是 python 提供的一种线程安全的队列,用于在线程之间安全地交换数据。它在多线程编程中非常有用,能够避免手动管理锁机制和复杂的同步问题。Queue
对象属于 queue
模块,提供了 FIFO(先进先出)、LIFO(后进先出)和优先级队列三种不同的队列类型。
(一)队列类型
-
FIFO 队列:默认的队列类型,遵循先进先出的规则。可以通过
queue.Queue
创建。 -
LIFO 队列:类似于栈的行为,遵循后进先出的规则。可以通过
queue.LifoQueue
创建。 -
优先级队列:队列中的元素按优先级排序,优先级越高的元素越先出队。可以通过
queue.PriorityQueue
创建。
(二)基本用法
使用 queue.Queue
可以方便地在多个线程间共享数据。以下是基本的操作方法:
-
put(item)
:将元素放入队列中。 -
get()
:从队列中获取元素。如果队列为空,会一直等待。 -
task_done()
:表示一个任务完成,用于通知队列中的任务状态。 -
join()
:阻塞主线程,直到队列中的所有任务都被处理完。
(三)队列的常用方法
-
q.put(item, block=True, timeout=None)
:将item
放入队列。如果block=True
且队列已满,则会阻塞直到有空位或超时。 -
q.get(block=True, timeout=None)
:从队列中获取一个元素。如果block=True
且队列为空,则会阻塞直到有新元素入队或超时。 -
q.task_done()
:当线程从队列中取出一个项目,并且完成对该项目的处理后,调用task_done()
。该方法必须与get()
配合使用。 -
q.join()
:阻塞调用线程,直到队列中所有项目都被处理完为止。 -
q.qsize()
:返回当前队列中未处理项目的数量,但由于线程间竞争问题,不建议过于依赖此方法的返回值。 -
q.empty()
和q.full()
:分别判断队列是否为空或已满,但由于线程间竞争,也并不总是精确的。
(四)LIFO队列和优先级队列
除了默认的 FIFO 队列,python 还支持 LIFO 队列和优先级队列;
- LIFO 队列,即后进先出,类似于栈的行为
示例:
import queue
lifo_queue = queue.LifoQueue()
lifo_queue.put(1)
lifo_queue.put(2)
lifo_queue.put(3)
print(lifo_queue.get()) # 输出 3
print(lifo_queue.get()) # 输出 2
- 优先级队列,通过元组的形式来指定优先级,优先级数值越小,优先级越高
示例:
import queue
priority_queue = queue.PriorityQueue()
# 元素的格式: (优先级, 数据)
priority_queue.put((2, "low priority"))
priority_queue.put((1, "high priority"))
priority_queue.put((3, "lowest priority"))
print(priority_queue.get()) # 输出 (1, "high priority")
print(priority_queue.get()) # 输出 (2, "low priority")
(五)适用场景
- 生产者-消费者模型:多个生产者和消费者线程之间通过队列传递数据,有效避免竞争条件。
- 任务调度:通过队列控制任务的顺序,尤其是优先级队列可以按照任务的紧急程度来处理。
- 线程间通信:在需要在线程之间传递数据的场景中,队列是一个非常安全且有效的工具。
(六)总结
queue.Queue
是一种简单且强大的工具,用于在多线程环境中安全地共享数据。通过 Queue
,我们可以避免直接管理锁的复杂性,并实现线程安全的数据交换和任务分发。如果你需要处理线程间的数据交换、任务队列,Queue
是首选。
三、生产者和消费者模式
生产者-消费者模式是一种常用的多线程设计模式,特别适用于数据生产和消费速度不一致的场景。生产者负责生成数据,消费者负责处理数据,两者通过共享的缓冲区进行通信。这种模式可以避免生产者和消费者直接耦合,并通过队列在生产和消费之间实现平衡。
在 python 中,queue.Queue
是线程安全的队列,适合用来实现生产者-消费者模式。以下是如何使用 queue.Queue
与 threading
模块来实现这个模式的详细步骤和示例。
(一)基本结构
生产者-消费者模式的结构大致如下:
-
生产者:生成数据并将其放入队列中。
-
消费者:从队列中获取数据并进行处理。
-
队列:生产者和消费者之间的数据缓冲区。
(二)生产者-消费者模式示例
以下代码示例展示了如何使用 Python 的 threading
和 queue
实现一个简单的生产者-消费者模型。
import threading
import queue
import time
# 创建一个队列,最大存储容量为5
q = queue.Queue(maxsize=5)
# 生产者函数
def producer():
for i in range(10):
item = f"item_{i}"
q.put(item) # 将项目放入队列
print(f"Producer produced {item}")
time.sleep(1) # 模拟生产过程中的延迟
q.put(None) # 发送结束信号给消费者
# 消费者函数
def consumer():
while True:
item = q.get() # 从队列中获取项目
if item is None:
break # 结束信号
print(f"Consumer consumed {item}")
time.sleep(2) # 模拟消费过程中的延迟
q.task_done() # 通知队列该任务已完成
# 创建并启动生产者线程
producer_thread = threading.Thread(target=producer)
producer_thread.start()
# 创建并启动消费者线程
consumer_thread = threading.Thread(target=consumer)
consumer_thread.start()
# 等待生产者线程完成
producer_thread.join()
# 等待消费者线程完成
consumer_thread.join()
print("All tasks are done")
运行结果:
Producer produced item_0
Producer produced item_1
Consumer consumed item_0
Producer produced item_2
Producer produced item_3
Consumer consumed item_1
Producer produced item_4
...
代码解释:
-
队列:
queue.Queue(maxsize=5)
创建了一个最大容量为 5 的队列。如果生产者生成数据的速度超过了消费者的处理速度,队列就会起到缓冲作用。Queue
会自动管理线程同步,因此不会出现线程竞争问题。 -
生产者线程:
producer()
函数中,生产者每秒生成一个数据并放入队列中。使用q.put(item)
将数据放入队列。如果队列已满,put()
将阻塞,直到有空位。 -
消费者线程:
consumer()
函数中,消费者从队列中获取数据进行处理。使用q.get()
从队列中获取数据,如果队列为空,get()
会阻塞,直到有新数据入队。 -
结束信号:当生产者完成生产后,使用
q.put(None)
向队列发送一个特殊的结束信号,通知消费者结束任务。 -
同步机制:
q.task_done()
通知队列,该任务已经完成,q.join()
阻塞主线程,直到队列中所有任务都被处理完毕。
(三)多个生产者和消费者
在实际应用中,可能会有多个生产者和消费者。可以轻松扩展这个模型来支持多个生产者和多个消费者;
示例:
import threading
import queue
import time
q = queue.Queue(maxsize=5)
# 生产者函数
def producer(id):
for i in range(5):
item = f"Producer-{id}-item-{i}"
q.put(item)
print(f"Producer {id} produced {item}")
time.sleep(1)
q.put(None) # 每个生产者生产完毕后,发送结束信号
# 消费者函数
def consumer(id):
while True:
item = q.get()
if item is None:
break
print(f"Consumer {id} consumed {item}")
time.sleep(2)
q.task_done()
# 启动多个生产者线程
for i in range(2):
threading.Thread(target=producer, args=(i,)).start()
# 启动多个消费者线程
for i in range(3):
threading.Thread(target=consumer, args=(i,)).start()
(四)适用场景
-
生产者消费者速率不一致:当生产者和消费者的速率不一致时(例如网络请求和数据处理速度不同),队列作为缓冲区,可以平衡生产和消费。
-
任务调度和分发:多个生产者和多个消费者可以高效地处理批量任务或数据流。
-
线程间通信:避免竞争条件,确保数据在线程之间安全地传递。
(五)总结
python 的 queue.Queue
和 threading
模块让实现生产者-消费者模式变得简单而高效。队列提供了线程安全的通信方式,避免了使用锁的复杂性,同时使得多个生产者和消费者能够以平滑的方式交替运行。
四、线程池
(一)简介
python 的线程池是用于管理线程的一个工具,能够方便地执行多线程任务,而无需手动创建和管理线程。它是 concurrent.futures
模块中的一种高级抽象,特别适合那些需要并发执行 I/O 密集型任务的场景,比如网络请求或文件读写。线程池的主要优点在于:
-
简化线程管理:不需要手动创建、启动和管理线程,线程池自动管理这些操作。
-
线程重用:线程池中的线程可以重复使用,避免频繁创建和销毁线程带来的开销。
-
控制并发数量:线程池允许你控制同时运行的线程数,避免因线程过多而导致系统资源枯竭。
(二)线程池的使用
在 python 中,线程池由 concurrent.futures.ThreadPoolExecutor
类实现,提供了简单易用的 API。你可以使用线程池执行函数,并获取函数的返回结果或捕获异常。
示例:
from concurrent.futures import ThreadPoolExecutor, as_completed
# 定义一个简单的函数来执行任务
def task(n):
return n * n
# 使用线程池来并发执行多个任务
with ThreadPoolExecutor(max_workers=5) as executor:
# 提交多个任务
futures = [executor.submit(task, i) for i in range(10)]
# 获取每个任务的结果
for future in as_completed(futures):
print(future.result())
(三)关键方法
-
submit(fn, *args, **kwargs)
: 提交任务到线程池,并返回一个Future
对象,通过Future
对象可以获取任务的执行状态和结果。 -
map(func, *iterables)
: 类似于内置的map
函数,线程池会并发执行func
函数,并将多个迭代对象传递给func
。 -
shutdown(wait=True)
: 关闭线程池。wait=True
表示等待所有线程任务执行完成后再关闭。
(四)线程池的 Future
对象
当你通过 submit
提交任务时,返回的是一个 Future
对象。这个对象代表的是即将执行或者已经执行完毕的任务。可以通过以下几种方式对其进行处理:
-
result()
: 阻塞等待任务执行完成,并返回任务的结果。 -
done()
: 判断任务是否已经完成。 -
exception()
: 如果任务执行中抛出了异常,可以通过此方法获取异常信息。
示例:
from concurrent.futures import ThreadPoolExecutor
def task(n):
return n * n
# 创建一个线程池
with ThreadPoolExecutor(max_workers=3) as executor:
future = executor.submit(task, 5) # 提交任务并获取 Future 对象
print(future.result()) # 等待并获取结果
该例子中,线程池管理着线程的创建、分配和销毁,简化了多线程编程的复杂性。
(五)适用场景
python线程池的适用场景主要有以下两种:
-
I/O 密集型任务:比如网络请求、文件读写、数据库操作等。
-
并发任务处理:当你有多个独立的任务需要并发处理时,线程池能够很好地管理这些任务。
(六)注意事项
-
线程池适合 I/O 密集型任务,不适合 CPU 密集型任务,对于 CPU 密集型任务,应该考虑使用
ProcessPoolExecutor
来避免 GIL 的影响。 -
提交的任务函数应尽可能是无状态的,或者线程安全的,避免并发竞争问题。
通过线程池,开发者可以轻松高效地进行多线程编程而不用担心底层线程管理的复杂性。
五、总结
该文章主要讲解的是python的线程和线程池等知识点,通过线程可以有效提高代码的执行效率,但有些场景下却不适合使用线程,比如CPU密集型的任务,这种场景适合使用进程,该知识点在下篇文章会接触到。