什么是 aiohttp
aiohttp 是一个基于 python asyncio 的异步 http 客户端/服务器框架,专为高性能网络编程设计。它提供了:
- 异步 http 客户端(类似异步版 requests)
- 异步 http 服务器(类似异步版 flask/django)
- 完整的 websocket 支持
- 高效的连接池管理
核心优势
| 特性 | 描述 |
|---|---|
| 异步非阻塞 | 单线程处理数千并发连接 |
| 高性能 | 远超同步框架(如 requests)的吞吐量 |
| 轻量级 | 简洁的api,无复杂依赖 |
| 全面协议支持 | http/1.1, http/2(客户端), websocket |
| 生态完善 | 良好文档和活跃社区 |
基础用法 - http客户端
安装
pip install aiohttp
基本get请求
import aiohttp
import asyncio
async def main():
async with aiohttp.clientsession() as session:
async with session.get('https://api.example.com/data') as response:
print("状态码:", response.status)
print("响应内容:", await response.text())
asyncio.run(main())
post请求示例
async def post_example():
async with aiohttp.clientsession() as session:
# 表单数据
async with session.post(
'https://httpbin.org/post',
data={'key': 'value'}
) as response:
print(await response.json())
# json数据
async with session.post(
'https://api.example.com/users',
json={'name': 'alice', 'age': 30}
) as response:
print(await response.json())
高级用法
并发请求
async def fetch(url):
async with aiohttp.clientsession() as session:
async with session.get(url) as response:
return await response.text()
async def concurrent_requests():
urls = [
'https://api.example.com/item/1',
'https://api.example.com/item/2',
'https://api.example.com/item/3'
]
tasks = [fetch(url) for url in urls]
results = await asyncio.gather(*tasks)
for url, content in zip(urls, results):
print(f"{url}: {content[:50]}...")
asyncio.run(concurrent_requests())
超时控制
async def timeout_example():
timeout = aiohttp.clienttimeout(total=5) # 5秒总超时
async with aiohttp.clientsession(timeout=timeout) as session:
try:
async with session.get('https://slow-api.example.com') as response:
return await response.text()
except asyncio.timeouterror:
print("请求超时!")
流式处理大响应
async def stream_response():
async with aiohttp.clientsession() as session:
async with session.get('https://large-file.example.com') as response:
with open('large_file.txt', 'wb') as f:
async for chunk in response.content.iter_chunked(1024):
f.write(chunk)
print(f"已接收 {len(chunk)} 字节")
服务器端开发
基本http服务器
from aiohttp import web
async def handle(request):
name = request.match_info.get('name', "world")
return web.response(text=f"hello, {name}!")
app = web.application()
app.add_routes([
web.get('/', handle),
web.get('/{name}', handle)
])
if __name__ == '__main__':
web.run_app(app, port=8080)
rest api示例
async def get_users(request):
users = [{'id': 1, 'name': 'alice'}, {'id': 2, 'name': 'bob'}]
return web.json_response(users)
async def create_user(request):
data = await request.json()
# 实际应用中这里会保存到数据库
return web.json_response({'id': 3, **data}, status=201)
app = web.application()
app.add_routes([
web.get('/api/users', get_users),
web.post('/api/users', create_user)
])
websocket服务器
async def websocket_handler(request):
ws = web.websocketresponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == aiohttp.wsmsgtype.text:
if msg.data == 'close':
await ws.close()
else:
await ws.send_str(f"echo: {msg.data}")
elif msg.type == aiohttp.wsmsgtype.error:
print('websocket连接异常关闭')
return ws
app.add_routes([web.get('/ws', websocket_handler)])进阶扩展
中间件示例
async def auth_middleware(app, handler):
async def middleware(request):
# 验证api密钥
if request.headers.get('x-api-key') != 'secret_key':
return web.json_response({'error': 'unauthorized'}, status=401)
return await handler(request)
return middleware
app = web.application(middlewares=[auth_middleware])
http/2客户端支持
async def http2_request():
conn = aiohttp.tcpconnector(force_close=true, enable_cleanup_closed=true)
async with aiohttp.clientsession(connector=conn) as session:
async with session.get(
'https://http2.akamai.com/',
headers={'accept': 'text/html'}
) as response:
print("http版本:", response.version)
print("内容:", await response.text()[:200])
性能优化配置
# 自定义连接器配置
connector = aiohttp.tcpconnector(
limit=100, # 最大并发连接数
limit_per_host=20, # 单主机最大连接数
ssl=false, # 禁用ssl验证(仅用于测试)
force_close=true # 避免连接延迟关闭
)
# 自定义会话配置
session = aiohttp.clientsession(
connector=connector,
timeout=aiohttp.clienttimeout(total=30),
headers={'user-agent': 'myapp/1.0'},
cookie_jar=aiohttp.cookiejar(unsafe=true)
)
最佳实践
重用clientsession:避免为每个请求创建新会话
使用连接池:合理配置tcpconnector参数
超时设置:总是配置合理的超时时间
资源清理:使用async with确保资源释放
错误处理:捕获并处理常见网络异常
try:
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
except aiohttp.clienterror as e:
print(f"请求错误: {e}")
完整示例
import aiohttp
import asyncio
from aiohttp import web
# 客户端示例
async def fetch_data():
async with aiohttp.clientsession() as session:
# 并发请求多个api
urls = [
'https://jsonplaceholder.typicode.com/posts/1',
'https://jsonplaceholder.typicode.com/comments/1',
'https://jsonplaceholder.typicode.com/albums/1'
]
tasks = []
for url in urls:
tasks.append(session.get(url))
responses = await asyncio.gather(*tasks)
results = []
for response in responses:
results.append(await response.json())
return results
# 服务器示例
async def handle_index(request):
return web.response(text="welcome to aiohttp server!")
async def handle_api(request):
data = await fetch_data()
return web.json_response(data)
# 创建应用
app = web.application()
app.add_routes([
web.get('/', handle_index),
web.get('/api', handle_api)
])
# 启动服务器
async def start_server():
runner = web.apprunner(app)
await runner.setup()
site = web.tcpsite(runner, 'localhost', 8080)
await site.start()
print("server running at http://localhost:8080")
# 保持运行
while true:
await asyncio.sleep(3600) # 每小时唤醒一次
if __name__ == '__main__':
asyncio.run(start_server())
总结
aiohttp 是 python 异步生态中处理 http 通信的首选工具,它提供了:
- 高效客户端:用于高性能爬虫、api调用
- 轻量级服务器:构建高性能web服务和api
- websocket支持:实现实时双向通信
- 连接池管理:优化资源利用率
通过合理利用 aiohttp 的异步特性,开发者可以轻松构建出能够处理数万并发连接的高性能网络应用,同时保持代码的简洁性和可维护性。
到此这篇关于python中异步http客户端/服务器框架aiohttp的使用全面指南的文章就介绍到这了,更多相关python异步框架aiohttp内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论