什么是 asyncio
asyncio 是 python 3.4+ 引入的标准库,用于编写并发代码的异步 i/o 框架。它使用事件循环和协程实现单线程并发,特别适合处理 i/o 密集型任务。
核心优势:
- 单线程内实现高并发
- 避免线程切换开销
- 更少的内存占用
- 代码更易于理解和调试
核心概念
协程 (coroutine)
使用 async def 定义的特殊函数,可以在执行过程中暂停和恢复。
async def my_coroutine():
print("开始执行")
await asyncio.sleep(1)
print("执行完成")
事件循环 (event loop)
asyncio 的核心,负责调度和执行协程。
await 关键字
用于等待异步操作完成,只能在 async 函数内使用。
task
对协程的封装,可以并发执行多个协程。
future
表示一个异步操作的最终结果。
基础用法
1. 运行协程的三种方式
import asyncio
async def hello():
print("hello")
await asyncio.sleep(1)
print("world")
# 方式1: python 3.7+ 推荐
asyncio.run(hello())
# 方式2: 手动管理事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(hello())
loop.close()
# 方式3: 在已有事件循环中
# await hello() # 只能在异步函数内使用
2. 创建和管理任务
async def task1():
await asyncio.sleep(2)
return "任务1完成"
async def task2():
await asyncio.sleep(1)
return "任务2完成"
async def main():
# 创建任务
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
# 等待所有任务完成
results = await asyncio.gather(t1, t2)
print(results)
asyncio.run(main())
3. 并发执行
async def fetch_data(n):
print(f"开始获取数据 {n}")
await asyncio.sleep(1)
print(f"完成获取数据 {n}")
return f"数据 {n}"
async def main():
# gather: 并发执行,按顺序返回结果
results = await asyncio.gather(
fetch_data(1),
fetch_data(2),
fetch_data(3)
)
print(results)
asyncio.run(main())
内部机理
事件循环的工作原理
┌─────────────────────────────────┐ │ 事件循环 (event loop) │ │ │ │ ┌──────────────────────────┐ │ │ │ 就绪队列 (ready queue) │ │ │ │ [task1, task2, task3] │ │ │ └──────────────────────────┘ │ │ │ │ ┌──────────────────────────┐ │ │ │ 等待队列 (wait queue) │ │ │ │ [task4, task5] │ │ │ └──────────────────────────┘ │ │ │ │ ┌──────────────────────────┐ │ │ │ i/o 选择器 │ │ │ │ (epoll/kqueue/select) │ │ │ └──────────────────────────┘ │ └─────────────────────────────────┘
执行流程
- 初始化: 创建事件循环
- 注册协程: 将协程包装成 task 对象
- 调度执行:
- 从就绪队列取出 task
- 执行到 await 处暂停
- 注册到相应的等待队列
- i/o 多路复用: 监听 i/o 事件
- 唤醒协程: i/o 完成后,将 task 移回就绪队列
- 循环往复: 直到所有任务完成
协程状态转换
async def example():
print("1. 开始执行") # running
await asyncio.sleep(1) # waiting (挂起)
print("2. 继续执行") # running (恢复)
return "完成" # finished
底层实现关键点
# 简化的事件循环伪代码
class eventloop:
def __init__(self):
self._ready = deque() # 就绪队列
self._selector = select.epoll() # i/o 选择器
def run_until_complete(self, coro):
task = task(coro)
self._ready.append(task)
while self._ready or self._has_pending_io():
# 执行就绪的任务
if self._ready:
task = self._ready.popleft()
task.step()
# 等待 i/o 事件
events = self._selector.select(timeout=0)
for event in events:
self._ready.append(event.callback)
与多线程/多进程的区别
对比表格
| 特性 | asyncio | 多线程 (threading) | 多进程 (multiprocessing) |
|---|---|---|---|
| 并发模型 | 协作式并发 | 抢占式并发 | 真正并行 |
| 运行环境 | 单线程 | 多线程 | 多进程 |
| gil 影响 | 无影响 | 受 gil 限制 | 不受 gil 限制 |
| 切换开销 | 极小(用户态) | 较大(内核态) | 最大(进程切换) |
| 内存占用 | 低 | 中等 | 高 |
| 适用场景 | i/o 密集型 | i/o 密集型 | cpu 密集型 |
| 数据共享 | 简单(同一线程) | 需要锁 | 需要 ipc |
| 调试难度 | 容易 | 困难 | 中等 |
使用场景详解
asyncio 适合的场景
- 网络请求(爬虫、api 调用)
- 文件 i/o
- 数据库查询
- websocket 连接
- 异步消息队列
# 典型的 asyncio 场景: 并发网络请求
import aiohttp
import asyncio
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = ['http://example.com'] * 100
async with aiohttp.clientsession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"完成 {len(results)} 个请求")
asyncio.run(main())
多线程适合的场景
- 有阻塞 i/o 操作且无异步替代
- 需要并发执行的库不支持 asyncio
- 中等规模并发
import threading
import requests
def fetch_url(url):
response = requests.get(url)
print(f"完成: {url}")
urls = ['http://example.com'] * 10
threads = [threading.thread(target=fetch_url, args=(url,)) for url in urls]
for t in threads:
t.start()
for t in threads:
t.join()
多进程适合的场景
- cpu 密集型计算
- 需要绕过 gil
- 数据并行处理
from multiprocessing import pool
def cpu_intensive(n):
return sum(i*i for i in range(n))
with pool(4) as pool:
results = pool.map(cpu_intensive, [10000000] * 4)
print(results)
性能对比示例
import asyncio
import threading
import multiprocessing
import time
# i/o 密集型任务
def io_bound_sync():
time.sleep(1)
async def io_bound_async():
await asyncio.sleep(1)
# 测试 asyncio
async def test_asyncio():
start = time.time()
await asyncio.gather(*[io_bound_async() for _ in range(100)])
print(f"asyncio: {time.time() - start:.2f}s") # 约 1 秒
# 测试多线程
def test_threading():
start = time.time()
threads = [threading.thread(target=io_bound_sync) for _ in range(100)]
for t in threads: t.start()
for t in threads: t.join()
print(f"threading: {time.time() - start:.2f}s") # 约 1-2 秒
# asyncio 在 i/o 密集型任务中表现最优
高级特性
1. 异步上下文管理器
class asyncresource:
async def __aenter__(self):
print("获取资源")
await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("释放资源")
await asyncio.sleep(1)
async def main():
async with asyncresource() as resource:
print("使用资源")
asyncio.run(main())
2. 异步迭代器
class asyncrange:
def __init__(self, start, end):
self.current = start
self.end = end
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.end:
raise stopasynciteration
await asyncio.sleep(0.1)
self.current += 1
return self.current - 1
async def main():
async for i in asyncrange(0, 5):
print(i)
asyncio.run(main())
3. 异步生成器
async def async_generator():
for i in range(5):
await asyncio.sleep(1)
yield i
async def main():
async for value in async_generator():
print(value)
asyncio.run(main())
4. 超时控制
async def slow_operation():
await asyncio.sleep(5)
return "完成"
async def main():
try:
# 设置 2 秒超时
result = await asyncio.wait_for(slow_operation(), timeout=2)
except asyncio.timeouterror:
print("操作超时")
asyncio.run(main())
5. 信号量和锁
# 限制并发数量
async def limited_task(sem, n):
async with sem:
print(f"任务 {n} 开始")
await asyncio.sleep(1)
print(f"任务 {n} 完成")
async def main():
sem = asyncio.semaphore(3) # 最多 3 个并发
await asyncio.gather(*[limited_task(sem, i) for i in range(10)])
asyncio.run(main())
6. 队列
async def producer(queue, n):
for i in range(n):
await queue.put(i)
print(f"生产: {i}")
await asyncio.sleep(0.5)
async def consumer(queue, name):
while true:
item = await queue.get()
print(f"{name} 消费: {item}")
await asyncio.sleep(1)
queue.task_done()
async def main():
queue = asyncio.queue()
# 创建生产者和消费者
producers = [asyncio.create_task(producer(queue, 5))]
consumers = [asyncio.create_task(consumer(queue, f"消费者{i}"))
for i in range(2)]
await asyncio.gather(*producers)
await queue.join() # 等待所有任务处理完成
for c in consumers:
c.cancel()
asyncio.run(main())
实战示例
示例1: 异步网络爬虫
import asyncio
import aiohttp
from bs4 import beautifulsoup
async def fetch_page(session, url):
try:
async with session.get(url, timeout=10) as response:
return await response.text()
except exception as e:
print(f"错误 {url}: {e}")
return none
async def parse_page(html):
if html:
soup = beautifulsoup(html, 'html.parser')
return soup.title.string if soup.title else "无标题"
return none
async def crawl_urls(urls):
async with aiohttp.clientsession() as session:
tasks = [fetch_page(session, url) for url in urls]
pages = await asyncio.gather(*tasks)
titles = await asyncio.gather(*[parse_page(page) for page in pages])
return titles
urls = [
'http://example.com',
'http://example.org',
'http://example.net',
]
# 执行爬虫
# titles = asyncio.run(crawl_urls(urls))
# print(titles)
示例2: 异步数据库操作
import asyncio
import aiosqlite
async def create_table(db):
await db.execute('''
create table if not exists users (
id integer primary key,
name text,
email text
)
''')
await db.commit()
async def insert_user(db, name, email):
await db.execute(
'insert into users (name, email) values (?, ?)',
(name, email)
)
await db.commit()
async def fetch_users(db):
async with db.execute('select * from users') as cursor:
return await cursor.fetchall()
async def main():
async with aiosqlite.connect('test.db') as db:
await create_table(db)
# 并发插入
await asyncio.gather(
insert_user(db, 'alice', 'alice@example.com'),
insert_user(db, 'bob', 'bob@example.com'),
insert_user(db, 'charlie', 'charlie@example.com')
)
users = await fetch_users(db)
for user in users:
print(user)
# asyncio.run(main())
示例3: 异步 web 服务器
import asyncio
async def handle_client(reader, writer):
data = await reader.read(1024)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"收到来自 {addr} 的消息: {message}")
response = f"echo: {message}"
writer.write(response.encode())
await writer.drain()
writer.close()
await writer.wait_closed()
async def start_server():
server = await asyncio.start_server(
handle_client, '127.0.0.1', 8888
)
addr = server.sockets[0].getsockname()
print(f"服务器启动在 {addr}")
async with server:
await server.serve_forever()
# asyncio.run(start_server())
示例4: 实时数据流处理
import asyncio
import random
async def data_stream():
"""模拟数据流"""
while true:
yield random.randint(1, 100)
await asyncio.sleep(0.5)
async def process_data(value):
"""处理数据"""
await asyncio.sleep(0.1)
return value * 2
async def monitor_stream():
"""监控和处理数据流"""
buffer = []
async for data in data_stream():
print(f"接收数据: {data}")
# 异步处理数据
task = asyncio.create_task(process_data(data))
buffer.append(task)
# 每 5 个数据批量处理
if len(buffer) >= 5:
results = await asyncio.gather(*buffer)
print(f"处理结果: {results}")
buffer = []
# 演示用,处理 20 个数据后停止
if data > 20:
break
# asyncio.run(monitor_stream())
示例5: 多任务协调
import asyncio
async def task_with_priority(name, priority, duration):
print(f"[优先级 {priority}] {name} 开始")
await asyncio.sleep(duration)
print(f"[优先级 {priority}] {name} 完成")
return f"{name} 结果"
async def coordinator():
# 创建不同优先级的任务
high_priority = [
task_with_priority(f"高优先级-{i}", 1, 1)
for i in range(3)
]
low_priority = [
task_with_priority(f"低优先级-{i}", 3, 2)
for i in range(3)
]
# 先执行高优先级任务
high_results = await asyncio.gather(*high_priority)
print(f"高优先级完成: {high_results}")
# 再执行低优先级任务
low_results = await asyncio.gather(*low_priority)
print(f"低优先级完成: {low_results}")
asyncio.run(coordinator())
最佳实践
1. 错误处理
async def safe_task(n):
try:
if n == 3:
raise valueerror("错误的值")
await asyncio.sleep(1)
return f"任务 {n} 成功"
except exception as e:
print(f"任务 {n} 失败: {e}")
return none
async def main():
results = await asyncio.gather(
safe_task(1),
safe_task(2),
safe_task(3),
return_exceptions=true # 不会因为一个任务失败而停止
)
print(results)
asyncio.run(main())
2. 资源清理
class asyncconnection:
async def __aenter__(self):
self.conn = await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def connect(self):
print("建立连接")
await asyncio.sleep(1)
return "connection"
async def close(self):
print("关闭连接")
await asyncio.sleep(1)
async def main():
async with asyncconnection() as conn:
print(f"使用连接: {conn.conn}")
asyncio.run(main())
3. 避免阻塞
import asyncio
from concurrent.futures import threadpoolexecutor
# 错误: 阻塞操作
async def bad_example():
time.sleep(5) # 这会阻塞整个事件循环!
# 正确: 使用 executor 运行阻塞操作
async def good_example():
loop = asyncio.get_event_loop()
with threadpoolexecutor() as executor:
result = await loop.run_in_executor(executor, blocking_function)
return result
def blocking_function():
import time
time.sleep(5)
return "完成"
4. 合理设置超时
async def with_timeout():
try:
result = await asyncio.wait_for(
long_running_task(),
timeout=5.0
)
except asyncio.timeouterror:
print("任务超时,执行回退逻辑")
result = "默认值"
return result
5. 使用 taskgroup (python 3.11+)
async def main():
async with asyncio.taskgroup() as tg:
task1 = tg.create_task(some_coro())
task2 = tg.create_task(another_coro())
# 所有任务完成或某个任务失败时退出
print("所有任务完成")
总结
asyncio 的核心优势
- 高性能: 单线程处理大量并发,避免线程切换开销
- 低资源消耗: 协程比线程更轻量,内存占用少
- 代码可读性: async/await 语法让异步代码像同步代码一样易读
- 丰富的生态: aiohttp、aiopg、aiomysql 等大量异步库支持
何时使用 asyncio
✅ 适合使用:
- 大量网络 i/o 操作(爬虫、api 服务)
- websocket 长连接
- 异步数据库操作
- 实时数据处理
- 微服务间通信
❌ 不适合使用:
- cpu 密集型计算(使用多进程)
- 简单的小型脚本
- 依赖大量同步库的项目
- 团队不熟悉异步编程
学习路径建议
- 基础阶段: 掌握 async/await、事件循环、task 基本概念
- 进阶阶段: 学习异步上下文管理器、生成器、同步原语
- 实战阶段: 使用 aiohttp、aiopg 等库构建实际项目
- 优化阶段: 性能分析、错误处理、最佳实践
常见陷阱
- 在异步函数中使用阻塞调用
- 忘记使用 await 导致协程未执行
- 过度使用 asyncio(简单任务反而降低效率)
- 忽略异常处理导致任务静默失败
- 不理解事件循环的单线程特性
参考资源
结语: asyncio 是 python 异步编程的强大工具,掌握它可以显著提升 i/o 密集型应用的性能。理解其内部机制和最佳实践,能帮助你写出高效、可维护的异步代码。
总结
到此这篇关于python标准库asyncio用法完全指南的文章就介绍到这了,更多相关python asyncio完全指南内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论