当前位置: 代码网 > it编程>前端脚本>Python > Python中异步HTTP客户端/服务器框架aiohttp的使用全面指南

Python中异步HTTP客户端/服务器框架aiohttp的使用全面指南

2025年06月03日 Python 我要评论
什么是 aiohttpaiohttp 是一个基于 python asyncio 的异步 http 客户端/服务器框架,专为高性能网络编程设计。它提供了:异步 http 客户端(类似异步版 reques

什么是 aiohttp

aiohttp 是一个基于 python asyncio 的异步 http 客户端/服务器框架,专为高性能网络编程设计。它提供了:

  • 异步 http 客户端(类似异步版 requests)
  • 异步 http 服务器(类似异步版 flask/django)
  • 完整的 websocket 支持
  • 高效的连接池管理

核心优势

特性描述
异步非阻塞单线程处理数千并发连接
高性能远超同步框架(如 requests)的吞吐量
轻量级简洁的api,无复杂依赖
全面协议支持http/1.1, http/2(客户端), websocket
生态完善良好文档和活跃社区

基础用法 - http客户端

安装

pip install aiohttp

基本get请求

import aiohttp
import asyncio

async def main():
    async with aiohttp.clientsession() as session:
        async with session.get('https://api.example.com/data') as response:
            print("状态码:", response.status)
            print("响应内容:", await response.text())

asyncio.run(main())

post请求示例

async def post_example():
    async with aiohttp.clientsession() as session:
        # 表单数据
        async with session.post(
            'https://httpbin.org/post', 
            data={'key': 'value'}
        ) as response:
            print(await response.json())
        
        # json数据
        async with session.post(
            'https://api.example.com/users',
            json={'name': 'alice', 'age': 30}
        ) as response:
            print(await response.json())

高级用法

并发请求

async def fetch(url):
    async with aiohttp.clientsession() as session:
        async with session.get(url) as response:
            return await response.text()

async def concurrent_requests():
    urls = [
        'https://api.example.com/item/1',
        'https://api.example.com/item/2',
        'https://api.example.com/item/3'
    ]
    
    tasks = [fetch(url) for url in urls]
    results = await asyncio.gather(*tasks)
    
    for url, content in zip(urls, results):
        print(f"{url}: {content[:50]}...")

asyncio.run(concurrent_requests())

超时控制

async def timeout_example():
    timeout = aiohttp.clienttimeout(total=5)  # 5秒总超时
    
    async with aiohttp.clientsession(timeout=timeout) as session:
        try:
            async with session.get('https://slow-api.example.com') as response:
                return await response.text()
        except asyncio.timeouterror:
            print("请求超时!")

流式处理大响应

async def stream_response():
    async with aiohttp.clientsession() as session:
        async with session.get('https://large-file.example.com') as response:
            with open('large_file.txt', 'wb') as f:
                async for chunk in response.content.iter_chunked(1024):
                    f.write(chunk)
                    print(f"已接收 {len(chunk)} 字节")

服务器端开发

基本http服务器

from aiohttp import web

async def handle(request):
    name = request.match_info.get('name', "world")
    return web.response(text=f"hello, {name}!")

app = web.application()
app.add_routes([
    web.get('/', handle),
    web.get('/{name}', handle)
])

if __name__ == '__main__':
    web.run_app(app, port=8080)

rest api示例

async def get_users(request):
    users = [{'id': 1, 'name': 'alice'}, {'id': 2, 'name': 'bob'}]
    return web.json_response(users)

async def create_user(request):
    data = await request.json()
    # 实际应用中这里会保存到数据库
    return web.json_response({'id': 3, **data}, status=201)

app = web.application()
app.add_routes([
    web.get('/api/users', get_users),
    web.post('/api/users', create_user)
])

websocket服务器

async def websocket_handler(request):
    ws = web.websocketresponse()
    await ws.prepare(request)
    
    async for msg in ws:
        if msg.type == aiohttp.wsmsgtype.text:
            if msg.data == 'close':
                await ws.close()
            else:
                await ws.send_str(f"echo: {msg.data}")
        elif msg.type == aiohttp.wsmsgtype.error:
            print('websocket连接异常关闭')
    
    return ws

​​​​​​​app.add_routes([web.get('/ws', websocket_handler)])

进阶扩展

中间件示例

async def auth_middleware(app, handler):
    async def middleware(request):
        # 验证api密钥
        if request.headers.get('x-api-key') != 'secret_key':
            return web.json_response({'error': 'unauthorized'}, status=401)
        return await handler(request)
    return middleware

app = web.application(middlewares=[auth_middleware])

http/2客户端支持

async def http2_request():
    conn = aiohttp.tcpconnector(force_close=true, enable_cleanup_closed=true)
    async with aiohttp.clientsession(connector=conn) as session:
        async with session.get(
            'https://http2.akamai.com/',
            headers={'accept': 'text/html'}
        ) as response:
            print("http版本:", response.version)
            print("内容:", await response.text()[:200])

性能优化配置

# 自定义连接器配置
connector = aiohttp.tcpconnector(
    limit=100,  # 最大并发连接数
    limit_per_host=20,  # 单主机最大连接数
    ssl=false,  # 禁用ssl验证(仅用于测试)
    force_close=true  # 避免连接延迟关闭
)

# 自定义会话配置
session = aiohttp.clientsession(
    connector=connector,
    timeout=aiohttp.clienttimeout(total=30),
    headers={'user-agent': 'myapp/1.0'},
    cookie_jar=aiohttp.cookiejar(unsafe=true)
)

最佳实践

重用clientsession:避免为每个请求创建新会话

使用连接池:合理配置tcpconnector参数

超时设置:总是配置合理的超时时间

资源清理:使用async with确保资源释放

错误处理:捕获并处理常见网络异常

try:
    async with session.get(url) as response:
        response.raise_for_status()
        return await response.json()
except aiohttp.clienterror as e:
    print(f"请求错误: {e}")

完整示例

import aiohttp
import asyncio
from aiohttp import web

# 客户端示例
async def fetch_data():
    async with aiohttp.clientsession() as session:
        # 并发请求多个api
        urls = [
            'https://jsonplaceholder.typicode.com/posts/1',
            'https://jsonplaceholder.typicode.com/comments/1',
            'https://jsonplaceholder.typicode.com/albums/1'
        ]
        
        tasks = []
        for url in urls:
            tasks.append(session.get(url))
        
        responses = await asyncio.gather(*tasks)
        
        results = []
        for response in responses:
            results.append(await response.json())
        
        return results

# 服务器示例
async def handle_index(request):
    return web.response(text="welcome to aiohttp server!")

async def handle_api(request):
    data = await fetch_data()
    return web.json_response(data)

# 创建应用
app = web.application()
app.add_routes([
    web.get('/', handle_index),
    web.get('/api', handle_api)
])

# 启动服务器
async def start_server():
    runner = web.apprunner(app)
    await runner.setup()
    site = web.tcpsite(runner, 'localhost', 8080)
    await site.start()
    print("server running at http://localhost:8080")
    
    # 保持运行
    while true:
        await asyncio.sleep(3600)  # 每小时唤醒一次

if __name__ == '__main__':
    asyncio.run(start_server())

总结

aiohttp 是 python 异步生态中处理 http 通信的首选工具,它提供了:

  • 高效客户端:用于高性能爬虫、api调用
  • 轻量级服务器:构建高性能web服务和api
  • websocket支持:实现实时双向通信
  • 连接池管理:优化资源利用率

通过合理利用 aiohttp 的异步特性,开发者可以轻松构建出能够处理数万并发连接的高性能网络应用,同时保持代码的简洁性和可维护性。

到此这篇关于python中异步http客户端/服务器框架aiohttp的使用全面指南的文章就介绍到这了,更多相关python异步框架aiohttp内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com