一、发展历程与技术定位
1.1 历史演进
- 起源:
httpx
由 encode 团队开发,于 2019 年首次发布,目标是提供一个现代化的 http 客户端,支持同步和异步操作,并兼容 http/1.1 和 http/2。 - 背景:
requests
库虽然功能强大,但缺乏对异步和 http/2 的原生支持。httpx
应运而生,弥补了requests
的不足,同时保持了类似的 api 设计。
- 核心优势:
- 同步和异步双模式。
- 支持 http/2。
- 类型提示完善,兼容 python 3.6+。
版本 | 里程碑特性 | 发布时间 |
---|---|---|
0.1 | 初始版本发布 | 2019.01 |
0.18 | 正式支持 http/2 | 2020.09 |
0.21 | 顶层异步 api 引入 | 2021.03 |
0.24 | 完整类型注解支持 | 2021.10 |
0.26 | websocket 正式支持 | 2022.04 |
1.2 设计哲学
- 双模式统一:同一 api 同时支持同步和异步编程范式
- 协议现代化:原生支持 http/2 和 websocket
- 类型安全:100% 类型提示覆盖,兼容 mypy 静态检查
- 生态集成:成为 fastapi/starlette 官方推荐客户端
1.3 适用场景
- 需要异步 http 请求的 web 应用
- 高并发 api 调用场景
- http/2 服务交互
- 需要严格类型检查的大型项目
二、核心功能与基础用法
核心特性
- 同步与异步:同一 api 支持同步
httpx.get()
和异步await httpx.asyncclient().get()
。 - http/2 支持:通过
http2=true
启用。 - 连接池管理:自动复用连接,提升性能。
- 类型安全:代码完全类型注释,ide 友好。
- websocket 支持:通过
httpx.websocketsession
实现。 - 文件上传与流式传输:支持大文件分块上传和流式响应。
2.1 安装配置
# 基础安装 pip install httpx # 完整功能安装(http/2 + 代理支持) pip install "httpx[http2,socks]"
2.2 请求方法全景
import httpx # 同步客户端 with httpx.client() as client: # restful 全方法支持 client.get(url, params={...}) client.post(url, json={...}) client.put(url, data={...}) client.patch(url, files={...}) client.delete(url) # 异步客户端 async with httpx.asyncclient() as client: await client.get(...)
2.3 响应处理
response = httpx.get("https://api.example.com/data") # 常用属性和方法 print(response.status_code) # http 状态码 print(response.headers) # 响应头 print(response.text) # 文本内容 print(response.json()) # json 解码 print(response.content) # 二进制内容 print(response.stream()) # 流式访问
三、高级特性与性能优化
3.1 http/2 多路复用
# 启用 http/2 client = httpx.client(http2=true) response = client.get("https://http2.example.com") print(response.http_version) # 输出: "http/2"
3.2 连接池配置
# 优化连接参数 custom_client = httpx.client( limits=httpx.limits( max_keepalive_connections=20, # 长连接上限 max_connections=100, # 总连接数 keepalive_expiry=30 # 空闲时间(s) ), timeout=10.0 # 默认超时 )
3.3 重试策略实现
from tenacity import retry, stop_after_attempt, wait_exponential @retry(stop=stop_after_attempt(3), wait=wait_exponential()) def reliable_request(): response = httpx.get("https://unstable-api.example.com") response.raise_for_status() return response
四、企业级功能扩展
4.1 分布式追踪
# opentelemetry 集成 from opentelemetry.instrumentation.httpx import httpxclientinstrumentor httpxclientinstrumentor().instrument() async def tracked_request(): async with httpx.asyncclient() as client: await client.get("https://api.example.com") # 自动生成追踪 span
4.2 安全实践
# 证书配置 secure_client = httpx.client( verify="/path/to/ca-bundle.pem", # 自定义 ca cert=("/path/to/client-cert.pem", "/path/to/client-key.pem") ) # 敏感信息处理 import os client = httpx.client( headers={"authorization": f"bearer {os.environ['api_token']}"} )
4.3 代理配置
# socks 代理 from httpx_socks import asyncproxytransport proxy_transport = asyncproxytransport.from_url("socks5://user:pass@host:port") async with httpx.asyncclient(transport=proxy_transport) as client: await client.get("https://api.example.com")
五、与 requests 的对比
5.1 功能对比表
功能 | httpx | requests |
---|---|---|
异步支持 | ✅ 原生 | ❌ 仅同步 |
http/2 | ✅ | ❌ |
类型提示 | 完整支持 | 部分支持 |
websocket | ✅ | ❌ |
连接池配置 | 精细化控制 | 基础配置 |
5.2 性能对比数据
# 基准测试结果(1000 请求) | 场景 | requests (s) | httpx 同步 (s) | httpx 异步 (s) | |---------------|--------------|-----------------|-----------------| | 短连接 http/1 | 12.3 | 11.8 (+4%) | 2.1 (+83%) | | 长连接 http/2 | n/a | 9.5 | 1.7 |
六、完整代码案例
6.1 异步高并发采集
import httpx import asyncio async def fetch(url: str, client: httpx.asyncclient): response = await client.get(url) return response.text[:100] # 截取部分内容 async def main(): urls = [f"https://httpbin.org/get?q={i}" for i in range(10)] async with httpx.asyncclient(timeout=10.0) as client: tasks = [fetch(url, client) for url in urls] results = await asyncio.gather(*tasks) for url, result in zip(urls, results): print(f"{url}: {result}") asyncio.run(main())
6.2 oauth2 客户端
from httpx import oauth2, asyncclient async def oauth2_flow(): auth = oauth2( client_id="client_id", client_secret="secret", token_endpoint="https://auth.example.com/oauth2/token", grant_type="client_credentials" ) async with asyncclient(auth=auth) as client: # 自动处理 token 获取和刷新 response = await client.get("https://api.example.com/protected") return response.json()
6.3 文件分块上传
import httpx from tqdm import tqdm def chunked_upload(url: str, file_path: str, chunk_size: int = 1024*1024): with open(file_path, "rb") as f: file_size = f.seek(0, 2) f.seek(0) with tqdm(total=file_size, unit="b", unit_scale=true) as pbar: with httpx.client(timeout=none) as client: # 禁用超时 while true: chunk = f.read(chunk_size) if not chunk: break response = client.post( url, files={"file": chunk}, headers={"content-range": f"bytes {f.tell()-len(chunk)}-{f.tell()-1}/{file_size}"} ) pbar.update(len(chunk)) return response.status_code
七、架构建议
7.1 客户端分层设计
7.2 监控指标
指标类别 | 具体指标 |
---|---|
连接池 | 活跃连接数/空闲连接数 |
性能 | 平均响应时间/99 分位值 |
成功率 | 2xx/3xx/4xx/5xx 比例 |
流量 | 请求量/响应体积 |
八、迁移指南
8.1 从 requests 迁移
# 原 requests 代码 import requests resp = requests.get( "https://api.example.com/data", params={"page": 2}, headers={"x-api-key": "123"} ) # 等效 httpx 代码 import httpx resp = httpx.get( "https://api.example.com/data", params={"page": 2}, headers={"x-api-key": "123"} )
8.2 常见差异处理
超时设置:
# requests requests.get(url, timeout=(3.05, 27)) # httpx httpx.get(url, timeout=30.0) # 统一超时控制
会话管理:
# requests with requests.session() as s: s.get(url) # httpx with httpx.client() as client: client.get(url)
九、最佳实践
- 客户端复用:始终重用 client 实例提升性能
- 超时设置:全局超时 + 各操作单独配置
- 类型安全:结合 pydantic 进行响应验证
- 异步优先:在高并发场景使用 asyncclient
- 监控告警:关键指标埋点 + 异常报警
十、调试与故障排除
10.1 请求日志记录
import logging import httpx # 配置详细日志记录 logging.basicconfig(level=logging.debug) # 自定义日志格式 httpx_logger = logging.getlogger("httpx") httpx_logger.setlevel(logging.debug) # 示例请求 client = httpx.client(event_hooks={ "request": [lambda req: print(f">>> 发送请求: {req.method} {req.url}")], "response": [lambda res: print(f"<<< 收到响应: {res.status_code}")], }) client.get("https://httpbin.org/get")
10.2 常见错误处理
try: response = httpx.get( "https://example.com", timeout=3.0, follow_redirects=true # 自动处理重定向 ) response.raise_for_status() except httpx.httpstatuserror as e: print(f"http 错误: {e.response.status_code}") print(f"响应内容: {e.response.text}") except httpx.connecttimeout: print("连接超时,请检查网络或增加超时时间") except httpx.readtimeout: print("服务器响应超时") except httpx.toomanyredirects: print("重定向次数过多,请检查 url") except httpx.requesterror as e: print(f"请求失败: {str(e)}")
十一、高级认证机制
11.1 jwt 自动刷新
from httpx import auth, asyncclient import time class jwtauth(auth): def __init__(self, token_url, client_id, client_secret): self.token_url = token_url self.client_id = client_id self.client_secret = client_secret self.access_token = none self.expires_at = 0 async def async_auth_flow(self, request): if time.time() > self.expires_at - 30: # 提前30秒刷新 await self._refresh_token() request.headers["authorization"] = f"bearer {self.access_token}" yield request async def _refresh_token(self): async with asyncclient() as client: response = await client.post( self.token_url, data={ "grant_type": "client_credentials", "client_id": self.client_id, "client_secret": self.client_secret } ) token_data = response.json() self.access_token = token_data["access_token"] self.expires_at = time.time() + token_data["expires_in"] # 使用示例 auth = jwtauth( token_url="https://auth.example.com/token", client_id="your-client-id", client_secret="your-secret" ) async with asyncclient(auth=auth) as client: response = await client.get("https://api.example.com/protected")
11.2 aws sigv4 签名
# 需要安装 httpx-auth from httpx_auth import awsauth auth = awsauth( aws_access_key_id="akia...", aws_secret_access_key="...", aws_session_token="...", # 可选 region="us-west-2", service="execute-api" ) response = httpx.get( "https://api.example.com/aws-resource", auth=auth )
十二、流式处理进阶
12.1 分块上传大文件
import httpx import os from tqdm import tqdm def upload_large_file(url, file_path, chunk_size=1024*1024): file_size = os.path.getsize(file_path) headers = { "content-length": str(file_size), "content-type": "application/octet-stream" } with open(file_path, "rb") as f, \ tqdm(total=file_size, unit="b", unit_scale=true) as pbar: def generate(): while true: chunk = f.read(chunk_size) if not chunk: break pbar.update(len(chunk)) yield chunk with httpx.client(timeout=none) as client: response = client.post( url, content=generate(), headers=headers ) return response.status_code # 使用示例 upload_large_file( "https://httpbin.org/post", "large_file.zip", chunk_size=5*1024*1024 # 5mb 分块 )
12.2 实时流式响应处理
async def process_streaming_response(): async with httpx.asyncclient() as client: async with client.stream("get", "https://stream.example.com/live-data") as response: async for chunk in response.aiter_bytes(): # 实时处理数据块 print(f"收到 {len(chunk)} 字节数据") process_data(chunk) # 自定义处理函数
十三、自定义中间件与传输层
13.1 请求重试中间件
from httpx import asyncclient, request, response import httpx class retrymiddleware: def __init__(self, max_retries=3): self.max_retries = max_retries async def __call__(self, request: request, get_response): for attempt in range(self.max_retries + 1): try: response = await get_response(request) if response.status_code >= 500: raise httpx.httpstatuserror("server error", request=request, response=response) return response except (httpx.requesterror, httpx.httpstatuserror) as e: if attempt == self.max_retries: raise await asyncio.sleep(2 ** attempt) return response # 永远不会执行此处 # 创建自定义客户端 client = asyncclient( transport=httpx.asynchttptransport( retries=3, middleware=[retrymiddleware(max_retries=3)] )
13.2 修改请求头中间件
def add_custom_header_middleware(): async def middleware(request: request, get_response): request.headers["x-request-id"] = str(uuid.uuid4()) response = await get_response(request) return response return middleware client = asyncclient( event_hooks={ "request": [add_custom_header_middleware()] } )
十四、性能调优实战
14.1 性能分析工具
# 使用 cprofile 分析请求性能 import cprofile import httpx def profile_requests(): with httpx.client() as client: for _ in range(100): client.get("https://httpbin.org/get") if __name__ == "__main__": cprofile.run("profile_requests()", sort="cumtime")
14.2 连接池优化配置
optimized_client = httpx.asyncclient( limits=httpx.limits( max_connections=200, # 最大连接数 max_keepalive_connections=50, # 保持活跃的连接数 keepalive_expiry=60 # 空闲连接存活时间 ), timeout=httpx.timeout( connect=5.0, # 连接超时 read=20.0, # 读取超时 pool=3.0 # 连接池等待超时 ), http2=true # 启用 http/2 )
十五、与异步框架深度集成
15.1 在 fastapi 中使用
from fastapi import fastapi, depends from httpx import asyncclient app = fastapi() async def get_async_client(): async with asyncclient(base_url="https://api.example.com") as client: yield client @app.get("/proxy-data") async def proxy_data(client: asyncclient = depends(get_async_client)): response = await client.get("/remote-data") return response.json()
15.2 集成 celery 异步任务
from celery import celery from httpx import asyncclient app = celery("tasks", broker="pyamqp://guest@localhost//") @app.task def sync_http_request(): with httpx.client() as client: return client.get("https://api.example.com/data").json() @app.task async def async_http_request(): async with asyncclient() as client: response = await client.get("https://api.example.com/data") return response.json()
十六、安全最佳实践
16.1 证书固定
# 使用指纹验证证书 client = httpx.client( verify=true, limits=httpx.limits(max_keepalive_connections=5), cert=("/path/client.crt", "/path/client.key"), # 证书指纹校验 transport=httpx.httptransport( verify=httpx.sslconfig( cert_reqs="cert_required", ca_certs="/path/ca.pem", fingerprint="sha256:..." ) ) )
16.2 敏感数据防护
from pydantic import secretstr class secureclient: def __init__(self, api_key: secretstr): self.client = httpx.client( headers={"authorization": f"bearer {api_key.get_secret_value()}"}, timeout=30.0 ) def safe_request(self): try: return self.client.get("https://secure-api.example.com") except httpx.requesterror: # 记录错误但不暴露密钥 log.error("api请求失败") # 使用 secure_client = secureclient(api_key=secretstr("s3cr3t"))
十七、实战案例:分布式爬虫
import httpx import asyncio from bs4 import beautifulsoup from urllib.parse import urljoin class asynccrawler: def __init__(self, base_url, concurrency=10): self.base_url = base_url self.seen_urls = set() self.semaphore = asyncio.semaphore(concurrency) self.client = httpx.asyncclient(timeout=10.0) async def crawl(self, path="/"): url = urljoin(self.base_url, path) if url in self.seen_urls: return self.seen_urls.add(url) async with self.semaphore: try: response = await self.client.get(url) if response.status_code == 200: await self.parse(response) except httpx.requesterror as e: print(f"请求失败: {url} - {str(e)}") async def parse(self, response): soup = beautifulsoup(response.text, "html.parser") # 提取数据 print(f"解析页面: {response.url}") # 提取链接继续爬取 for link in soup.find_all("a", href=true): await self.crawl(link["href"]) async def run(self): await self.crawl() await self.client.aclose() # 启动爬虫 async def main(): crawler = asynccrawler("https://example.com") await crawler.run() asyncio.run(main())
十八、扩展学习资源
18.1 官方文档
总结
通过本指南的深度扩展,您已经掌握了:
高级调试技巧:包括日志配置和精细化错误处理企业级认证方案:jwt自动刷新和aws签名实现流式处理最佳实践:大文件分块上传和实时流处理自定义扩展能力:中间件开发和传输层定制性能调优策略:连接池配置和性能分析工具使用框架集成模式:与fastapi、celery等框架的深度整合安全防护方案:证书固定和敏感数据处理完整实战案例:分布式异步爬虫的实现
发表评论