一、前言
使用 fastapi 可以帮助我们更简单高效地部署 ai 交互业务。fastapi 提供了快速构建 api 的能力,开发者可以轻松地定义模型需要的输入和输出格式,并编写好相应的业务逻辑。
fastapi 的异步高性能架构,可以有效支持大量并发的预测请求,为用户提供流畅的交互体验。此外,fastapi 还提供了容器化部署能力,开发者可以轻松打包 ai 模型为 docker 镜像,实现跨环境的部署和扩展。
总之,使用 fastapi 可以大大提高 ai 应用程序的开发效率和用户体验,为 ai 模型的部署和交互提供全方位的支持。
本篇在开源模型应用落地-fastapi-助力模型交互-websocket篇(五)基础上,学习如何集成tool获取实时数据,并以流式方式返回
二、术语
2.1.tool
tool(工具)是为了增强其语言模型的功能和实用性而设计的一系列辅助手段,用于扩展模型的能力。例如代码解释器(code interpreter)和知识检索(knowledge retrieval)等都属于其工具。
2.2.langchain预置的tools
https://github.com/langchain-ai/langchain/tree/v0.1.16/docs/docs/integrations/tools
基本这些工具能满足大部分需求,具体使用参见:
2.3.langchain支持流式输出的方法
stream:基本的流式传输方式,能逐步给出代理的动作和观察结果。astream:异步的流式传输,用于异步处理需求的情况。astream_events:更细致的流式传输,能流式传输代理的每个具体事件,如工具调用和结束、模型启动和结束等,便于深入了解和监控代理执行的详细过程。
2.4.langchainhub
是 langchain 相关工具的集合中心,其作用在于方便开发者发现和共享常用的提示(prompt)、链、代理等。
它受 hugging face hub 启发,促进社区交流与协作,推动 langchain 生态发展。当前,它在新架构中被置于 langsmith 里,主要聚焦于 prompt。
2.5.asyncio
是一个用于编写并发代码的标准库,它提供了构建异步应用程序的基础框架。
三、前置条件
3.1. 创建虚拟环境&安装依赖
增加google search以及langchainhub的依赖包
conda create -n fastapi_test python=3.10
conda activate fastapi_test
pip install fastapi websockets uvicorn
pip install --quiet langchain-core langchain-community langchain-openai
pip install google-search-results langchainhub
3.2. 注册google search api账号
参见:开源模型应用落地-fastapi-助力模型交互-websocket篇(五)
3.3. 生成google search api的key
四、技术实现
4.1. 使用tool&流式输出
# -*- coding: utf-8 -*-
import asyncio
import os
from langchain.agents import create_structured_chat_agent, agentexecutor
from langchain_community.utilities.serpapi import serpapiwrapper
from langchain_core.prompts import systemmessageprompttemplate, humanmessageprompttemplate, chatprompttemplate
from langchain_core.tools import tool
from langchain_openai import chatopenai
os.environ["openai_api_key"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 你的open ai key
os.environ["serpapi_api_key"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
llm = chatopenai(model="gpt-3.5-turbo",temperature=0,max_tokens=512)
@tool
def search(query:str):
"""只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""
serp = serpapiwrapper()
result = serp.run(query)
print("实时搜索结果:", result)
return result
tools = [search]
template='''
respond to the human as helpfully and accurately as possible. you have access to the following tools:
{tools}
use a json blob to specify a tool by providing an action key (tool name) and an action_input key (tool input).
valid "action" values: "final answer" or {tool_names}
provide only one action per $json_blob, as shown:
```
{{
"action": $tool_name,
"action_input": $input
}}
```
follow this format:
question: input question to answer
thought: consider previous and subsequent steps
action:
```
$json_blob
```
observation: action result
... (repeat thought/action/observation n times)
thought: i know what to respond
action:
```
{{
"action": "final answer",
"action_input": "final response to human"
}}
begin! reminder to always respond with a valid json blob of a single action. use tools if necessary. respond directly if appropriate. format is action:```$json_blob```then observation
'''
system_message_prompt = systemmessageprompttemplate.from_template(template)
human_template='''
{input}
{agent_scratchpad}
(reminder to respond in a json blob no matter what)
'''
human_message_prompt = humanmessageprompttemplate.from_template(human_template)
prompt = chatprompttemplate.from_messages([system_message_prompt, human_message_prompt])
print(prompt)
agent = create_structured_chat_agent(
llm, tools, prompt
)
agent_executor = agentexecutor(agent=agent, tools=tools, verbose=true, handle_parsing_errors=true)
async def chat(params):
events = agent_executor.astream_events(params,version="v2")
async for event in events:
type = event['event']
if 'on_chat_model_stream' == type:
data = event['data']
chunk = data['chunk']
content = chunk.content
if content and len(content) > 0:
print(content)
asyncio.run(chat({"input": "广州现在天气如何?"}))
调用结果:

说明:
流式输出的数据结构为:
{'event': 'on_chat_model_stream', 'data': {'chunk': aimessagechunk(content='天', id='run-92515b63-4b86-4af8-8515-2f84def9dfab')}, 'run_id': '92515b63-4b86-4af8-8515-2f84def9dfab', 'name': 'chatopenai', 'tags': ['seq:step:3'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 512, 'ls_stop': ['\nobservation']}}
type: on_chat_model_stream
{'event': 'on_chat_model_stream', 'data': {'chunk': aimessagechunk(content='气', id='run-92515b63-4b86-4af8-8515-2f84def9dfab')}, 'run_id': '92515b63-4b86-4af8-8515-2f84def9dfab', 'name': 'chatopenai', 'tags': ['seq:step:3'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 512, 'ls_stop': ['\nobservation']}}
4.2. 通过langchainhub使用公共prompt
在4.1使用tool&流式输出的代码基础上进行调整
# -*- coding: utf-8 -*-
import asyncio
import os
from langchain.agents import create_structured_chat_agent, agentexecutor
from langchain_community.utilities.serpapi import serpapiwrapper
from langchain_core.tools import tool
from langchain_openai import chatopenai
os.environ["openai_api_key"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 你的open ai key
os.environ["serpapi_api_key"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
from langchain import hub
llm = chatopenai(model="gpt-3.5-turbo",temperature=0,max_tokens=512)
@tool
def search(query:str):
"""只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""
serp = serpapiwrapper()
result = serp.run(query)
print("实时搜索结果:", result)
return result
tools = [search]
prompt = hub.pull("hwchase17/structured-chat-agent")
print(prompt)
agent = create_structured_chat_agent(
llm, tools, prompt
)
agent_executor = agentexecutor(agent=agent, tools=tools, verbose=true, handle_parsing_errors=true)
async def chat(params):
events = agent_executor.astream_events(params,version="v2")
async for event in events:
type = event['event']
if 'on_chat_model_stream' == type:
data = event['data']
chunk = data['chunk']
content = chunk.content
if content and len(content) > 0:
print(content)
asyncio.run(chat({"input": "广州现在天气如何?"}))
调用结果:

4.3. 整合代码
在开源模型应用落地-fastapi-助力模型交互-websocket篇(五)的代码基础上进行调整
import uvicorn
import os
from typing import annotated
from fastapi import (
depends,
fastapi,
websocket,
websocketexception,
websocketdisconnect,
status,
)
from langchain import hub
from langchain.agents import create_structured_chat_agent, agentexecutor
from langchain_community.utilities import serpapiwrapper
from langchain_core.tools import tool
from langchain_openai import chatopenai
os.environ["openai_api_key"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 你的open ai key
os.environ["serpapi_api_key"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
class connectionmanager:
def __init__(self):
self.active_connections: list[websocket] = []
async def connect(self, websocket: websocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: websocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: websocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = connectionmanager()
app = fastapi()
async def authenticate(
websocket: websocket,
userid: str,
secret: str,
):
if userid is none or secret is none:
raise websocketexception(code=status.ws_1008_policy_violation)
print(f'userid: {userid},secret: {secret}')
if '12345' == userid and 'xxxxxxxxxxxxxxxxxxxxxxxxxx' == secret:
return 'pass'
else:
return 'fail'
@tool
def search(query:str):
"""只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""
serp = serpapiwrapper()
result = serp.run(query)
print("实时搜索结果:", result)
return result
def get_prompt():
prompt = hub.pull("hwchase17/structured-chat-agent")
return prompt
async def chat(query):
global llm,tools
agent = create_structured_chat_agent(
llm, tools, get_prompt()
)
agent_executor = agentexecutor(agent=agent, tools=tools, verbose=true, handle_parsing_errors=true)
events = agent_executor.astream_events({"input": query}, version="v1")
async for event in events:
type = event['event']
if 'on_chat_model_stream' == type:
data = event['data']
chunk = data['chunk']
content = chunk.content
if content and len(content) > 0:
print(content)
yield content
@app.websocket("/ws")
async def websocket_endpoint(*,websocket: websocket,userid: str,permission: annotated[str, depends(authenticate)],):
await manager.connect(websocket)
try:
while true:
text = await websocket.receive_text()
if 'fail' == permission:
await manager.send_personal_message(
f"authentication failed", websocket
)
else:
if text is not none and len(text) > 0:
async for msg in chat(text):
await manager.send_personal_message(msg, websocket)
except websocketdisconnect:
manager.disconnect(websocket)
print(f"client #{userid} left the chat")
await manager.broadcast(f"client #{userid} left the chat")
if __name__ == '__main__':
tools = [search]
llm = chatopenai(model="gpt-3.5-turbo", temperature=0, max_tokens=512)
uvicorn.run(app, host='0.0.0.0',port=7777)
客户端:
<!doctype html>
<html>
<head>
<title>chat</title>
</head>
<body>
<h1>websocket chat</h1>
<form action="" onsubmit="sendmessage(event)">
<label>userid: <input type="text" id="userid" autocomplete="off" value="12345"/></label>
<label>secret: <input type="text" id="secret" autocomplete="off" value="xxxxxxxxxxxxxxxxxxxxxxxxxx"/></label>
<br/>
<button onclick="connect(event)">connect</button>
<hr>
<label>message: <input type="text" id="messagetext" autocomplete="off"/></label>
<button>send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = null;
function connect(event) {
var userid = document.getelementbyid("userid")
var secret = document.getelementbyid("secret")
ws = new websocket("ws://localhost:7777/ws?userid="+userid.value+"&secret=" + secret.value);
ws.onmessage = function(event) {
var messages = document.getelementbyid('messages')
var message = document.createelement('li')
var content = document.createtextnode(event.data)
message.appendchild(content)
messages.appendchild(message)
};
event.preventdefault()
}
function sendmessage(event) {
var input = document.getelementbyid("messagetext")
ws.send(input.value)
input.value = ''
event.preventdefault()
}
</script>
</body>
</html>
调用结果:
用户输入:你好
不需要触发工具调用

模型输出:

用户输入:广州现在天气如何?

需要调用工具

模型输出:
```
action:
```
{
"action": "final answer",
"action_input": "广州现在的天气是多云,温度为87华氏度,降水概率为7%,湿度为76%,风力为7英里/小时。"
}
```
ps:
1. 上面仅用于演示流式输出的效果,里面包含一些冗余的信息,例如:"action": "final answer",要根据实际情况过滤。
2. 页面输出的样式可以根据实际需要进行调整,此处仅用于演示效果。

发表评论