python基础-数据结构-leetcode刷题必看-queue---队列-python的底层构建

news2024/11/19 11:25:27

文章目录

  • 队列
    • 双端队列 deque
      • 底层存储
      • deque接口
        • 1. `__init__(self, iterable: Iterable[_T], maxlen: int | None = None) -> None`
        • 2. `append(self, __x: _T) -> None`
        • 3. `appendleft(self, __x: _T) -> None`
        • 4. `copy(self) -> Self`
        • 5. `count(self, __x: _T) -> int`
        • 6. `extend(self, __iterable: Iterable[_T]) -> None`
        • 7. `extendleft(self, __iterable: Iterable[_T]) -> None`
        • 8. `insert(self, __i: int, __x: _T) -> None`
        • 9. `index(self, __x: _T, __start: int = 0, __stop: int = ...) -> int`
        • 10. `pop(self) -> _T`
        • 11. `popleft(self) -> _T`
        • 12. `remove(self, __value: _T) -> None`
        • 13. `rotate(self, __n: int = 1) -> None`
    • 线程安全队列Queue
      • 属性
      • 方法
        • `__init__(self, maxsize: int = 0) -> None`
        • `_init(self, maxsize: int) -> None`
        • `empty(self) -> bool`
        • `full(self) -> bool`
        • `get(self, block: bool = True, timeout: float | None = None) -> _T`
        • `get_nowait(self) -> _T`
        • `_get(self) -> _T`
        • `put(self, item: _T, block: bool = True, timeout: float | None = None) -> None`
        • `put_nowait(self, item: _T) -> None`
        • `_put(self, item: _T) -> None`
        • `join(self) -> None`
        • `qsize(self) -> int`
        • `_qsize(self) -> int`
        • `task_done(self) -> None`
    • 线程安全优先级队列
    • 线程安全——栈
    • 线程安全的简单的队列——SimpleQueue

队列

在python有以下几种队列

  • deque 双端队列
  • Queue 队列 FIFO
  • LifoQueue 栈 LIFO
  • PriorityQueue 优先队列
  • SimpleQueue 简单队列 FIFO
    除了deque,其他都有一个共同的特点——线程安全
    在这里插入图片描述

双端队列 deque

由于python中的队列是基于双端队列来实现的,所以我们先查看deque的实现,如果你通过pycharm去追溯deque的源代码,你会发现里面只有类的定义并没有具体的实现,其代码如下:

class deque(MutableSequence[_T], Generic[_T]):
    @property
    def maxlen(self) -> int | None: ...
    @overload
    def __init__(self, *, maxlen: int | None = None) -> None: ...
    @overload
    def __init__(self, iterable: Iterable[_T], maxlen: int | None = None) -> None: ...
    def append(self, __x: _T) -> None: ...
    def appendleft(self, __x: _T) -> None: ...
    def copy(self) -> Self: ...
    def count(self, __x: _T) -> int: ...
    def extend(self, __iterable: Iterable[_T]) -> None: ...
    def extendleft(self, __iterable: Iterable[_T]) -> None: ...
    def insert(self, __i: int, __x: _T) -> None: ...
    def index(self, __x: _T, __start: int = 0, __stop: int = ...) -> int: ...
    def pop(self) -> _T: ...  # type: ignore[override]
    def popleft(self) -> _T: ...
    def remove(self, __value: _T) -> None: ...
    def rotate(self, __n: int = 1) -> None: ...
    def __copy__(self) -> Self: ...
    def __len__(self) -> int: ...
    # These methods of deque don't take slices, unlike MutableSequence, hence the type: ignores
    def __getitem__(self, __key: SupportsIndex) -> _T: ...  # type: ignore[override]
    def __setitem__(self, __key: SupportsIndex, __value: _T) -> None: ...  # type: ignore[override]
    def __delitem__(self, __key: SupportsIndex) -> None: ...  # type: ignore[override]
    def __contains__(self, __key: object) -> bool: ...
    def __reduce__(self) -> tuple[type[Self], tuple[()], None, Iterator[_T]]: ...
    def __iadd__(self, __value: Iterable[_T]) -> Self: ...
    def __add__(self, __value: Self) -> Self: ...
    def __mul__(self, __value: int) -> Self: ...
    def __imul__(self, __value: int) -> Self: ...
    def __lt__(self, __value: deque[_T]) -> bool: ...
    def __le__(self, __value: deque[_T]) -> bool: ...
    def __gt__(self, __value: deque[_T]) -> bool: ...
    def __ge__(self, __value: deque[_T]) -> bool: ...
    def __eq__(self, __value: object) -> bool: ...
    if sys.version_info >= (3, 9):
        def __class_getitem__(cls, __item: Any) -> GenericAlias: ...

底层存储

