一、概述
asyncio.queue 是 python 标准库 asyncio 模块中的异步队列实现,位于 lib/asyncio/queues.py。它的设计思路与线程安全的 queue.queue 类似,但专门为 async/await 协程模型 设计,不是线程安全的。
核心定位:在异步并发环境下,作为生产者-消费者模式的协调桥梁,实现协程之间的数据传递与流量控制。
import asyncio queue = asyncio.queue(maxsize=10) # 有界队列,容量10 queue = asyncio.queue() # 无界队列(maxsize=0)
二、内部实现原理
从 cpython 源码来看,asyncio.queue 的核心数据结构非常精巧:
class queue(mixins._loopboundmixin):
def __init__(self, maxsize=0):
self._maxsize = maxsize
self._getters = collections.deque() # 等待获取的 future 队列
self._putters = collections.deque() # 等待放入的 future 队列
self._unfinished_tasks = 0 # 未完成任务计数
self._finished = locks.event() # join() 阻塞用的事件
self._finished.set()
self._init(maxsize)
self._is_shutdown = false
关键设计
| 内部属性 | 作用 |
|---|---|
| _queue | 实际存储数据的 collections.deque(fifo) |
| _getters | 当队列为空时,get() 的调用者会创建一个 future 并挂入此 deque 等待 |
| _putters | 当队列已满时,put() 的调用者会创建一个 future 并挂入此 deque 等待 |
| _unfinished_tasks | 配合 task_done() / join() 实现任务追踪 |
| _finished | 一个 asyncio.event,当未完成任务归零时被 set,唤醒 join() |
协程挂起与唤醒机制
put() 和 get() 的阻塞并非真正的线程阻塞,而是通过 future + await 实现的协程挂起:
# put() 核心逻辑(简化)
async def put(self, item):
while self.full():
putter = self._get_loop().create_future()
self._putters.append(putter)
await putter # 协程在此挂起
self.put_nowait(item)
# get() 核心逻辑(简化)
async def get(self):
while self.empty():
getter = self._get_loop().create_future()
self._getters.append(getter)
await getter # 协程在此挂起
return self.get_nowait()
当对面操作发生时,通过 _wakeup_next() 唤醒等待者:
def _wakeup_next(self, waiters):
while waiters:
waiter = waiters.popleft()
if not waiter.done():
waiter.set_result(none) # 唤醒一个等待的协程
break
这种 逐个唤醒(one-by-one wakeup) 设计避免了"惊群效应",保证公平性:先等待的协程先被唤醒。
三、完整 api 详解
3.1 构造函数
asyncio.queue(maxsize=0)
maxsize <= 0:无界队列,put()永远不会阻塞(内存允许的前提下)maxsize > 0:有界队列,队列满时put()会挂起等待
3.2 核心方法
async put(item)
将元素放入队列。如果队列已满,挂起当前协程直到有空间。
put_nowait(item)
非阻塞放入。队列满时立即抛出 queuefull 异常。
async get()
从队列取出并返回一个元素。如果队列为空,挂起当前协程直到有元素可用。
get_nowait()
非阻塞获取。队列空时立即抛出 queueempty 异常。
3.3 任务追踪方法
task_done()
标记一个之前通过 get() 获取的任务已完成。内部将 _unfinished_tasks 减 1。当计数归零时,触发 _finished 事件,解除 join() 的阻塞。
注意:调用次数超过 put() 的次数会抛出 valueerror。
async join()
阻塞直到队列中所有任务都被处理完毕(每个 put 进去的任务都收到了对应的 task_done() 调用)。
# 典型用法 await queue.join() # 等待所有任务完成
3.4 状态查询
| 方法 | 说明 |
|---|---|
| qsize() | 返回队列当前元素数量 |
| empty() | 队列是否为空 |
| full() | 队列是否已满(maxsize=0 时永远返回 false) |
| maxsize | 属性,返回队列容量上限 |
与 threading.queue 不同,由于单线程事件循环的特性,qsize() 的返回值在 await 之前是可靠的——不会被其他线程中断。
3.5 关闭方法(python 3.13+)
shutdown(immediate=false)
queue.shutdown() # 优雅关闭:允许消费完剩余元素 queue.shutdown(immediate=true) # 立即关闭:清空队列,中断所有等待
- 优雅关闭(immediate=false):
- put() 立即抛出 queueshutdown
- get() 可以继续取出已有元素,队列空后才抛出 queueshutdown
- 立即关闭(immediate=true):
- 队列被清空,_unfinished_tasks 相应减少
- 所有阻塞的 get() 和 put() 立即被唤醒并抛出 queueshutdown
- 如果 _unfinished_tasks 归零,join() 也会被解除阻塞
四、三种队列变体
asyncio 提供了三种队列,它们通过重写 _init、_get、_put 三个内部方法实现不同的出队策略:
4.1queue(fifo 先进先出)
def _init(self, maxsize):
self._queue = collections.deque()
def _put(self, item):
self._queue.append(item)
def _get(self):
return self._queue.popleft()
4.2priorityqueue(优先级队列)
def _init(self, maxsize):
self._queue = []
def _put(self, item, heappush=heapq.heappush):
heappush(self._queue, item)
def _get(self, heappop=heapq.heappop):
return heappop(self._queue)
元素通常为 (priority, data) 元组,数值越小优先级越高。
4.3lifoqueue(后进先出 / 栈)
def _init(self, maxsize):
self._queue = []
def _put(self, item):
self._queue.append(item)
def _get(self):
return self._queue.pop() # 从尾部取出
五、异常体系
| 异常 | 触发时机 |
|---|---|
| asyncio.queueempty | get_nowait() 在空队列上调用 |
| asyncio.queuefull | put_nowait() 在满队列上调用 |
| asyncio.queueshutdown | 在已 shutdown() 的队列上调用 put() 或 get()(3.13+) |
六、经典模式:生产者-消费者
import asyncio
import random
async def producer(queue: asyncio.queue, name: str):
for i in range(5):
item = f"{name}-item-{i}"
await queue.put(item)
print(f"[{name}] 生产: {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
async def consumer(queue: asyncio.queue, name: str):
while true:
item = await queue.get()
print(f" [{name}] 消费: {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def main():
queue = asyncio.queue(maxsize=3) # 有界队列,产生背压
# 启动生产者和消费者
producers = [asyncio.create_task(producer(queue, f"p{i}")) for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, f"c{i}")) for i in range(3)]
# 等待所有生产完成
await asyncio.gather(*producers)
# 等待队列中所有任务被消费完
await queue.join()
# 取消消费者(它们在 await queue.get() 处无限等待)
for c in consumers:
c.cancel()
asyncio.run(main())
关键要点
- maxsize 实现背压(backpressure):当消费者处理速度跟不上时,生产者自动被挂起,防止内存无限增长
- task_done() + join() 配对使用:确保每个任务都被处理完毕后再收尾
- 消费者用 while true + cancel() 模式:消费者持续监听,生产结束后由外部取消
七、超时控制
asyncio.queue 的方法本身不接受 timeout 参数,需要配合 asyncio.wait_for() 使用:
try:
item = await asyncio.wait_for(queue.get(), timeout=5.0)
except asyncio.timeouterror:
print("等待超时,队列中没有新数据")
try:
await asyncio.wait_for(queue.put(item), timeout=3.0)
except asyncio.timeouterror:
print("队列已满,放入超时")
八、线程安全问题与跨线程使用
asyncio.queue 不是线程安全的。 如果你需要从非 asyncio 线程向队列中投递任务,必须使用 loop.call_soon_threadsafe():
# 从其他线程安全地放入数据 loop.call_soon_threadsafe(queue.put_nowait, item)
或者使用 asyncio.run_coroutine_threadsafe() 调用异步方法:
future = asyncio.run_coroutine_threadsafe(queue.put(item), loop) future.result() # 阻塞等待完成
九、与queue.queue的对比
| 特性 | queue.queue | asyncio.queue |
|---|---|---|
| 线程安全 | 是(内置锁) | 否(单线程事件循环) |
| 阻塞方式 | 线程阻塞(真正挂起 os 线程) | 协程挂起(让出事件循环) |
| timeout 参数 | get(timeout=5) | 需配合 asyncio.wait_for() |
| qsize() 可靠性 | 不可靠(多线程竞争) | 在 await 点之间可靠 |
| join() / task_done() | 支持 | 支持 |
| shutdown() | 3.13+ 支持 | 3.13+ 支持 |
| 适用场景 | 多线程 | asyncio 协程 |
十、高级用法与最佳实践
10.1 优雅关闭(python 3.13+)
async def main():
queue = asyncio.queue()
# ... 启动 workers ...
# 生产完毕后,优雅关闭队列
queue.shutdown()
await queue.join()
10.2 python 3.12 及之前的优雅关闭——哨兵值模式
sentinel = object()
async def consumer(queue):
while true:
item = await queue.get()
if item is sentinel:
queue.task_done()
break
process(item)
queue.task_done()
# 向每个 consumer 发送一个哨兵
for _ in range(num_consumers):
await queue.put(sentinel)
await queue.join()
10.3 扇出/扇入(fan-out / fan-in)
async def pipeline():
raw_queue = asyncio.queue()
processed_queue = asyncio.queue()
# stage 1: 多个 fetcher 往 raw_queue 放数据
fetchers = [asyncio.create_task(fetch(raw_queue)) for _ in range(5)]
# stage 2: 多个 processor 从 raw_queue 取出,处理后放入 processed_queue
processors = [asyncio.create_task(process(raw_queue, processed_queue)) for _ in range(3)]
# stage 3: 单个 writer 从 processed_queue 取出并写入
writer = asyncio.create_task(write(processed_queue))
10.4 注意事项
- 无界队列的内存风险:
maxsize=0时生产速度远大于消费速度会导致内存暴涨,生产环境应始终设置合理的maxsize - 忘记调用
task_done():会导致join()永远阻塞 - 异常处理:消费者中如果处理逻辑抛出异常,必须确保
task_done()仍被调用(用try/finally):async def safe_consumer(queue): while true: item = await queue.get() try: await process(item) finally: queue.task_done() - 不要混用线程:除非通过
call_soon_threadsafe或run_coroutine_threadsafe
十一、总结
asyncio.queue 是异步编程中不可或缺的协调原语。它的设计简洁优雅——底层通过 future 实现挂起/唤醒,通过 deque 保证 fifo 公平性,通过 _unfinished_tasks + event 实现 join 语义。理解它的内部机制,能帮助你在构建异步任务调度器、流水线处理、背压控制等场景中做出更好的架构决策。
到此这篇关于python中asyncio.queue异步队列的实现的文章就介绍到这了,更多相关python asyncio.queue异步队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论