当前位置: 代码网 > it编程>前端脚本>Python > Python httpx库终极指南实战案例

Python httpx库终极指南实战案例

2025年05月12日 Python 我要评论
一、发展历程与技术定位1.1 历史演进起源:httpx 由 encode 团队开发,于 2019 年首次发布,目标是提供一个现代化的 http 客户端,支持同步和异步操作,并兼容 http/1.1 和

一、发展历程与技术定位

1.1 历史演进

  • 起源httpxencode 团队开发,于 2019 年首次发布,目标是提供一个现代化的 http 客户端,支持同步和异步操作,并兼容 http/1.1 和 http/2。
  • 背景
    • requests 库虽然功能强大,但缺乏对异步和 http/2 的原生支持。
    • httpx 应运而生,弥补了 requests 的不足,同时保持了类似的 api 设计。
  • 核心优势
    • 同步和异步双模式。
    • 支持 http/2。
    • 类型提示完善,兼容 python 3.6+。
版本里程碑特性发布时间
0.1初始版本发布2019.01
0.18正式支持 http/22020.09
0.21顶层异步 api 引入2021.03
0.24完整类型注解支持2021.10
0.26websocket 正式支持2022.04

1.2 设计哲学

  • 双模式统一:同一 api 同时支持同步和异步编程范式
  • 协议现代化:原生支持 http/2 和 websocket
  • 类型安全:100% 类型提示覆盖,兼容 mypy 静态检查
  • 生态集成:成为 fastapi/starlette 官方推荐客户端

1.3 适用场景

  • 需要异步 http 请求的 web 应用
  • 高并发 api 调用场景
  • http/2 服务交互
  • 需要严格类型检查的大型项目

二、核心功能与基础用法

核心特性

  • 同步与异步:同一 api 支持同步 httpx.get() 和异步 await httpx.asyncclient().get()
  • http/2 支持:通过 http2=true 启用。
  • 连接池管理:自动复用连接,提升性能。
  • 类型安全:代码完全类型注释,ide 友好。
  • websocket 支持:通过 httpx.websocketsession 实现。
  • 文件上传与流式传输:支持大文件分块上传和流式响应。

2.1 安装配置

# 基础安装
pip install httpx
# 完整功能安装(http/2 + 代理支持)
pip install "httpx[http2,socks]"

2.2 请求方法全景

import httpx
# 同步客户端
with httpx.client() as client:
    # restful 全方法支持
    client.get(url, params={...})
    client.post(url, json={...})
    client.put(url, data={...})
    client.patch(url, files={...})
    client.delete(url)
# 异步客户端
async with httpx.asyncclient() as client:
    await client.get(...)

2.3 响应处理

response = httpx.get("https://api.example.com/data")
# 常用属性和方法
print(response.status_code)     # http 状态码
print(response.headers)         # 响应头
print(response.text)            # 文本内容
print(response.json())          # json 解码
print(response.content)         # 二进制内容
print(response.stream())        # 流式访问

三、高级特性与性能优化

3.1 http/2 多路复用

# 启用 http/2
client = httpx.client(http2=true)
response = client.get("https://http2.example.com")
print(response.http_version)  # 输出: "http/2"

3.2 连接池配置

# 优化连接参数
custom_client = httpx.client(
    limits=httpx.limits(
        max_keepalive_connections=20,  # 长连接上限
        max_connections=100,           # 总连接数
        keepalive_expiry=30            # 空闲时间(s)
    ),
    timeout=10.0                       # 默认超时
)

3.3 重试策略实现

from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential())
def reliable_request():
    response = httpx.get("https://unstable-api.example.com")
    response.raise_for_status()
    return response

四、企业级功能扩展

4.1 分布式追踪

# opentelemetry 集成
from opentelemetry.instrumentation.httpx import httpxclientinstrumentor
httpxclientinstrumentor().instrument()
async def tracked_request():
    async with httpx.asyncclient() as client:
        await client.get("https://api.example.com")  # 自动生成追踪 span

4.2 安全实践

# 证书配置
secure_client = httpx.client(
    verify="/path/to/ca-bundle.pem",  # 自定义 ca
    cert=("/path/to/client-cert.pem", "/path/to/client-key.pem")
)
# 敏感信息处理
import os
client = httpx.client(
    headers={"authorization": f"bearer {os.environ['api_token']}"}
)

4.3 代理配置