这说明其底层并不是由python写的,而是由效率更高的c语言的结构体实现,让我们大致浏览一下pythondeque的底层实现:
pythongihub仓库中,我门可以从cpythoncpython/Modules/_collectionsmodule.c)中找到deque的踪迹

#define BLOCKLEN 64
#define CENTER ((BLOCKLEN - 1) / 2)
#define MAXFREEBLOCKS 16

typedef struct BLOCK {
    struct BLOCK *leftlink;
    PyObject *data[BLOCKLEN];
    struct BLOCK *rightlink;
} block;

typedef struct {
    PyObject_VAR_HEAD
    block *leftblock;
    block *rightblock;
    Py_ssize_t leftindex;       /* 0 <= leftindex < BLOCKLEN */
    Py_ssize_t rightindex;      /* 0 <= rightindex < BLOCKLEN */
    size_t state;               /* incremented whenever the indices move */
    Py_ssize_t maxlen;          /* maxlen is -1 for unbounded deques */
    Py_ssize_t numfreeblocks;
    block *freeblocks[MAXFREEBLOCKS];
    PyObject *weakreflist;
} dequeobject;

如果学过数据结构的都知道,双端队列可以由动态数组双向列表实现

  • 如果使用数组实现双端队列,那么当从头结点删除数据时那么需要将后面的数据都要向前移动一格,在大批量数据的情况,从头结点删除的效率就很低,复杂度为O(n)
  • 如果使用双向链表实现双端队列,尽管能够解决动态数组从头结点效率低下的问题,但是算向链表会占用额外的空间、且随机访问的复杂度为 O ( n ) O(n) O(n),此外它具有链表的通病:缓存性能差(非连续存储空间,无法将整个存储块加载到内存,在内存中访问是跳跃式访问,不是连续访问,性能会差很多)

所以python底层选择了折中的方式,即使用链表保证在队头插入元素的高效性,也使用数组提高连续访问的高效能力,从上面的代码看,python的双端队列是一个双向链表块dequeobject,每个块block(BLOCK)是一个64大小的数组data,这样将其性能进行了折中。当然其插入删除的操作也变的更复杂。

deque接口

这些方法和构造函数都是Python标准库中collections.deque类的接口,用于操作双端队列。以下是对每个方法的详细介绍和用法示例:

1. __init__(self, iterable: Iterable[_T], maxlen: int | None = None) -> None

构造函数,用于初始化一个双端队列对象。

参数

  • iterable: 可选,初始化队列的可迭代对象。如果提供,该对象的元素会被按顺序添加到队列中。
  • maxlen: 可选,队列的最大长度。如果设置了最大长度,当队列满时,添加新元素会导致旧元素被丢弃。

示例

>>> from collections import deque
>>> deque()
deque([])
>>> deque([1,2,3])
deque([1, 2, 3])
>>> deque([1,2,3], maxlen=5)
deque([1, 2, 3], maxlen=5)
2. append(self, __x: _T) -> None

在队列的右端添加一个元素。

参数

  • __x: 要添加的元素。

示例

>>> dq = deque([1, 2, 3])
>>> dq.append(4)
>>> dq
deque([1, 2, 3, 4])
3. appendleft(self, __x: _T) -> None

在队列的左端添加一个元素。

参数

  • __x: 要添加的元素。

示例

>>> dq
deque([1, 2, 3, 4])
>>> dq.appendleft(0)
>>> dq
deque([0, 1, 2, 3, 4])
4. copy(self) -> Self

返回一个双端队列的浅拷贝。

示例

>>> dq.copy()
deque([0, 1, 2, 3, 4])
>>> dq.copy() == dq #判断相等的方法是元素相同
True
>>> dq_copy = dq.copy()
>>> dq_copy.append(5)
>>> dq_copy
deque([0, 1, 2, 3, 4, 5])
>>> dq
deque([0, 1, 2, 3, 4])
5. count(self, __x: _T) -> int

返回队列中指定元素的出现次数。

参数

  • __x: 要计数的元素。

示例

>>> dq
deque([0, 1, 2, 3, 4])
>>> dq.count(1)
1
6. extend(self, __iterable: Iterable[_T]) -> None

在队列的右端扩展一个可迭代对象的所有元素。

参数

  • __iterable: 包含要添加元素的可迭代对象。

示例

>>> dq.extend([3,4,5]) #接收可迭代类型
>>> dq
deque([0, 1, 2, 3, 4, 3, 4, 5])
7. extendleft(self, __iterable: Iterable[_T]) -> None

在队列的左端扩展一个可迭代对象的所有元素。注意,元素会按相反的顺序添加。

参数

  • __iterable: 包含要添加元素的可迭代对象。

示例

