event loop 执行机制
event loop 本质是单线程内的无限循环调度器,不断从任务队列取出就绪的任务执行:
while true:
events = poll_for_ready_events() # 检查就绪事件(io完成、定时器到期、回调投递)
for event in events:
execute(event.callback) # 逐个执行
协程遇到 await 时挂起自身、让出控制权给 loop,loop 去执行其他就绪任务;
等待的操作完成后,loop 恢复该协程继续执行。
一个线程最多运行一个 event loop。
三个核心 api 对比
1.asyncio.create_task(coro)
在当前 loop 内并发调度一个协程,立即返回 asyncio.task,不阻塞。
async def main():
task1 = asyncio.create_task(fetch_a()) # 立即调度,不等完成
task2 = asyncio.create_task(fetch_b()) # 两个协程并发运行
result_a = await task1 # 需要结果时再 await
result_b = await task2
适用场景: 同一个 loop 内同时跑多个异步 io 任务(并发请求、并发监听等)。
2.asyncio.to_thread(func, *args)
把同步阻塞函数丢到线程池执行,当前协程挂起等待结果,但 event loop 不阻塞。
async def main():
# time.sleep 是同步阻塞的,直接调用会卡死整个 loop
await asyncio.to_thread(time.sleep, 5) # 丢到线程池,loop 继续处理别的
# 典型场景:cpu 密集计算、没有 async 版本的阻塞 io 库
data = await asyncio.to_thread(read_big_file, path)适用场景: 调用没有 async 版本的阻塞函数(文件 io、cpu 计算、旧的同步库),
避免卡死 event loop。
await会卡住当前协程吗?
当前协程会等待,但 event loop 和其他协程不受影响:
| 写法 | 当前协程 | event loop | 其他协程 |
|---|---|---|---|
time.sleep(5) | 卡 | 卡 | 全卡 |
await asyncio.to_thread(time.sleep, 5) | 等 | 不卡 | 正常跑 |
await asyncio.sleep(5) | 等 | 不卡 | 正常跑 |
3.asyncio.run_coroutine_threadsafe(coro, loop)
从另一个线程向指定 loop 投递一个协程,返回 concurrent.futures.future。
# 在非 asyncio 的普通线程中: future = asyncio.run_coroutine_threadsafe(play_audio(data), main_loop) result = future.result(timeout=10) # 阻塞等待结果(在当前线程阻塞,不影响 loop)
适用场景: 普通线程需要让某个 event loop 执行一个协程(跨线程投递异步任务)。
相关 api:call_soon_threadsafe
loop.call_soon_threadsafe(callback, *args)
与 run_coroutine_threadsafe 类似,但投递的是普通回调函数而非协程。
# 从 websocket 线程把音频流注册到主线程的 mixer self._main_loop.call_soon_threadsafe(mixer.add_stream, stream)
总结对比表
| api | 调用位置 | 投递目标 | 方向 | 返回值 |
|---|---|---|---|---|
create_task(coro) | 协程内 | 协程 → 同一个 loop | async → async | asyncio.task |
to_thread(func) | 协程内 | 同步函数 → 线程池 | async → sync → async | awaitable |
run_coroutine_threadsafe(coro, loop) | 任意线程 | 协程 → 另一个线程的 loop | sync → async | concurrent.futures.future |
call_soon_threadsafe(cb) | 任意线程 | 回调 → 另一个线程的 loop | sync → sync(callback) | none |
速记口诀
create_task → 同 loop 内并发跑协程 to_thread → 把同步阻塞的东西丢出去,别卡我的 loop run_coroutine_threadsafe → 从外面往别人的 loop 里塞协程 call_soon_threadsafe → 从外面往别人的 loop 里塞回调
项目中的实际应用
本项目 audioreceiverserver 中存在两个线程各自运行一个 event loop:
主线程 event loop websocket 线程 event loop ┌───────────────────┐ ┌───────────────────┐ │ mixer 协程 │ │ ws.recv() 协程 a │ │ 其他音频处理 ... │ ◄── call_soon │ ws.recv() 协程 b │ │ │ threadsafe │ ws.recv() 协程 c │ └───────────────────┘ └───────────────────┘
- websocket 线程的 loop 同时
await多个连接的recv(),谁来数据就恢复谁 - 需要操作主线程的 mixer 时,通过
call_soon_threadsafe把回调投递到主线程 loop
到此这篇关于python asyncio常规操作记录的文章就介绍到这了,更多相关python asyncio内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论