在电商物流追踪、金融数据监控等场景中,api请求的稳定性直接决定系统可靠性。当顺丰api因网络抖动返回503错误,或因跨地域调用出现10秒延迟时,如何确保程序不崩溃且数据不丢失?本文通过真实案例拆解,用requests库实现"防抖动+抗异常"的健壮请求方案。
一、血泪教训:那些年踩过的api坑
某跨境电商系统在"黑色星期五"大促期间突发故障:调用顺丰国际件接口时,30%的请求因超时失败,导致2000+包裹状态同步延迟。事后分析发现三大元凶:
- 固定超时陷阱:设置
timeout=5导致所有跨洋请求必然超时(实际平均响应时间8秒) - 暴力重试雪崩:简单for循环重试5次,瞬间产生10倍请求量击垮顺丰网关
- 代理池污染:使用失效代理ip发起请求,触发顺丰反爬机制封禁整个ip段
这些场景揭示核心问题:api请求需要"有智慧的等待"和"有策略的坚持" 。
二、超时配置:给请求装上"安全阀"
1. 连接超时 vs 读取超时
import requests
try:
# 连接超时3秒(tcp握手阶段)
# 读取超时10秒(服务器处理阶段)
response = requests.get(
'https://api.sf-express.com/track',
params={'trackingnumber': 'sf123456789'},
timeout=(3, 10) # 元组形式分别设置
)
print(response.json())
except requests.exceptions.connecttimeout:
print("连接服务器失败,请检查网络")
except requests.exceptions.readtimeout:
print("服务器处理超时,请稍后重试")
关键决策点:
- 国内api调用:
timeout=(2, 5)(连接2秒,读取5秒) - 跨境api调用:
timeout=(5, 15)(考虑国际链路延迟) - 文件上传场景:需增加
write_timeout参数(需httpx等库支持)
2. 动态超时策略
某物流监控系统采用分级超时机制:
def get_dynamic_timeout(retry_count):
base_timeout = 3 # 基础超时
if retry_count > 0:
return min(base_timeout * (2 ** retry_count), 30) # 指数退避,最大30秒
return base_timeout
# 使用示例
for i in range(3):
try:
timeout = get_dynamic_timeout(i)
response = requests.get(url, timeout=timeout)
break
except exception as e:
print(f"第{i+1}次尝试失败,超时时间调整为{timeout}秒")
三、重试机制:让请求学会"坚持"
1. 指数退避重试(推荐方案)
from requests.adapters import httpadapter
from urllib3.util.retry import retry
def create_retry_session(retries=3, backoff_factor=1, status_forcelist=(500, 502, 503, 504)):
session = requests.session()
retry = retry(
total=retries,
read=true, # 允许读取超时重试
connect=true, # 允许连接超时重试
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
allowed_methods=["get", "post"] # 支持post请求重试
)
adapter = httpadapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
# 使用示例
session = create_retry_session()
response = session.get('https://api.sf-express.com/track', params={'trackingnumber': 'sf123456789'})
参数深解:
backoff_factor=1:第1次重试等待1秒,第2次2秒,第3次4秒status_forcelist:仅对5xx服务器错误和429限流错误重试allowed_methods:默认不重试post请求,需显式声明
2. 熔断机制实现(避免雪崩)
from collections import deque
import time
class circuitbreaker:
def __init__(self, max_failures=3, reset_timeout=60):
self.failures = deque(maxlen=max_failures)
self.reset_timeout = reset_timeout
def is_open(self):
if len(self.failures) < self.failures.maxlen:
return false
# 如果最近max_failures次请求都失败,且最后一次失败在reset_timeout秒内
return (time.time() - self.failures[-1]) < self.reset_timeout
def record_failure(self):
self.failures.append(time.time())
def record_success(self):
self.failures.clear()
# 结合重试使用
breaker = circuitbreaker(max_failures=3, reset_timeout=30)
def safe_request():
if breaker.is_open():
raise exception("service unavailable, circuit breaker open")
try:
response = create_retry_session().get(url)
if response.status_code == 200:
breaker.record_success()
return response
else:
breaker.record_failure()
raise exception("api request failed")
except exception as e:
breaker.record_failure()
raise e
四、代理配置:突破封禁的"隐身术"
1. 代理池实战方案
import random
from requests.adapters import httpadapter
class proxypool:
def __init__(self):
self.proxies = [
{"http": "http://1.1.1.1:8080", "https": "http://1.1.1.1:8080"},
{"http": "http://2.2.2.2:8080", "https": "http://2.2.2.2:8080"},
# 更多代理...
]
self.failed_proxies = set()
def get_proxy(self):
available_proxies = [p for p in self.proxies if p not in self.failed_proxies]
if not available_proxies:
raise exception("no available proxies")
return random.choice(available_proxies)
def mark_failed(self, proxy):
self.failed_proxies.add(proxy)
# 使用示例
proxy_pool = proxypool()
session = requests.session()
for _ in range(3): # 尝试3个不同代理
try:
proxy = proxy_pool.get_proxy()
response = session.get(
'https://api.sf-express.com/track',
proxies=proxy,
timeout=(3, 10)
)
if response.status_code == 200:
print("success with proxy:", proxy)
break
else:
proxy_pool.mark_failed(proxy)
except exception:
proxy_pool.mark_failed(proxy)
else:
print("all proxies failed")
2. tor代理配置(高匿名场景)
import requests
def make_tor_request(url):
proxies = {
'http': 'socks5h://127.0.0.1:9050',
'https': 'socks5h://127.0.0.1:9050'
}
try:
response = requests.get(url, proxies=proxies, timeout=(5, 15))
print("tor出口ip:", response.json()['origin'])
return response
except exception as e:
print("tor请求失败:", e)
# 需提前安装tor服务并启动
# sudo apt install tor # ubuntu系统
# sudo service tor start
五、完整实战案例
import requests
from requests.adapters import httpadapter
from urllib3.util.retry import retry
import logging
from datetime import datetime
# 日志配置
logging.basicconfig(
level=logging.info,
format='%(asctime)s - %(levelname)s - %(message)s'
)
class sfexpresstracker:
def __init__(self, app_key, app_secret):
self.app_key = app_key
self.app_secret = app_secret
self.session = self._create_session()
def _create_session(self):
"""创建带重试和超时的会话"""
retry_strategy = retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["get", "post"]
)
adapter = httpadapter(max_retries=retry_strategy)
session = requests.session()
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def _generate_sign(self, params):
"""生成api签名(简化版)"""
import hashlib
sorted_params = sorted(params.items(), key=lambda x: x[0])
raw_str = self.app_secret + ''.join([f"{k}{v}" for k, v in sorted_params]) + self.app_secret
return hashlib.md5(raw_str.encode()).hexdigest().upper()
def query_track(self, tracking_number):
"""查询物流轨迹"""
url = "https://bsp-ois.sf-express.com/bsp-ois/express/service/querytrack"
params = {
"appkey": self.app_key,
"tracknumber": tracking_number,
"timestamp": str(int(datetime.now().timestamp()))
}
params["sign"] = self._generate_sign(params)
try:
response = self.session.get(
url,
params=params,
timeout=(3, 10) # 连接3秒,读取10秒
)
response.raise_for_status()
data = response.json()
if data.get('success'):
return data['data']['tracks']
else:
logging.warning(f"api返回错误: {data.get('errormsg', '未知错误')}")
return none
except requests.exceptions.requestexception as e:
logging.error(f"请求失败: {str(e)}")
return none
# 使用示例
if __name__ == "__main__":
tracker = sfexpresstracker(app_key="your_app_key", app_secret="your_app_secret")
tracks = tracker.query_track("sf123456789")
if tracks:
for step in tracks:
print(f"{step['accepttime']} {step['acceptaddress']} - {step['remark']}")
六、常见问题q&a
q1:被网站封ip怎么办?
a:立即启用备用代理池,建议使用住宅代理(如站大爷ip代理),配合每请求更换ip策略。对于大规模爬取,可采用tor网络或ip轮换中间件。
q2:如何选择重试次数?
a:遵循"3次黄金法则":
- 首次请求
- 指数退避重试2次(总计3次)
- 超过3次仍失败应触发熔断或人工干预
q3:代理请求速度慢怎么解决?
a:
- 测试代理延迟:
curl --socks5 127.0.0.1:9050 https://httpbin.org/ip - 使用代理评分机制,淘汰高延迟代理
- 对代理请求添加
user-agent和常规请求头
q4:如何记录重试日志?
a:扩展retry类实现自定义日志:
from urllib3.util.retry import retry
import logging
class loggingretry(retry):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.logger = logging.getlogger(__name__)
def new(self, **kw):
self.logger.debug(f"creating new retry adapter with params: {kw}")
return super().new(**kw)
def increment(self, method, url, response=none, error=none, **kwargs):
self.logger.warning(f"retry attempt {self.total - self._remaining + 1} for {method} {url}")
return super().increment(method, url, response, error, **kwargs)
q5:post请求重试需要注意什么?
a:
- 确保请求是幂等的(如使用唯一请求id)
- 在重试前检查响应是否已部分处理
- 考虑使用
idempotency-key请求头(如stripe api要求)
通过合理组合超时配置、智能重试和代理策略,可构建出应对各种异常场景的健壮api请求系统。实际开发中建议结合prometheus监控重试率、失败率等指标,持续优化请求策略。
以上就是python requests实现api请求重试与超时配置全解析的详细内容,更多关于python requests请求重试与超时配置的资料请关注代码网其它相关文章!
发表评论