1. flask
服务器端代码 (使用 flask)
from flask import flask, jsonify, request from flask_compress import compress import logging app = flask(__name__) compress(app) # 启用 gzip 压缩 # 配置日志 logging.basicconfig(level=logging.info) logger = logging.getlogger(__name__) @app.route('/data', methods=['get']) def get_data(): try: # 处理请求参数 count = int(request.args.get('count', 100)) # 返回一些示例 json 数据 data = { 'message': 'hello, this is compressed data!', 'numbers': list(range(count)) } return jsonify(data) except exception as e: logger.error(f"error occurred: {e}") return jsonify({'error': 'internal server error'}), 500 @app.errorhandler(404) def page_not_found(e): return jsonify({'error': 'not found'}), 404 if __name__ == '__main__': app.run(debug=true)
客户端代码 (接收并解压 gzip 响应)
import requests import gzip import json from io import bytesio def fetch_data(url): try: # 发送请求到服务器端 response = requests.get(url) # 检查响应头,确认数据是否被 gzip 压缩 if response.headers.get('content-encoding') == 'gzip': # 使用 gzip 解压响应内容 compressed_content = bytesio(response.content) with gzip.gzipfile(fileobj=compressed_content, mode='rb') as f: decompressed_data = f.read() # 解码解压后的数据 data = json.loads(decompressed_data.decode('utf-8')) return data else: return response.json() except requests.requestexception as e: print(f"http request failed: {e}") return none except json.jsondecodeerror as e: print(f"failed to decode json: {e}") return none except exception as e: print(f"an error occurred: {e}") return none if __name__ == '__main__': url = 'http://127.0.0.1:5000/data?count=50' data = fetch_data(url) if data: print(data) else: print("failed to fetch data.")
2. fastapi
服务器端代码 (使用 fastapi)
pip install fastapi uvicorn fastapi-compress
from fastapi import fastapi, request, httpexception from fastapi.responses import jsonresponse from fastapi_compress import compress import logging app = fastapi() compressor = compress() compressor.init_app(app) # 配置日志 logging.basicconfig(level=logging.info) logger = logging.getlogger(__name__) @app.get("/data") async def get_data(count: int = 100): try: # 返回一些示例 json 数据 data = { 'message': 'hello, this is compressed data!', 'numbers': list(range(count)) } return jsonresponse(content=data) except exception as e: logger.error(f"error occurred: {e}") raise httpexception(status_code=500, detail="internal server error") @app.exception_handler(404) async def not_found_handler(request: request, exc: httpexception): return jsonresponse(status_code=404, content={'error': 'not found'}) if __name__ == '__main__': import uvicorn uvicorn.run(app, host="127.0.0.1", port=8000, log_level="info")
客户端代码 (接收并解压 gzip 响应)
import requests import gzip import json from io import bytesio def fetch_data(url): try: # 发送请求到服务器端 response = requests.get(url) # 检查响应头,确认数据是否被 gzip 压缩 if response.headers.get('content-encoding') == 'gzip': # 使用 gzip 解压响应内容 compressed_content = bytesio(response.content) with gzip.gzipfile(fileobj=compressed_content, mode='rb') as f: decompressed_data = f.read() # 解码解压后的数据 data = json.loads(decompressed_data.decode('utf-8')) return data else: return response.json() except requests.requestexception as e: print(f"http request failed: {e}") return none except json.jsondecodeerror as e: print(f"failed to decode json: {e}") return none except exception as e: print(f"an error occurred: {e}") return none if __name__ == '__main__': url = 'http://127.0.0.1:8000/data?count=50' data = fetch_data(url) if data: print(data) else: print("failed to fetch data.")
到此这篇关于python启用gzip实现压缩响应体的文章就介绍到这了,更多相关python gzip压缩响应体内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论