当前位置: 代码网 > it编程>前端脚本>Python > Python中asyncio.Queue异步队列的实现

Python中asyncio.Queue异步队列的实现

2026年05月15日 Python 我要评论
一、概述asyncio.queue 是 python 标准库 asyncio 模块中的异步队列实现,位于 lib/asyncio/queues.py。它的设计思路与线程安全的 queue.queue

一、概述

asyncio.queue 是 python 标准库 asyncio 模块中的异步队列实现,位于 lib/asyncio/queues.py。它的设计思路与线程安全的 queue.queue 类似,但专门为 async/await 协程模型 设计,不是线程安全的

核心定位:在异步并发环境下,作为生产者-消费者模式的协调桥梁,实现协程之间的数据传递与流量控制。

import asyncio

queue = asyncio.queue(maxsize=10)  # 有界队列,容量10
queue = asyncio.queue()            # 无界队列(maxsize=0)

二、内部实现原理

从 cpython 源码来看,asyncio.queue 的核心数据结构非常精巧:

class queue(mixins._loopboundmixin):
    def __init__(self, maxsize=0):
        self._maxsize = maxsize
        self._getters = collections.deque()   # 等待获取的 future 队列
        self._putters = collections.deque()   # 等待放入的 future 队列
        self._unfinished_tasks = 0            # 未完成任务计数
        self._finished = locks.event()        # join() 阻塞用的事件
        self._finished.set()
        self._init(maxsize)
        self._is_shutdown = false

关键设计

内部属性作用
_queue实际存储数据的 collections.deque(fifo)
_getters当队列为空时,get() 的调用者会创建一个 future 并挂入此 deque 等待
_putters当队列已满时,put() 的调用者会创建一个 future 并挂入此 deque 等待
_unfinished_tasks配合 task_done() / join() 实现任务追踪
_finished一个 asyncio.event,当未完成任务归零时被 set,唤醒 join()

协程挂起与唤醒机制

put()get() 的阻塞并非真正的线程阻塞,而是通过 future + await 实现的协程挂起:

# put() 核心逻辑(简化)
async def put(self, item):
    while self.full():
        putter = self._get_loop().create_future()
        self._putters.append(putter)
        await putter                  # 协程在此挂起
    self.put_nowait(item)

# get() 核心逻辑(简化)
async def get(self):
    while self.empty():
        getter = self._get_loop().create_future()
        self._getters.append(getter)
        await getter                  # 协程在此挂起
    return self.get_nowait()

当对面操作发生时,通过 _wakeup_next() 唤醒等待者:

def _wakeup_next(self, waiters):
    while waiters:
        waiter = waiters.popleft()
        if not waiter.done():
            waiter.set_result(none)   # 唤醒一个等待的协程
            break

这种 逐个唤醒(one-by-one wakeup) 设计避免了"惊群效应",保证公平性:先等待的协程先被唤醒。

三、完整 api 详解

3.1 构造函数

asyncio.queue(maxsize=0)
  • maxsize <= 0:无界队列,put() 永远不会阻塞(内存允许的前提下)
  • maxsize > 0:有界队列,队列满时 put() 会挂起等待

3.2 核心方法

async put(item)

将元素放入队列。如果队列已满,挂起当前协程直到有空间。

put_nowait(item)

非阻塞放入。队列满时立即抛出 queuefull 异常。

async get()

从队列取出并返回一个元素。如果队列为空,挂起当前协程直到有元素可用。

get_nowait()

非阻塞获取。队列空时立即抛出 queueempty 异常。

3.3 任务追踪方法

task_done()

标记一个之前通过 get() 获取的任务已完成。内部将 _unfinished_tasks 减 1。当计数归零时,触发 _finished 事件,解除 join() 的阻塞。

注意:调用次数超过 put() 的次数会抛出 valueerror

async join()

阻塞直到队列中所有任务都被处理完毕(每个 put 进去的任务都收到了对应的 task_done() 调用)。

# 典型用法
await queue.join()   # 等待所有任务完成

3.4 状态查询

方法说明
qsize()返回队列当前元素数量
empty()队列是否为空
full()队列是否已满(maxsize=0 时永远返回 false)
maxsize属性,返回队列容量上限

threading.queue 不同,由于单线程事件循环的特性,qsize() 的返回值在 await 之前是可靠的——不会被其他线程中断。

3.5 关闭方法(python 3.13+)

shutdown(immediate=false)

queue.shutdown()              # 优雅关闭:允许消费完剩余元素
queue.shutdown(immediate=true) # 立即关闭:清空队列,中断所有等待
  • 优雅关闭(immediate=false):
    • put() 立即抛出 queueshutdown
    • get() 可以继续取出已有元素,队列空后才抛出 queueshutdown
  • 立即关闭(immediate=true):
    • 队列被清空,_unfinished_tasks 相应减少
    • 所有阻塞的 get() 和 put() 立即被唤醒并抛出 queueshutdown
    • 如果 _unfinished_tasks 归零,join() 也会被解除阻塞

四、三种队列变体

asyncio 提供了三种队列,它们通过重写 _init_get_put 三个内部方法实现不同的出队策略:

4.1queue(fifo 先进先出)

def _init(self, maxsize):
    self._queue = collections.deque()
def _put(self, item):
    self._queue.append(item)
def _get(self):
    return self._queue.popleft()

4.2priorityqueue(优先级队列)

def _init(self, maxsize):
    self._queue = []
def _put(self, item, heappush=heapq.heappush):
    heappush(self._queue, item)
def _get(self, heappop=heapq.heappop):
    return heappop(self._queue)

元素通常为 (priority, data) 元组,数值越小优先级越高

