当前位置: 代码网 > it编程>前端脚本>Python > Python实现多智能体协作的实战教程

Python实现多智能体协作的实战教程

2026年04月24日 Python 我要评论
一个 agent 解决不了的问题,就组一个队。本文带你用 python 从零搭建多智能体协作系统,实现"ai 产品分析师 + 代码工程师 + 测试专家 + 文档撰写员 + 质检官"

一个 agent 解决不了的问题,就组一个队。本文带你用 python 从零搭建多智能体协作系统,实现"ai 产品分析师 + 代码工程师 + 测试专家 + 文档撰写员 + 质检官"全自动流水线。

为什么一个 agent 不够用?

用过 gpt 的人都有过这种经历:

你:帮我完整分析一个竞品,写一份技术方案,再给出完整代码实现,附上测试用例,最后写成报告……

gpt:好的!(给了一份残缺不全、越写越错的东西)

单个 agent 的问题在于:

  • 上下文有限:长任务超出 context window 就开始"失忆"
  • 角色模糊:既要分析又要写代码,什么都做但什么都不精
  • 无法并行:顺序执行,慢

多智能体协作的思路:像管理一个团队,每个 agent 只做一件专业的事,通过协作完成复杂任务。

一、多智能体架构设计

本文搭建的多智能体系统结构如下:

用户输入任务
     ↓
  boss agent(任务规划 + 调度)
     ↓
  ┌──────────────────────────────────┐
  │  任务队列(task queue)           │
  └──────────────────────────────────┘
     ↓              ↓              ↓
研究员 agent   工程师 agent   质检 agent
  (搜索/分析)   (写代码/方案)  (测试/评审)
     ↓              ↓
文档 agent    测试 agent
  (写报告)     (单元测试)
     ↓
最终输出(报告 / 代码 / 测试)

五位团队成员:

agent 角色职责核心能力
boss agent任务拆解 + 调度 + 汇总规划、协调、决策
研究员 agent信息搜集 + 竞品分析搜索、总结、报告
工程师 agent代码实现 + 方案设计编程、架构、调试
测试 agent单元测试 + bug 检测测试、验证、覆盖率
文档 agent技术文档 + readme写作、整理、规范

二、agent 基类封装

# agents/base.py
from abc import abc, abstractmethod
from dataclasses import dataclass, field
from typing import any, optional
from enum import enum
import asyncio
import time
import uuid
import json

class taskstatus(enum):
    pending   = "pending"
    running   = "running"
    done      = "done"
    failed    = "failed"
    cancelled = "cancelled"


@dataclass
class task:
    """一个可执行的任务单元"""
    id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    name: str = ""
    description: str = ""
    assignee: str = ""          # 分配给哪个 agent
    depends_on: list[str] = field(default_factory=list)  # 依赖的任务 id
    status: taskstatus = taskstatus.pending
    result: any = none
    error: str = ""
    created_at: float = field(default_factory=time.time)
    started_at: float = 0.0
    finished_at: float = 0.0
    retries: int = 0
    max_retries: int = 3

    @property
    def elapsed_ms(self) -> float:
        if self.finished_at and self.started_at:
            return (self.finished_at - self.started_at) * 1000
        return 0.0

    def to_dict(self) -> dict:
        return {
            "id": self.id,
            "name": self.name,
            "assignee": self.assignee,
            "status": self.status.value,
            "elapsed_ms": round(self.elapsed_ms, 1),
            "result_preview": str(self.result)[:200] if self.result else none
        }


@dataclass
class agentmemory:
    """agent 的短期工作记忆"""
    context: dict = field(default_factory=dict)    # 键值对上下文
    history: list[dict] = field(default_factory=list)  # 对话历史

    def remember(self, key: str, value: any):
        self.context[key] = value

    def recall(self, key: str, default=none) -> any:
        return self.context.get(key, default)

    def add_message(self, role: str, content: str):
        self.history.append({"role": role, "content": content})

    def clear_history(self):
        self.history.clear()


