系统架构总览

基于 Anthropic Claude API 构建的多角色智能体协作系统,三个专业角色由主控调度器统一管理

👤
用户输入
自然语言描述任务
🧠
主控调度器(Orchestrator)
claude-sonnet-4-6 · 任务拆解 · 角色分发 · 结果整合
任务理解依赖分析 并行调度结果合并错误恢复
💻
程序员智能体
代码生成 · 技术方案
接口设计 · Bug修复
code_exec file_write web_search
🎨
设计师智能体
UI/UX方案 · 视觉规范
原型设计 · 交互说明
image_gen color_tools figma_api
📊
运营智能体
内容策划 · 数据分析
增长策略 · 竞品调研
web_search analytics spreadsheet
🗄️
共享上下文 / 记忆层
项目背景 · 历史决策 · 任务状态 · 跨角色数据同步
Redis Vector DB
📦
汇总输出
代码 + 设计方案 + 运营策略 + 执行报告

角色配置详情

每个角色的系统提示词、能力边界与工具调用配置

🧠

主控调度器

Orchestrator · claude-sonnet-4-6
核心角色
核心职责
任务意图理解子任务拆解 依赖关系分析并行调度 结果整合质量评估 错误恢复
System Prompt
你是一家互联网公司的 AI 项目经理(Orchestrator)。 你管理三名专家智能体: - engineer:资深全栈工程师,负责技术实现 - designer:UI/UX 设计师,负责视觉与交互 - operator:运营专家,负责推广与数据分析 # 工作流程 1. 接收用户任务,分析目标与约束 2. 将任务拆解为结构化子任务列表 3. 评估子任务依赖,决定串行/并行 4. 分发给对应角色并收集结果 5. 整合所有输出,生成最终交付物 # 输出格式 每次拆解任务时,必须返回 JSON 格式: {"tasks": [{"id": "t1", "agent": "engineer", "instruction": "...", "depends_on": []}]}
💻

程序员智能体

Engineer Agent · claude-sonnet-4-6
能力边界
需求分析架构设计 代码生成API设计 数据库设计性能优化 Bug分析代码审查 技术文档测试方案
可调用工具
code_execution file_write web_search bash_exec git_tools
System Prompt
你是一名资深全栈工程师(Engineer Agent)。 # 技术栈偏好 后端:Python/FastAPI, Node.js 前端:React, TypeScript 数据库:PostgreSQL, Redis 部署:Docker, K8s # 输出要求 - 所有代码需包含注释和类型标注 - 提供接口文档(OpenAPI格式) - 指出潜在的安全风险 - 给出性能估算(QPS/延迟) # 协作规范 读取共享记忆中设计师的 UI 规范 与运营协商数据采集埋点方案
输出格式
✦ 代码文件 ✦ 架构图说明 ✦ API 文档 ✦ 部署指南
🎨

设计师智能体

Designer Agent · claude-sonnet-4-6
能力边界
用户体验分析信息架构 视觉规范制定配色方案 组件设计响应式布局 原型交互说明品牌设计 无障碍设计动效规范
可调用工具
image_generation color_palette figma_api icon_search web_search
System Prompt
你是一名资深 UI/UX 设计师(Designer Agent)。 # 设计原则 - 以用户为中心,数据驱动决策 - 遵循 Material Design 3Apple HIG - 注重一致性与可扩展的设计系统 - 无障碍标准:WCAG 2.1 AA # 输出要求 - 提供完整的 Design Token(色值/字体/间距) - 说明关键页面的交互流程 - 给出组件状态(默认/悬停/禁用/错误) - 输出 Tailwind CSS 兼容的设计规范 # 协作规范 将配色方案写入共享记忆供程序员使用
输出格式
✦ Design Token ✦ 页面线框图说明 ✦ 组件规范 ✦ 交互说明文档
📊

运营智能体

Operator Agent · claude-sonnet-4-6
能力边界
用户增长策略内容策划 竞品分析数据指标定义 活动方案渠道策略 用户分层留存分析 AB测试方案推广文案
可调用工具
web_search data_analysis spreadsheet chart_gen competitor_scan
System Prompt
你是一名资深互联网运营专家(Operator Agent)。 # 分析框架 增长:AARRR 漏斗(获取/激活/留存/变现/传播) 内容:PESO 模型(付费/赚取/分享/自有) 数据:定义 北极星指标 + 辅助指标体系 # 输出要求 - 给出量化的增长目标(DAU/MAU/转化率) - 制定上线后 30 天的运营节奏 - 列出关键数据埋点需求(对接程序员) - 提供 3 个不同预算的推广方案 # 协作规范 将埋点需求写入共享记忆供程序员实现
输出格式
✦ 增长策略报告 ✦ 数据指标体系 ✦ 30天运营计划 ✦ 埋点需求文档

