当前位置: 代码网 > it编程>前端脚本>Python > Python Requests实现API请求重试与超时配置全解析

Python Requests实现API请求重试与超时配置全解析

2025年11月28日 Python 我要评论
​在电商物流追踪、金融数据监控等场景中,api请求的稳定性直接决定系统可靠性。当顺丰api因网络抖动返回503错误,或因跨地域调用出现10秒延迟时,如何确保程序不崩溃且数据不丢失?本文通过真实案例拆解

​在电商物流追踪、金融数据监控等场景中,api请求的稳定性直接决定系统可靠性。当顺丰api因网络抖动返回503错误,或因跨地域调用出现10秒延迟时,如何确保程序不崩溃且数据不丢失?本文通过真实案例拆解,用requests库实现"防抖动+抗异常"的健壮请求方案。

一、血泪教训:那些年踩过的api坑

某跨境电商系统在"黑色星期五"大促期间突发故障:调用顺丰国际件接口时,30%的请求因超时失败,导致2000+包裹状态同步延迟。事后分析发现三大元凶:

  • 固定超时陷阱:设置timeout=5导致所有跨洋请求必然超时(实际平均响应时间8秒)
  • 暴力重试雪崩:简单for循环重试5次,瞬间产生10倍请求量击垮顺丰网关
  • 代理池污染:使用失效代理ip发起请求,触发顺丰反爬机制封禁整个ip段

这些场景揭示核心问题:api请求需要"有智慧的等待"和"有策略的坚持"

二、超时配置:给请求装上"安全阀"

1. 连接超时 vs 读取超时

import requests

try:
    # 连接超时3秒(tcp握手阶段)
    # 读取超时10秒(服务器处理阶段)
    response = requests.get(
        'https://api.sf-express.com/track',
        params={'trackingnumber': 'sf123456789'},
        timeout=(3, 10)  # 元组形式分别设置
    )
    print(response.json())
except requests.exceptions.connecttimeout:
    print("连接服务器失败,请检查网络")
except requests.exceptions.readtimeout:
    print("服务器处理超时,请稍后重试")

关键决策点

  • 国内api调用:timeout=(2, 5)(连接2秒,读取5秒)
  • 跨境api调用:timeout=(5, 15)(考虑国际链路延迟)
  • 文件上传场景:需增加write_timeout参数(需httpx等库支持)

2. 动态超时策略

某物流监控系统采用分级超时机制:

def get_dynamic_timeout(retry_count):
    base_timeout = 3  # 基础超时
    if retry_count > 0:
        return min(base_timeout * (2 ** retry_count), 30)  # 指数退避,最大30秒
    return base_timeout

# 使用示例
for i in range(3):
    try:
        timeout = get_dynamic_timeout(i)
        response = requests.get(url, timeout=timeout)
        break
    except exception as e:
        print(f"第{i+1}次尝试失败,超时时间调整为{timeout}秒")

三、重试机制:让请求学会"坚持"

1. 指数退避重试(推荐方案)

from requests.adapters import httpadapter
from urllib3.util.retry import retry

def create_retry_session(retries=3, backoff_factor=1, status_forcelist=(500, 502, 503, 504)):
    session = requests.session()
    retry = retry(
        total=retries,
        read=true,  # 允许读取超时重试
        connect=true,  # 允许连接超时重试
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
        allowed_methods=["get", "post"]  # 支持post请求重试
    )
    adapter = httpadapter(max_retries=retry)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    return session

# 使用示例
session = create_retry_session()
response = session.get('https://api.sf-express.com/track', params={'trackingnumber': 'sf123456789'})

参数深解:

  • backoff_factor=1:第1次重试等待1秒,第2次2秒,第3次4秒
  • status_forcelist:仅对5xx服务器错误和429限流错误重试
  • allowed_methods:默认不重试post请求,需显式声明

2. 熔断机制实现(避免雪崩)

from collections import deque
import time

class circuitbreaker:
    def __init__(self, max_failures=3, reset_timeout=60):
        self.failures = deque(maxlen=max_failures)
        self.reset_timeout = reset_timeout

    def is_open(self):
        if len(self.failures) < self.failures.maxlen:
            return false
        # 如果最近max_failures次请求都失败,且最后一次失败在reset_timeout秒内
        return (time.time() - self.failures[-1]) < self.reset_timeout

    def record_failure(self):
        self.failures.append(time.time())

    def record_success(self):
        self.failures.clear()

# 结合重试使用
breaker = circuitbreaker(max_failures=3, reset_timeout=30)

def safe_request():
    if breaker.is_open():
        raise exception("service unavailable, circuit breaker open")
    
    try:
        response = create_retry_session().get(url)
        if response.status_code == 200:
            breaker.record_success()
            return response
        else:
            breaker.record_failure()
            raise exception("api request failed")
    except exception as e:
        breaker.record_failure()
        raise e

四、代理配置:突破封禁的"隐身术"

1. 代理池实战方案

import random
from requests.adapters import httpadapter

class proxypool:
    def __init__(self):
        self.proxies = [
            {"http": "http://1.1.1.1:8080", "https": "http://1.1.1.1:8080"},
            {"http": "http://2.2.2.2:8080", "https": "http://2.2.2.2:8080"},
            # 更多代理...
        ]
        self.failed_proxies = set()

    def get_proxy(self):
        available_proxies = [p for p in self.proxies if p not in self.failed_proxies]
        if not available_proxies:
            raise exception("no available proxies")
        return random.choice(available_proxies)

    def mark_failed(self, proxy):
        self.failed_proxies.add(proxy)