>>> dq
deque([0, 1, 2, 3, 4, 3, 4, 5])
>>> dq.extendleft((0,1,2)) # 并不是直接拼接在left,而是反向后拼接的
>>> dq
deque([2, 1, 0, 0, 1, 2, 3, 4, 3, 4, 5])
8. insert(self, __i: int, __x: _T) -> None

在指定位置插入一个元素。

参数

  • __i: 要插入的位置索引。如果索引超出范围,元素会被插入到队列的两端。
  • __x: 要插入的元素。

示例

>>> dq
deque([2, 1, 100, 0, 0, 1, 2, 3, 4, 3, 4, 5])
>>> dq.insert(20,100) # 超过的索引直接插在末尾
>>> dq
deque([2, 1, 100, 0, 0, 1, 2, 3, 4, 3, 4, 5, 100])
>>> dq.insert(-1,200) #支持负数索引
>>> dq
deque([2, 1, 100, 0, 0, 1, 2, 3, 4, 3, 4, 5, 200, 100])
>>> dq.insert(-20,50)
>>> dq
deque([50, 2, 1, 100, 0, 0, 1, 2, 3, 4, 3, 4, 5, 200, 100])
9. index(self, __x: _T, __start: int = 0, __stop: int = ...) -> int

返回指定值首次出现的索引。如果没有找到该值,将引发ValueError

参数

  • __x: 要查找的元素。
  • __start: 可选,搜索的起始位置。
  • __stop: 可选,搜索的结束位置。

示例

>>> dq.index(100)
3
>>> dq.index(100,start=5) # 非关键字参数
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: index() takes no keyword arguments
>>> dq.index(100,5)
14
10. pop(self) -> _T

移除并返回队列右端的元素。如果队列为空,将引发IndexError

示例

>>> dq = deque([1, 2, 3])
>>> dq.pop()
3
>>> dq.pop()
2
>>> dq.pop()
1
>>> dq.pop()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
IndexError: pop from an empty deque
>>>
11. popleft(self) -> _T

移除并返回队列左端的元素。如果队列为空,将引发IndexError

示例

>>> dq = deque([1, 2, 3])
>>> dq.popleft()
1
>>> dq.popleft()
2
>>> dq.popleft()
3
>>> dq.popleft()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
IndexError: pop from an empty deque
>>>
12. remove(self, __value: _T) -> None

移除队列中第一个匹配的元素。如果没有找到该值,将引发ValueError

参数

  • __value: 要移除的元素。

示例

>>> dq = deque([1, 2, 3,4,3])
>>> dq.remove(3)
>>> dq
deque([1, 2, 4, 3])
>>> dq.remove(0)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ValueError: deque.remove(x): x not in deque
13. rotate(self, __n: int = 1) -> None

将队列旋转 n 步。如果 n 为正数,元素将从右端移到左端。如果 n 为负数,元素将从左端移到右端。

参数

  • __n: 旋转的步数。

示例

>>> dq = deque([1, 2, 3, 4])
>>> dq.rotate(2) #向右旋转
>>> dq
deque([3, 4, 1, 2])
>>> dq.rotate(-1) #向左旋转
>>> dq
deque([4, 1, 2, 3])

这些方法和接口提供了丰富的操作来管理和操作双端队列,适用于需要在两端进行频繁插入和删除操作的数据结构。

线程安全队列Queue

Queue的底层是基于deque实现的,因为Queue的功能是deque中的部分,只要对其进行限制其双端性遵循FIFO就可以使用。下面是Queue的官方代码

