当前位置: 代码网 > it编程>前端脚本>Python > Python基于FastAPI和WebSocket实现实时聊天应用

Python基于FastAPI和WebSocket实现实时聊天应用

2025年04月30日 Python 我要评论
项目简介这是一个基于 fastapi 和 websocket 实现的实时聊天应用,支持一对一聊天、离线消息存储等功能。技术栈后端:fastapi (python)前端:html、javascript、

项目简介

这是一个基于 fastapi 和 websocket 实现的实时聊天应用,支持一对一聊天、离线消息存储等功能。

技术栈

后端:fastapi (python)

前端:html、javascript、css

通信:websocket

认证:简单的 token 认证

项目结构

├── main.py              # 后端主程序
└── templates/           # 前端模板目录
    └── chat.html       # 聊天页面

详细代码实现

1. 后端实现 (main.py)

from fastapi import fastapi, websocket, depends, websocketdisconnect
from typing import dict
import json
from fastapi.responses import htmlresponse, fileresponse
from fastapi.staticfiles import staticfiles

app = fastapi()
app.mount("/templates", staticfiles(directory="templates"), name="templates")

class connectionmanager:
    def __init__(self):
        self.active_connections: dict[str, websocket] = {}
        self.offline_messages: dict[str, list] = {}  # 离线消息存储

    async def connect(self, user: str, websocket: websocket):
        await websocket.accept()
        self.active_connections[user] = websocket

    async def disconnect(self, user: str, websocket: websocket):
        if user in self.active_connections:
            del self.active_connections[user]

    async def send_personal_message(self, message: str, to_user: str):
        if to_user in self.active_connections:
            await self.active_connections[to_user].send_text(message)
            return {"success": true}
        else:
            # 存储离线消息
            if to_user not in self.offline_messages:
                self.offline_messages[to_user] = []
            self.offline_messages[to_user].append(message)
            return {"success": false, "reason": "offline", "stored": true}

    async def get_offline_messages(self, user: str):
        if user in self.offline_messages:
            messages = self.offline_messages[user]
            del self.offline_messages[user]  # 取出后删除
            return messages
        return []

# token 认证
async def get_cookie_or_token(token: str = none):
    if not token:
        from fastapi import httpexception
        raise httpexception(status_code=401, detail="未授权")
    return token

# 初始化连接管理器
ws_manager = connectionmanager()

@app.websocket("/ws/{user}")
async def websocket_one_to_one(
        websocket: websocket,
        user: str,
        cookie_or_token: str = depends(get_cookie_or_token)
):
    await ws_manager.connect(user, websocket)
    
    # 发送离线消息
    offline_msgs = await ws_manager.get_offline_messages(user)
    for msg in offline_msgs:
        await websocket.send_text(msg)
    
    try:
        while true:
            data = await websocket.receive_text()
            message_data = json.loads(data)
            
            # 防止自己给自己发消息
            if message_data["to_user"] == user:
                error_msg = {
                    "type": "error",
                    "content": "不能发送消息给自己"
                }
                await websocket.send_text(json.dumps(error_msg))
                continue
                
            response_message = {
                "from": user,
                "content": message_data["content"],
                "timestamp": message_data.get("timestamp"),
                "type": "message"
            }
            
            # 发送消息
            result = await ws_manager.send_personal_message(
                message=json.dumps(response_message),
                to_user=message_data["to_user"]
            )
            
            # 处理离线消息
            if not result["success"]:
                if result.get("stored"):
                    success_msg = {
                        "type": "info",
                        "content": f"消息已保存,将在用户 {message_data['to_user']} 上线时送达"
                    }
                    await websocket.send_text(json.dumps(success_msg))
    except websocketdisconnect:
        await ws_manager.disconnect(user, websocket)
    except exception as e:
        print(f"websocket 错误: {e}")
        await ws_manager.disconnect(user, websocket)

@app.get("/", response_class=fileresponse)
async def root():
    return "templates/chat.html"

2. 前端实现 (templates/chat.html)

<!doctype html>
<html>
<head>
    <title>websocket chat</title>
    <style>
        body {
            font-family: arial, sans-serif;
            max-width: 800px;
            margin: 0 auto;
            padding: 20px;
        }
        #loginsection {
            text-align: center;
            margin-top: 100px;
        }
        #chatsection {
            display: none;
        }
        #messages {
            list-style-type: none;
            padding: 0;
            height: 400px;
            overflow-y: auto;
            border: 1px solid #ccc;
            margin-bottom: 20px;
            padding: 10px;
        }
        .message {
            margin: 10px 0;
            padding: 10px;
            border-radius: 5px;
            background-color: #f0f0f0;
        }
        .input-group {
            margin-bottom: 10px;
        }
        input[type="text"] {
            padding: 8px;
            margin-right: 10px;
            width: 200px;
        }
        button {
            padding: 8px 15px;
            background-color: #4caf50;
            color: white;
            border: none;
            border-radius: 4px;
            cursor: pointer;
        }
        button:hover {
            background-color: #45a049;
        }
        #logoutbtn {
            background-color: #f44336;
        }
        #logoutbtn:hover {
            background-color: #da190b;
        }
        .error {
            color: red;
            margin: 10px 0;
        }
        .info {
            color: blue;
            margin: 10px 0;
        }
    </style>
