项目简介
这是一个基于 fastapi 和 websocket 实现的实时聊天应用,支持一对一聊天、离线消息存储等功能。
技术栈
后端:fastapi (python)
前端:html、javascript、css
通信:websocket
认证:简单的 token 认证
项目结构
├── main.py # 后端主程序
└── templates/ # 前端模板目录
└── chat.html # 聊天页面
详细代码实现
1. 后端实现 (main.py)
from fastapi import fastapi, websocket, depends, websocketdisconnect
from typing import dict
import json
from fastapi.responses import htmlresponse, fileresponse
from fastapi.staticfiles import staticfiles
app = fastapi()
app.mount("/templates", staticfiles(directory="templates"), name="templates")
class connectionmanager:
def __init__(self):
self.active_connections: dict[str, websocket] = {}
self.offline_messages: dict[str, list] = {} # 离线消息存储
async def connect(self, user: str, websocket: websocket):
await websocket.accept()
self.active_connections[user] = websocket
async def disconnect(self, user: str, websocket: websocket):
if user in self.active_connections:
del self.active_connections[user]
async def send_personal_message(self, message: str, to_user: str):
if to_user in self.active_connections:
await self.active_connections[to_user].send_text(message)
return {"success": true}
else:
# 存储离线消息
if to_user not in self.offline_messages:
self.offline_messages[to_user] = []
self.offline_messages[to_user].append(message)
return {"success": false, "reason": "offline", "stored": true}
async def get_offline_messages(self, user: str):
if user in self.offline_messages:
messages = self.offline_messages[user]
del self.offline_messages[user] # 取出后删除
return messages
return []
# token 认证
async def get_cookie_or_token(token: str = none):
if not token:
from fastapi import httpexception
raise httpexception(status_code=401, detail="未授权")
return token
# 初始化连接管理器
ws_manager = connectionmanager()
@app.websocket("/ws/{user}")
async def websocket_one_to_one(
websocket: websocket,
user: str,
cookie_or_token: str = depends(get_cookie_or_token)
):
await ws_manager.connect(user, websocket)
# 发送离线消息
offline_msgs = await ws_manager.get_offline_messages(user)
for msg in offline_msgs:
await websocket.send_text(msg)
try:
while true:
data = await websocket.receive_text()
message_data = json.loads(data)
# 防止自己给自己发消息
if message_data["to_user"] == user:
error_msg = {
"type": "error",
"content": "不能发送消息给自己"
}
await websocket.send_text(json.dumps(error_msg))
continue
response_message = {
"from": user,
"content": message_data["content"],
"timestamp": message_data.get("timestamp"),
"type": "message"
}
# 发送消息
result = await ws_manager.send_personal_message(
message=json.dumps(response_message),
to_user=message_data["to_user"]
)
# 处理离线消息
if not result["success"]:
if result.get("stored"):
success_msg = {
"type": "info",
"content": f"消息已保存,将在用户 {message_data['to_user']} 上线时送达"
}
await websocket.send_text(json.dumps(success_msg))
except websocketdisconnect:
await ws_manager.disconnect(user, websocket)
except exception as e:
print(f"websocket 错误: {e}")
await ws_manager.disconnect(user, websocket)
@app.get("/", response_class=fileresponse)
async def root():
return "templates/chat.html"
2. 前端实现 (templates/chat.html)
<!doctype html>
<html>
<head>
<title>websocket chat</title>
<style>
body {
font-family: arial, sans-serif;
max-width: 800px;
margin: 0 auto;
padding: 20px;
}
#loginsection {
text-align: center;
margin-top: 100px;
}
#chatsection {
display: none;
}
#messages {
list-style-type: none;
padding: 0;
height: 400px;
overflow-y: auto;
border: 1px solid #ccc;
margin-bottom: 20px;
padding: 10px;
}
.message {
margin: 10px 0;
padding: 10px;
border-radius: 5px;
background-color: #f0f0f0;
}
.input-group {
margin-bottom: 10px;
}
input[type="text"] {
padding: 8px;
margin-right: 10px;
width: 200px;
}
button {
padding: 8px 15px;
background-color: #4caf50;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
}
button:hover {
background-color: #45a049;
}
#logoutbtn {
background-color: #f44336;
}
#logoutbtn:hover {
background-color: #da190b;
}
.error {
color: red;
margin: 10px 0;
}
.info {
color: blue;
margin: 10px 0;
}
</style>
</head>
<body>
<!-- 登录部分 -->
<div id="loginsection">
<h1>websocket 聊天</h1>
<div class="input-group">
<input type="text" id="loginusername" placeholder="输入用户名" autocomplete="off"/>
<button onclick="login()">登录</button>
</div>
<div id="loginerror" class="error"></div>
</div>
<!-- 聊天部分 -->
<div id="chatsection">
<h1>websocket 聊天</h1>
<div id="currentuser"></div>
<form action="" onsubmit="sendmessage(event)">
<div class="input-group">
<input type="text" id="messagetext" placeholder="输入消息" autocomplete="off"/>
<input type="text" id="username" placeholder="接收者用户名" autocomplete="off"/>
<button type="submit">发送</button>
</div>
</form>
<button id="logoutbtn" onclick="logout()">退出</button>
<ul id='messages'></ul>
</div>
<script>
let ws = null;
function checklogin() {
const token = localstorage.getitem('token');
if (token) {
document.getelementbyid('loginsection').style.display = 'none';
document.getelementbyid('chatsection').style.display = 'block';
document.getelementbyid('currentuser').textcontent = `当前用户: ${token}`;
connectwebsocket();
} else {
document.getelementbyid('loginsection').style.display = 'block';
document.getelementbyid('chatsection').style.display = 'none';
}
}
function login() {
const username = document.getelementbyid('loginusername').value.trim();
if (!username) {
document.getelementbyid('loginerror').textcontent = '请输入用户名';
return;
}
localstorage.setitem('token', username);
checklogin();
}
function logout() {
if (ws && ws.readystate === websocket.open) {
ws.close();
}
localstorage.removeitem('token');
document.getelementbyid('messages').innerhtml = '';
document.getelementbyid('messagetext').value = '';
document.getelementbyid('username').value = '';
document.getelementbyid('loginsection').style.display = 'block';
document.getelementbyid('chatsection').style.display = 'none';
document.getelementbyid('loginusername').value = '';
document.getelementbyid('currentuser').textcontent = '';
}
function connectwebsocket() {
const token = localstorage.getitem('token');
ws = new websocket(`ws://localhost:8000/ws/${token}?token=${token}`);
ws.onmessage = function(event) {
const messages = document.getelementbyid('messages');
const message = document.createelement('li');
message.classname = 'message';
const data = json.parse(event.data);
if (data.type === 'error') {
message.style.color = 'red';
message.textcontent = data.content;
} else if (data.type === 'info') {
message.style.color = 'blue';
message.textcontent = data.content;
} else {
message.textcontent = `${data.from}: ${data.content}`;
}
messages.appendchild(message);
messages.scrolltop = messages.scrollheight;
};
ws.onerror = function(error) {
console.error('websocket 错误:', error);
};
ws.onclose = function() {
console.log('websocket 连接已关闭');
};
}
function sendmessage(event) {
event.preventdefault();
const messageinput = document.getelementbyid("messagetext");
const usernameinput = document.getelementbyid("username");
if (!messageinput.value.trim() || !usernameinput.value.trim()) {
return;
}
const messagedata = {
content: messageinput.value,
to_user: usernameinput.value,
timestamp: new date().toisostring()
};
ws.send(json.stringify(messagedata));
const messages = document.getelementbyid('messages');
const message = document.createelement('li');
message.classname = 'message';
message.style.backgroundcolor = '#e3f2fd';
message.textcontent = `我: ${messageinput.value}`;
messages.appendchild(message);
messages.scrolltop = messages.scrollheight;
messageinput.value = '';
}
checklogin();
</script>
</body>
</html>
功能特点
1.用户管理
- 简单的用户名登录系统
- 用户在线状态管理
- 用户会话保持
2.消息功能
- 实时一对一聊天
- 离线消息存储
- 上线自动接收离线消息
- 防止自己给自己发消息
3.界面特性
- 响应式设计
- 消息实时显示
- 错误信息提示
- 消息状态反馈
4.技术特性
- websocket 实时通信
- 状态管理
- 错误处理
- 会话管理
如何运行
1.安装依赖
pip install fastapi uvicorn
2.启动服务器
uvicorn main:app --reload
3.访问应用
- 打开浏览器访问 http://localhost:8000
- 输入用户名登录
- 开始聊天
使用流程
1.登录
- 输入用户名
- 点击登录按钮
2.发送消息
- 输入接收者用户名
- 输入消息内容
- 点击发送
3.接收消息
- 实时接收其他用户发送的消息
- 上线时接收离线消息
4.退出
- 点击退出按钮
- 清除登录状态
效果图


以上就是python基于fastapi和websocket实现实时聊天应用的详细内容,更多关于python fastapi websocket实时聊天的资料请关注代码网其它相关文章!
发表评论