# socks 代理
from httpx_socks import asyncproxytransport
proxy_transport = asyncproxytransport.from_url("socks5://user:pass@host:port")
async with httpx.asyncclient(transport=proxy_transport) as client:
    await client.get("https://api.example.com")

五、与 requests 的对比

5.1 功能对比表

功能httpxrequests
异步支持✅ 原生❌ 仅同步
http/2
类型提示完整支持部分支持
websocket
连接池配置精细化控制基础配置

5.2 性能对比数据

# 基准测试结果(1000 请求)
| 场景          | requests (s) | httpx 同步 (s) | httpx 异步 (s) |
|---------------|--------------|-----------------|-----------------|
| 短连接 http/1 | 12.3         | 11.8 (+4%)      | 2.1 (+83%)      |
| 长连接 http/2 | n/a          | 9.5             | 1.7             |

六、完整代码案例

6.1 异步高并发采集

import httpx
import asyncio
async def fetch(url: str, client: httpx.asyncclient):
    response = await client.get(url)
    return response.text[:100]  # 截取部分内容
async def main():
    urls = [f"https://httpbin.org/get?q={i}" for i in range(10)]
    async with httpx.asyncclient(timeout=10.0) as client:
        tasks = [fetch(url, client) for url in urls]
        results = await asyncio.gather(*tasks)
    for url, result in zip(urls, results):
        print(f"{url}: {result}")
asyncio.run(main())

6.2 oauth2 客户端

from httpx import oauth2, asyncclient
async def oauth2_flow():
    auth = oauth2(
        client_id="client_id",
        client_secret="secret",
        token_endpoint="https://auth.example.com/oauth2/token",
        grant_type="client_credentials"
    )
    async with asyncclient(auth=auth) as client:
        # 自动处理 token 获取和刷新
        response = await client.get("https://api.example.com/protected")
        return response.json()

6.3 文件分块上传

import httpx
from tqdm import tqdm
def chunked_upload(url: str, file_path: str, chunk_size: int = 1024*1024):
    with open(file_path, "rb") as f:
        file_size = f.seek(0, 2)
        f.seek(0)
        with tqdm(total=file_size, unit="b", unit_scale=true) as pbar:
            with httpx.client(timeout=none) as client:  # 禁用超时
                while true:
                    chunk = f.read(chunk_size)
                    if not chunk:
                        break
                    response = client.post(
                        url,
                        files={"file": chunk},
                        headers={"content-range": f"bytes {f.tell()-len(chunk)}-{f.tell()-1}/{file_size}"}
                    )
                    pbar.update(len(chunk))
                return response.status_code

七、架构建议

7.1 客户端分层设计

7.2 监控指标

指标类别具体指标
连接池活跃连接数/空闲连接数
性能平均响应时间/99 分位值
成功率2xx/3xx/4xx/5xx 比例
流量请求量/响应体积

八、迁移指南

8.1 从 requests 迁移

# 原 requests 代码
import requests
resp = requests.get(
    "https://api.example.com/data",
    params={"page": 2},
    headers={"x-api-key": "123"}
)
# 等效 httpx 代码
import httpx
resp = httpx.get(
    "https://api.example.com/data",
    params={"page": 2},
    headers={"x-api-key": "123"}
)

8.2 常见差异处理

超时设置

# requests
requests.get(url, timeout=(3.05, 27))
# httpx
httpx.get(url, timeout=30.0)  # 统一超时控制

会话管理

# requests
with requests.session() as s:
    s.get(url)
# httpx
with httpx.client() as client:
    client.get(url)

九、最佳实践

  • 客户端复用:始终重用 client 实例提升性能
  • 超时设置:全局超时 + 各操作单独配置
  • 类型安全:结合 pydantic 进行响应验证
  • 异步优先:在高并发场景使用 asyncclient
  • 监控告警:关键指标埋点 + 异常报警

十、调试与故障排除

10.1 请求日志记录

import logging
import httpx
# 配置详细日志记录
logging.basicconfig(level=logging.debug)
# 自定义日志格式
httpx_logger = logging.getlogger("httpx")
httpx_logger.setlevel(logging.debug)
# 示例请求
client = httpx.client(event_hooks={
    "request": [lambda req: print(f">>> 发送请求: {req.method} {req.url}")],
    "response": [lambda res: print(f"<<< 收到响应: {res.status_code}")],
})
client.get("https://httpbin.org/get")

10.2 常见错误处理

try:
    response = httpx.get(
        "https://example.com",
        timeout=3.0,
        follow_redirects=true  # 自动处理重定向
    )
    response.raise_for_status()