</head>
<body>
    <!-- 登录部分 -->
    <div id="loginsection">
        <h1>websocket 聊天</h1>
        <div class="input-group">
            <input type="text" id="loginusername" placeholder="输入用户名" autocomplete="off"/>
            <button onclick="login()">登录</button>
        </div>
        <div id="loginerror" class="error"></div>
    </div>

    <!-- 聊天部分 -->
    <div id="chatsection">
        <h1>websocket 聊天</h1>
        <div id="currentuser"></div>
        <form action="" onsubmit="sendmessage(event)">
            <div class="input-group">
                <input type="text" id="messagetext" placeholder="输入消息" autocomplete="off"/>
                <input type="text" id="username" placeholder="接收者用户名" autocomplete="off"/>
                <button type="submit">发送</button>
            </div>
        </form>
        <button id="logoutbtn" onclick="logout()">退出</button>
        <ul id='messages'></ul>
    </div>

    <script>
        let ws = null;

        function checklogin() {
            const token = localstorage.getitem('token');
            if (token) {
                document.getelementbyid('loginsection').style.display = 'none';
                document.getelementbyid('chatsection').style.display = 'block';
                document.getelementbyid('currentuser').textcontent = `当前用户: ${token}`;
                connectwebsocket();
            } else {
                document.getelementbyid('loginsection').style.display = 'block';
                document.getelementbyid('chatsection').style.display = 'none';
            }
        }

        function login() {
            const username = document.getelementbyid('loginusername').value.trim();
            if (!username) {
                document.getelementbyid('loginerror').textcontent = '请输入用户名';
                return;
            }
            localstorage.setitem('token', username);
            checklogin();
        }

        function logout() {
            if (ws && ws.readystate === websocket.open) {
                ws.close();
            }
            localstorage.removeitem('token');
            document.getelementbyid('messages').innerhtml = '';
            document.getelementbyid('messagetext').value = '';
            document.getelementbyid('username').value = '';
            document.getelementbyid('loginsection').style.display = 'block';
            document.getelementbyid('chatsection').style.display = 'none';
            document.getelementbyid('loginusername').value = '';
            document.getelementbyid('currentuser').textcontent = '';
        }

        function connectwebsocket() {
            const token = localstorage.getitem('token');
            ws = new websocket(`ws://localhost:8000/ws/${token}?token=${token}`);
            
            ws.onmessage = function(event) {
                const messages = document.getelementbyid('messages');
                const message = document.createelement('li');
                message.classname = 'message';
                
                const data = json.parse(event.data);
                
                if (data.type === 'error') {
                    message.style.color = 'red';
                    message.textcontent = data.content;
                } else if (data.type === 'info') {
                    message.style.color = 'blue';
                    message.textcontent = data.content;
                } else {
                    message.textcontent = `${data.from}: ${data.content}`;
                }
                
                messages.appendchild(message);
                messages.scrolltop = messages.scrollheight;
            };

            ws.onerror = function(error) {
                console.error('websocket 错误:', error);
            };

            ws.onclose = function() {
                console.log('websocket 连接已关闭');
            };
        }

        function sendmessage(event) {
            event.preventdefault();
            
            const messageinput = document.getelementbyid("messagetext");
            const usernameinput = document.getelementbyid("username");
            
            if (!messageinput.value.trim() || !usernameinput.value.trim()) {
                return;
            }
            
            const messagedata = {
                content: messageinput.value,
                to_user: usernameinput.value,
                timestamp: new date().toisostring()
            };

            ws.send(json.stringify(messagedata));
            
            const messages = document.getelementbyid('messages');
            const message = document.createelement('li');
            message.classname = 'message';
            message.style.backgroundcolor = '#e3f2fd';
            message.textcontent = `我: ${messageinput.value}`;
            messages.appendchild(message);
            messages.scrolltop = messages.scrollheight;
            
            messageinput.value = '';
        }

        checklogin();
    </script>
</body>
</html>

功能特点

1.用户管理

  • 简单的用户名登录系统
  • 用户在线状态管理
  • 用户会话保持

2.消息功能

  • 实时一对一聊天
  • 离线消息存储
  • 上线自动接收离线消息
  • 防止自己给自己发消息

3.界面特性

  • 响应式设计
  • 消息实时显示
  • 错误信息提示
  • 消息状态反馈

4.技术特性

  • websocket 实时通信
  • 状态管理
  • 错误处理
  • 会话管理

如何运行

1.安装依赖

pip install fastapi uvicorn

2.启动服务器

uvicorn main:app --reload

3.访问应用

  • 打开浏览器访问 http://localhost:8000
  • 输入用户名登录
  • 开始聊天

使用流程

1.登录

  • 输入用户名
  • 点击登录按钮

2.发送消息

  • 输入接收者用户名
  • 输入消息内容
  • 点击发送

3.接收消息

  • 实时接收其他用户发送的消息
  • 上线时接收离线消息

4.退出

  • 点击退出按钮
  • 清除登录状态

效果图

以上就是python基于fastapi和websocket实现实时聊天应用的详细内容,更多关于python fastapi websocket实时聊天的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com