项目简介
这是一个基于 fastapi 和 websocket 实现的实时聊天应用,支持一对一聊天、离线消息存储等功能。
技术栈
后端:fastapi (python)
前端:html、javascript、css
通信:websocket
认证:简单的 token 认证
项目结构
├── main.py # 后端主程序
└── templates/ # 前端模板目录
└── chat.html # 聊天页面
详细代码实现
1. 后端实现 (main.py)
from fastapi import fastapi, websocket, depends, websocketdisconnect from typing import dict import json from fastapi.responses import htmlresponse, fileresponse from fastapi.staticfiles import staticfiles app = fastapi() app.mount("/templates", staticfiles(directory="templates"), name="templates") class connectionmanager: def __init__(self): self.active_connections: dict[str, websocket] = {} self.offline_messages: dict[str, list] = {} # 离线消息存储 async def connect(self, user: str, websocket: websocket): await websocket.accept() self.active_connections[user] = websocket async def disconnect(self, user: str, websocket: websocket): if user in self.active_connections: del self.active_connections[user] async def send_personal_message(self, message: str, to_user: str): if to_user in self.active_connections: await self.active_connections[to_user].send_text(message) return {"success": true} else: # 存储离线消息 if to_user not in self.offline_messages: self.offline_messages[to_user] = [] self.offline_messages[to_user].append(message) return {"success": false, "reason": "offline", "stored": true} async def get_offline_messages(self, user: str): if user in self.offline_messages: messages = self.offline_messages[user] del self.offline_messages[user] # 取出后删除 return messages return [] # token 认证 async def get_cookie_or_token(token: str = none): if not token: from fastapi import httpexception raise httpexception(status_code=401, detail="未授权") return token # 初始化连接管理器 ws_manager = connectionmanager() @app.websocket("/ws/{user}") async def websocket_one_to_one( websocket: websocket, user: str, cookie_or_token: str = depends(get_cookie_or_token) ): await ws_manager.connect(user, websocket) # 发送离线消息 offline_msgs = await ws_manager.get_offline_messages(user) for msg in offline_msgs: await websocket.send_text(msg) try: while true: data = await websocket.receive_text() message_data = json.loads(data) # 防止自己给自己发消息 if message_data["to_user"] == user: error_msg = { "type": "error", "content": "不能发送消息给自己" } await websocket.send_text(json.dumps(error_msg)) continue response_message = { "from": user, "content": message_data["content"], "timestamp": message_data.get("timestamp"), "type": "message" } # 发送消息 result = await ws_manager.send_personal_message( message=json.dumps(response_message), to_user=message_data["to_user"] ) # 处理离线消息 if not result["success"]: if result.get("stored"): success_msg = { "type": "info", "content": f"消息已保存,将在用户 {message_data['to_user']} 上线时送达" } await websocket.send_text(json.dumps(success_msg)) except websocketdisconnect: await ws_manager.disconnect(user, websocket) except exception as e: print(f"websocket 错误: {e}") await ws_manager.disconnect(user, websocket) @app.get("/", response_class=fileresponse) async def root(): return "templates/chat.html"
2. 前端实现 (templates/chat.html)
<!doctype html> <html> <head> <title>websocket chat</title> <style> body { font-family: arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; } #loginsection { text-align: center; margin-top: 100px; } #chatsection { display: none; } #messages { list-style-type: none; padding: 0; height: 400px; overflow-y: auto; border: 1px solid #ccc; margin-bottom: 20px; padding: 10px; } .message { margin: 10px 0; padding: 10px; border-radius: 5px; background-color: #f0f0f0; } .input-group { margin-bottom: 10px; } input[type="text"] { padding: 8px; margin-right: 10px; width: 200px; } button { padding: 8px 15px; background-color: #4caf50; color: white; border: none; border-radius: 4px; cursor: pointer; } button:hover { background-color: #45a049; } #logoutbtn { background-color: #f44336; } #logoutbtn:hover { background-color: #da190b; } .error { color: red; margin: 10px 0; } .info { color: blue; margin: 10px 0; } </style> </head> <body> <!-- 登录部分 --> <div id="loginsection"> <h1>websocket 聊天</h1> <div class="input-group"> <input type="text" id="loginusername" placeholder="输入用户名" autocomplete="off"/> <button onclick="login()">登录</button> </div> <div id="loginerror" class="error"></div> </div> <!-- 聊天部分 --> <div id="chatsection"> <h1>websocket 聊天</h1> <div id="currentuser"></div> <form action="" onsubmit="sendmessage(event)"> <div class="input-group"> <input type="text" id="messagetext" placeholder="输入消息" autocomplete="off"/> <input type="text" id="username" placeholder="接收者用户名" autocomplete="off"/> <button type="submit">发送</button> </div> </form> <button id="logoutbtn" onclick="logout()">退出</button> <ul id='messages'></ul> </div> <script> let ws = null; function checklogin() { const token = localstorage.getitem('token'); if (token) { document.getelementbyid('loginsection').style.display = 'none'; document.getelementbyid('chatsection').style.display = 'block'; document.getelementbyid('currentuser').textcontent = `当前用户: ${token}`; connectwebsocket(); } else { document.getelementbyid('loginsection').style.display = 'block'; document.getelementbyid('chatsection').style.display = 'none'; } } function login() { const username = document.getelementbyid('loginusername').value.trim(); if (!username) { document.getelementbyid('loginerror').textcontent = '请输入用户名'; return; } localstorage.setitem('token', username); checklogin(); } function logout() { if (ws && ws.readystate === websocket.open) { ws.close(); } localstorage.removeitem('token'); document.getelementbyid('messages').innerhtml = ''; document.getelementbyid('messagetext').value = ''; document.getelementbyid('username').value = ''; document.getelementbyid('loginsection').style.display = 'block'; document.getelementbyid('chatsection').style.display = 'none'; document.getelementbyid('loginusername').value = ''; document.getelementbyid('currentuser').textcontent = ''; } function connectwebsocket() { const token = localstorage.getitem('token'); ws = new websocket(`ws://localhost:8000/ws/${token}?token=${token}`); ws.onmessage = function(event) { const messages = document.getelementbyid('messages'); const message = document.createelement('li'); message.classname = 'message'; const data = json.parse(event.data); if (data.type === 'error') { message.style.color = 'red'; message.textcontent = data.content; } else if (data.type === 'info') { message.style.color = 'blue'; message.textcontent = data.content; } else { message.textcontent = `${data.from}: ${data.content}`; } messages.appendchild(message); messages.scrolltop = messages.scrollheight; }; ws.onerror = function(error) { console.error('websocket 错误:', error); }; ws.onclose = function() { console.log('websocket 连接已关闭'); }; } function sendmessage(event) { event.preventdefault(); const messageinput = document.getelementbyid("messagetext"); const usernameinput = document.getelementbyid("username"); if (!messageinput.value.trim() || !usernameinput.value.trim()) { return; } const messagedata = { content: messageinput.value, to_user: usernameinput.value, timestamp: new date().toisostring() }; ws.send(json.stringify(messagedata)); const messages = document.getelementbyid('messages'); const message = document.createelement('li'); message.classname = 'message'; message.style.backgroundcolor = '#e3f2fd'; message.textcontent = `我: ${messageinput.value}`; messages.appendchild(message); messages.scrolltop = messages.scrollheight; messageinput.value = ''; } checklogin(); </script> </body> </html>
功能特点
1.用户管理
- 简单的用户名登录系统
- 用户在线状态管理
- 用户会话保持
2.消息功能
- 实时一对一聊天
- 离线消息存储
- 上线自动接收离线消息
- 防止自己给自己发消息
3.界面特性
- 响应式设计
- 消息实时显示
- 错误信息提示
- 消息状态反馈
4.技术特性
- websocket 实时通信
- 状态管理
- 错误处理
- 会话管理
如何运行
1.安装依赖
pip install fastapi uvicorn
2.启动服务器
uvicorn main:app --reload
3.访问应用
- 打开浏览器访问 http://localhost:8000
- 输入用户名登录
- 开始聊天
使用流程
1.登录
- 输入用户名
- 点击登录按钮
2.发送消息
- 输入接收者用户名
- 输入消息内容
- 点击发送
3.接收消息
- 实时接收其他用户发送的消息
- 上线时接收离线消息
4.退出
- 点击退出按钮
- 清除登录状态
效果图
以上就是python基于fastapi和websocket实现实时聊天应用的详细内容,更多关于python fastapi websocket实时聊天的资料请关注代码网其它相关文章!
发表评论