class Queue:
    '''Create a queue object with a given maximum size.

    If maxsize is <= 0, the queue size is infinite.
    '''

    def __init__(self, maxsize=0):
        self.maxsize = maxsize
        self._init(maxsize)

        # mutex must be held whenever the queue is mutating.  All methods
        # that acquire mutex must release it before returning.  mutex
        # is shared between the three conditions, so acquiring and
        # releasing the conditions also acquires and releases mutex.
        self.mutex = threading.Lock()

        # Notify not_empty whenever an item is added to the queue; a
        # thread waiting to get is notified then.
        self.not_empty = threading.Condition(self.mutex)

        # Notify not_full whenever an item is removed from the queue;
        # a thread waiting to put is notified then.
        self.not_full = threading.Condition(self.mutex)

        # Notify all_tasks_done whenever the number of unfinished tasks
        # drops to zero; thread waiting to join() is notified to resume
        self.all_tasks_done = threading.Condition(self.mutex)
        self.unfinished_tasks = 0

    def task_done(self):
        '''Indicate that a formerly enqueued task is complete.

        Used by Queue consumer threads.  For each get() used to fetch a task,
        a subsequent call to task_done() tells the queue that the processing
        on the task is complete.

        If a join() is currently blocking, it will resume when all items
        have been processed (meaning that a task_done() call was received
        for every item that had been put() into the queue).

        Raises a ValueError if called more times than there were items
        placed in the queue.
        '''
        with self.all_tasks_done:
            unfinished = self.unfinished_tasks - 1
            if unfinished <= 0:
                if unfinished < 0:
                    raise ValueError('task_done() called too many times')
                self.all_tasks_done.notify_all()
            self.unfinished_tasks = unfinished

    def join(self):
        '''Blocks until all items in the Queue have been gotten and processed.

        The count of unfinished tasks goes up whenever an item is added to the
        queue. The count goes down whenever a consumer thread calls task_done()
        to indicate the item was retrieved and all work on it is complete.

        When the count of unfinished tasks drops to zero, join() unblocks.
        '''
        with self.all_tasks_done:
            while self.unfinished_tasks:
                self.all_tasks_done.wait()

    def qsize(self):
        '''Return the approximate size of the queue (not reliable!).'''
        with self.mutex:
            return self._qsize()

    def empty(self):
        '''Return True if the queue is empty, False otherwise (not reliable!).

        This method is likely to be removed at some point.  Use qsize() == 0
        as a direct substitute, but be aware that either approach risks a race
        condition where a queue can grow before the result of empty() or
        qsize() can be used.

        To create code that needs to wait for all queued tasks to be
        completed, the preferred technique is to use the join() method.
        '''
        with self.mutex:
            return not self._qsize()

    def full(self):
        '''Return True if the queue is full, False otherwise (not reliable!).

        This method is likely to be removed at some point.  Use qsize() >= n
        as a direct substitute, but be aware that either approach risks a race
        condition where a queue can shrink before the result of full() or
        qsize() can be used.
        '''
        with self.mutex:
            return 0 < self.maxsize <= self._qsize()

    def put(self, item, block=True, timeout=None):
        '''Put an item into the queue.

        If optional args 'block' is true and 'timeout' is None (the default),
        block if necessary until a free slot is available. If 'timeout' is
        a non-negative number, it blocks at most 'timeout' seconds and raises
        the Full exception if no free slot was available within that time.
        Otherwise ('block' is false), put an item on the queue if a free slot
        is immediately available, else raise the Full exception ('timeout'
        is ignored in that case).
        '''
        with self.not_full:
            if self.maxsize > 0:
                if not block:
                    if self._qsize() >= self.maxsize:
                        raise Full
                elif timeout is None:
                    while self._qsize() >= self.maxsize:
                        self.not_full.wait()
                elif timeout < 0:
                    raise ValueError("'timeout' must be a non-negative number")
                else:
                    endtime = time() + timeout
                    while self._qsize() >= self.maxsize:
                        remaining = endtime - time()
                        if remaining <= 0.0:
                            raise Full
                        self.not_full.wait(remaining)
            self._put(item)
            self.unfinished_tasks += 1
            self.not_empty.notify()

    def get(self, block=True, timeout=None):
        '''Remove and return an item from the queue.

        If optional args 'block' is true and 'timeout' is None (the default),
        block if necessary until an item is available. If 'timeout' is
        a non-negative number, it blocks at most 'timeout' seconds and raises
        the Empty exception if no item was available within that time.
        Otherwise ('block' is false), return an item if one is immediately
        available, else raise the Empty exception ('timeout' is ignored
        in that case).
        '''
        with self.not_empty:
            if not block:
                if not self._qsize():
                    raise Empty
            elif timeout is None:
                while not self._qsize():
                    self.not_empty.wait()
            elif timeout < 0:
                raise ValueError("'timeout' must be a non-negative number")
            else:
                endtime = time() + timeout
                while not self._qsize():
                    remaining = endtime - time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)
            item = self._get()
            self.not_full.notify()
            return item

    def put_nowait(self, item):
        '''Put an item into the queue without blocking.

        Only enqueue the item if a free slot is immediately available.
        Otherwise raise the Full exception.
        '''
        return self.put(item, block=False)

    def get_nowait(self):
        '''Remove and return an item from the queue without blocking.

        Only get an item if one is immediately available. Otherwise
        raise the Empty exception.
        '''
        return self.get(block=False)

    # Override these methods to implement other queue organizations
    # (e.g. stack or priority queue).
    # These will only be called with appropriate locks held

    # Initialize the queue representation
    def _init(self, maxsize):
        self.queue = deque()

    def _qsize(self):
        return len(self.queue)

    # Put a new item in the queue
    def _put(self, item):
        self.queue.append(item)

    # Get an item from the queue
    def _get(self):
        return self.queue.popleft()

    __class_getitem__ = classmethod(types.GenericAlias)

