1. flask
服务器端代码 (使用 flask)
from flask import flask, jsonify, request
from flask_compress import compress
import logging
app = flask(__name__)
compress(app) # 启用 gzip 压缩
# 配置日志
logging.basicconfig(level=logging.info)
logger = logging.getlogger(__name__)
@app.route('/data', methods=['get'])
def get_data():
try:
# 处理请求参数
count = int(request.args.get('count', 100))
# 返回一些示例 json 数据
data = {
'message': 'hello, this is compressed data!',
'numbers': list(range(count))
}
return jsonify(data)
except exception as e:
logger.error(f"error occurred: {e}")
return jsonify({'error': 'internal server error'}), 500
@app.errorhandler(404)
def page_not_found(e):
return jsonify({'error': 'not found'}), 404
if __name__ == '__main__':
app.run(debug=true)客户端代码 (接收并解压 gzip 响应)
import requests
import gzip
import json
from io import bytesio
def fetch_data(url):
try:
# 发送请求到服务器端
response = requests.get(url)
# 检查响应头,确认数据是否被 gzip 压缩
if response.headers.get('content-encoding') == 'gzip':
# 使用 gzip 解压响应内容
compressed_content = bytesio(response.content)
with gzip.gzipfile(fileobj=compressed_content, mode='rb') as f:
decompressed_data = f.read()
# 解码解压后的数据
data = json.loads(decompressed_data.decode('utf-8'))
return data
else:
return response.json()
except requests.requestexception as e:
print(f"http request failed: {e}")
return none
except json.jsondecodeerror as e:
print(f"failed to decode json: {e}")
return none
except exception as e:
print(f"an error occurred: {e}")
return none
if __name__ == '__main__':
url = 'http://127.0.0.1:5000/data?count=50'
data = fetch_data(url)
if data:
print(data)
else:
print("failed to fetch data.")2. fastapi
服务器端代码 (使用 fastapi)
pip install fastapi uvicorn fastapi-compress
from fastapi import fastapi, request, httpexception
from fastapi.responses import jsonresponse
from fastapi_compress import compress
import logging
app = fastapi()
compressor = compress()
compressor.init_app(app)
# 配置日志
logging.basicconfig(level=logging.info)
logger = logging.getlogger(__name__)
@app.get("/data")
async def get_data(count: int = 100):
try:
# 返回一些示例 json 数据
data = {
'message': 'hello, this is compressed data!',
'numbers': list(range(count))
}
return jsonresponse(content=data)
except exception as e:
logger.error(f"error occurred: {e}")
raise httpexception(status_code=500, detail="internal server error")
@app.exception_handler(404)
async def not_found_handler(request: request, exc: httpexception):
return jsonresponse(status_code=404, content={'error': 'not found'})
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000, log_level="info")客户端代码 (接收并解压 gzip 响应)
import requests
import gzip
import json
from io import bytesio
def fetch_data(url):
try:
# 发送请求到服务器端
response = requests.get(url)
# 检查响应头,确认数据是否被 gzip 压缩
if response.headers.get('content-encoding') == 'gzip':
# 使用 gzip 解压响应内容
compressed_content = bytesio(response.content)
with gzip.gzipfile(fileobj=compressed_content, mode='rb') as f:
decompressed_data = f.read()
# 解码解压后的数据
data = json.loads(decompressed_data.decode('utf-8'))
return data
else:
return response.json()
except requests.requestexception as e:
print(f"http request failed: {e}")
return none
except json.jsondecodeerror as e:
print(f"failed to decode json: {e}")
return none
except exception as e:
print(f"an error occurred: {e}")
return none
if __name__ == '__main__':
url = 'http://127.0.0.1:8000/data?count=50'
data = fetch_data(url)
if data:
print(data)
else:
print("failed to fetch data.")到此这篇关于python启用gzip实现压缩响应体的文章就介绍到这了,更多相关python gzip压缩响应体内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论