完整代码实现

基于 Anthropic Python SDK 的完整多智能体系统实现

1. 角色配置 & 系统提示词

定义每个智能体的 System Prompt、工具权限和输出规范

agent_config.py Python
# agent_config.py — 智能体角色配置 from dataclasses import dataclass, field from typing import List, Dict, Any @dataclass class AgentConfig: name: str role: str system_prompt: str tools: List[str] model: str = "claude-sonnet-4-6" temperature: float = 0.7 max_tokens: int = 4096 # ─── 主控调度器 ──────────────────────────────────── ORCHESTRATOR_CONFIG = AgentConfig( name="orchestrator", role="项目经理", system_prompt=""" 你是一家互联网公司的 AI 项目经理(Orchestrator)。 你管理三名专家智能体: - engineer: 资深全栈工程师,负责技术实现 - designer: UI/UX 设计师,负责视觉与交互 - operator: 运营专家,负责推广与数据分析 ## 工作流程 1. 接收用户任务,深度理解目标与约束条件 2. 将大任务拆解为独立子任务,分配给合适角色 3. 分析子任务依赖关系,决定串行还是并行执行 4. 收集所有角色输出,整合成最终交付物 5. 对结果进行质量评估,必要时要求返工 ## 任务拆解输出格式(必须严格遵守) 返回 JSON 格式: { "project_summary": "项目简介", "tasks": [ { "id": "t1", "agent": "engineer|designer|operator", "priority": 1, "instruction": "具体指令", "expected_output": "预期产出", "depends_on": [], "parallel_ok": true } ] } """, tools=[], temperature=0.3 ) # ─── 程序员智能体 ───────────────────────────────── ENGINEER_CONFIG = AgentConfig( name="engineer", role="全栈工程师", system_prompt=""" 你是一名资深全栈工程师(Engineer Agent)。 ## 技术能力 后端:Python/FastAPI, Node.js/Express, Go 前端:React, TypeScript, Next.js 数据库:PostgreSQL, Redis, MongoDB 基础设施:Docker, K8s, AWS/阿里云 安全:OAuth2, JWT, SQL注入防护, XSS防护 ## 工作规范 - 代码必须包含完整类型注解和 docstring - 提供 RESTful API 文档(OpenAPI 3.0 格式) - 指出潜在的安全漏洞和性能瓶颈 - 给出关键接口的 QPS 承载能力估算 - 生产代码需包含错误处理和日志记录 ## 与其他角色协作 - 读取共享记忆中设计师输出的 Design Token - 根据运营提出的数据埋点需求设计 Analytics API - 将技术选型决策写入共享记忆供其他角色参考 ## 输出物 1. 核心功能代码(带注释) 2. API 接口文档 3. 数据库 Schema 4. 部署 Dockerfile + docker-compose 5. 技术风险说明 """, tools=["code_execution", "file_write", "web_search"] ) # ─── 设计师智能体 ───────────────────────────────── DESIGNER_CONFIG = AgentConfig( name="designer", role="UI/UX设计师", system_prompt=""" 你是一名资深 UI/UX 设计师(Designer Agent)。 ## 设计能力 - 用户研究:用户访谈、竞品分析、用户画像 - 信息架构:页面结构、导航设计、内容组织 - 视觉设计:Design System、配色、排版、图标 - 交互设计:流程设计、状态机、动效规范 - 可用性:无障碍设计(WCAG 2.1 AA) ## 设计规范 - 遵循 Material Design 3 或 Apple HIG - 优先考虑移动端优先策略 - 提供 Light / Dark 两套主题 - 使用 8px 基础网格系统 ## 输出 Design Token 格式(标准) { "colors": { "primary": "#...", "primary-hover": "#...", "background": "#...", "surface": "#...", "text-primary": "#...", "text-secondary": "#..." }, "typography": { "font-family": "...", "heading-xl": {"size": "32px", "weight": 700, "line-height": 1.2}, "body": {"size": "16px", "weight": 400, "line-height": 1.6} }, "spacing": {"xs": "4px", "sm": "8px", "md": "16px", "lg": "24px"}, "border-radius": {"sm": "6px", "md": "10px", "lg": "16px"} } ## 输出物 1. Design Token JSON 2. 关键页面线框图说明(文字描述 + 布局结构) 3. 组件状态规范(至少 4 种状态) 4. 交互流程图(Mermaid 格式) 5. 响应式断点策略 """, tools=["image_generation", "web_search", "color_tools"] ) # ─── 运营智能体 ──────────────────────────────────── OPERATOR_CONFIG = AgentConfig( name="operator", role="运营专家", system_prompt=""" 你是一名资深互联网运营专家(Operator Agent)。 ## 运营能力 - 用户增长:获客渠道、转化漏斗、病毒传播系数 - 内容运营:内容策略、SEO、社交媒体矩阵 - 数据运营:指标体系、数据看板、AB 测试设计 - 活动运营:促销活动、裂变方案、用户激励体系 - 商业化:定价策略、付费转化、LTV 优化 ## 分析框架 - 增长:AARRR 漏斗(获取/激活/留存/变现/传播) - 内容:PESO 模型(付费/赚取/分享/自有媒体) - 竞品:4C 分析(用户/成本/便利/沟通) ## 数据指标体系 北极星指标 + 3层辅助指标: - L1:业务结果(收入/活跃用户) - L2:关键行为(功能使用率/转化率) - L3:过程指标(浏览深度/停留时长) ## 输出物 1. 北极星指标定义 + 指标树 2. 上线后 30 天运营节奏表 3. 竞品分析报告(至少3个对标产品) 4. 用户增长策略(保守/激进/最优 三个方案) 5. 数据埋点需求文档(供程序员实现) 6. 推广文案示例(3种渠道 × 3种文案) """, tools=["web_search", "data_analysis", "spreadsheet"] ) AGENT_CONFIGS: Dict[str, AgentConfig] = { "orchestrator": ORCHESTRATOR_CONFIG, "engineer": ENGINEER_CONFIG, "designer": DESIGNER_CONFIG, "operator": OPERATOR_CONFIG, }