except httpx.httpstatuserror as e:
    print(f"http 错误: {e.response.status_code}")
    print(f"响应内容: {e.response.text}")
except httpx.connecttimeout:
    print("连接超时,请检查网络或增加超时时间")
except httpx.readtimeout:
    print("服务器响应超时")
except httpx.toomanyredirects:
    print("重定向次数过多,请检查 url")
except httpx.requesterror as e:
    print(f"请求失败: {str(e)}")

十一、高级认证机制

11.1 jwt 自动刷新

from httpx import auth, asyncclient
import time
class jwtauth(auth):
    def __init__(self, token_url, client_id, client_secret):
        self.token_url = token_url
        self.client_id = client_id
        self.client_secret = client_secret
        self.access_token = none
        self.expires_at = 0
    async def async_auth_flow(self, request):
        if time.time() > self.expires_at - 30:  # 提前30秒刷新
            await self._refresh_token()
        request.headers["authorization"] = f"bearer {self.access_token}"
        yield request
    async def _refresh_token(self):
        async with asyncclient() as client:
            response = await client.post(
                self.token_url,
                data={
                    "grant_type": "client_credentials",
                    "client_id": self.client_id,
                    "client_secret": self.client_secret
                }
            )
            token_data = response.json()
            self.access_token = token_data["access_token"]
            self.expires_at = time.time() + token_data["expires_in"]
# 使用示例
auth = jwtauth(
    token_url="https://auth.example.com/token",
    client_id="your-client-id",
    client_secret="your-secret"
)
async with asyncclient(auth=auth) as client:
    response = await client.get("https://api.example.com/protected")

11.2 aws sigv4 签名

# 需要安装 httpx-auth
from httpx_auth import awsauth
auth = awsauth(
    aws_access_key_id="akia...",
    aws_secret_access_key="...",
    aws_session_token="...",  # 可选
    region="us-west-2",
    service="execute-api"
)
response = httpx.get(
    "https://api.example.com/aws-resource",
    auth=auth
)

十二、流式处理进阶

12.1 分块上传大文件

import httpx
import os
from tqdm import tqdm
def upload_large_file(url, file_path, chunk_size=1024*1024):
    file_size = os.path.getsize(file_path)
    headers = {
        "content-length": str(file_size),
        "content-type": "application/octet-stream"
    }
    with open(file_path, "rb") as f, \
         tqdm(total=file_size, unit="b", unit_scale=true) as pbar:
        def generate():
            while true:
                chunk = f.read(chunk_size)
                if not chunk:
                    break
                pbar.update(len(chunk))
                yield chunk
        with httpx.client(timeout=none) as client:
            response = client.post(
                url,
                content=generate(),
                headers=headers
            )
            return response.status_code
# 使用示例
upload_large_file(
    "https://httpbin.org/post",
    "large_file.zip",
    chunk_size=5*1024*1024  # 5mb 分块
)

12.2 实时流式响应处理

async def process_streaming_response():
    async with httpx.asyncclient() as client:
        async with client.stream("get", "https://stream.example.com/live-data") as response:
            async for chunk in response.aiter_bytes():
                # 实时处理数据块
                print(f"收到 {len(chunk)} 字节数据")
                process_data(chunk)  # 自定义处理函数

十三、自定义中间件与传输层

13.1 请求重试中间件

from httpx import asyncclient, request, response
import httpx
class retrymiddleware:
    def __init__(self, max_retries=3):
        self.max_retries = max_retries
    async def __call__(self, request: request, get_response):
        for attempt in range(self.max_retries + 1):
            try:
                response = await get_response(request)
                if response.status_code >= 500:
                    raise httpx.httpstatuserror("server error", request=request, response=response)
                return response
            except (httpx.requesterror, httpx.httpstatuserror) as e:
                if attempt == self.max_retries:
                    raise
                await asyncio.sleep(2 ** attempt)
        return response  # 永远不会执行此处
