当前位置: 代码网 > it编程>前端脚本>Python > Python接口自动化中Token过期的9种解决方案

Python接口自动化中Token过期的9种解决方案

2026年01月15日 Python 我要评论
为什么token管理是接口自动化的痛点?在进行python接口自动化测试时,相信大家都遇到过这样的困扰:❌ 每次跑用例前手动更新 token❌ 测试中途 token 过期,用例批量失败❌ 多环境(de

为什么token管理是接口自动化的痛点?

在进行python接口自动化测试时,相信大家都遇到过这样的困扰:

❌ 每次跑用例前手动更新 token

❌ 测试中途 token 过期,用例批量失败

❌ 多环境(dev/test/prod)token 管理混乱

❌ 登录接口变化,所有用例都要改

token过期导致测试用例批量失败,已经成为接口自动化测试中最常见的"拦路虎"。

别担心,今天就来彻底解决这个难题。我们将会从“为什么token需要自动刷新”、“token 刷新的常见场景”以及“token过期的9种解决方案”三方面进行详述。

为什么token需要自动刷新?

在python接口自动化测试中,token自动刷新不是一个可有可无的"锦上添花"功能,而是确保测试稳定性和可靠性的核心机制。

例如:大多数基于token的认证机制(如jwt、oauth2)都会为token设置一个有效期(例如30分钟、2小时等)。一旦token过期,接口就会返回401 unauthorized错误,导致测试用例失败。这会导致测试的连续性和稳定性,影响测试效率,也会影响自动化测试模拟用户真实行为(如用户会在token过期后通过刷新token或重新登录来继续操作)。

token 刷新的常见场景

token 刷新的常见场景,如下图:

我们要做的,就是让程序能自动判断并处理这四种情况。

token过期的9种解决方案

解决方案1

解决方案1:简单粗暴型 - 每次请求前重新登录

核心思路:

每次发送请求前都先调用登录接口获取新token

代码实现:

def get_fresh_token():
    """每次请求前获取新token"""
    login_data = {"username": "testuser", "password": "testpass"}
    resp = requests.post(f"{base_url}/login", json=login_data)
    return resp.json()["data"]["access_token"]
def test_api_with_fresh_token():
    token = get_fresh_token()
    headers = {"authorization": f"bearer {token}"}
    response = requests.get(f"{base_url}/api/data", headers=headers)
    return response.json()

优缺点分析:

优点:实现简单,保证每次都是新token;

缺点:效率极低,增加额外请求,频繁登录可能触发风控。

解决方案2

解决方案2:前置获取型 - 用例执行前统一获取

核心思路:

在测试套件执行前一次性获取token,所有用例共享。

代码实现:

import pytest
class testbase:
    token = none
    @classmethod
    def setup_class(cls):
        """测试类执行前获取token"""
        cls.token = cls.get_token_once()
    @classmethod
    def get_token_once(cls):
        login_data = {"username": "testuser", "password": "testpass"}
        resp = requests.post(f"{base_url}/login", json=login_data)
        return resp.json()["data"]["access_token"]
    def test_user_info(self):
        headers = {"authorization": f"bearer {self.token}"}
        response = requests.get(f"{base_url}/user/info", headers=headers)
        assert response.status_code == 200

优缺点分析:

优点:减少登录次数,提高效率;

缺点:长时间运行测试时token仍会过期。

解决方案3

解决方案3:智能检测型 - 请求失败时重新登录

核心思路:

捕获401错误,自动重新登录并重试请求

代码实现:

class smarttokenclient:
    def __init__(self, base_url, username, password):
        self.base_url = base_url
        self.username = username
        self.password = password
        self.token = none
    def login(self):
        """登录获取token"""
        login_data = {
            "username": self.username,
            "password": self.password
        }
        resp = requests.post(f"{self.base_url}/login", json=login_data)
        self.token = resp.json()["data"]["access_token"]
        return self.token
    def request_with_retry(self, method, endpoint, **kwargs):
        """带重试机制的请求"""
        if not self.token:
            self.login()
        headers = kwargs.get('headers', {})
        headers['authorization'] = f'bearer {self.token}'
        kwargs['headers'] = headers
        response = requests.request(method, f"{self.base_url}{endpoint}", **kwargs)
        # 检测到token过期,重新登录并重试
        if response.status_code == 401:
            print("检测到token过期,重新登录...")
            self.login()
            kwargs['headers']['authorization'] = f'bearer {self.token}'
            response = requests.request(method, f"{self.base_url}{endpoint}", **kwargs)
        return response
# 使用示例
client = smarttokenclient(base_url, "testuser", "testpass")
response = client.request_with_retry("get", "/api/users")

优缺点分析

优点:自动处理过期情况,用户体验好;

缺点:第一次401错误无法避免,需要重试。

解决方案4

解决方案4:主动刷新型 - 定期刷新token

核心思路:

记录token获取时间,在过期前主动刷新

代码实现:

import time
class proactivetokenclient:
    def __init__(self, base_url, username, password, token_ttl=1800):
        self.base_url = base_url
        self.username = username
        self.password = password
        self.token_ttl = token_ttl  # token有效期,默认30分钟
        self.token = none
        self.token_time = 0
    def get_token(self):
        """获取token,如果快过期则主动刷新"""
        current_time = time.time()
        # 没有token或token即将过期(提前5分钟刷新)
        if not self.token or (current_time - self.token_time) > (self.token_ttl - 300):
            self.refresh_token()
        return self.token
    def refresh_token(self):
        """刷新token"""
        login_data = {
            "username": self.username, 
            "password": self.password
        }
        resp = requests.post(f"{self.base_url}/login", json=login_data)
        self.token = resp.json()["data"]["access_token"]
        self.token_time = time.time()
        print(f"token已刷新: {self.token[:20]}...")
    def get(self, endpoint, **kwargs):
        """get请求"""
        token = self.get_token()
        headers = kwargs.get('headers', {})
        headers['authorization'] = f'bearer {token}'
        kwargs['headers'] = headers
        return requests.get(f"{self.base_url}{endpoint}", **kwargs)

优缺点分析:

优点:避免401错误,用户体验最佳;

缺点:需要知道token准确有效期,实现稍复杂。

解决方案5

解决方案5:双token轮换型 - access token + refresh token

核心思路:

使用refresh token静默刷新access token,用户无感知

代码实现:

class dualtokenclient:
    def __init__(self, base_url, username, password):
        self.base_url = base_url
        self.username = username
        self.password = password
        self.access_token = none
        self.refresh_token = none
        self.setup_tokens()
    def setup_tokens(self):
        """初始获取双token"""
        login_data = {
            "username": self.username,
            "password": self.password
        }
        resp = requests.post(f"{self.base_url}/login", json=login_data)
        token_data = resp.json()["data"]
        self.access_token = token_data["access_token"]
        self.refresh_token = token_data["refresh_token"]
    def refresh_access_token(self):
        """使用refresh token刷新access token"""
        refresh_data = {
            "refresh_token": self.refresh_token
        }
        try:
            resp = requests.post(f"{self.base_url}/refresh", json=refresh_data)
            if resp.status_code == 200:
                token_data = resp.json()["data"]
                self.access_token = token_data["access_token"]
                # 有些系统会返回新的refresh_token
                if "refresh_token" in token_data:
                    self.refresh_token = token_data["refresh_token"]
                return true
        except exception as e:
            print(f"刷新token失败: {e}")
        # 刷新失败,重新登录
        self.setup_tokens()
        return false
    def request(self, method, endpoint, max_retry=1, **kwargs):
        """支持token自动刷新的请求"""
        for attempt in range(max_retry + 1):
            headers = kwargs.get('headers', {})
            headers['authorization'] = f'bearer {self.access_token}'
            kwargs['headers'] = headers
            response = requests.request(method, f"{self.base_url}{endpoint}", **kwargs)
            if response.status_code != 401:
                return response
            if attempt < max_retry:
                print("access token过期,尝试刷新...")
                if self.refresh_access_token():
                    continue
        return response

优缺点分析:

优点:用户体验极佳,安全性高;

缺点:需要服务端支持双token机制。

解决方案6

解决方案6:装饰器模式 - 无侵入式token管理

核心思路:

使用装饰器自动为请求方法添加token管理功能。

代码实现​​​​​​​

def auto_token_refresh(func):
    """自动token刷新装饰器"""
    def wrapper(self, *args, **kwargs):
        # 确保有有效token
        self.ensure_valid_token()
        # 执行原函数
        result = func(self, *args, **kwargs)
        # 如果返回401,刷新token后重试
        if hasattr(result, 'status_code') and result.status_code == 401:
            print("检测到401错误,自动刷新token并重试...")
            self.refresh_token()
            result = func(self, *args, **kwargs)
        return result
    return wrapper
class decoratortokenclient:
    def __init__(self, base_url, username, password):
        self.base_url = base_url
        self.username = username
        self.password = password
        self.token = none
    def login(self):
        """登录获取token"""
        login_data = {
            "username": self.username,
            "password": self.password
        }
        resp = requests.post(f"{self.base_url}/login", json=login_data)
        self.token = resp.json()["data"]["access_token"]
        return self.token
    def refresh_token(self):
        """刷新token"""
        return self.login()
    def ensure_valid_token(self):
        """确保有有效的token"""
        if not self.token:
            self.login()
    @auto_token_refresh
    def get_users(self):
        """获取用户列表"""
        headers = {"authorization": f"bearer {self.token}"}
        return requests.get(f"{self.base_url}/users", headers=headers)
    @auto_token_refresh  
    def create_order(self, order_data):
        """创建订单"""
        headers = {"authorization": f"bearer {self.token}"}
        return requests.post(f"{self.base_url}/orders", json=order_data, headers=headers)
# 使用示例
client = decoratortokenclient(base_url, "testuser", "testpass")
response = client.get_users()  # 自动处理token

优缺点分析:

优点:代码简洁,无侵入性,易于维护;

缺点:需要为每个方法添加装饰器。

解决方案7

解决方案7:会话保持型 - 使用requests.session。

核心思路:

利用session对象保持登录状态,自动处理cookie。

代码实现:​​​​​​​

class sessiontokenclient:
    def __init__(self, base_url, username, password):
        self.base_url = base_url
        self.username = username
        self.password = password
        self.session = requests.session()
        self.is_logged_in = false
    def login(self):
        """登录并保持会话"""
        login_data = {
            "username": self.username,
            "password": self.password
        }
        response = self.session.post(f"{self.base_url}/login", json=login_data)
        if response.status_code == 200:
            self.is_logged_in = true
            # 如果是token认证,可能需要手动设置header
            token = response.json()["data"]["access_token"]
            self.session.headers.update({"authorization": f"bearer {token}"})
        return response
    def ensure_login(self):
        """确保处于登录状态"""
        if not self.is_logged_in:
            self.login()
    def get(self, endpoint, **kwargs):
        """get请求"""
        self.ensure_login()
        response = self.session.get(f"{self.base_url}{endpoint}", **kwargs)
        # 检查是否因会话过期而失败
        if response.status_code == 401:
            print("会话过期,重新登录...")
            self.login()
            response = self.session.get(f"{self.base_url}{endpoint}", **kwargs)
        return response
    def post(self, endpoint, **kwargs):
        """post请求"""
        self.ensure_login()
        response = self.session.post(f"{self.base_url}{endpoint}", **kwargs)
        if response.status_code == 401:
            print("会话过期,重新登录...")
            self.login()
            response = self.session.post(f"{self.base_url}{endpoint}", **kwargs)
        return response
# 使用示例
client = sessiontokenclient(base_url, "testuser", "testpass")
response = client.get("/api/users")  # 自动处理会话

优缺点分析:

优点:自动处理cookie,适合基于会话的系统;

缺点:不适合纯token认证的系统。

解决方案8

解决方案8:工厂模式 - 多环境token管理

核心思路:

使用工厂模式管理不同环境的token,支持动态切换。

代码实现:​​​​​​​

from abc import abc, abstractmethod
class tokenmanager(abc):
    """token管理器抽象类"""
    @abstractmethod
    def get_token(self) -> str:
        pass
    @abstractmethod
    def refresh_token(self) -> bool:
        pass
class devtokenmanager(tokenmanager):
    """开发环境token管理器"""
    def get_token(self) -> str:
        # 开发环境可能返回固定token或mock token
        return "dev_mock_token"
    def refresh_token(self) -> bool:
        return true
class testtokenmanager(tokenmanager):
    """测试环境token管理器"""
    def __init__(self, base_url, username, password):
        self.base_url = base_url
        self.username = username
        self.password = password
        self.token = none
    def get_token(self) -> str:
        if not self.token:
            self.refresh_token()
        return self.token
    def refresh_token(self) -> bool:
        login_data = {
            "username": self.username,
            "password": self.password
        }
        resp = requests.post(f"{self.base_url}/login", json=login_data)
        self.token = resp.json()["data"]["access_token"]
        return true
class prodtokenmanager(tokenmanager):
    """生产环境token管理器 - 更严格的安全控制"""
    def get_token(self) -> str:
        # 生产环境可能需要更复杂的认证流程
        raise exception("生产环境请使用正式的认证流程")
    def refresh_token(self) -> bool:
        raise exception("生产环境请使用正式的认证流程")
class tokenmanagerfactory:
    """token管理器工厂"""
    @staticmethod
    def create_manager(env: str, **kwargs) -> tokenmanager:
        if env == "dev":
            return devtokenmanager()
        elif env == "test":
            return testtokenmanager(kwargs['base_url'], kwargs['username'], kwargs['password'])
        elif env == "prod":
            return prodtokenmanager()
        else:
            raise valueerror(f"不支持的环境: {env}")
# 使用示例
token_manager = tokenmanagerfactory.create_manager(
    "test", 
    base_url="https://test.api.com",
    username="testuser", 
    password="testpass"
)
token = token_manager.get_token()
headers = {"authorization": f"bearer {token}"}
response = requests.get("https://test.api.com/api/data", headers=headers)

优缺点分析:

优点:支持多环境,扩展性强,符合开闭原则;

缺点:实现相对复杂,过度设计对于简单项目。

解决方案9

解决方案9:终极方案 - 封装支持自动刷新的authclient。

核心思路:

设计一个 authhttpclient 类,具备以下能力:

  • 自动登录获取 token;
  • 请求时自动携带 token;
  • 检测 401 错误,自动刷新;
  • 刷新失败则重新登录;
  • token 持久化(文件或内存);
  • 线程安全,支持并发。

代码实现:​​​​​​​

import requests
import json
import os
import time
from typing import optional, dict, any
# =======================
# ?? 认证客户端(支持自动刷新)
# =======================
class authhttpclient:
    def __init__(
        self,
        base_url: str,
        login_url: str = "/auth/login",
        refresh_url: str = "/auth/refresh",
        username: str = "",
        password: str = "",
        token_file: str = "cache/token.json"
    ):
        self.base_url = base_url.rstrip("/")
        self.login_url = login_url
        self.refresh_url = refresh_url
        self.username = username
        self.password = password
        self.token_file = token_file
        self.session = requests.session()
        # 确保缓存目录存在
        os.makedirs(os.path.dirname(token_file), exist_ok=true)
    def _save_token(self, token: str, refresh_token: str, expires_in: int):
        """保存 token 到本地文件"""
        data = {
            "access_token": token,
            "refresh_token": refresh_token,
            "expires_at": time.time() + expires_in - 60,  # 提前60秒过期
            "username": self.username
        }
        with open(self.token_file, "w", encoding="utf-8") as f:
            json.dump(data, f, indent=2)
    def _load_token(self) -> optional[dict[str, any]]:
        """从文件加载 token"""
        if not os.path.exists(self.token_file):
            return none
        try:
            with open(self.token_file, "r", encoding="utf-8") as f:
                return json.load(f)
        except:
            return none
    def _is_token_expired(self) -> bool:
        """检查 token 是否过期"""
        token_data = self._load_token()
        if not token_data:
            return true
        return time.time() >= token_data.get("expires_at", 0)
    def login(self) -> bool:
        """登录并获取 token"""
        try:
            resp = self.session.post(
                f"{self.base_url}{self.login_url}",
                json={"username": self.username, "password": self.password}
            )
            if resp.status_code == 200:
                data = resp.json()
                token = data["data"]["access_token"]
                refresh_token = data["data"]["refresh_token"]
                expires_in = data["data"].get("expires_in", 1800)  # 默认30分钟
                self._save_token(token, refresh_token, expires_in)
                self.session.headers.update({"authorization": f"bearer {token}"})
                print(f"? 登录成功,token 已保存")
                return true
            else:
                print(f"? 登录失败: {resp.text}")
                return false
        except exception as e:
            print(f"? 登录异常: {e}")
            return false
    def refresh_token(self) -> bool:
        """刷新 token"""
        token_data = self._load_token()
        if not token_data:
            return false
        try:
            resp = self.session.post(
                f"{self.base_url}{self.refresh_url}",
                json={"refresh_token": token_data["refresh_token"]}
            )
            if resp.status_code == 200:
                data = resp.json()
                new_token = data["data"]["access_token"]
                new_refresh = data["data"]["refresh_token"]
                expires_in = data["data"].get("expires_in", 1800)
                self._save_token(new_token, new_refresh, expires_in)
                self.session.headers.update({"authorization": f"bearer {new_token}"})
                print(f"?? token 刷新成功")
                return true
            else:
                print(f"? token 刷新失败: {resp.text}")
                return false
        except exception as e:
            print(f"? 刷新异常: {e}")
            return false
    def ensure_auth(self) -> bool:
        """确保当前有有效 token(自动登录或刷新)"""
        if self._is_token_expired():
            print("?? token 已过期,尝试刷新...")
            if not self.refresh_token():
                print("?? 刷新失败,正在重新登录...")
                return self.login()
        else:
            # 加载有效 token
            token_data = self._load_token()
            self.session.headers.update({"authorization": f"bearer {token_data['access_token']}"})
            print("?? 使用现有 token")
        return true
    def request(self, method: str, url: str, **kwargs) -> requests.response:
        """重写 request,自动处理认证"""
        full_url = self.base_url + ("" if url.startswith("/") else "/") + url
        # 确保认证有效
        if not self.ensure_auth():
            raise exception("认证失败,无法继续请求")
        try:
            resp = self.session.request(method, full_url, **kwargs)
            # 如果返回 401,可能是刷新失败,尝试重新登录
            if resp.status_code == 401:
                print("?? 收到 401,尝试重新登录...")
                self.login()
                # 重新发送请求
                resp = self.session.request(method, full_url, **kwargs)
            return resp
        except exception as e:
            print(f"? 请求异常: {e}")
            raise
    # 便捷方法
    def get(self, url, **kwargs):
        return self.request("get", url, **kwargs)
    def post(self, url, **kwargs):
        return self.request("post", url, **kwargs)
    def put(self, url, **kwargs):
        return self.request("put", url, **kwargs)
    def delete(self, url, **kwargs):
        return self.request("delete", url, **kwargs)

优缺点分析:

这个方案融合了前面所有方案的优点,提供了最完善、最健壮的token自动刷新机制。​

以上就是python接口自动化中token过期的9种解决方案的详细内容,更多关于python接口自动化token过期的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com