class baseagent(abc):
    """
    所有 agent 的抽象基类
    每个 agent 有:角色、工具、记忆、llm 后端
    """
    def __init__(
        self,
        name: str,
        role: str,
        model: str = "gpt-4o",
        api_key: str = none,
        base_url: str = "https://api.openai.com/v1",
        temperature: float = 0.7,
        max_tokens: int = 4096,
        verbose: bool = true
    ):
        self.name = name
        self.role = role
        self.model = model
        self.temperature = temperature
        self.max_tokens = max_tokens
        self.verbose = verbose
        self.memory = agentmemory()

        # 初始化 llm 客户端
        from openai import asyncopenai
        self.client = asyncopenai(
            api_key=api_key,
            base_url=base_url
        )

        self._tools: dict[str, callable] = {}
        self._register_tools()

    def _register_tools(self):
        """子类重写,注册本 agent 可用的工具"""
        pass

    def register_tool(self, name: str, func: callable):
        self._tools[name] = func

    def log(self, msg: str):
        if self.verbose:
            print(f"[{self.name}] {msg}")

    async def think(self, prompt: str, system_override: str = none) -> str:
        """调用 llm 进行思考,返回文本结果"""
        system = system_override or f"""你是 {self.name},{self.role}。
请认真完成分配给你的任务,输出要专业、详细、有条理。"""

        messages = [
            {"role": "system", "content": system},
            *self.memory.history[-6:],   # 保留最近 6 条历史(节省 token)
            {"role": "user", "content": prompt}
        ]

        response = await self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            temperature=self.temperature,
            max_tokens=self.max_tokens
        )
        reply = response.choices[0].message.content
        self.memory.add_message("assistant", reply)
        return reply

    @abstractmethod
    async def execute(self, task: task, shared_context: dict) -> any:
        """执行任务,子类必须实现"""
        pass

三、五大专业 agent 实现

3.1 研究员 agent(搜索 + 分析)

# agents/researcher.py
from .base import baseagent, task
import httpx

class researcheragent(baseagent):
    """研究员:负责信息搜集、竞品分析、数据整理"""

    def __init__(self, **kwargs):
        super().__init__(
            name="研究员小研",
            role="资深产品研究员,擅长竞品分析、市场调研和信息整理",
            temperature=0.3,   # 研究任务需要更准确,降低随机性
            **kwargs
        )

    def _register_tools(self):
        self.register_tool("web_search", self._web_search)
        self.register_tool("summarize_url", self._summarize_url)

    async def _web_search(self, query: str) -> str:
        """模拟网页搜索(实际接入 serper / tavily / duckduckgo)"""
        # 实际使用时替换为真实 api
        async with httpx.asyncclient() as client:
            # 示例:接入 serper api
            # response = await client.post(
            #     "https://google.serper.dev/search",
            #     headers={"x-api-key": serper_api_key},
            #     json={"q": query}
            # )
            # return str(response.json()["organic"][:3])
            return f"搜索结果({query}):[模拟数据,请替换为真实搜索 api]"

    async def _summarize_url(self, url: str) -> str:
        """抓取并总结网页内容"""
        async with httpx.asyncclient(timeout=10) as client:
            try:
                response = await client.get(url, follow_redirects=true)
                # 简化处理,实际应该用 beautifulsoup 解析
                text = response.text[:3000]
                return await self.think(f"请总结以下网页内容的核心信息:\n{text}")
            except exception as e:
                return f"抓取失败: {e}"

    async def execute(self, task: task, shared_context: dict) -> str:
        self.log(f"开始执行研究任务:{task.name}")
        task.started_at = __import__("time").time()

        # 1. 搜集信息
        search_result = await self._web_search(task.description)
        self.memory.remember("search_result", search_result)

        # 2. 深度分析
        analysis_prompt = f"""
你是专业研究员。请根据以下信息,对「{task.name}」进行深度分析:

搜索结果:
{search_result}

已有上下文:
{json.dumps(shared_context.get("background", {}), ensure_ascii=false, indent=2)}

请输出:
1. **核心发现**(3-5 条)
2. **数据支撑**(关键数字和事实)
3. **竞品对比**(表格形式)
4. **关键洞察**(对后续工作的启示)
"""
        result = await self.think(analysis_prompt)
        self.log(f"研究完成,输出 {len(result)} 字")
        return result

3.2 工程师 agent(写代码 + 方案)

# agents/engineer.py
from .base import baseagent, task
import subprocess, tempfile, os