其重要体现队列的代码很少,大多数代码都在保证线程安全,如下:

    def _init(self, maxsize):
        self.queue = deque()

    def _qsize(self):
        return len(self.queue)

    # Put a new item in the queue
    def _put(self, item):
        self.queue.append(item)

    # Get an item from the queue
    def _get(self):
        return self.queue.popleft()

从上面可以看出,Queue的核心操作get、put都是基于deque实现的
其Queue类的定义如下:

class Queue(Generic[_T]):
    maxsize: int

    mutex: Lock  # undocumented
    not_empty: Condition  # undocumented
    not_full: Condition  # undocumented
    all_tasks_done: Condition  # undocumented
    unfinished_tasks: int  # undocumented
    # Despite the fact that `queue` has `deque` type,
    # we treat it as `Any` to allow different implementations in subtypes.
    queue: Any  # undocumented
    def __init__(self, maxsize: int = 0) -> None: ...
    def _init(self, maxsize: int) -> None: ...
    def empty(self) -> bool: ...
    def full(self) -> bool: ...
    def get(self, block: bool = True, timeout: float | None = None) -> _T: ...
    def get_nowait(self) -> _T: ...
    def _get(self) -> _T: ...
    def put(self, item: _T, block: bool = True, timeout: float | None = None) -> None: ...
    def put_nowait(self, item: _T) -> None: ...
    def _put(self, item: _T) -> None: ...
    def join(self) -> None: ...
    def qsize(self) -> int: ...
    def _qsize(self) -> int: ...
    def task_done(self) -> None: ...
    if sys.version_info >= (3, 9):
        def __class_getitem__(cls, item: Any) -> GenericAlias: ...

这些函数和属性是Python标准库中的queue.Queue类的接口。queue.Queue提供了一个线程安全的FIFO(First In, First Out)队列,常用于多线程环境下的任务管理。以下是每个函数和属性的详细介绍及其用法:

属性

  • maxsize: int
    队列的最大大小。如果 maxsize 为 0 或负数,则队列大小没有限制。

  • mutex: Lock
    一个锁对象,用于同步队列的访问,确保线程安全。

  • not_empty: Condition
    一个条件变量,用于在队列非空时进行通知。

  • not_full: Condition
    一个条件变量,用于在队列未满时进行通知。

  • all_tasks_done: Condition
    一个条件变量,用于在所有任务完成时进行通知。

  • unfinished_tasks: int
    未完成任务的计数。

  • queue: Any
    实际存储队列元素的容器,类型可以是任意的。该属性未公开文档。在_init中将其初始化。

方法

__init__(self, maxsize: int = 0) -> None

构造函数,用于初始化一个队列对象。

参数

  • maxsize: 队列的最大大小。如果为 0 或负数,则队列大小没有限制。

示例

from queue import Queue

q = Queue(maxsize=10)
_init(self, maxsize: int) -> None

初始化队列的内部方法。通常由构造函数调用,不直接使用。

参数

  • maxsize: 队列的最大大小。
empty(self) -> bool

如果队列为空,返回 True

示例

>>> from queue import Queue
>>> q = Queue()
>>> q.empty()
True
full(self) -> bool

如果队列已满,返回 True

示例

>>> q.put(1)
>>> q.put(2)
>>> q.full()
True
>>> q.put(3) # 满了之后添加元素会一直等待

在这里插入图片描述

get(self, block: bool = True, timeout: float | None = None) -> _T

从队列中移除并返回一个元素。

参数

  • block: 如果为 True,在队列为空时会阻塞,直到有元素可用。
  • timeout: 阻塞的最大时间(以秒为单位)。如果为 None,将一直阻塞直到有元素可用。

示例

>>> from queue import Queue
>>> q = Queue()
>>> q.put(1)
>>> q.put(2)
>>> q 
<queue.Queue object at 0x0000020DD481AD30>
>>> list(q) # Queue是不可迭代类型
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: 'Queue' object is not iterable
>>> print(q)
<queue.Queue object at 0x0000020DD481AD30>
>>> q.get() # FIFO
1
>>> q.get() # LILO
2
get_nowait(self) -> _T

非阻塞地从队列中移除并返回一个元素。如果队列为空,抛出 queue.Empty 异常。

示例

>>> q = Queue()
>>> q.put(1)
>>> q.get_nowait()
1
>>> q.get_nowait() # 不等待会报错 如果使用q.get()会一直等待,直到右元素入队
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "D:\Python\Python36\lib\queue.py", line 192, in get_nowait
    return self.get(block=False)
  File "D:\Python\Python36\lib\queue.py", line 161, in get
    raise Empty
queue.Empty

_get(self) -> _T

内部方法,用于从队列中获取一个元素。通常由 get 方法调用,不直接使用。

put(self, item: _T, block: bool = True, timeout: float | None = None) -> None