# 使用示例
proxy_pool = proxypool()
session = requests.session()

for _ in range(3):  # 尝试3个不同代理
    try:
        proxy = proxy_pool.get_proxy()
        response = session.get(
            'https://api.sf-express.com/track',
            proxies=proxy,
            timeout=(3, 10)
        )
        if response.status_code == 200:
            print("success with proxy:", proxy)
            break
        else:
            proxy_pool.mark_failed(proxy)
    except exception:
        proxy_pool.mark_failed(proxy)
else:
    print("all proxies failed")

2. tor代理配置(高匿名场景)

import requests

def make_tor_request(url):
    proxies = {
        'http': 'socks5h://127.0.0.1:9050',
        'https': 'socks5h://127.0.0.1:9050'
    }
    try:
        response = requests.get(url, proxies=proxies, timeout=(5, 15))
        print("tor出口ip:", response.json()['origin'])
        return response
    except exception as e:
        print("tor请求失败:", e)

# 需提前安装tor服务并启动
# sudo apt install tor  # ubuntu系统
# sudo service tor start

五、完整实战案例

import requests
from requests.adapters import httpadapter
from urllib3.util.retry import retry
import logging
from datetime import datetime

# 日志配置
logging.basicconfig(
    level=logging.info,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

class sfexpresstracker:
    def __init__(self, app_key, app_secret):
        self.app_key = app_key
        self.app_secret = app_secret
        self.session = self._create_session()

    def _create_session(self):
        """创建带重试和超时的会话"""
        retry_strategy = retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["get", "post"]
        )
        adapter = httpadapter(max_retries=retry_strategy)
        session = requests.session()
        session.mount("http://", adapter)
        session.mount("https://", adapter)
        return session

    def _generate_sign(self, params):
        """生成api签名(简化版)"""
        import hashlib
        sorted_params = sorted(params.items(), key=lambda x: x[0])
        raw_str = self.app_secret + ''.join([f"{k}{v}" for k, v in sorted_params]) + self.app_secret
        return hashlib.md5(raw_str.encode()).hexdigest().upper()

    def query_track(self, tracking_number):
        """查询物流轨迹"""
        url = "https://bsp-ois.sf-express.com/bsp-ois/express/service/querytrack"
        params = {
            "appkey": self.app_key,
            "tracknumber": tracking_number,
            "timestamp": str(int(datetime.now().timestamp()))
        }
        params["sign"] = self._generate_sign(params)

        try:
            response = self.session.get(
                url,
                params=params,
                timeout=(3, 10)  # 连接3秒,读取10秒
            )
            response.raise_for_status()
            data = response.json()
            
            if data.get('success'):
                return data['data']['tracks']
            else:
                logging.warning(f"api返回错误: {data.get('errormsg', '未知错误')}")
                return none
                
        except requests.exceptions.requestexception as e:
            logging.error(f"请求失败: {str(e)}")
            return none

# 使用示例
if __name__ == "__main__":
    tracker = sfexpresstracker(app_key="your_app_key", app_secret="your_app_secret")
    tracks = tracker.query_track("sf123456789")
    if tracks:
        for step in tracks:
            print(f"{step['accepttime']} {step['acceptaddress']} - {step['remark']}")

六、常见问题q&a

q1:被网站封ip怎么办?

a:立即启用备用代理池,建议使用住宅代理(如站大爷ip代理),配合每请求更换ip策略。对于大规模爬取,可采用tor网络或ip轮换中间件。

q2:如何选择重试次数?

a:遵循"3次黄金法则":

  • 首次请求
  • 指数退避重试2次(总计3次)
  • 超过3次仍失败应触发熔断或人工干预

q3:代理请求速度慢怎么解决?

a:

  • 测试代理延迟:curl --socks5 127.0.0.1:9050 https://httpbin.org/ip
  • 使用代理评分机制,淘汰高延迟代理
  • 对代理请求添加user-agent和常规请求头

q4:如何记录重试日志?

a:扩展retry类实现自定义日志:

from urllib3.util.retry import retry
import logging

class loggingretry(retry):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.logger = logging.getlogger(__name__)

    def new(self, **kw):
        self.logger.debug(f"creating new retry adapter with params: {kw}")
        return super().new(**kw)

    def increment(self, method, url, response=none, error=none, **kwargs):
        self.logger.warning(f"retry attempt {self.total - self._remaining + 1} for {method} {url}")
        return super().increment(method, url, response, error, **kwargs)

q5:post请求重试需要注意什么?

a:

  • 确保请求是幂等的(如使用唯一请求id)
  • 在重试前检查响应是否已部分处理
  • 考虑使用idempotency-key请求头(如stripe api要求)

通过合理组合超时配置、智能重试和代理策略,可构建出应对各种异常场景的健壮api请求系统。实际开发中建议结合prometheus监控重试率、失败率等指标,持续优化请求策略。

以上就是python requests实现api请求重试与超时配置全解析的详细内容,更多关于python requests请求重试与超时配置的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com