class engineeragent(baseagent):
    """工程师:负责技术方案设计和代码实现"""

    def __init__(self, **kwargs):
        super().__init__(
            name="工程师小码",
            role="资深 python 全栈工程师,擅长架构设计、代码实现和性能优化",
            temperature=0.2,   # 代码任务要更确定,温度调低
            **kwargs
        )

    def _register_tools(self):
        self.register_tool("run_code", self._run_code)
        self.register_tool("check_syntax", self._check_syntax)

    async def _run_code(self, code: str, timeout: int = 10) -> str:
        """在沙箱中执行 python 代码"""
        with tempfile.namedtemporaryfile(mode="w", suffix=".py", delete=false) as f:
            f.write(code)
            f.flush()
            try:
                result = subprocess.run(
                    ["python", f.name],
                    capture_output=true,
                    text=true,
                    timeout=timeout
                )
                output = result.stdout or result.stderr
                return f"执行结果:\n{output[:2000]}"
            except subprocess.timeoutexpired:
                return "⚠️ 执行超时"
            except exception as e:
                return f"❌ 执行失败:{e}"
            finally:
                os.unlink(f.name)

    async def _check_syntax(self, code: str) -> str:
        """检查 python 代码语法"""
        try:
            import ast
            ast.parse(code)
            return "✅ 语法检查通过"
        except syntaxerror as e:
            return f"❌ 语法错误:{e}"

    def _extract_code(self, text: str) -> str:
        """从 llm 输出中提取代码块"""
        import re
        pattern = r"```(?:python)?\n(.*?)```"
        matches = re.findall(pattern, text, re.dotall)
        return matches[0].strip() if matches else text

    async def execute(self, task: task, shared_context: dict) -> dict:
        self.log(f"开始开发:{task.name}")
        task.started_at = __import__("time").time()

        research = shared_context.get("research_result", "")

        # 1. 方案设计
        design_prompt = f"""
任务:{task.description}

参考研究结论:
{research[:1500] if research else "无"}

请先输出:
## 技术方案
1. 架构选型(为什么这么选)
2. 核心模块拆解
3. 关键技术点
4. 预期接口设计

然后输出完整的 python 实现代码。
"""
        design_output = await self.think(design_prompt)

        # 2. 提取并验证代码
        code = self._extract_code(design_output)
        syntax_check = await self._check_syntax(code)

        # 3. 如果语法有问题,自动修复
        if "❌" in syntax_check:
            fix_prompt = f"以下代码有语法错误,请修复:\n{syntax_check}\n\n```python\n[code]\n```"
            fixed = await self.think(fix_prompt)
            code = self._extract_code(fixed)

        # 4. 运行测试
        run_result = await self._run_code(code)
        self.log(f"代码执行:{run_result[:100]}")

        return {
            "design": design_output,
            "code": code,
            "syntax_check": syntax_check,
            "run_result": run_result
        }

3.3 测试 agent + 文档 agent

# agents/tester.py
class testeragent(baseagent):
    """测试专家:生成单元测试,检测 bug"""

    def __init__(self, **kwargs):
        super().__init__(
            name="测试专家小测",
            role="资深 qa 工程师,擅长编写 pytest 单元测试和发现潜在 bug",
            temperature=0.2,
            **kwargs
        )

    async def execute(self, task: task, shared_context: dict) -> dict:
        self.log("开始生成测试...")
        code = shared_context.get("engineer_result", {}).get("code", "")

        test_prompt = f"""
请为以下 python 代码编写全面的 pytest 测试套件:

```python
[code]

要求:

  1. 覆盖所有公开函数/方法
  2. 包含:正常用例、边界条件、异常用例
  3. 使用 pytest.mark.parametrize 参数化测试
  4. 添加 mock(如果涉及外部依赖)
  5. 目标覆盖率:> 80%

输出完整的可运行测试文件。

test_code = await self.think(test_prompt)
    # 分析测试覆盖情况
    analysis_prompt = f"分析以上测试的覆盖情况,列出可能遗漏的测试用例:"
    coverage_analysis = await self.think(analysis_prompt)
    return {
        "test_code": test_code,
        "coverage_analysis": coverage_analysis
    }

agents/documenter.py

class documenteragent(baseagent):
'文档撰写员:生成技术文档和 readme'
def __init__(self, **kwargs):
    super().__init__(
        name="文档小写",
        role="技术文档专家,擅长撰写清晰易读的 readme、api 文档和技术博客",
        temperature=0.6,   # 文档写作可以稍微有创意
        **kwargs
    )
async def execute(self, task: task, shared_context: dict) -> str:
    self.log("开始撰写文档...")
    engineer_result = shared_context.get("engineer_result", {})
    tester_result = shared_context.get("tester_result", {})
    research_result = shared_context.get("research_result", "")
    doc_prompt = f"""