将一个元素放入队列。

参数

  • item: 要放入队列的元素。
  • block: 如果为 True,在队列已满时会阻塞,直到有空间可用。
  • timeout: 阻塞的最大时间(以秒为单位)。如果为 None,将一直阻塞直到有空间可用。

示例

>>> q = Queue(2)
>>> q.put(1)
>>> q.put(2)
>>> q.put(3,True,3) # 3秒之后报错
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "D:\Python\Python36\lib\queue.py", line 141, in put
    raise Full
queue.Full
put_nowait(self, item: _T) -> None

非阻塞地将一个元素放入队列。如果队列已满,抛出 queue.Full 异常。

示例

>>> q = Queue(2)
>>> q.put(1)
>>> q.put(2)
>>> q.put_nowait(3) # 立即报错
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "D:\Python\Python36\lib\queue.py", line 184, in put_nowait
    return self.put(item, block=False)
  File "D:\Python\Python36\lib\queue.py", line 130, in put
    raise Full
queue.Full
_put(self, item: _T) -> None

内部方法,用于将一个元素放入队列。通常由 put 方法调用,不直接使用。

join(self) -> None

阻塞主线程,直到所有的队列项被处理。每次从 get 中获取一个项目后,需要调用 task_done 来表明该项目已经完成处理。

示例

import threading
import time
from queue import Queue

def worker(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f'Processing {item}')
        time.sleep(1)
        q.task_done()
q = Queue()
thread = threading.Thread(target=worker, args=(q,))
thread.start()
for item in range(5):
    q.put(item)
q.join()  # 阻塞,直到所有任务完成
q.put(None)  # 停止工作线程
thread.join()
qsize(self) -> int

返回队列中的元素数量。

示例

>>> q = Queue()
>>> q.put(1)
>>> q.put(2)
>>> q.qsize()
2
>>> len(q) #并没有实现__len__
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: object of type 'Queue' has no len()
_qsize(self) -> int

内部方法,用于返回队列中的元素数量。通常由 qsize 方法调用,不直接使用。

task_done(self) -> None

表明一个队列中的任务已经完成。在每次从 get 中获取一个项目后调用。

示例

>>> q = Queue()
>>> q.put(1)
>>> q.get()
1
>>> q.task_done()

线程安全优先级队列

优先级队列继承了Queue对象,其主要是继承了它的线程安全性,其底层存储于deque没有任何关系了,就是数组,插入和弹出都换成了的上虑和下虑操作,如果你不知道堆是什么,请看我的另一篇博文图解堆队列算法,这里就不介绍了

class PriorityQueue(Queue):
    '''Variant of Queue that retrieves open entries in priority order (lowest first).

    Entries are typically tuples of the form:  (priority number, data).
    '''

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self):
        return len(self.queue)

    def _put(self, item):
        heappush(self.queue, item)

    def _get(self):
        return heappop(self.queue)

那么PriorityQueue的调用方法是继承了Queue,方法使用时一模一样的。

线程安全——栈

LifoQueue的线程安全也是套壳Queue的,只是将初始化换成数组,getput换成我们使用的普通的栈的形式,就是直接拿数组作为栈append为入栈,pop为出栈。

class LifoQueue(Queue):
    '''Variant of Queue that retrieves most recently added entries first.'''

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self):
        return len(self.queue)

    def _put(self, item):
        self.queue.append(item)

    def _get(self):
        return self.queue.pop()

那么LifoQueue的调用方法是继承了Queue,方法使用时一模一样的。

线程安全的简单的队列——SimpleQueue

SimpleQueue Queue的简化版,他没有了大小限制,但是也是线程安全的,剩余的putgetput_nowaitget_nowaitemptyqsize操作都相同。

