当前位置: 代码网 > 服务器>软件设计>开源 > 开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)

开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)

2024年08月01日 开源 我要评论
使用FastAPI提高AI应用程序的开发效率和用户体验,为 AI 模型的部署和交互提供全方位的支持。

一、前言

    使用 fastapi 可以帮助我们更简单高效地部署 ai 交互业务。fastapi 提供了快速构建 api 的能力,开发者可以轻松地定义模型需要的输入和输出格式,并编写好相应的业务逻辑。

    fastapi 的异步高性能架构,可以有效支持大量并发的预测请求,为用户提供流畅的交互体验。此外,fastapi 还提供了容器化部署能力,开发者可以轻松打包 ai 模型为 docker 镜像,实现跨环境的部署和扩展。

    总之,使用 fastapi 可以大大提高 ai 应用程序的开发效率和用户体验,为 ai 模型的部署和交互提供全方位的支持。

    langchain基础入门:开源模型应用落地-fastapi-助力模型交互-websocket篇(一),本篇学习如何集成langchain进行模型交互,并使用工具获取实时信息


二、术语

2.1.fastapi

    fastapi 是一个用于构建 api 的现代、快速(高性能)的 python web 框架。它是基于标准 python 类型注释的 asgi (asynchronous server gateway interface) 框架。

fastapi 具有以下主要特点:

  1. 快速: fastapi 使用 asgi 服务器和 starlette 框架,在性能测试中表现出色。它可以与 uvicorn 一起使用,提供非常高的性能。

  2. 简单: fastapi 利用 python 类型注释,使 api 定义变得简单且直观。开发人员只需要定义输入和输出模型,fastapi 会自动生成 api 文档。

  3. 现代: fastapi 支持 openapi 标准,可以自动生成 api 文档和交互式文档。它还支持 json schema 和数据验证。

  4. 全功能: fastapi 提供了路由、依赖注入、数据验证、安全性、测试等功能,是一个功能齐全的 web 框架。

  5. 可扩展: fastapi 被设计为可扩展的。开发人员可以轻松地集成其他库和组件,如数据库、身份验证等。

2.2.websocket

    是一种计算机通信协议,它提供了在单个 tcp 连接上进行全双工通信的机制。它是 html5 一个重要的组成部分。

websocket 协议主要有以下特点:

  1. 全双工通信:websocket 允许客户端和服务器之间进行双向实时通信,即数据可以同时在两个方向上流动。这与传统的 http 请求-响应模型不同,http 中数据只能单向流动。

  2. 持久性连接:websocket 连接是一种持久性的连接,一旦建立就会一直保持,直到客户端或服务器主动关闭连接。这与 http 的连接是短暂的不同。

  3. 低开销:相比 http 请求-响应模型,websocket 在建立连接时需要较少的数据交换,因此网络开销较小。

  4. 实时性:由于 websocket 连接是持久性的,且数据可以双向流动,因此 websocket 非常适用于需要实时、低延迟数据交互的应用场景,如聊天应用、实时游戏、股票行情等。

2.3.tool

    tool(工具)是为了增强其语言模型的功能和实用性而设计的一系列辅助手段,用于扩展模型的能力。例如代码解释器(code interpreter)和知识检索(knowledge retrieval)等都属于其工具。

2.4.langchain预置的tools

    https://github.com/langchain-ai/langchain/tree/v0.1.16/docs/docs/integrations/tools

   基本这些工具能满足大部分需求,具体使用参见:


三、前置条件

3.1. 创建虚拟环境&安装依赖

  增加google search的依赖包

conda create -n fastapi_test python=3.10
conda activate fastapi_test
pip install fastapi websockets uvicorn
pip install --quiet  langchain-core langchain-community langchain-openai
pip install google-search-results

3.2. 注册google search api账号

1. 输入注册信息

可以使用google账号登录,但仍要执行下面的认证操作