请为以下项目生成完整的 readme.md:

【项目背景】
{research_result[:500] if research_result else task.description}
【技术方案】
{engineer_result.get(“design”, “”)[:800]}
【代码实现】
{engineer_result.get("code", "")[:1000]}
【测试情况】
{tester_result.get(“coverage_analysis”, “”)[:300]}

readme 需包含:

  • 项目简介(一句话)
  • 功能特性(带 emoji 的列表)
  • 快速开始(安装 + 使用示例)
  • api 参考(关键函数说明)
  • 项目结构
  • 贡献指南
  • license

风格:专业但不枯燥,对新手友好。

四、任务调度器:boss agent

agents/boss.py

import asyncio
from dataclasses import dataclass, field
from .base import baseagent, task, taskstatus
class bossagent(baseagent):
    """
    boss agent:整体任务规划 + 调度 + 结果汇总
    相当于项目经理,负责把大任务拆成小任务,分配给合适的 agent
    """
    def __init__(self, team: dict[str, baseagent], **kwargs):
        super().__init__(
            name="boss 总监",
            role="资深项目总监,擅长任务规划、团队协调和结果整合",
            temperature=0.5,
            **kwargs
        )
        self.team = team                  # {agent_name: agent_instance}
        self.task_queue: list[task] = []  # 任务队列
        self.shared_context: dict = {}    # 团队共享上下文
    async def plan(self, user_request: str) -> list[task]:
        """根据用户需求,拆解任务并分配给团队成员"""
        team_info = "\n".join([
            f"- {name}: {agent.role}"
            for name, agent in self.team.items()
        ])
        plan_prompt = f"""
用户需求:{user_request}
你的团队成员:
{team_info}
请把这个需求拆解成 3-6 个可执行的子任务,输出 json 格式:
[
  {{
    "name": "任务名称",
    "description": "详细描述(给 agent 的明确指令)",
    "assignee": "agent 名称(researcher/engineer/tester/documenter)",
    "depends_on": []   // 依赖的其他任务名称
  }}
]
注意:
1. 每个任务职责单一,可独立执行
2. 正确设置依赖关系(如写代码必须等研究完成)
3. 任务描述要具体,让 agent 知道该做什么
"""
        plan_output = await self.think(plan_prompt)
        # 提取 json 任务列表
        import re, json
        match = re.search(r"\[.*?\]", plan_output, re.dotall)
        if match:
            try:
                tasks_data = json.loads(match.group())
                return [task(**t) for t in tasks_data]
            except:
                pass
        # fallback:默认任务流
        return self._default_task_flow(user_request)
    def _default_task_flow(self, request: str) -> list[task]:
        return [
            task(name="需求调研", description=f"调研:{request}", assignee="researcher"),
            task(name="代码实现", description=f"实现:{request}", assignee="engineer", depends_on=["需求调研"]),
            task(name="单元测试", description="为以上代码生成测试", assignee="tester", depends_on=["代码实现"]),
            task(name="文档生成", description="生成完整文档", assignee="documenter", depends_on=["代码实现", "单元测试"]),
        ]
    async def _execute_task(self, task: task) -> none:
        """执行单个任务"""
        agent_map = {
            "researcher":  self.team.get("researcher"),
            "engineer":    self.team.get("engineer"),
            "tester":      self.team.get("tester"),
            "documenter":  self.team.get("documenter"),
        }
        agent = agent_map.get(task.assignee)
        if not agent:
            task.status = taskstatus.failed
            task.error = f"找不到 agent:{task.assignee}"
            return
        task.status = taskstatus.running
        self.log(f"▶ 分派任务「{task.name}」给 {agent.name}")
        try:
            result = await agent.execute(task, self.shared_context)
            task.result = result
            task.status = taskstatus.done
            task.finished_at = __import__("time").time()
            # 把结果存入共享上下文
            self.shared_context[f"{task.assignee}_result"] = result
            self.log(f"✅ 任务「{task.name}」完成({task.elapsed_ms:.0f}ms)")
        except exception as e:
            task.status = taskstatus.failed
            task.error = str(e)
            self.log(f"❌ 任务「{task.name}」失败:{e}")
    async def run(self, user_request: str) -> dict:
        """主流程:规划 → 调度 → 执行 → 汇总"""
        print(f"\n{'='*60}")
        print(f"🎯 用户需求:{user_request}")
        print(f"{'='*60}\n")
        # 1. 任务规划
        self.log("开始任务规划...")
        tasks = await self.plan(user_request)
        self.shared_context["user_request"] = user_request
        self.task_queue = tasks
        print(f"📋 任务计划({len(tasks)} 个子任务):")
        for i, t in enumerate(tasks, 1):
            deps = f"(依赖:{', '.join(t.depends_on)})" if t.depends_on else ""
            print(f"  {i}. [{t.assignee}] {t.name} {deps}")
        print()
        # 2. 拓扑排序 + 执行(支持并行)
        completed = set()
        while len(completed) < len(tasks):
            # 找出当前可执行的任务(依赖已完成)
            ready = [
                t for t in tasks
                if t.status == taskstatus.pending
                and all(dep in completed for dep in t.depends_on)
            ]
            if not ready:
                # 检查是否死锁
                pending = [t for t in tasks if t.status == taskstatus.pending]
                if pending:
                    self.log("⚠️ 检测到任务依赖死锁,强制执行剩余任务")
                    ready = pending[:1]
                else:
                    break
            # 并行执行所有可执行任务
            await asyncio.gather(*[self._execute_task(t) for t in ready])
            completed.update(t.name for t in ready if t.status == taskstatus.done)
        # 3. 汇总结果
        return await self._summarize()
    async def _summarize(self) -> dict:
        """汇总所有 agent 的工作成果"""
        self.log("汇总所有成果...")
        results = {
            name: ctx
            for name, ctx in self.shared_context.items()
            if name.endswith("_result")
        }
        summary_prompt = f"""
团队已完成以下工作,请综合汇总成一份清晰的最终报告:
{__import__("json").dumps(
    {k: str(v)[:500] for k, v in results.items()},
    ensure_ascii=false, indent=2
)}
输出格式:
## 项目总结
- 完成情况
- 核心交付物
- 关键发现
- 建议后续行动
"""
        summary = await self.think(summary_prompt)
        return {
            "summary": summary,
            "details": results,
            "tasks": [t.to_dict() for t in self.task_queue]
        }

