一个 agent 解决不了的问题,就组一个队。本文带你用 python 从零搭建多智能体协作系统,实现"ai 产品分析师 + 代码工程师 + 测试专家 + 文档撰写员 + 质检官"全自动流水线。
为什么一个 agent 不够用?
用过 gpt 的人都有过这种经历:
你:帮我完整分析一个竞品,写一份技术方案,再给出完整代码实现,附上测试用例,最后写成报告……
gpt:好的!(给了一份残缺不全、越写越错的东西)
单个 agent 的问题在于:
- 上下文有限:长任务超出 context window 就开始"失忆"
- 角色模糊:既要分析又要写代码,什么都做但什么都不精
- 无法并行:顺序执行,慢
多智能体协作的思路:像管理一个团队,每个 agent 只做一件专业的事,通过协作完成复杂任务。
一、多智能体架构设计
本文搭建的多智能体系统结构如下:
用户输入任务
↓
boss agent(任务规划 + 调度)
↓
┌──────────────────────────────────┐
│ 任务队列(task queue) │
└──────────────────────────────────┘
↓ ↓ ↓
研究员 agent 工程师 agent 质检 agent
(搜索/分析) (写代码/方案) (测试/评审)
↓ ↓
文档 agent 测试 agent
(写报告) (单元测试)
↓
最终输出(报告 / 代码 / 测试)
五位团队成员:
| agent 角色 | 职责 | 核心能力 |
|---|---|---|
| boss agent | 任务拆解 + 调度 + 汇总 | 规划、协调、决策 |
| 研究员 agent | 信息搜集 + 竞品分析 | 搜索、总结、报告 |
| 工程师 agent | 代码实现 + 方案设计 | 编程、架构、调试 |
| 测试 agent | 单元测试 + bug 检测 | 测试、验证、覆盖率 |
| 文档 agent | 技术文档 + readme | 写作、整理、规范 |
二、agent 基类封装
# agents/base.py
from abc import abc, abstractmethod
from dataclasses import dataclass, field
from typing import any, optional
from enum import enum
import asyncio
import time
import uuid
import json
class taskstatus(enum):
pending = "pending"
running = "running"
done = "done"
failed = "failed"
cancelled = "cancelled"
@dataclass
class task:
"""一个可执行的任务单元"""
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
name: str = ""
description: str = ""
assignee: str = "" # 分配给哪个 agent
depends_on: list[str] = field(default_factory=list) # 依赖的任务 id
status: taskstatus = taskstatus.pending
result: any = none
error: str = ""
created_at: float = field(default_factory=time.time)
started_at: float = 0.0
finished_at: float = 0.0
retries: int = 0
max_retries: int = 3
@property
def elapsed_ms(self) -> float:
if self.finished_at and self.started_at:
return (self.finished_at - self.started_at) * 1000
return 0.0
def to_dict(self) -> dict:
return {
"id": self.id,
"name": self.name,
"assignee": self.assignee,
"status": self.status.value,
"elapsed_ms": round(self.elapsed_ms, 1),
"result_preview": str(self.result)[:200] if self.result else none
}
@dataclass
class agentmemory:
"""agent 的短期工作记忆"""
context: dict = field(default_factory=dict) # 键值对上下文
history: list[dict] = field(default_factory=list) # 对话历史
def remember(self, key: str, value: any):
self.context[key] = value
def recall(self, key: str, default=none) -> any:
return self.context.get(key, default)
def add_message(self, role: str, content: str):
self.history.append({"role": role, "content": content})
def clear_history(self):
self.history.clear()
class baseagent(abc):
"""
所有 agent 的抽象基类
每个 agent 有:角色、工具、记忆、llm 后端
"""
def __init__(
self,
name: str,
role: str,
model: str = "gpt-4o",
api_key: str = none,
base_url: str = "https://api.openai.com/v1",
temperature: float = 0.7,
max_tokens: int = 4096,
verbose: bool = true
):
self.name = name
self.role = role
self.model = model
self.temperature = temperature
self.max_tokens = max_tokens
self.verbose = verbose
self.memory = agentmemory()
# 初始化 llm 客户端
from openai import asyncopenai
self.client = asyncopenai(
api_key=api_key,
base_url=base_url
)
self._tools: dict[str, callable] = {}
self._register_tools()
def _register_tools(self):
"""子类重写,注册本 agent 可用的工具"""
pass
def register_tool(self, name: str, func: callable):
self._tools[name] = func
def log(self, msg: str):
if self.verbose:
print(f"[{self.name}] {msg}")
async def think(self, prompt: str, system_override: str = none) -> str:
"""调用 llm 进行思考,返回文本结果"""
system = system_override or f"""你是 {self.name},{self.role}。
请认真完成分配给你的任务,输出要专业、详细、有条理。"""
messages = [
{"role": "system", "content": system},
*self.memory.history[-6:], # 保留最近 6 条历史(节省 token)
{"role": "user", "content": prompt}
]
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=self.temperature,
max_tokens=self.max_tokens
)
reply = response.choices[0].message.content
self.memory.add_message("assistant", reply)
return reply
@abstractmethod
async def execute(self, task: task, shared_context: dict) -> any:
"""执行任务,子类必须实现"""
pass
三、五大专业 agent 实现
3.1 研究员 agent(搜索 + 分析)
# agents/researcher.py
from .base import baseagent, task
import httpx
class researcheragent(baseagent):
"""研究员:负责信息搜集、竞品分析、数据整理"""
def __init__(self, **kwargs):
super().__init__(
name="研究员小研",
role="资深产品研究员,擅长竞品分析、市场调研和信息整理",
temperature=0.3, # 研究任务需要更准确,降低随机性
**kwargs
)
def _register_tools(self):
self.register_tool("web_search", self._web_search)
self.register_tool("summarize_url", self._summarize_url)
async def _web_search(self, query: str) -> str:
"""模拟网页搜索(实际接入 serper / tavily / duckduckgo)"""
# 实际使用时替换为真实 api
async with httpx.asyncclient() as client:
# 示例:接入 serper api
# response = await client.post(
# "https://google.serper.dev/search",
# headers={"x-api-key": serper_api_key},
# json={"q": query}
# )
# return str(response.json()["organic"][:3])
return f"搜索结果({query}):[模拟数据,请替换为真实搜索 api]"
async def _summarize_url(self, url: str) -> str:
"""抓取并总结网页内容"""
async with httpx.asyncclient(timeout=10) as client:
try:
response = await client.get(url, follow_redirects=true)
# 简化处理,实际应该用 beautifulsoup 解析
text = response.text[:3000]
return await self.think(f"请总结以下网页内容的核心信息:\n{text}")
except exception as e:
return f"抓取失败: {e}"
async def execute(self, task: task, shared_context: dict) -> str:
self.log(f"开始执行研究任务:{task.name}")
task.started_at = __import__("time").time()
# 1. 搜集信息
search_result = await self._web_search(task.description)
self.memory.remember("search_result", search_result)
# 2. 深度分析
analysis_prompt = f"""
你是专业研究员。请根据以下信息,对「{task.name}」进行深度分析:
搜索结果:
{search_result}
已有上下文:
{json.dumps(shared_context.get("background", {}), ensure_ascii=false, indent=2)}
请输出:
1. **核心发现**(3-5 条)
2. **数据支撑**(关键数字和事实)
3. **竞品对比**(表格形式)
4. **关键洞察**(对后续工作的启示)
"""
result = await self.think(analysis_prompt)
self.log(f"研究完成,输出 {len(result)} 字")
return result
3.2 工程师 agent(写代码 + 方案)
# agents/engineer.py
from .base import baseagent, task
import subprocess, tempfile, os
class engineeragent(baseagent):
"""工程师:负责技术方案设计和代码实现"""
def __init__(self, **kwargs):
super().__init__(
name="工程师小码",
role="资深 python 全栈工程师,擅长架构设计、代码实现和性能优化",
temperature=0.2, # 代码任务要更确定,温度调低
**kwargs
)
def _register_tools(self):
self.register_tool("run_code", self._run_code)
self.register_tool("check_syntax", self._check_syntax)
async def _run_code(self, code: str, timeout: int = 10) -> str:
"""在沙箱中执行 python 代码"""
with tempfile.namedtemporaryfile(mode="w", suffix=".py", delete=false) as f:
f.write(code)
f.flush()
try:
result = subprocess.run(
["python", f.name],
capture_output=true,
text=true,
timeout=timeout
)
output = result.stdout or result.stderr
return f"执行结果:\n{output[:2000]}"
except subprocess.timeoutexpired:
return "⚠️ 执行超时"
except exception as e:
return f"❌ 执行失败:{e}"
finally:
os.unlink(f.name)
async def _check_syntax(self, code: str) -> str:
"""检查 python 代码语法"""
try:
import ast
ast.parse(code)
return "✅ 语法检查通过"
except syntaxerror as e:
return f"❌ 语法错误:{e}"
def _extract_code(self, text: str) -> str:
"""从 llm 输出中提取代码块"""
import re
pattern = r"```(?:python)?\n(.*?)```"
matches = re.findall(pattern, text, re.dotall)
return matches[0].strip() if matches else text
async def execute(self, task: task, shared_context: dict) -> dict:
self.log(f"开始开发:{task.name}")
task.started_at = __import__("time").time()
research = shared_context.get("research_result", "")
# 1. 方案设计
design_prompt = f"""
任务:{task.description}
参考研究结论:
{research[:1500] if research else "无"}
请先输出:
## 技术方案
1. 架构选型(为什么这么选)
2. 核心模块拆解
3. 关键技术点
4. 预期接口设计
然后输出完整的 python 实现代码。
"""
design_output = await self.think(design_prompt)
# 2. 提取并验证代码
code = self._extract_code(design_output)
syntax_check = await self._check_syntax(code)
# 3. 如果语法有问题,自动修复
if "❌" in syntax_check:
fix_prompt = f"以下代码有语法错误,请修复:\n{syntax_check}\n\n```python\n[code]\n```"
fixed = await self.think(fix_prompt)
code = self._extract_code(fixed)
# 4. 运行测试
run_result = await self._run_code(code)
self.log(f"代码执行:{run_result[:100]}")
return {
"design": design_output,
"code": code,
"syntax_check": syntax_check,
"run_result": run_result
}
3.3 测试 agent + 文档 agent
# agents/tester.py
class testeragent(baseagent):
"""测试专家:生成单元测试,检测 bug"""
def __init__(self, **kwargs):
super().__init__(
name="测试专家小测",
role="资深 qa 工程师,擅长编写 pytest 单元测试和发现潜在 bug",
temperature=0.2,
**kwargs
)
async def execute(self, task: task, shared_context: dict) -> dict:
self.log("开始生成测试...")
code = shared_context.get("engineer_result", {}).get("code", "")
test_prompt = f"""
请为以下 python 代码编写全面的 pytest 测试套件:
```python
[code]
要求:
- 覆盖所有公开函数/方法
- 包含:正常用例、边界条件、异常用例
- 使用 pytest.mark.parametrize 参数化测试
- 添加 mock(如果涉及外部依赖)
- 目标覆盖率:> 80%
输出完整的可运行测试文件。
test_code = await self.think(test_prompt)
# 分析测试覆盖情况
analysis_prompt = f"分析以上测试的覆盖情况,列出可能遗漏的测试用例:"
coverage_analysis = await self.think(analysis_prompt)
return {
"test_code": test_code,
"coverage_analysis": coverage_analysis
}agents/documenter.py
class documenteragent(baseagent):
'文档撰写员:生成技术文档和 readme'
def __init__(self, **kwargs):
super().__init__(
name="文档小写",
role="技术文档专家,擅长撰写清晰易读的 readme、api 文档和技术博客",
temperature=0.6, # 文档写作可以稍微有创意
**kwargs
)
async def execute(self, task: task, shared_context: dict) -> str:
self.log("开始撰写文档...")
engineer_result = shared_context.get("engineer_result", {})
tester_result = shared_context.get("tester_result", {})
research_result = shared_context.get("research_result", "")
doc_prompt = f"""请为以下项目生成完整的 readme.md:
【项目背景】
{research_result[:500] if research_result else task.description}
【技术方案】
{engineer_result.get(“design”, “”)[:800]}
【代码实现】
{engineer_result.get("code", "")[:1000]}
【测试情况】
{tester_result.get(“coverage_analysis”, “”)[:300]}readme 需包含:
- 项目简介(一句话)
- 功能特性(带 emoji 的列表)
- 快速开始(安装 + 使用示例)
- api 参考(关键函数说明)
- 项目结构
- 贡献指南
- license
风格:专业但不枯燥,对新手友好。
四、任务调度器:boss agent
agents/boss.py
import asyncio
from dataclasses import dataclass, field
from .base import baseagent, task, taskstatus
class bossagent(baseagent):
"""
boss agent:整体任务规划 + 调度 + 结果汇总
相当于项目经理,负责把大任务拆成小任务,分配给合适的 agent
"""
def __init__(self, team: dict[str, baseagent], **kwargs):
super().__init__(
name="boss 总监",
role="资深项目总监,擅长任务规划、团队协调和结果整合",
temperature=0.5,
**kwargs
)
self.team = team # {agent_name: agent_instance}
self.task_queue: list[task] = [] # 任务队列
self.shared_context: dict = {} # 团队共享上下文
async def plan(self, user_request: str) -> list[task]:
"""根据用户需求,拆解任务并分配给团队成员"""
team_info = "\n".join([
f"- {name}: {agent.role}"
for name, agent in self.team.items()
])
plan_prompt = f"""
用户需求:{user_request}
你的团队成员:
{team_info}
请把这个需求拆解成 3-6 个可执行的子任务,输出 json 格式:
[
{{
"name": "任务名称",
"description": "详细描述(给 agent 的明确指令)",
"assignee": "agent 名称(researcher/engineer/tester/documenter)",
"depends_on": [] // 依赖的其他任务名称
}}
]
注意:
1. 每个任务职责单一,可独立执行
2. 正确设置依赖关系(如写代码必须等研究完成)
3. 任务描述要具体,让 agent 知道该做什么
"""
plan_output = await self.think(plan_prompt)
# 提取 json 任务列表
import re, json
match = re.search(r"\[.*?\]", plan_output, re.dotall)
if match:
try:
tasks_data = json.loads(match.group())
return [task(**t) for t in tasks_data]
except:
pass
# fallback:默认任务流
return self._default_task_flow(user_request)
def _default_task_flow(self, request: str) -> list[task]:
return [
task(name="需求调研", description=f"调研:{request}", assignee="researcher"),
task(name="代码实现", description=f"实现:{request}", assignee="engineer", depends_on=["需求调研"]),
task(name="单元测试", description="为以上代码生成测试", assignee="tester", depends_on=["代码实现"]),
task(name="文档生成", description="生成完整文档", assignee="documenter", depends_on=["代码实现", "单元测试"]),
]
async def _execute_task(self, task: task) -> none:
"""执行单个任务"""
agent_map = {
"researcher": self.team.get("researcher"),
"engineer": self.team.get("engineer"),
"tester": self.team.get("tester"),
"documenter": self.team.get("documenter"),
}
agent = agent_map.get(task.assignee)
if not agent:
task.status = taskstatus.failed
task.error = f"找不到 agent:{task.assignee}"
return
task.status = taskstatus.running
self.log(f"▶ 分派任务「{task.name}」给 {agent.name}")
try:
result = await agent.execute(task, self.shared_context)
task.result = result
task.status = taskstatus.done
task.finished_at = __import__("time").time()
# 把结果存入共享上下文
self.shared_context[f"{task.assignee}_result"] = result
self.log(f"✅ 任务「{task.name}」完成({task.elapsed_ms:.0f}ms)")
except exception as e:
task.status = taskstatus.failed
task.error = str(e)
self.log(f"❌ 任务「{task.name}」失败:{e}")
async def run(self, user_request: str) -> dict:
"""主流程:规划 → 调度 → 执行 → 汇总"""
print(f"\n{'='*60}")
print(f"🎯 用户需求:{user_request}")
print(f"{'='*60}\n")
# 1. 任务规划
self.log("开始任务规划...")
tasks = await self.plan(user_request)
self.shared_context["user_request"] = user_request
self.task_queue = tasks
print(f"📋 任务计划({len(tasks)} 个子任务):")
for i, t in enumerate(tasks, 1):
deps = f"(依赖:{', '.join(t.depends_on)})" if t.depends_on else ""
print(f" {i}. [{t.assignee}] {t.name} {deps}")
print()
# 2. 拓扑排序 + 执行(支持并行)
completed = set()
while len(completed) < len(tasks):
# 找出当前可执行的任务(依赖已完成)
ready = [
t for t in tasks
if t.status == taskstatus.pending
and all(dep in completed for dep in t.depends_on)
]
if not ready:
# 检查是否死锁
pending = [t for t in tasks if t.status == taskstatus.pending]
if pending:
self.log("⚠️ 检测到任务依赖死锁,强制执行剩余任务")
ready = pending[:1]
else:
break
# 并行执行所有可执行任务
await asyncio.gather(*[self._execute_task(t) for t in ready])
completed.update(t.name for t in ready if t.status == taskstatus.done)
# 3. 汇总结果
return await self._summarize()
async def _summarize(self) -> dict:
"""汇总所有 agent 的工作成果"""
self.log("汇总所有成果...")
results = {
name: ctx
for name, ctx in self.shared_context.items()
if name.endswith("_result")
}
summary_prompt = f"""
团队已完成以下工作,请综合汇总成一份清晰的最终报告:
{__import__("json").dumps(
{k: str(v)[:500] for k, v in results.items()},
ensure_ascii=false, indent=2
)}
输出格式:
## 项目总结
- 完成情况
- 核心交付物
- 关键发现
- 建议后续行动
"""
summary = await self.think(summary_prompt)
return {
"summary": summary,
"details": results,
"tasks": [t.to_dict() for t in self.task_queue]
}五、实战:自动生成竞品分析报告
# main.py
import asyncio
import os
from agents.boss import bossagent
from agents.researcher import researcheragent
from agents.engineer import engineeragent
from agents.tester import testeragent
from agents.documenter import documenteragent
api_key = os.getenv("openai_api_key")
base_url = os.getenv("base_url", "https://api.openai.com/v1")
def build_team() -> dict:
"""组建 ai 工作团队"""
shared_kwargs = dict(
api_key=api_key,
base_url=base_url,
model="gpt-4o",
verbose=true
)
return {
"researcher": researcheragent(**shared_kwargs),
"engineer": engineeragent(**shared_kwargs),
"tester": testeragent(**shared_kwargs),
"documenter": documenteragent(**shared_kwargs),
}
async def run_competitive_analysis():
"""实战场景:自动生成竞品分析报告"""
team = build_team()
boss = bossagent(team=team, api_key=api_key, base_url=base_url, model="gpt-4o")
result = await boss.run(
"分析 rag(检索增强生成)领域的主流工具:langchain、llamaindex、dify,"
"对比它们的特点、优劣势和适用场景,给出选型建议,并用 python 写一个简单的对比评测脚本。"
)
# 输出结果
print("\n" + "="*60)
print("🎉 最终报告")
print("="*60)
print(result["summary"])
# 保存到文件
import json
with open("output/competitive_analysis.json", "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=false, indent=2)
print("\n✅ 完整结果已保存到 output/competitive_analysis.json")
async def run_code_generation():
"""实战场景:自动写代码 + 测试 + 文档"""
team = build_team()
boss = bossagent(team=team, api_key=api_key, base_url=base_url, model="gpt-4o")
result = await boss.run(
"用 python 实现一个高性能的内存缓存库,支持:lru/lfu/ttl 三种淘汰策略、"
"线程安全、最大容量限制、序列化存储,提供装饰器 api。"
)
print(result["summary"])
if __name__ == "__main__":
asyncio.run(run_competitive_analysis())
六、agent 通信与状态共享
# 实现一个简单的消息总线,让 agent 之间可以通信
import asyncio
from collections import defaultdict
class messagebus:
"""
agent 消息总线(发布-订阅模式)
允许 agent 在执行过程中互相通知
"""
def __init__(self):
self._subscribers: dict[str, list[asyncio.queue]] = defaultdict(list)
def subscribe(self, topic: str) -> asyncio.queue:
"""订阅某个话题"""
q = asyncio.queue()
self._subscribers[topic].append(q)
return q
async def publish(self, topic: str, message: dict):
"""发布消息到某个话题"""
for q in self._subscribers[topic]:
await q.put(message)
async def request_help(
self,
from_agent: str,
to_agent: str,
question: str
) -> str:
"""agent 向另一个 agent 请求帮助"""
await self.publish(f"help_request_{to_agent}", {
"from": from_agent,
"question": question
})
# 等待响应(超时 30 秒)
response_queue = self.subscribe(f"help_response_{from_agent}")
try:
response = await asyncio.wait_for(response_queue.get(), timeout=30)
return response.get("answer", "")
except asyncio.timeouterror:
return f"❌ {to_agent} 未在超时内响应"
# 在 agent 中使用消息总线
class collaborativeagent(baseagent):
def __init__(self, bus: messagebus, **kwargs):
super().__init__(**kwargs)
self.bus = bus
async def ask_colleague(self, agent_name: str, question: str) -> str:
"""向同事请教问题"""
self.log(f"向 {agent_name} 求助:{question[:50]}...")
answer = await self.bus.request_help(self.name, agent_name, question)
return answer
七、错误恢复与重试机制
import functools
def with_retry(max_retries: int = 3, delay: float = 1.0, backoff: float = 2.0):
"""任务执行失败自动重试的装饰器(指数退避)"""
def decorator(func):
@functools.wraps(func)
async def wrapper(self, task: task, *args, **kwargs):
last_error = none
for attempt in range(max_retries):
try:
return await func(self, task, *args, **kwargs)
except exception as e:
last_error = e
wait = delay * (backoff ** attempt)
self.log(f"⚠️ 第 {attempt+1} 次失败({e}),{wait:.1f}s 后重试...")
await asyncio.sleep(wait)
task.retries += 1
task.status = taskstatus.failed
task.error = str(last_error)
raise last_error
return wrapper
return decorator
# 在 agent 中使用
class robustengineeragent(engineeragent):
@with_retry(max_retries=3, delay=2.0)
async def execute(self, task: task, shared_context: dict) -> dict:
return await super().execute(task, shared_context)
八、性能优化:并行执行
# 使用信号量控制并发,避免 api 限流
import asyncio
from contextlib import asynccontextmanager
class concurrentbossagent(bossagent):
"""支持并行执行的 boss agent"""
def __init__(self, *args, max_concurrent: int = 3, **kwargs):
super().__init__(*args, **kwargs)
self.semaphore = asyncio.semaphore(max_concurrent) # 最多 3 个并发
async def _execute_task_with_limit(self, task: task) -> none:
async with self.semaphore: # 拿到信号量才执行
await self._execute_task(task)
async def run(self, user_request: str) -> dict:
"""并行执行不依赖彼此的任务"""
tasks = await self.plan(user_request)
# 按依赖关系分层
layers = self._topological_sort(tasks)
for layer in layers:
# 同一层的任务没有依赖,可以并行
print(f"⚡ 并行执行 {len(layer)} 个任务:{[t.name for t in layer]}")
await asyncio.gather(*[
self._execute_task_with_limit(t) for t in layer
])
return await self._summarize()
def _topological_sort(self, tasks: list[task]) -> list[list[task]]:
"""拓扑排序,返回可并行的任务层"""
task_map = {t.name: t for t in tasks}
layers = []
remaining = set(t.name for t in tasks)
while remaining:
# 找出当前没有未完成依赖的任务
layer = [
task_map[name] for name in remaining
if all(dep not in remaining for dep in task_map[name].depends_on)
]
if not layer:
break # 避免死循环
layers.append(layer)
remaining -= {t.name for t in layer}
return layers
九、web 监控面板(gradio)
# monitor.py
import gradio as gr
import asyncio
import json
from main import build_team, bossagent
task_log = []
def run_agents(user_request: str):
"""在 gradio 中运行多智能体系统"""
team = build_team()
boss = bossagent(team=team, api_key=os.getenv("openai_api_key"))
result = asyncio.run(boss.run(user_request))
task_log.extend(result["tasks"])
# 格式化任务执行结果
task_table = [[
t["id"], t["name"], t["assignee"],
t["status"], f"{t['elapsed_ms']:.0f}ms"
] for t in result["tasks"]]
return (
result["summary"],
task_table,
json.dumps(result["details"], ensure_ascii=false, indent=2)
)
with gr.blocks(title="🤖 多智能体监控面板") as demo:
gr.markdown("# 🤖 python 多智能体协作系统")
gr.markdown("输入任务,5 个 ai agent 自动分工完成,实时查看进度")
with gr.row():
user_input = gr.textbox(
label="📝 输入任务",
placeholder="例如:分析 langchain vs llamaindex,给出技术选型建议...",
lines=3
)
run_btn = gr.button("🚀 启动团队", variant="primary", size="lg")
summary_output = gr.markdown(label="📋 最终汇总")
with gr.accordion("📊 任务执行详情", open=false):
task_table = gr.dataframe(
headers=["任务id", "任务名", "负责agent", "状态", "耗时"],
label="任务执行记录"
)
with gr.accordion("🔍 原始输出", open=false):
raw_output = gr.json(label="详细结果")
gr.examples(
examples=[
["分析 fastapi vs django vs flask,给出 2026 年 python api 框架选型建议"],
["用 python 实现一个支持并发的下载器,带进度条和断点续传"],
["调研向量数据库 milvus/weaviate/chroma 的优劣势,并写一个性能对比脚本"],
],
inputs=user_input
)
run_btn.click(
fn=run_agents,
inputs=user_input,
outputs=[summary_output, task_table, raw_output]
)
demo.launch(server_name="0.0.0.0", server_port=7860)
十、效果对比:单 agent vs 多 agent
| 维度 | 单 agent | 多 agent 协作 |
|---|---|---|
| 输出质量 | 中等,容易混乱 | 高,专业分工 |
| 复杂任务 | 容易遗漏、错乱 | 拆解清晰,覆盖全面 |
| 执行速度 | 顺序,慢 | 并行,快 2-5x |
| 可扩展性 | 差,难以增加能力 | 好,加 agent 即可 |
| 可调试性 | 难,输出混在一起 | 易,每个 agent 独立可观测 |
| 适用场景 | 简单单步任务 | 复杂多步骤任务 |
总结
本文搭建的多智能体系统核心要点:
- baseagent 基类:统一的 llm 调用 + 工具注册 + 记忆管理
- 5 个专业 agent:研究员/工程师/测试/文档/boss 各司其职
- boss agent:任务规划 + 拓扑调度 + 结果汇总
- 并行执行:信号量控制,最大化利用 api 并发
- 错误恢复:指数退避重试,提高任务成功率
- 消息总线:agent 之间实时通信
- gradio 面板:可视化监控执行过程
以上就是python实现多智能体协作的实战教程的详细内容,更多关于python多智能体的资料请关注代码网其它相关文章!
发表评论