在Python中,队列(Queue)是一种抽象的数据类型,它遵循先进先出(FIFO)的原则。队列是一种特殊的线性表,只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作。
Python标准库中的queue
模块提供了多种队列的实现,包括:
- Queue:这是一个简单的队列类,可以用来实现先进先出的数据结构。
- LifoQueue:这是一个后进先出(LIFO)的数据结构,与栈类似。
- PriorityQueue:这是一个优先级队列,可以根据元素的优先级进行排序。
下面是一个使用queue.Queue
的简单示例:
import queue
# 创建一个队列
q = queue.Queue()
# 向队列中添加元素
for i in range(5):
q.put(i)
# 从队列中获取元素,它会按照先进先出的原则返回元素
while not q.empty():
print(q.get())
输出:
0
1
2
3
4
queue.Queue还提供了其他一些有用的方法,如
q.full()(检查队列是否已满)、
q.maxsize`(获取队列的最大大小)等。此外,它还支持线程安全,可以在多线程环境中安全地使用。
属性和方法以及用法
- qsize():返回队列中的元素个数。
import queue
# 创建一个队列
q = queue.Queue()
# 向队列中添加元素
q.put("item1")
q.put("item2")
q.put("item3")
# 获取队列中的元素个数
size = q.qsize()
print("队列大小:", size)
运行结果:
- mutex:返回队列的锁对象,用于多线程环境中的同步操作。通过使用mutex属性,可以确保在多线程访问队列时,对队列的操作是线程安全的。代码如下:
import queue
import threading
# 创建一个队列和锁对象
q = queue.Queue()
mutex = q.mutex
# 定义一个线程函数,用于向队列中添加元素
def add_item(item):
with mutex:
q.put(item)
# 定义一个线程函数,用于从队列中获取元素
def get_item():
with mutex:
item = q.get()
return item
# 创建两个线程,分别执行添加和获取操作
thread1 = threading.Thread(target=add_item, args=("item1",))
thread2 = threading.Thread(target=get_item)
# 启动线程
thread1.start()
thread2.start()
# 等待线程结束
thread1.join()
thread2.join()
- not_full():如果队列未满,则返回True,否则返回False。
- maxsize(可选参数,默认为0,用于设定队列长度。maxsize小于等于0表示队列长度无限。)
- put(item, block=True, timeout=None)(在队尾插入一个项目。如果block参数为True(默认值),且队列为空,该方法会阻塞直到有空间可用。如果timeout参数提供,该方法在等待时有超时限制。如果队列已满且block为False,将引发Full异常。)
import queue
# 创建一个队列
q = queue.Queue()
# 向队列中添加元素
for i in range(5):
q.put(i)
# 获取队列的大小
size = q.qsize()
print("队列大小:", size)
# 从队列中获取元素并打印
while not q.empty():
item = q.get()
print(item)
- get(block=True, timeout=None)(从队头删除并返回一个项目。如果block参数为True(默认值),且队列为空,该方法会阻塞直到有项目可用。如果timeout参数提供,该方法在等待时有超时限制。如果队列为空且block为False,将引发Empty异常。)
- empty()(检查队列是否为空,返回True如果队列为空,否则返回False。)
import queue
# 创建一个队列
q = queue.Queue()
# 检查队列是否为空
if q.empty():
print("队列为空")
else:
print("队列非空")
# 向队列中添加元素
q.put("item1")
q.put("item2")
# 检查队列是否为空
if q.empty():
print("队列为空")
else:
print("队列非空")
- full()(检查队列是否已满。如果队列已满,返回True,否则返回False。)
import queue
# 创建一个队列,容量为3
q = queue.Queue(maxsize=3)
# 向队列中添加元素
for i in range(5):
try:
q.put(i, block=False)
except queue.Full:
print("队列已满,添加元素失败")
# 检查队列是否已满
if q.full():
print("队列已满")
else:
print("队列未满")
运行结果:
- get_nowait():与get()方法类似,但不进行阻塞等待。如果队列为空,将引发Empty异常。
import queue
q = queue.Queue()
try:
item = q.get_nowait()
except queue.Empty:
print("队列为空,无法获取元素。")
- put_nowait():与put()方法类似,但不进行阻塞等待。如果队列已满,将引发Full异常。
import queue
q = queue.Queue(maxsize=3)
try:
q.put_nowait("item")
q.put_nowait("item1")
q.put_nowait("item2")
q.put_nowait("item3")
except queue.Full:
print("队列已满,无法添加元素。")
- join():Queue.join() 是 Python 的 queue 模块中的一个方法,用于阻塞当前线程,直到队列中的所有任务都已完成处理。
当你使用多线程或多进程从队列中获取并处理任务时,Queue.join() 方法可以确保主线程等待所有任务都已完成后再继续执行。这样可以避免因任务未完成而导致主线程提前结束。
多线程中的Quene
task_done() 是 Python 的 queue 模块中的一个方法,用于标记队列中的一个任务已经完成。
当多个线程或进程同时从队列中获取任务并处理时,可以使用 task_done() 方法来通知队列中已经完成了一个任务。这有助于确保队列中的任务能够被正确地处理,并且可以避免出现死锁或阻塞的情况。
代码如下:
import queue
import threading
import time
# 创建一个队列
q = queue.Queue()
# 定义一个生产者线程函数,将数据添加到队列中
def producer(thread_id):
for i in range(5):
print(f"生产者 {thread_id} 生产了 {i}")
q.put(i)
time.sleep(1)
# 定义一个消费者线程函数,从队列中获取数据并处理
def consumer(thread_id):
while True:
item = q.get()
print(f"消费者 {thread_id} 消费了 {item}")
time.sleep(1)
q.task_done()
# 创建生产者线程
threads = []
for i in range(2):
t = threading.Thread(target=producer, args=(i,))
threads.append(t)
t.start()
# 创建消费者线程
for i in range(3):
t = threading.Thread(target=consumer, args=(i,))
threads.append(t)
t.start()
# 等待所有任务完成
q.join()
运行结果:
如果使用get_nowait,应该怎么写呢?代码如下:
import queue
import threading
import time
# 创建一个队列
q = queue.Queue()
# 定义一个生产者线程函数,将数据添加到队列中
def producer(thread_id):
for i in range(5):
print(f"生产者 {thread_id} 生产了 {i}")
q.put(i)
time.sleep(1)
# 定义一个消费者线程函数,从队列中获取数据并处理
def consumer(thread_id):
while True:
try:
item = q.get_nowait()
print(f"消费者 {thread_id} 消费了 {item}")
time.sleep(1)
except queue.Empty:
print("队列为空,无法获取元素。")
pass
time.sleep(1)
# 创建生产者线程
threads = []
for i in range(2):
t = threading.Thread(target=producer, args=(i,))
threads.append(t)
t.start()
# 创建消费者线程
for i in range(3):
t = threading.Thread(target=consumer, args=(i,))
threads.append(t)
t.start()
# 等待所有任务完成
q.join()
多进程中的Quene
在Python的multiprocessing
模块中,Queue
是一个进程安全的队列类,它允许在多个进程之间进行安全的通信。这个队列是基于Python标准库中的queue
模块实现的,并添加了多进程的支持。
多进程存,主线程取
import multiprocessing
import time
def worker(q, num):
""" Worker function that puts data into the queue. """
for i in range(5):
print(f"Worker {num} produced {i}")
q.put(i)
q.put("STOP") # 用于告诉主进程所有工作已经完成
if __name__ == "__main__":
q = multiprocessing.Queue() # 创建一个队列对象
processes = []
for i in range(3): # 创建3个工作进程
p = multiprocessing.Process(target=worker, args=(q, i))
processes.append(p)
p.start()
while True:
item = q.get() # 从队列中获取数据
if item == "STOP": # 如果获取到"STOP",则所有工作进程已经完成
break
print(f"Main process consumed {item}")
for p in processes: # 等待所有工作进程结束
p.join()
运行结果:
在这个示例中,我们创建了一个Queue
对象q
,并创建了三个工作进程。每个工作进程将数据添加到队列中,主进程从队列中获取数据并处理。当工作进程完成所有任务后,它会在队列中放置一个"STOP"标记,主进程通过检查这个标记来知道所有工作进程已经完成。最后,主进程等待所有工作进程结束。
注意,在使用multiprocessing.Queue
时,你需要确保传递给子进程的队列对象是通过multiprocessing.Queue()
创建的,而不是通过标准库中的queue.Queue()
创建的。因为标准库中的queue.Queue
不是线程安全的,也不支持多进程。
多进程存,多进程取
import multiprocessing
import time
def worker(q, num):
for i in range(5):
print(f"Worker {num} produced {i}")
q.put(i)
def get_item(q):
while True:
item = q.get() # 从队列中获取数据
print(f"get process consumed {item}")
if __name__ == "__main__":
q = multiprocessing.Queue() # 创建一个队列对象
processes = []
for i in range(3): # 创建3个工作进程
p = multiprocessing.Process(target=worker, args=(q, i))
processes.append(p)
p.start()
for i in range(3): # 创建3个工作进程
p = multiprocessing.Process(target=get_item,args=(q,))
processes.append(p)
p.start()
for p in processes: # 等待所有工作进程结束
p.join()