五、实战:自动生成竞品分析报告

# main.py
import asyncio
import os
from agents.boss import bossagent
from agents.researcher import researcheragent
from agents.engineer import engineeragent
from agents.tester import testeragent
from agents.documenter import documenteragent

api_key = os.getenv("openai_api_key")
base_url = os.getenv("base_url", "https://api.openai.com/v1")

def build_team() -> dict:
    """组建 ai 工作团队"""
    shared_kwargs = dict(
        api_key=api_key,
        base_url=base_url,
        model="gpt-4o",
        verbose=true
    )
    return {
        "researcher":  researcheragent(**shared_kwargs),
        "engineer":    engineeragent(**shared_kwargs),
        "tester":      testeragent(**shared_kwargs),
        "documenter":  documenteragent(**shared_kwargs),
    }


async def run_competitive_analysis():
    """实战场景:自动生成竞品分析报告"""
    team = build_team()
    boss = bossagent(team=team, api_key=api_key, base_url=base_url, model="gpt-4o")

    result = await boss.run(
        "分析 rag(检索增强生成)领域的主流工具:langchain、llamaindex、dify,"
        "对比它们的特点、优劣势和适用场景,给出选型建议,并用 python 写一个简单的对比评测脚本。"
    )

    # 输出结果
    print("\n" + "="*60)
    print("🎉 最终报告")
    print("="*60)
    print(result["summary"])

    # 保存到文件
    import json
    with open("output/competitive_analysis.json", "w", encoding="utf-8") as f:
        json.dump(result, f, ensure_ascii=false, indent=2)
    print("\n✅ 完整结果已保存到 output/competitive_analysis.json")


