一、http/https 通信
1. 客户端示例(requests 库)
import requests # http get response = requests.get('http://httpbin.org/get') print(response.text) # https post response = requests.post( 'https://httpbin.org/post', data={'key': 'value'}, verify=true # 验证 ssl 证书(默认) ) print(response.json())
2. 服务端示例(flask)
from flask import flask, request app = flask(__name__) @app.route('/api', methods=['get', 'post']) def handle_request(): if request.method == 'get': return {'message': 'get received'} elif request.method == 'post': return {'data': request.json} if __name__ == '__main__': app.run(ssl_context='adhoc') # 启用 https
二、udp 通信
1. 服务端
import socket server = socket.socket(socket.af_inet, socket.sock_dgram) server.bind(('0.0.0.0', 9999)) while true: data, addr = server.recvfrom(1024) print(f"received from {addr}: {data.decode()}") server.sendto(b'udp response', addr)
2. 客户端
import socket client = socket.socket(socket.af_inet, socket.sock_dgram) client.sendto(b'hello udp', ('localhost', 9999)) response, addr = client.recvfrom(1024) print(f"received: {response.decode()}")
三、websocket 通信
需要安装库:pip install websockets
1. 服务端
import asyncio import websockets async def handler(websocket): async for message in websocket: print(f"received: {message}") await websocket.send(f"echo: {message}") async def main(): async with websockets.serve(handler, "localhost", 8765): await asyncio.future() # 永久运行 asyncio.run(main())
2. 客户端
import asyncio import websockets async def client(): async with websockets.connect("ws://localhost:8765") as ws: await ws.send("hello websocket!") response = await ws.recv() print(f"received: {response}") asyncio.run(client())
四、server-sent events (sse)
需要安装库:pip install sseclient-py
1. 服务端(flask 实现)
from flask import flask, response app = flask(__name__) @app.route('/stream') def stream(): def event_stream(): for i in range(5): yield f"data: message {i}\n\n" return response(event_stream(), mimetype="text/event-stream") if __name__ == '__main__': app.run()
2. 客户端
import requests from sseclient import sseclient url = 'http://localhost:5000/stream' response = requests.get(url, stream=true) client = sseclient(response) for event in client.events(): print(f"received event: {event.data}")
关键点说明
- http/https:最常用的请求-响应模型
- udp:无连接协议,适合实时性要求高的场景
- websocket:全双工通信协议,适合实时双向通信
- sse:服务器到客户端的单向推送技术
- 安全建议:
- https 使用 requests 的
verify=true
验证证书 - websocket 使用
wss://
安全协议 - 生产环境应使用正式 ssl 证书
- https 使用 requests 的
根据具体需求选择协议:
- 简单数据请求:http/https
- 实时游戏/视频:udp
- 聊天应用:websocket
- 实时通知:sse
建议根据实际场景配合使用异步框架(如 aiohttp、fastapi)以获得更好的性能。
到此这篇关于python实现常见网络通信的示例详解的文章就介绍到这了,更多相关python网络通信内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论