class _PySimpleQueue:
    '''Simple, unbounded FIFO queue.

    This pure Python implementation is not reentrant.
    '''
    # Note: while this pure Python version provides fairness
    # (by using a threading.Semaphore which is itself fair, being based
    #  on threading.Condition), fairness is not part of the API contract.
    # This allows the C version to use a different implementation.

    def __init__(self):
        self._queue = deque()
        self._count = threading.Semaphore(0)

    def put(self, item, block=True, timeout=None):
        '''Put the item on the queue.

        The optional 'block' and 'timeout' arguments are ignored, as this method
        never blocks.  They are provided for compatibility with the Queue class.
        '''
        self._queue.append(item)
        self._count.release()

    def get(self, block=True, timeout=None):
        '''Remove and return an item from the queue.

        If optional args 'block' is true and 'timeout' is None (the default),
        block if necessary until an item is available. If 'timeout' is
        a non-negative number, it blocks at most 'timeout' seconds and raises
        the Empty exception if no item was available within that time.
        Otherwise ('block' is false), return an item if one is immediately
        available, else raise the Empty exception ('timeout' is ignored
        in that case).
        '''
        if timeout is not None and timeout < 0:
            raise ValueError("'timeout' must be a non-negative number")
        if not self._count.acquire(block, timeout):
            raise Empty
        return self._queue.popleft()

    def put_nowait(self, item):
        '''Put an item into the queue without blocking.

        This is exactly equivalent to `put(item, block=False)` and is only provided
        for compatibility with the Queue class.
        '''
        return self.put(item, block=False)

    def get_nowait(self):
        '''Remove and return an item from the queue without blocking.

        Only get an item if one is immediately available. Otherwise
        raise the Empty exception.
        '''
        return self.get(block=False)

    def empty(self):
        '''Return True if the queue is empty, False otherwise (not reliable!).'''
        return len(self._queue) == 0

    def qsize(self):
        '''Return the approximate size of the queue (not reliable!).'''
        return len(self._queue)

    __class_getitem__ = classmethod(types.GenericAlias)


if SimpleQueue is None:
    SimpleQueue = _PySimpleQueue

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1715842.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

增强团队建设和创造力的 6 个敏捷游戏

加入敏捷框架提供了对资源的访问和支持&#xff0c;可以帮助你的组织最大限度地发挥敏捷的优势。它还提供了一个与其他敏捷从业者联系的平台&#xff0c;以共享最佳实践并相互学习。 实践敏捷工作方法可以让团队按照自己的节奏&#xff0c;尽可能多地发挥创造力来追求目标&…

【408精华知识】关于存储系统,看这一篇就够了!

关于存储系统&#xff0c;其实是一个很完整的逻辑&#xff0c;在学完基础知识后&#xff0c;我们可以试着将整个存储系统的转化逻辑画下来&#xff0c;更形象化地去理解整个存储系统&#xff0c;如下图&#xff1a; 文章目录 &#xff08;一&#xff09;虚-->实&#xff08;…

Python面向对象学习笔记

Python面向对象编程 记录人&#xff1a; 李思成 时间&#xff1a; 2024/05/01至2024/05/23 课程来源&#xff1a; B站Python面向对象 1.面向对象编程概述 官方概述 程序是指令的集合&#xff0c;运行程序时&#xff0c;程序中的语句会变成一条或多条指令&#xff0c;然后…

算法练习——字符串

一确定字符串是否包含唯一字符 1.1涉及知识点 c的输入输出语法 cin>>s; cout<<"NO"; 如何定义字符串 切记&#xff1a;在[]中必须加数字——字符串最大长度&#xff0c;不然编译不通过 char s[101]; 如何获取字符串长度 char s[101];cin>>s;i…

Lua的几个特殊用法

&#xff1a;/.的区别 详细可以参考https://zhuanlan.zhihu.com/p/651619116。最重要的不同就是传递默认参数self。 通过.调用函数&#xff0c;传递self实例 通过 &#xff1a; 调用函数&#xff0c;传递self (不需要显示的传递self参数&#xff0c;默认就会传递&#xff0c;但…

STM32FLASH闪存

文章目录 前言首先来回顾一下存储器映像FLASH简介闪存模块组织Flash基本结构&#xff08;关系&#xff09;图Flash解锁使用指针访问存储器FLASH操作Flash全擦除Flash页擦除Flash写入 选项字节选项字节操作选项字节擦除选项字节写入 器件电子签名注意闪存控制寄存器一览 前言 本…

Bentham Science药学全文期刊库文献在家轻松下载

今天讲一个药学期刊库——Bentham Science药学全文期刊数据库 Bentham Science出版公司&#xff0c;作为全球范围内主要的制药、医疗、生物医学及化学出版商之一&#xff0c;出版期刊质量在全球享有广泛赞誉&#xff0c;其中有54种期刊被ISI收录&#xff0c;43%被PubMed收录&a…

Vue3 图片或视频下载跨域或文件损坏的解决方法

Vue3 图片或视频下载跨域或文件损坏的解决方法 修改跨域配置文件下载方法 修改跨域配置文件 修改vite.config.ts文件proxy里面写跨域地址&#xff0c;如下图&#xff0c;图片地址就是我们要跨域的目标地址&#xff1a; 下载方法 如下就是我取消上面那句后的报错 然后调用两…

「清新题精讲」CF249D - Donkey and Stars

更好的阅读体验 CF249D - Donkey and Stars Description 给定 n n n 个点 ( x i , y i ) (x_i,y_i) (xi​,yi​) 和 a , b , c , d a,b,c,d a,b,c,d&#xff0c;求出最多有多少个点依次连接而成的折线上线段的斜率在 ( a b , c d ) (\frac{a}{b},\frac{c}{d}) (ba​,dc​…