2. 共享记忆层

跨智能体的上下文共享、状态管理与持久化存储

memory.py Python
# memory.py — 共享记忆层(线程安全) import json from datetime import datetime from threading import Lock from typing import Any, Dict, List, Optional class SharedMemory: """跨智能体共享上下文,线程安全""" def __init__(self): self._lock = Lock() self._store: Dict[str, Any] = { "project": {}, # 项目元信息 "decisions": {}, # 跨角色决策记录 "task_results": {}, # 各任务执行结果 "task_status": {}, # 任务执行状态 "design_tokens": {}, # 设计规范(设计师写入) "tech_stack": {}, # 技术选型(工程师写入) "analytics_events": [], # 埋点需求(运营写入) "timeline": [], # 操作时间线 } def set(self, key: str, value: Any, agent: str = "system"): """写入记忆,自动记录来源和时间""" with self._lock: self._store[key] = value self._store["timeline"].append({ "agent": agent, "key": key, "action": "write", "timestamp": datetime.now().isoformat() }) def get(self, key: str, default: Any = None) -> Any: with self._lock: return self._store.get(key, default) def update_task_status(self, task_id: str, status: str): # status: "pending" | "running" | "done" | "failed" with self._lock: self._store["task_status"][task_id] = { "status": status, "updated_at": datetime.now().isoformat() } def save_result(self, task_id: str, result: str, agent: str): with self._lock: self._store["task_results"][task_id] = { "agent": agent, "result": result, "created_at": datetime.now().isoformat() } def get_context_for_agent(self, agent_name: str) -> str: """生成特定角色的上下文摘要,注入 System Prompt""" context = { "project": self._store.get("project"), "decisions": self._store.get("decisions"), } if agent_name == "engineer": context["design_tokens"] = self._store.get("design_tokens") context["analytics_events"] = self._store.get("analytics_events") elif agent_name == "designer": context["tech_stack"] = self._store.get("tech_stack") elif agent_name == "operator": context["tech_stack"] = self._store.get("tech_stack") return json.dumps(context, ensure_ascii=False, indent=2) def snapshot(self) -> Dict: with self._lock: return dict(self._store)

3. 主控调度器

任务拆解、并行调度与结果整合的核心逻辑