async def run_code_generation():
    """实战场景:自动写代码 + 测试 + 文档"""
    team = build_team()
    boss = bossagent(team=team, api_key=api_key, base_url=base_url, model="gpt-4o")

    result = await boss.run(
        "用 python 实现一个高性能的内存缓存库,支持:lru/lfu/ttl 三种淘汰策略、"
        "线程安全、最大容量限制、序列化存储,提供装饰器 api。"
    )

    print(result["summary"])


if __name__ == "__main__":
    asyncio.run(run_competitive_analysis())

六、agent 通信与状态共享

# 实现一个简单的消息总线,让 agent 之间可以通信
import asyncio
from collections import defaultdict

class messagebus:
    """
    agent 消息总线(发布-订阅模式)
    允许 agent 在执行过程中互相通知
    """
    def __init__(self):
        self._subscribers: dict[str, list[asyncio.queue]] = defaultdict(list)

    def subscribe(self, topic: str) -> asyncio.queue:
        """订阅某个话题"""
        q = asyncio.queue()
        self._subscribers[topic].append(q)
        return q

    async def publish(self, topic: str, message: dict):
        """发布消息到某个话题"""
        for q in self._subscribers[topic]:
            await q.put(message)

    async def request_help(
        self,
        from_agent: str,
        to_agent: str,
        question: str
    ) -> str:
        """agent 向另一个 agent 请求帮助"""
        await self.publish(f"help_request_{to_agent}", {
            "from": from_agent,
            "question": question
        })
        # 等待响应(超时 30 秒)
        response_queue = self.subscribe(f"help_response_{from_agent}")
        try:
            response = await asyncio.wait_for(response_queue.get(), timeout=30)
            return response.get("answer", "")
        except asyncio.timeouterror:
            return f"❌ {to_agent} 未在超时内响应"


# 在 agent 中使用消息总线
class collaborativeagent(baseagent):
    def __init__(self, bus: messagebus, **kwargs):
        super().__init__(**kwargs)
        self.bus = bus

    async def ask_colleague(self, agent_name: str, question: str) -> str:
        """向同事请教问题"""
        self.log(f"向 {agent_name} 求助:{question[:50]}...")
        answer = await self.bus.request_help(self.name, agent_name, question)
        return answer

七、错误恢复与重试机制

import functools

def with_retry(max_retries: int = 3, delay: float = 1.0, backoff: float = 2.0):
    """任务执行失败自动重试的装饰器(指数退避)"""
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(self, task: task, *args, **kwargs):
            last_error = none
            for attempt in range(max_retries):
                try:
                    return await func(self, task, *args, **kwargs)
                except exception as e:
                    last_error = e
                    wait = delay * (backoff ** attempt)
                    self.log(f"⚠️ 第 {attempt+1} 次失败({e}),{wait:.1f}s 后重试...")
                    await asyncio.sleep(wait)
                    task.retries += 1
            task.status = taskstatus.failed
            task.error = str(last_error)
            raise last_error
        return wrapper
    return decorator


# 在 agent 中使用
class robustengineeragent(engineeragent):
    @with_retry(max_retries=3, delay=2.0)
    async def execute(self, task: task, shared_context: dict) -> dict:
        return await super().execute(task, shared_context)

八、性能优化:并行执行

# 使用信号量控制并发,避免 api 限流
import asyncio
from contextlib import asynccontextmanager

