✅ 一、背景:为什么需要loop.run_in_executor
在 python 的异步编程(asyncio)中,协程可以并发运行,提高效率,但它们依赖于“非阻塞的 i/o”。如果你在协程中调用了一个阻塞的操作(比如 time.sleep()、requests.get() 等),它会阻塞整个事件循环,导致其他协程也无法继续执行。
为了解决这个问题,python 提供了 loop.run_in_executor(),允许你把阻塞的同步代码“放在后台线程或进程中执行”,从而不影响主事件循环。
🧪 二、基本用法和执行流程
✅ 基本结构:
loop.run_in_executor(executor, func, *args)
executor: 指定使用的执行器(线程池或进程池),为none时使用默认线程池。func: 要执行的同步阻塞函数。*args: 传递给函数的参数。
✅ 最小示例(阻塞函数放到线程中):
import asyncio
import time
def blocking_func(name):
print(f"开始阻塞任务 {name}")
time.sleep(3)
print(f"结束阻塞任务 {name}")
return f"{name} done"
async def main():
loop = asyncio.get_running_loop()
# 把阻塞函数丢进默认线程池
result = await loop.run_in_executor(none, blocking_func, "任务a")
print(result)
asyncio.run(main())⏳ 输出:
开始阻塞任务 任务a
结束阻塞任务 任务a
任务a done
注意:虽然
blocking_func()是阻塞的,但不会阻塞asyncio主事件循环,因此你可以同时运行其他协程。
🧵 三、线程池 vs 进程池
1. threadpoolexecutor(默认)
- 用于 i/o 密集型任务(网络请求、文件 i/o 等)
- 启动快,线程共享内存,效率高
run_in_executor(none, ...)就是使用默认线程池
2. processpoolexecutor
- 用于 cpu 密集型任务(图像处理、数据加密、科学计算等)
- 每个进程独立内存,更耗资源,但避免 gil 限制
- 用于充分利用多核 cpu
示例(使用 processpoolexecutor):
from concurrent.futures import processpoolexecutor
def compute(n):
return sum(i * i for i in range(n))
async def main():
loop = asyncio.get_running_loop()
with processpoolexecutor() as executor:
result = await loop.run_in_executor(executor, compute, 10_000_000)
print(result)
asyncio.run(main())📊 四、run_in_executor 与协程的差异对比
| 特性 | 协程 (async def) | run_in_executor |
|---|---|---|
| 是否阻塞事件循环 | 否 | 否 |
| 适用于 | 异步 i/o 操作 | 同步阻塞操作 |
| 是否需要线程或进程 | 否 | 是(线程或进程池) |
| 是否自动并发 | 是 | 是 |
| 是否可中断 | 可以使用 asyncio.cancellederror | 线程执行不能中断 |
🔁 五、与其他异步写法对比
❌ 错误做法(会阻塞整个事件循环):
import asyncio
import time
async def wrong():
time.sleep(2) # 阻塞整个事件循环!
print("完成")
asyncio.run(wrong())✅ 正确做法(使用run_in_executor):
async def correct():
loop = asyncio.get_running_loop()
await loop.run_in_executor(none, time.sleep, 2)
print("完成")
🔐 六、进阶用法:并行多个任务
import asyncio
import time
def task(name, duration):
print(f"开始 {name}")
time.sleep(duration)
print(f"结束 {name}")
return name
async def main():
loop = asyncio.get_running_loop()
tasks = [
loop.run_in_executor(none, task, 'a', 2),
loop.run_in_executor(none, task, 'b', 3),
loop.run_in_executor(none, task, 'c', 1),
]
results = await asyncio.gather(*tasks)
print("全部完成:", results)
asyncio.run(main())⏳ 输出(并行):
开始 a
开始 b
开始 c
结束 c
结束 a
结束 b
全部完成: ['a', 'b', 'c']
⚠️ 七、注意事项与潜在陷阱
| 问题 | 描述与解决 |
|---|---|
| 共享资源问题 | 多线程操作同一个变量可能会出错,考虑加锁或使用 asyncio.queue |
| 线程池大小限制 | 默认线程池大小有限(通常为 cpu 核心数的 5 倍),可手动调整 |
| 异常处理 | 在线程中抛出的异常必须在主线程中 await 时捕获 |
| 进程池不能用 lambda | 进程池中的函数必须是可序列化的,不能是匿名函数或本地函数 |
| 不能中断线程任务 | 一旦 run_in_executor 开始执行函数,无法强制中断线程任务 |
💼 八、真实场景举例
示例 1:读取大文件(i/o 密集型)
def read_file(path):
with open(path, 'r') as f:
return f.read()
data = await loop.run_in_executor(none, read_file, "bigfile.txt")示例 2:调用同步网络库(如requests)
import requests
def fetch(url):
response = requests.get(url)
return response.text
html = await loop.run_in_executor(none, fetch, "https://example.com")实际建议:使用
httpx.asyncclient替代requests
🧩 九、封装通用工具函数(推荐写法)
import asyncio
from typing import callable, any
from functools import partial
async def to_thread(func: callable, *args, **kwargs) -> any:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(none, partial(func, *args, **kwargs))
# 使用方式
result = await to_thread(my_blocking_function, arg1, arg2)✅ 十、python 3.9+ 新特性:asyncio.to_thread
从 python 3.9 起,你可以直接用内置的 asyncio.to_thread() 来代替 run_in_executor(none, ...),更加简洁:
import asyncio
def blocking_func():
...
await asyncio.to_thread(blocking_func)等价于:
await loop.run_in_executor(none, blocking_func)
📘 总结
| 特点 | 内容 |
|---|---|
| 功能 | 异步执行阻塞的同步函数 |
| 用法 | await loop.run_in_executor(executor, func, *args) |
| 默认线程池 | executor=none |
| 用于场景 | 文件 i/o、网络请求、同步数据库操作、cpu 密集计算 |
| 替代方案 | python 3.9+: asyncio.to_thread() |
到此这篇关于python中的loop.run_in_executor基本用法的文章就介绍到这了,更多相关python loop.run_in_executor内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论