# 创建自定义客户端
client = asyncclient(
    transport=httpx.asynchttptransport(
        retries=3,
        middleware=[retrymiddleware(max_retries=3)]
)

13.2 修改请求头中间件

def add_custom_header_middleware():
    async def middleware(request: request, get_response):
        request.headers["x-request-id"] = str(uuid.uuid4())
        response = await get_response(request)
        return response
    return middleware
client = asyncclient(
    event_hooks={
        "request": [add_custom_header_middleware()]
    }
)

十四、性能调优实战

14.1 性能分析工具

# 使用 cprofile 分析请求性能
import cprofile
import httpx
def profile_requests():
    with httpx.client() as client:
        for _ in range(100):
            client.get("https://httpbin.org/get")
if __name__ == "__main__":
    cprofile.run("profile_requests()", sort="cumtime")

14.2 连接池优化配置

optimized_client = httpx.asyncclient(
    limits=httpx.limits(
        max_connections=200,           # 最大连接数
        max_keepalive_connections=50,  # 保持活跃的连接数
        keepalive_expiry=60            # 空闲连接存活时间
    ),
    timeout=httpx.timeout(
        connect=5.0,                   # 连接超时
        read=20.0,                     # 读取超时
        pool=3.0                       # 连接池等待超时
    ),
    http2=true                        # 启用 http/2
)

十五、与异步框架深度集成

15.1 在 fastapi 中使用

from fastapi import fastapi, depends
from httpx import asyncclient
app = fastapi()
async def get_async_client():
    async with asyncclient(base_url="https://api.example.com") as client:
        yield client
@app.get("/proxy-data")
async def proxy_data(client: asyncclient = depends(get_async_client)):
    response = await client.get("/remote-data")
    return response.json()

15.2 集成 celery 异步任务

from celery import celery
from httpx import asyncclient
app = celery("tasks", broker="pyamqp://guest@localhost//")
@app.task
def sync_http_request():
    with httpx.client() as client:
        return client.get("https://api.example.com/data").json()
@app.task
async def async_http_request():
    async with asyncclient() as client:
        response = await client.get("https://api.example.com/data")
        return response.json()

十六、安全最佳实践

16.1 证书固定

# 使用指纹验证证书
client = httpx.client(
    verify=true,
    limits=httpx.limits(max_keepalive_connections=5),
    cert=("/path/client.crt", "/path/client.key"),
    # 证书指纹校验
    transport=httpx.httptransport(
        verify=httpx.sslconfig(
            cert_reqs="cert_required",
            ca_certs="/path/ca.pem",
            fingerprint="sha256:..."
        )
    )
)

16.2 敏感数据防护

from pydantic import secretstr
class secureclient:
    def __init__(self, api_key: secretstr):
        self.client = httpx.client(
            headers={"authorization": f"bearer {api_key.get_secret_value()}"},
            timeout=30.0
        )
    def safe_request(self):
        try:
            return self.client.get("https://secure-api.example.com")
        except httpx.requesterror:
            # 记录错误但不暴露密钥
            log.error("api请求失败")
# 使用
secure_client = secureclient(api_key=secretstr("s3cr3t"))

十七、实战案例:分布式爬虫

import httpx
import asyncio
from bs4 import beautifulsoup
from urllib.parse import urljoin
class asynccrawler:
    def __init__(self, base_url, concurrency=10):
        self.base_url = base_url
        self.seen_urls = set()
        self.semaphore = asyncio.semaphore(concurrency)
        self.client = httpx.asyncclient(timeout=10.0)
    async def crawl(self, path="/"):
        url = urljoin(self.base_url, path)
        if url in self.seen_urls:
            return
        self.seen_urls.add(url)
        async with self.semaphore:
            try:
                response = await self.client.get(url)
                if response.status_code == 200:
                    await self.parse(response)
            except httpx.requesterror as e:
                print(f"请求失败: {url} - {str(e)}")
    async def parse(self, response):
        soup = beautifulsoup(response.text, "html.parser")
        # 提取数据
        print(f"解析页面: {response.url}")
        # 提取链接继续爬取
        for link in soup.find_all("a", href=true):
            await self.crawl(link["href"])
    async def run(self):
        await self.crawl()
        await self.client.aclose()
# 启动爬虫
async def main():
    crawler = asynccrawler("https://example.com")
    await crawler.run()
asyncio.run(main())

十八、扩展学习资源

18.1 官方文档

https 官方文档

httpx github 仓库

总结

通过本指南的深度扩展,您已经掌握了:

高级调试技巧:包括日志配置和精细化错误处理企业级认证方案:jwt自动刷新和aws签名实现流式处理最佳实践:大文件分块上传和实时流处理自定义扩展能力:中间件开发和传输层定制性能调优策略:连接池配置和性能分析工具使用框架集成模式:与fastapi、celery等框架的深度整合安全防护方案:证书固定和敏感数据处理完整实战案例:分布式异步爬虫的实现

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com