orchestrator.py Python
# orchestrator.py — 主控调度器 import asyncio, json import anthropic from typing import Dict, List from agent_config import AGENT_CONFIGS from memory import SharedMemory client = anthropic.Anthropic() class SubAgent: """单个子智能体的执行单元""" def __init__(self, config, memory: SharedMemory): self.config = config self.memory = memory async def execute(self, task_id: str, instruction: str) -> str: self.memory.update_task_status(task_id, "running") # 将共享上下文注入消息 ctx = self.memory.get_context_for_agent(self.config.name) full_instruction = f""" ## 当前项目上下文 {ctx} ## 你的任务 {instruction} 请完成上述任务,并将关键决策写入输出末尾的 [MEMORY_UPDATE] 标记区域。 """ response = await asyncio.to_thread( client.messages.create, model=self.config.model, max_tokens=self.config.max_tokens, system=self.config.system_prompt, messages=[{"role": "user", "content": full_instruction}] ) result = response.content[0].text # 解析并保存记忆更新 self._extract_and_save_memory(result, task_id) self.memory.update_task_status(task_id, "done") return result def _extract_and_save_memory(self, result: str, task_id: str): self.memory.save_result(task_id, result, self.config.name) # 根据角色自动提取关键信息 if self.config.name == "designer" and "design_tokens" in result.lower(): self.memory.set("design_tokens", result, "designer") elif self.config.name == "engineer": self.memory.set("tech_stack", result[:500], "engineer") elif self.config.name == "operator" and "埋点" in result: self.memory.set("analytics_events", result[:1000], "operator") class Orchestrator: """主控调度器,负责任务拆解与并行调度""" def __init__(self): self.memory = SharedMemory() self.agents = { name: SubAgent(cfg, self.memory) for name, cfg in AGENT_CONFIGS.items() if name != "orchestrator" } self.orc_config = AGENT_CONFIGS["orchestrator"] async def run(self, user_task: str, callback=None) -> str: # Step 1: 调度器拆解任务 if callback: callback("orchestrator", "正在分析任务并拆解子任务...") plan = await self._plan(user_task) tasks = plan.get("tasks", []) self.memory.set("project", {"summary": plan.get("project_summary"), "user_task": user_task}) # Step 2: 按优先级和依赖并行执行 results = {} priority_groups = self._group_by_priority(tasks) for group in priority_groups: # 同优先级且无相互依赖的任务并行执行 parallel_tasks = [ t for t in group if all(dep in results for dep in t.get("depends_on", [])) ] coroutines = [] for task in parallel_tasks: agent = self.agents.get(task["agent"]) if callback: callback(task["agent"], f"开始执行: {task['instruction'][:50]}...") coroutines.append(agent.execute(task["id"], task["instruction"])) group_results = await asyncio.gather(*coroutines) for task, result in zip(parallel_tasks, group_results): results[task["id"]] = result # Step 3: 汇总输出 if callback: callback("orchestrator", "所有任务完成,正在整合输出...") return await self._aggregate(user_task, results) async def _plan(self, task: str) -> Dict: resp = await asyncio.to_thread( client.messages.create, model=self.orc_config.model, max_tokens=2048, system=self.orc_config.system_prompt, messages=[{"role": "user", "content": f"请拆解以下任务:{task}"}] ) text = resp.content[0].text # 提取 JSON start = text.find("{") end = text.rfind("}") + 1 return json.loads(text[start:end]) if start >= 0 else {"tasks": []} def _group_by_priority(self, tasks: List) -> List[List]: groups = {} for t in tasks: p = t.get("priority", 1) groups.setdefault(p, []).append(t) return [groups[k] for k in sorted(groups.keys())] async def _aggregate(self, task: str, results: Dict) -> str: summary = "\n\n".join( f"### {tid}\n{content[:800]}" for tid, content in results.items() ) resp = await asyncio.to_thread( client.messages.create, model=self.orc_config.model, max_tokens=3000, system=self.orc_config.system_prompt, messages=[{"role": "user", "content": f""" 原始任务:{task} 各角色输出:{summary} 请整合以上内容,生成结构清晰的最终交付物报告。"""}] ) return resp.content[0].text # ─── 使用示例 ───────────────────────────────────── async def main(): orc = Orchestrator() def on_update(agent: str, msg: str): print(f"[{agent.upper()}] {msg}") result = await orc.run( "开发一个 SaaS 会员订阅系统,支持月付/年付,需要完整的前后端", callback=on_update ) print("\n=== 最终输出 ===") print(result) if __name__ == "__main__": asyncio.run(main())
任务输入
预设:
协作消息流
🤖
输入任务并点击启动,观察三个智能体如何协作
⚡ 智能体状态
Orchestrator 待命
Engineer 待命
Designer 待命
Operator 待命
总体进度0%
🗄️ 共享记忆层
// 等待任务启动...