下面是一份“由浅入深”的 asyncio 实战手册。先解释核心概念,再给出可直接运行的、循序渐进的示例代码。全部示例都只依赖标准库(除少数标注处),适配 python 3.10+(如使用 taskgroup、asyncio.timeout() 的段落需要 3.11+)。
1. 基础概念速览
async def:定义协程函数(coroutine function)。await:在协程中等待另一个可等待对象(协程、asyncio.task、asyncio.future等)。- 事件循环(event loop):调度协程、i/o 事件的核心;用
asyncio.run(main())启动。 - 并发 ≠ 并行:asyncio 是单线程协作式并发,靠 i/o 等待时让出控制权。
- 任务(task):把协程“提交给”事件循环让其并发执行:
asyncio.create_task(coro())。
2. 最小可运行示例:async/await+asyncio.run
# demo_basic.py
import asyncio
async def io_job(name, delay):
print(f"[{name}] start")
await asyncio.sleep(delay) # 模拟i/o等待
print(f"[{name}] done after {delay}s")
return name, delay
async def main():
r1 = await io_job("a", 1)
r2 = await io_job("b", 2) # 串行等待,总耗时约3秒
print("results:", r1, r2)
if __name__ == "__main__":
asyncio.run(main())
3. 并发执行:create_task、gather、as_completed
# demo_concurrency.py
import asyncio, random
async def fetch(i):
d = random.uniform(0.5, 2.0)
await asyncio.sleep(d)
return f"task-{i}", round(d, 2)
async def main():
# 3.1 create_task + await
t1 = asyncio.create_task(fetch(1))
t2 = asyncio.create_task(fetch(2))
r1 = await t1
r2 = await t2
print("create_task results:", r1, r2)
# 3.2 gather(顺序返回结果;遇异常默认会立刻抛出)
results = await asyncio.gather(*(fetch(i) for i in range(3, 8)))
print("gather results:", results)
# 3.3 as_completed(谁先完成先拿谁)
tasks = [asyncio.create_task(fetch(i)) for i in range(8, 13)]
for fut in asyncio.as_completed(tasks):
print("as_completed:", await fut)
if __name__ == "__main__":
asyncio.run(main())
4. 超时控制与取消:wait_for、asyncio.timeout、task.cancel
# demo_timeout_cancel.py
import asyncio
async def slow():
try:
await asyncio.sleep(5)
return "ok"
except asyncio.cancellederror:
print("slow() got cancelled!")
raise
async def main():
# 4.1 wait_for(3.8+)
try:
res = await asyncio.wait_for(slow(), timeout=2)
print("result:", res)
except asyncio.timeouterror:
print("wait_for timeout!")
# 4.2 asyncio.timeout 上下文(3.11+)
try:
async with asyncio.timeout(2): # 注:需 python 3.11+
await slow()
except timeouterror:
print("context timeout!")
# 4.3 手动取消
t = asyncio.create_task(slow())
await asyncio.sleep(1)
t.cancel()
try:
await t
except asyncio.cancellederror:
print("task cancelled confirmed")
if __name__ == "__main__":
asyncio.run(main())
小贴士:被取消的任务应正确处理 cancellederror,并在需要时做清理(finally)。
5. 限流与同步原语:semaphore、lock、event、condition
# demo_sync_primitives.py
import asyncio, random
sem = asyncio.semaphore(3) # 同时最多3个并发
lock = asyncio.lock()
evt = asyncio.event()
async def worker(i):
async with sem: # 限制并发
await asyncio.sleep(random.uniform(0.2, 1.0))
async with lock: # 保护共享输出(示意)
print(f"worker {i} done")
async def notifier():
await asyncio.sleep(1)
evt.set() # 广播事件
async def waiter():
print("waiting for event...")
await evt.wait()
print("event received!")
async def main():
tasks = [asyncio.create_task(worker(i)) for i in range(10)]
tasks += [asyncio.create_task(notifier()), asyncio.create_task(waiter())]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
condition 适合更复杂的“等待某条件成立”的场景,用法与 threading.condition 类似(只是换成 async with / await)。
6. 生产者-消费者:asyncio.queue
# demo_queue.py
import asyncio, random
async def producer(q: asyncio.queue):
for i in range(10):
await asyncio.sleep(random.uniform(0.1, 0.4))
await q.put((i, f"data-{i}"))
print(f"produced {i}")
await q.put(none) # 结束哨兵
async def consumer(q: asyncio.queue):
while true:
item = await q.get()
if item is none:
q.task_done()
break
i, data = item
await asyncio.sleep(0.3)
print(f"consumed {i} -> {data}")
q.task_done()
async def main():
q = asyncio.queue(maxsize=5)
prod = asyncio.create_task(producer(q))
cons = asyncio.create_task(consumer(q))
await asyncio.gather(prod)
await q.join() # 等全部消费完成
await cons # 等消费者退出
if __name__ == "__main__":
asyncio.run(main())
7. 任务编组(python 3.11+):asyncio.taskgroup
# demo_taskgroup.py
import asyncio, random
async def job(n):
await asyncio.sleep(random.uniform(0.2, 1.0))
if n == 3:
raise runtimeerror("boom at 3")
return n
async def main():
try:
async with asyncio.taskgroup() as tg:
tasks = [tg.create_task(job(i)) for i in range(5)]
# 出错会自动取消其余任务并向外传播异常
except* runtimeerror as eg: # pep 654(exceptiongroup)
print("caught:", eg)
if __name__ == "__main__":
asyncio.run(main())
对比 gather:taskgroup 在结构化并发上更可靠,失败会自动收拢与传播。
8. 与线程/同步代码协作:to_thread、run_in_executor
# demo_thread_bridge.py
import asyncio, time, concurrent.futures
def blocking_io(n):
time.sleep(n)
return f"blocking {n}s"
async def main():
# 8.1 python 3.9+ 推荐:to_thread
r1 = await asyncio.to_thread(blocking_io, 1)
print("to_thread:", r1)
# 8.2 传统:run_in_executor
loop = asyncio.get_running_loop()
with concurrent.futures.threadpoolexecutor(max_workers=3) as pool:
futs = [loop.run_in_executor(pool, blocking_io, i) for i in (1, 2, 1)]
for r in await asyncio.gather(*futs):
print("executor:", r)
if __name__ == "__main__":
asyncio.run(main())
原则:cpu 密集就放线程/进程池;i/o 密集用 await 原生异步接口。
9. tcp/udp 网络编程(内置 streams / protocols)
9.1 tcp echo(server & client,基于 streams)
# demo_tcp_echo.py
import asyncio
async def handle_echo(reader: asyncio.streamreader, writer: asyncio.streamwriter):
addr = writer.get_extra_info('peername')
print(f"client connected: {addr}")
try:
while data := await reader.readline():
msg = data.decode().rstrip()
print(f"recv: {msg}")
writer.write((msg + "\n").encode())
await writer.drain()
except asyncio.cancellederror:
raise
finally:
writer.close()
await writer.wait_closed()
print("client closed", addr)
async def run_server():
server = await asyncio.start_server(handle_echo, "127.0.0.1", 8888)
addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets)
print(f"serving on {addrs}")
async with server:
await server.serve_forever()
async def run_client():
reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
for i in range(3):
writer.write(f"hello {i}\n".encode())
await writer.drain()
echo = await reader.readline()
print("echo:", echo.decode().rstrip())
writer.close()
await writer.wait_closed()
async def main():
server_task = asyncio.create_task(run_server())
await asyncio.sleep(0.2)
await run_client()
server_task.cancel()
with contextlib.suppress(asyncio.cancellederror):
await server_task
if __name__ == "__main__":
import contextlib
asyncio.run(main())
9.2 udp(datagram)
# demo_udp.py
import asyncio
class echoserver(asyncio.datagramprotocol):
def datagram_received(self, data, addr):
print("server recv:", data, "from", addr)
self.transport.sendto(data, addr)
async def main():
loop = asyncio.get_running_loop()
transport, _ = await loop.create_datagram_endpoint(
lambda: echoserver(), local_addr=("127.0.0.1", 9999)
)
# client
on_resp = loop.create_future()
class client(asyncio.datagramprotocol):
def datagram_received(self, data, addr):
print("client recv:", data)
on_resp.set_result(none)
ctransport, _ = await loop.create_datagram_endpoint(
lambda: client(), remote_addr=("127.0.0.1", 9999)
)
ctransport.sendto(b"hello-udp")
await on_resp
transport.close()
ctransport.close()
if __name__ == "__main__":
asyncio.run(main())
10. 子进程(异步等待):asyncio.create_subprocess_exec
# demo_subprocess.py
import asyncio, sys
async def main():
# 跨平台示例:调用 python -c 'print("hi")'
proc = await asyncio.create_subprocess_exec(
sys.executable, "-c", 'print("hi from child")',
stdout=asyncio.subprocess.pipe, stderr=asyncio.subprocess.pipe
)
out, err = await proc.communicate()
print("stdout:", out.decode().strip(), "| code:", proc.returncode)
if __name__ == "__main__":
asyncio.run(main())
11. 异步上下文管理器与迭代器:async with、async for
# demo_async_with_for.py
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def open_resource():
print("acquire resource")
await asyncio.sleep(0.2)
try:
yield "resource"
finally:
await asyncio.sleep(0.2)
print("release resource")
class asynccounter:
def __init__(self, n): self.n=n; self.i=0
def __aiter__(self): return self
async def __anext__(self):
if self.i >= self.n:
raise stopasynciteration
await asyncio.sleep(0.1)
self.i += 1
return self.i
async def main():
async with open_resource() as r:
print("using:", r)
async for x in asynccounter(5):
print("got:", x)
if __name__ == "__main__":
asyncio.run(main())
12. 超实用模式集
12.1 背压与批处理
# demo_backpressure.py
import asyncio, random
async def producer(q):
for i in range(30):
await q.put(i) # maxsize 限制可形成背压
await asyncio.sleep(0.05)
await q.put(none)
async def consumer(q):
batch = []
while true:
item = await q.get()
if item is none:
if batch:
print("flush batch:", batch)
q.task_done()
break
batch.append(item)
if len(batch) >= 8:
# 模拟批量处理
await asyncio.sleep(random.uniform(0.1, 0.3))
print("process batch:", batch)
batch.clear()
q.task_done()
async def main():
q = asyncio.queue(maxsize=10)
await asyncio.gather(producer(q), consumer(q))
await q.join()
if __name__ == "__main__":
asyncio.run(main())
12.2 幂等重试 + 指数退避
# demo_retry.py
import asyncio, random
async def fragile_call():
await asyncio.sleep(0.1)
if random.random() < 0.7:
raise runtimeerror("transient")
return "ok"
async def retry(coro_func, attempts=5, base=0.2):
for n in range(attempts):
try:
return await coro_func()
except exception as e:
if n == attempts - 1:
raise
await asyncio.sleep(base * (2 ** n)) # 退避
raise runtimeerror("unreachable")
async def main():
try:
r = await retry(fragile_call)
print("result:", r)
except exception as e:
print("failed:", e)
if __name__ == "__main__":
asyncio.run(main())
13. 常见坑与最佳实践
入口统一用 asyncio.run(main()),不要混用早期 api(如手动获取 loop、run_until_complete),除非有特殊需求。
避免阻塞调用(如 time.sleep()、重 cpu 任务)直接出现在协程里;改用 await asyncio.sleep() 或 asyncio.to_thread()/进程池。
正确处理取消:在 try/except/finally 中传播 cancellederror,避免吞掉取消导致任务“僵尸化”。
使用限流:对外部服务/磁盘/网络做 semaphore、队列背压,保护系统。
结构化并发:优先考虑 taskgroup(3.11+)来让“任务生命周期”明确,异常聚合安全。
日志与超时:给关键 i/o 加 timeout,并使用 asyncio.create_task() 后保存句柄,便于监控和取消。
14. 进阶延伸(可选)
文件异步:标准库没有真正异步文件 i/o;可选三方库 aiofiles(仅示意):
# pip install aiofiles
import aiofiles, asyncio
async def read_file(p):
async with aiofiles.open(p, "r", encoding="utf-8") as f:
return await f.read()
http 客户端/服务端:aiohttp、httpx[http2] 等第三方库更贴近真实网络场景。
与 gui/qt 的集成:可通过 qasync 把 qt 事件循环与 asyncio 融合(适合你在 qml/pyside6 下需要异步网络/io 的情况)。
到此这篇关于由浅入深介绍python asyncio的各种用法与代码示例的文章就介绍到这了,更多相关python asyncio用法内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论