json web token (jwt) 是一种基于 json 格式的轻量级的安全令牌,通常用于身份验证和信息交换。python 的 pyjwt 是一个流行的库,用于处理 jwt。本文将从零开始,带你逐步学习 pyjwt 的基本用法、原理以及进阶功能。
什么是 jwt
jwt 的组成
jwt 是由三部分组成的字符串:
- header(头部):声明类型和签名算法。
- payload(载荷):存储数据(通常是用户信息或声明)。
- signature(签名):对 header 和 payload 进行签名,确保数据未被篡改。
它们通过 . 分隔,整体形式为:
header.payload.signature
例如:
eyjhbgcioijiuzi1niisinr5cci6ikpxvcj9.eyj1c2vyx2lkijoxmjmsim5hbwuioijkb2uilcjyb2xlijoiywrtaw4ifq.4zuhl8hfxx8e8oomnqbyt3mhnfoyqh27-8r1zjxv9fo
jwt 的工作流程
- 生成令牌:用户登录时,服务器验证用户身份,并生成 jwt。
- 携带令牌:jwt 通常通过 http header 的
authorization字段传递。 - 验证令牌:服务器收到请求后,验证 jwt 的签名是否有效,确认信息是否未被篡改。
安装 pyjwt
在开始使用 pyjwt 之前,先安装它:
pip install pyjwt
生成 jwt
创建简单的 jwt
下面是如何使用 pyjwt 生成一个简单的 jwt:
import jwt
import datetime
# 定义密钥和算法
secret_key = "your_secret_key"
algorithm = "hs256"
# 生成 jwt
payload = {
"user_id": 123,
"name": "john doe",
"role": "admin",
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1) # 设置过期时间
}
token = jwt.encode(payload, secret_key, algorithm=algorithm)
print("generated jwt:", token)
代码说明:
payload是载荷,其中exp是过期时间。jwt.encode用于生成 jwt。
解码 jwt
要解码生成的 jwt 并查看数据:
decoded_payload = jwt.decode(token, secret_key, algorithms=[algorithm])
print("decoded payload:", decoded_payload)
检查过期时间
如果令牌过期,会抛出 jwt.expiredsignatureerror 异常:
try:
decoded_payload = jwt.decode(token, secret_key, algorithms=[algorithm])
print("decoded payload:", decoded_payload)
except jwt.expiredsignatureerror:
print("token has expired!")
jwt 的签名和验证
pyjwt 支持多种算法,如:
- 对称算法:
hs256,hs384,hs512 - 非对称算法:
rs256,es256
对称加密
对称加密使用单一密钥签名和验证:
# 对称加密签名 token = jwt.encode(payload, secret_key, algorithm="hs256") # 验证签名 decoded_payload = jwt.decode(token, secret_key, algorithms=["hs256"])
非对称加密
非对称加密使用公钥和私钥。示例:
生成密钥(假设已有 rsa 密钥对)。
签名和验证:
private_key = open("private_key.pem").read() public_key = open("public_key.pem").read() # 使用私钥签名 token = jwt.encode(payload, private_key, algorithm="rs256") # 使用公钥验证 decoded_payload = jwt.decode(token, public_key, algorithms=["rs256"])
高级功能
自定义声明
除了标准声明(如 exp、iat),你可以添加自定义字段:
payload = {
"user_id": 123,
"role": "editor",
"permissions": ["read", "write"],
"iat": datetime.datetime.utcnow(), # 签发时间
"exp": datetime.datetime.utcnow() + datetime.timedelta(days=1) # 过期时间
}
token = jwt.encode(payload, secret_key, algorithm="hs256")
设置过期时间
exp 是 jwt 的标准声明,用于指定过期时间。
payload = {
"user_id": 123,
"exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=15)
}
令牌刷新
令牌刷新可以延长用户登录会话时间:
# 原令牌
token = jwt.encode(payload, secret_key, algorithm="hs256")
# 解码并刷新
decoded = jwt.decode(token, secret_key, algorithms=["hs256"], options={"verify_exp": false})
decoded["exp"] = datetime.datetime.utcnow() + datetime.timedelta(minutes=15)
# 生成新令牌
refreshed_token = jwt.encode(decoded, secret_key, algorithm="hs256")
常见错误及解决方法
token 已过期
错误信息:jwt.expiredsignatureerror
解决方法:在 jwt.decode 时捕获异常。
try:
decoded_payload = jwt.decode(token, secret_key, algorithms=[algorithm])
except jwt.expiredsignatureerror:
print("token has expired!")
签名验证失败
错误信息:jwt.invalidsignatureerror
解决方法:确保签名密钥正确,并使用相同的算法。
实战:jwt 身份验证示例
以下是一个基于 fastapi的示例,实现用户登录和令牌验证:
from fastapi import fastapi, depends, httpexception
from fastapi.security import oauth2passwordbearer, oauth2passwordrequestform
import jwt
import datetime
from typing import dict, optional
# ==========================
# 配置和常量
# ==========================
secret_key = "your_secret_key" # jwt 密钥
algorithm = "hs256" # jwt 算法
token_expire_hours = 1 # 令牌有效时间(小时)
# oauth2passwordbearer 定义,用于获取用户的令牌
oauth2_scheme = oauth2passwordbearer(tokenurl="token")
# ==========================
# 用户服务类
# ==========================
class userservice:
"""
用户服务类:负责用户身份验证和管理
"""
def __init__(self):
# 模拟数据库:用户名和密码
self.users_db = {
"admin": "password123",
"user": "mypassword"
}
def authenticate(self, username: str, password: str) -> bool:
"""
验证用户名和密码是否匹配
:param username: 用户名
:param password: 密码
:return: 验证是否成功
"""
return self.users_db.get(username) == password
# ==========================
# jwt 工具类
# ==========================
class jwthandler:
"""
jwt 工具类:负责生成和验证 jwt
"""
@staticmethod
def create_token(username: str, expire_hours: int = token_expire_hours) -> str:
"""
生成 jwt
:param username: 用户名
:param expire_hours: 令牌有效时间
:return: 生成的 jwt 字符串
"""
payload = {
"sub": username, # 声明主体
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=expire_hours) # 过期时间
}
return jwt.encode(payload, secret_key, algorithm=algorithm)
@staticmethod
def verify_token(token: str) -> optional[dict]:
"""
验证 jwt 并解码
:param token: jwt 字符串
:return: 解码后的 payload,如果无效则返回 none
"""
try:
return jwt.decode(token, secret_key, algorithms=[algorithm])
except jwt.expiredsignatureerror:
raise httpexception(status_code=401, detail="令牌已过期")
except jwt.invalidtokenerror:
raise httpexception(status_code=401, detail="令牌无效")
# ==========================
# 依赖注入类
# ==========================
class dependencies:
"""
依赖注入类,用于解耦依赖
"""
def __init__(self, user_service: userservice, jwt_handler: jwthandler):
self.user_service = user_service
self.jwt_handler = jwt_handler
def authenticate_user(self, form_data: oauth2passwordrequestform) -> str:
"""
验证用户并生成 jwt
:param form_data: 用户提交的表单数据(包含用户名和密码)
:return: jwt
"""
username = form_data.username
password = form_data.password
if not self.user_service.authenticate(username, password):
raise httpexception(status_code=401, detail="用户名或密码错误")
return self.jwt_handler.create_token(username)
def validate_token(self, token: str) -> str:
"""
验证 token 并返回用户名
:param token: jwt
:return: 解码后的用户名
"""
payload = self.jwt_handler.verify_token(token)
username = payload.get("sub")
if not username:
raise httpexception(status_code=401, detail="无效的令牌")
return username
# ==========================
# 创建 fastapi 应用
# ==========================
app = fastapi()
# 实例化服务类
user_service = userservice()
jwt_handler = jwthandler()
dependencies = dependencies(user_service, jwt_handler)
# ==========================
# 路由定义
# ==========================
@app.post("/token")
async def login(form_data: oauth2passwordrequestform = depends()):
"""
登录接口:验证用户并返回 token
:param form_data: fastapi 提供的 oauth2passwordrequestform
:return: 包含 token 的 json
"""
token = dependencies.authenticate_user(form_data)
return {"access_token": token, "token_type": "bearer"}
@app.get("/protected")
async def protected(token: str = depends(oauth2_scheme)):
"""
受保护的接口:需要提供有效 token 才能访问
:param token: oauth2passwordbearer 自动提取的 jwt
:return: 欢迎信息
"""
username = dependencies.validate_token(token)
return {"message": f"欢迎回来,{username}!您的访问已被授权。"}
# ==========================
# 主程序入口(仅调试使用)
# ==========================
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)
代码说明
- fastapi 的 oauth2passwordbearer
oauth2passwordbearer是 fastapi 提供的工具,用于从请求头中自动提取和验证 bearer 类型的 token。- 通过
depends(oauth2_scheme)可以在路由中轻松获取 token。
- jwt 生成和解码
- 使用
jwt.encode生成包含用户信息的 jwt。 - 使用
jwt.decode验证和解码 jwt,并处理可能的异常,如令牌过期或无效。
- 使用
- 路由说明
/token:用于登录,验证用户名和密码,返回 token。/protected:受保护的接口,只有提供有效 token 的用户才能访问。
- 错误处理
- 如果用户的用户名或密码错误,会返回 401 错误。
- 如果 token 无效或过期,也会返回 401 错误。
测试流程
安装依赖 确保安装了
fastapi和uvicorn:pip install fastapi uvicorn pyjwt
启动服务 运行代码后,访问
http://127.0.0.1:8000/docs打开 fastapi 自动生成的 api 文档页面。测试接口
- 登录接口 使用
/token,提交用户名和密码获取 token。 - 受保护接口 使用
/protected,在请求头中添加authorization: bearer <token>来访问。
- 登录接口 使用
总结
在这篇文章中,我们深入探讨了 json web token(jwt)的基本原理,尤其是如何使用 pyjwt 库来生成、解码和验证令牌。我们首先了解了 jwt 的工作机制,并学习了签名算法及其进阶功能,以确保令牌的安全性和可靠性。此外,我们还讨论了一些常见问题及其解决方法,以便帮助你更好地应对这些在实际应用中可能遇到的挑战。最后,我们通过一个完整的 jwt 身份验证实例,将理论与实践相结合,希望能为你的项目提供有价值的参考。掌握这些内容后,你将能够灵活地在自己的 web 应用中应用 jwt,提升系统的安全性和用户体验。
到此这篇关于pyjwt实现token验证的文章就介绍到这了,更多相关pyjwt token验证内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论