2. 需要认证邮箱

3. 需要认证手机

4. 认证成功

3.3. 生成google search api的key


四、技术实现

4.1. google search小试

# -*- coding: utf-8 -*-
import os

from langchain_community.utilities.serpapi import serpapiwrapper

os.environ["serpapi_api_key"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
serp = serpapiwrapper()
result = serp.run("广州的实时气温如何?")
print("实时搜索结果:", result)

调用结果:

4.2. 非流式输出

    本章代码将开源模型应用落地-fastapi-助力模型交互-websocket篇(三)基础上进行拓展

服务端:

import uvicorn
import os

from typing import annotated
from fastapi import (
    depends,
    fastapi,
    websocket,
    websocketexception,
    websocketdisconnect,
    status,
)
from langchain.agents import create_structured_chat_agent, agentexecutor
from langchain_community.utilities import serpapiwrapper

from langchain_core.prompts import chatprompttemplate, systemmessageprompttemplate, humanmessageprompttemplate
from langchain_core.tools import tool
from langchain_openai import chatopenai

os.environ["openai_api_key"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'    #你的open ai key
os.environ["serpapi_api_key"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"


class connectionmanager:
    def __init__(self):
        self.active_connections: list[websocket] = []

    async def connect(self, websocket: websocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: websocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: websocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = connectionmanager()

app = fastapi()

async def authenticate(
    websocket: websocket,
    userid: str,
    secret: str,
):
    if userid is none or secret is none:
        raise websocketexception(code=status.ws_1008_policy_violation)

    print(f'userid: {userid},secret: {secret}')
    if '12345' == userid and 'xxxxxxxxxxxxxxxxxxxxxxxxxx' == secret:
        return 'pass'
    else:
        return 'fail'

@tool
def search(query:str):
    """只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""
    serp = serpapiwrapper()
    result = serp.run(query)
    print("实时搜索结果:", result)
    return result


def get_prompt():
    template='''
    respond to the human as helpfully and accurately as possible. you have access to the following tools:
    
    {tools}
    
    use a json blob to specify a tool by providing an action key (tool name) and an action_input key (tool input).
    
    valid "action" values: "final answer" or {tool_names}
    
    provide only one action per $json_blob, as shown:
    
    ```
    
    {{
    
      "action": $tool_name,
    
      "action_input": $input
    
    }}
    
    ```
    
    follow this format:
    
    question: input question to answer
    
    thought: consider previous and subsequent steps
    
    action:
    
    ```
    
    $json_blob
    
    ```
    
    observation: action result
    
    ... (repeat thought/action/observation n times)
    
    thought: i know what to respond
    
    action:
    
    ```
    
    {{
    
      "action": "final answer",
    
      "action_input": "final response to human"
    
    }}
    
    begin! reminder to always respond with a valid json blob of a single action. use tools if necessary. respond directly if appropriate. format is action:```$json_blob```then observation
    '''
    system_message_prompt = systemmessageprompttemplate.from_template(template)
    human_template='''
    {input}
    
    {agent_scratchpad}
    
     (reminder to respond in a json blob no matter what)
    '''
    human_message_prompt = humanmessageprompttemplate.from_template(human_template)
    prompt = chatprompttemplate.from_messages([system_message_prompt, human_message_prompt])

    return prompt

async def chat(query):
    global llm,tools
    agent = create_structured_chat_agent(
        llm, tools, get_prompt()
    )

    agent_executor = agentexecutor(agent=agent, tools=tools, verbose=true, handle_parsing_errors=true)

    result = agent_executor.invoke({"input": query})
    print(result['output'])
    yield result['output']

@app.websocket("/ws")
async def websocket_endpoint(*,websocket: websocket,userid: str,permission: annotated[str, depends(authenticate)],):
    await manager.connect(websocket)
    try:
        while true:
            text = await websocket.receive_text()

            if 'fail' == permission:
                await manager.send_personal_message(
                    f"authentication failed", websocket
                )
            else:
                if text is not none and len(text) > 0:
                    async for msg in chat(text):
                        await manager.send_personal_message(msg, websocket)

    except websocketdisconnect:
        manager.disconnect(websocket)
        print(f"client #{userid} left the chat")
        await manager.broadcast(f"client #{userid} left the chat")

if __name__ == '__main__':
    tools = [search]
    llm = chatopenai(model="gpt-3.5-turbo", temperature=0, max_tokens=512)
    uvicorn.run(app, host='0.0.0.0',port=7777)

客户端:

<!doctype html>
<html>
    <head>
        <title>chat</title>
    </head>
    <body>
        <h1>websocket chat</h1>
        <form action="" onsubmit="sendmessage(event)">
            <label>userid: <input type="text" id="userid" autocomplete="off" value="12345"/></label>
            <label>secret: <input type="text" id="secret" autocomplete="off" value="xxxxxxxxxxxxxxxxxxxxxxxxxx"/></label>
            <br/>
            <button onclick="connect(event)">connect</button>
            <hr>
            <label>message: <input type="text" id="messagetext" autocomplete="off"/></label>
            <button>send</button>
        </form>
        <ul id='messages'>
        </ul>
        <script>
            var ws = null;
            function connect(event) {
                var userid = document.getelementbyid("userid")
                var secret = document.getelementbyid("secret")
                ws = new websocket("ws://localhost:7777/ws?userid="+userid.value+"&secret=" + secret.value);
                ws.onmessage = function(event) {
                    var messages = document.getelementbyid('messages')
                    var message = document.createelement('li')
                    var content = document.createtextnode(event.data)
                    message.appendchild(content)
                    messages.appendchild(message)
                };
                event.preventdefault()
            }
            function sendmessage(event) {
                var input = document.getelementbyid("messagetext")
                ws.send(input.value)
                input.value = ''
                event.preventdefault()
            }
        </script>
    </body>
</html>

调用结果:

用户输入:你好

不需要触发工具调用

模型输出:你好!有什么我可以帮忙的吗?

用户输入:广州现在天气如何?

需要调用工具

模型输出:the current weather in guangzhou is partly cloudy with a temperature of 95°f, 66% chance of precipitation, 58% humidity, and wind speed of 16 mph. this information was last updated on monday at 1:00 pm.

ps:

1. 在ai交互中,langchain框架并不是必须引入,此处引用仅用于简化openai的交互流程。

2. 页面输出的样式可以根据实际需要进行调整,此处仅用于演示效果。

3. 目前还遗留两个问题,一是如何实现流式输出,二是如何更好维护prompt模版,篇幅有限,下回分解


五、附带说明

5.1. 如何避免模型用英文回复

在提示词模版加入:remember to answer in chinese.  暗示模型一定要以中文进行回复。

修改后的提示语为:

respond to the human as helpfully and accurately as possible. you have access to the following tools:
    
    {tools}
    
    use a json blob to specify a tool by providing an action key (tool name) and an action_input key (tool input).
    
    valid "action" values: "final answer" or {tool_names}
    
    provide only one action per $json_blob, as shown:
    
    ```
    
    {{
    
      "action": $tool_name,
    
      "action_input": $input
    
    }}
    
    ```
    
    follow this format:
    
    question: input question to answer
    
    thought: consider previous and subsequent steps
    
    action:
    
    ```
    
    $json_blob
    
    ```
    
    observation: action result
    
    ... (repeat thought/action/observation n times)
    
    thought: i know what to respond
    
    action:
    
    ```
    
    {{
    
      "action": "final answer",
    
      "action_input": "final response to human"
    
    }}
    
    begin! reminder to always respond with a valid json blob of a single action. use tools if necessary. respond directly if appropriate. remember to answer in chinese.format is action:```$json_blob```then observation

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com