class concurrentbossagent(bossagent):
    """支持并行执行的 boss agent"""

    def __init__(self, *args, max_concurrent: int = 3, **kwargs):
        super().__init__(*args, **kwargs)
        self.semaphore = asyncio.semaphore(max_concurrent)  # 最多 3 个并发

    async def _execute_task_with_limit(self, task: task) -> none:
        async with self.semaphore:  # 拿到信号量才执行
            await self._execute_task(task)

    async def run(self, user_request: str) -> dict:
        """并行执行不依赖彼此的任务"""
        tasks = await self.plan(user_request)

        # 按依赖关系分层
        layers = self._topological_sort(tasks)

        for layer in layers:
            # 同一层的任务没有依赖,可以并行
            print(f"⚡ 并行执行 {len(layer)} 个任务:{[t.name for t in layer]}")
            await asyncio.gather(*[
                self._execute_task_with_limit(t) for t in layer
            ])

        return await self._summarize()

    def _topological_sort(self, tasks: list[task]) -> list[list[task]]:
        """拓扑排序,返回可并行的任务层"""
        task_map = {t.name: t for t in tasks}
        layers = []
        remaining = set(t.name for t in tasks)

        while remaining:
            # 找出当前没有未完成依赖的任务
            layer = [
                task_map[name] for name in remaining
                if all(dep not in remaining for dep in task_map[name].depends_on)
            ]
            if not layer:
                break  # 避免死循环
            layers.append(layer)
            remaining -= {t.name for t in layer}

        return layers

九、web 监控面板(gradio)

# monitor.py
import gradio as gr
import asyncio
import json
from main import build_team, bossagent

task_log = []

def run_agents(user_request: str):
    """在 gradio 中运行多智能体系统"""
    team = build_team()
    boss = bossagent(team=team, api_key=os.getenv("openai_api_key"))

    result = asyncio.run(boss.run(user_request))
    task_log.extend(result["tasks"])

    # 格式化任务执行结果
    task_table = [[
        t["id"], t["name"], t["assignee"],
        t["status"], f"{t['elapsed_ms']:.0f}ms"
    ] for t in result["tasks"]]

    return (
        result["summary"],
        task_table,
        json.dumps(result["details"], ensure_ascii=false, indent=2)
    )


with gr.blocks(title="🤖 多智能体监控面板") as demo:
    gr.markdown("# 🤖 python 多智能体协作系统")
    gr.markdown("输入任务,5 个 ai agent 自动分工完成,实时查看进度")

    with gr.row():
        user_input = gr.textbox(
            label="📝 输入任务",
            placeholder="例如:分析 langchain vs llamaindex,给出技术选型建议...",
            lines=3
        )
        run_btn = gr.button("🚀 启动团队", variant="primary", size="lg")

    summary_output = gr.markdown(label="📋 最终汇总")

    with gr.accordion("📊 任务执行详情", open=false):
        task_table = gr.dataframe(
            headers=["任务id", "任务名", "负责agent", "状态", "耗时"],
            label="任务执行记录"
        )

    with gr.accordion("🔍 原始输出", open=false):
        raw_output = gr.json(label="详细结果")

    gr.examples(
        examples=[
            ["分析 fastapi vs django vs flask,给出 2026 年 python api 框架选型建议"],
            ["用 python 实现一个支持并发的下载器,带进度条和断点续传"],
            ["调研向量数据库 milvus/weaviate/chroma 的优劣势,并写一个性能对比脚本"],
        ],
        inputs=user_input
    )

    run_btn.click(
        fn=run_agents,
        inputs=user_input,
        outputs=[summary_output, task_table, raw_output]
    )

demo.launch(server_name="0.0.0.0", server_port=7860)

十、效果对比:单 agent vs 多 agent

维度单 agent多 agent 协作
输出质量中等,容易混乱高,专业分工
复杂任务容易遗漏、错乱拆解清晰,覆盖全面
执行速度顺序,慢并行,快 2-5x
可扩展性差,难以增加能力好,加 agent 即可
可调试性难,输出混在一起易,每个 agent 独立可观测
适用场景简单单步任务复杂多步骤任务

总结

本文搭建的多智能体系统核心要点:

  • baseagent 基类:统一的 llm 调用 + 工具注册 + 记忆管理
  • 5 个专业 agent:研究员/工程师/测试/文档/boss 各司其职
  • boss agent:任务规划 + 拓扑调度 + 结果汇总
  • 并行执行:信号量控制,最大化利用 api 并发
  • 错误恢复:指数退避重试,提高任务成功率
  • 消息总线:agent 之间实时通信
  • gradio 面板:可视化监控执行过程

以上就是python实现多智能体协作的实战教程的详细内容,更多关于python多智能体的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com