​代康伟的智慧引领,北汽蓝谷在新能源汽车市场的革新之路

在新能源汽车市场的浪潮中&#xff0c;北汽蓝谷以其独特的场景化造车理念&#xff0c;成为了行业的引领者。2023年&#xff0c;北汽蓝谷的极狐品牌在产品投放上明显加快&#xff0c;推出了多款基于场景化造车理念的新车&#xff0c;如极狐阿尔法S/T森林版和极狐汽车考拉等&…

huggingface的self.state与self.control来源(TrainerState与TrainerControl)

文章目录 前言一、huggingface的trainer的self.state与self.control初始化调用二、TrainerState源码解读(self.state)1、huggingface中self.state初始化参数2、TrainerState类的Demo 三、TrainerControl源码解读(self.control)总结 前言 在 Hugging Face 中&#xff0c;self.s…

力扣:226. 翻转二叉树

226. 翻转二叉树 已解答 简单 相关标签 相关企业 给你一棵二叉树的根节点 root &#xff0c;翻转这棵二叉树&#xff0c;并返回其根节点。 示例 1&#xff1a; 输入&#xff1a;root [4,2,7,1,3,6,9] 输出&#xff1a;[4,7,2,9,6,3,1]示例 2&#xff1a; 输入&#xff1a…

数组-检查数组内是否存在和为7的倍数的子序列

一、题目描述 二、解题思路 这里首先要分辨清楚是子序列还是子数组 原数组&#xff1a;[1,2,3,4,5] 子序列&#xff1a;元素和元素之间相对位置保持不变&#xff0c;但是在原数组中不一定连续&#xff0c;如&#xff1a;[1,3,4]&#xff1b; 子数组&#xff1a;元素元素之间保…

canfd与can2.0关系

canfd是can2.0的升级版&#xff0c; 支持canfd的设备就支持can2.0&#xff0c;但can2.0的设备不支持canfd 参考 是选CAN接口卡还是CANFD接口卡_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1Hh411K7Zn/?spm_id_from333.999.0.0 哪些STM32有CANFD外设 STM32G0, STM…

一款免费的软件媒体系统软件!!【送源码】

Jellyfin是一个免费的软件媒体系统&#xff0c;让您在管理和流媒体控制您的媒体。它是专有的Emby和Plex的替代品&#xff0c;通过多个应用程序从专用服务器向最终用户设备提供媒体。Jellvfin是Emby的3.5.2版本的后裔&#xff0c;并被移植到.NETCore框架中&#xff0c;以实现完全…

新火种AI|寻求合作伙伴,展开豪赌,推出神秘AI项目...苹果能否突破AI困境?

作者&#xff1a;小岩 编辑&#xff1a;彩云 2024年&#xff0c;伴随着AI技术的多次爆火&#xff0c;不仅各大科技巨头纷纷进入AI赛道展开角力&#xff0c;诸多智能手机厂商也纷纷加紧布局相关技术&#xff0c;推出众多AI手机。作为手机领域的龙头老大&#xff0c;苹果自然是…

基于单片机的步进电机控制系统研究

摘 要 &#xff1a; 近年来 &#xff0c; 步进电机凭借其定位精度高 、 使用方便 、 性价比高 、 容易控制等优点 &#xff0c; 在各领域受到广泛应用 。 文中利用C52 单片机设计了一种步进电机控制系统 &#xff0c; 介绍了其总体方案 、 主控制模块 、 驱动电路 、 键盘 、 晶…

洗地机哪个牌子最好用?十大名牌洗地机排行榜

作为一种新兴的智能家居产品&#xff0c;洗地机的市场规模已经突破了百亿大关。如此庞大的市场自然吸引了大量资本的涌入&#xff0c;许多品牌纷纷推出自己的洗地机产品&#xff0c;试图在这个竞争激烈的市场中占据一席之地。然而&#xff0c;面对如此多的品牌和型号&#xff0…

SelfKG论文翻译

SelfKG: Self-Supervised Entity Alignment in Knowledge Graphs SelfKG&#xff1a;知识图中的自监督实体对齐 ABSTRACT 实体对齐旨在识别不同知识图谱&#xff08;KG&#xff09;中的等效实体&#xff0c;是构建网络规模知识图谱的基本问题。在其发展过程中&#xff0c;标…

Java面试题分享-敏感词替换 java 版本

入职啦最近更新了一些后端笔试、面试题目&#xff0c;大家看看能快速实现吗&#xff1f; 关注 入职啦 微信公众号&#xff0c;每日更新有用的知识&#xff0c;Python&#xff0c;Java&#xff0c;Golang&#xff0c;Rust&#xff0c;javascript 等语言都有 不要再用replaceAll做…