4.3lifoqueue(后进先出 / 栈)

def _init(self, maxsize):
    self._queue = []
def _put(self, item):
    self._queue.append(item)
def _get(self):
    return self._queue.pop()   # 从尾部取出

五、异常体系

异常触发时机
asyncio.queueemptyget_nowait() 在空队列上调用
asyncio.queuefullput_nowait() 在满队列上调用
asyncio.queueshutdown在已 shutdown() 的队列上调用 put() 或 get()(3.13+)

六、经典模式:生产者-消费者

import asyncio
import random

async def producer(queue: asyncio.queue, name: str):
    for i in range(5):
        item = f"{name}-item-{i}"
        await queue.put(item)
        print(f"[{name}] 生产: {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))

async def consumer(queue: asyncio.queue, name: str):
    while true:
        item = await queue.get()
        print(f"  [{name}] 消费: {item}")
        await asyncio.sleep(random.uniform(0.2, 0.8))
        queue.task_done()

async def main():
    queue = asyncio.queue(maxsize=3)  # 有界队列,产生背压

    # 启动生产者和消费者
    producers = [asyncio.create_task(producer(queue, f"p{i}")) for i in range(2)]
    consumers = [asyncio.create_task(consumer(queue, f"c{i}")) for i in range(3)]

    # 等待所有生产完成
    await asyncio.gather(*producers)
    # 等待队列中所有任务被消费完
    await queue.join()

    # 取消消费者(它们在 await queue.get() 处无限等待)
    for c in consumers:
        c.cancel()

asyncio.run(main())

关键要点

  1. maxsize 实现背压(backpressure):当消费者处理速度跟不上时,生产者自动被挂起,防止内存无限增长
  2. task_done() + join() 配对使用:确保每个任务都被处理完毕后再收尾
  3. 消费者用 while true + cancel() 模式:消费者持续监听,生产结束后由外部取消

七、超时控制

asyncio.queue 的方法本身不接受 timeout 参数,需要配合 asyncio.wait_for() 使用:

try:
    item = await asyncio.wait_for(queue.get(), timeout=5.0)
except asyncio.timeouterror:
    print("等待超时,队列中没有新数据")
try:
    await asyncio.wait_for(queue.put(item), timeout=3.0)
except asyncio.timeouterror:
    print("队列已满,放入超时")

八、线程安全问题与跨线程使用

asyncio.queue 不是线程安全的。 如果你需要从非 asyncio 线程向队列中投递任务,必须使用 loop.call_soon_threadsafe()

# 从其他线程安全地放入数据
loop.call_soon_threadsafe(queue.put_nowait, item)

或者使用 asyncio.run_coroutine_threadsafe() 调用异步方法:

future = asyncio.run_coroutine_threadsafe(queue.put(item), loop)
future.result()  # 阻塞等待完成

九、与queue.queue的对比

特性queue.queueasyncio.queue
线程安全是(内置锁)否(单线程事件循环)
阻塞方式线程阻塞(真正挂起 os 线程)协程挂起(让出事件循环)
timeout 参数get(timeout=5)需配合 asyncio.wait_for()
qsize() 可靠性不可靠(多线程竞争)在 await 点之间可靠
join() / task_done()支持支持
shutdown()3.13+ 支持3.13+ 支持
适用场景多线程asyncio 协程

十、高级用法与最佳实践

10.1 优雅关闭(python 3.13+)

async def main():
    queue = asyncio.queue()
    # ... 启动 workers ...

    # 生产完毕后,优雅关闭队列
    queue.shutdown()
    await queue.join()

10.2 python 3.12 及之前的优雅关闭——哨兵值模式

sentinel = object()

async def consumer(queue):
    while true:
        item = await queue.get()
        if item is sentinel:
            queue.task_done()
            break
        process(item)
        queue.task_done()

# 向每个 consumer 发送一个哨兵
for _ in range(num_consumers):
    await queue.put(sentinel)
await queue.join()

10.3 扇出/扇入(fan-out / fan-in)

async def pipeline():
    raw_queue = asyncio.queue()
    processed_queue = asyncio.queue()

    # stage 1: 多个 fetcher 往 raw_queue 放数据
    fetchers = [asyncio.create_task(fetch(raw_queue)) for _ in range(5)]
    # stage 2: 多个 processor 从 raw_queue 取出,处理后放入 processed_queue
    processors = [asyncio.create_task(process(raw_queue, processed_queue)) for _ in range(3)]
    # stage 3: 单个 writer 从 processed_queue 取出并写入
    writer = asyncio.create_task(write(processed_queue))

10.4 注意事项

  1. 无界队列的内存风险maxsize=0 时生产速度远大于消费速度会导致内存暴涨,生产环境应始终设置合理的 maxsize
  2. 忘记调用 task_done():会导致 join() 永远阻塞
  3. 异常处理:消费者中如果处理逻辑抛出异常,必须确保 task_done() 仍被调用(用 try/finally):
    async def safe_consumer(queue):
        while true:
            item = await queue.get()
            try:
                await process(item)
            finally:
                queue.task_done()
    
  4. 不要混用线程:除非通过 call_soon_threadsaferun_coroutine_threadsafe

十一、总结

asyncio.queue 是异步编程中不可或缺的协调原语。它的设计简洁优雅——底层通过 future 实现挂起/唤醒,通过 deque 保证 fifo 公平性,通过 _unfinished_tasks + event 实现 join 语义。理解它的内部机制,能帮助你在构建异步任务调度器、流水线处理、背压控制等场景中做出更好的架构决策。

到此这篇关于python中asyncio.queue异步队列的实现的文章就介绍到这了,更多相关python asyncio.queue异步队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com